You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ap...@apache.org on 2023/06/15 12:13:30 UTC

[arrow] branch main updated: GH-35749: [C++] Handle run-end encoded filters in compute kernels (#35750)

This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 475b5b9463 GH-35749: [C++] Handle run-end encoded filters in compute kernels (#35750)
475b5b9463 is described below

commit 475b5b9463b64bec4e03a47e3277076db246bd35
Author: Felipe Oliveira Carvalho <fe...@gmail.com>
AuthorDate: Thu Jun 15 09:13:17 2023 -0300

    GH-35749: [C++] Handle run-end encoded filters in compute kernels (#35750)
    
    ### Rationale for this change
    
    Boolean arrays (bitmaps) used to represent filters in Arrow take 1 bit per boolean value. If the filter contains long runs, the filter can be run-end encoded and save even more memory.
    
    Using POPCNT, a bitmap can be scanned efficiently for <64 runs of logical values, but a run-end encoded array gives the lengths of the run directly and go beyond word size per run.
    
    These two observations make the case that, for the right dataset, REE filters can be more efficiently processed in compute kernels.
    
    ### What changes are included in this PR?
    
     - [x] `GetFilterOutputSize` can count number of emits from a REE filter
     - [x] `GetTakeIndices` can produce an array of logical indices from a REE filter
     - [x] `"array_filter"` can handle REE filters
    
    ### Are these changes tested?
    
    Yes.
    
    ### Are there any user-facing changes?
    
    Yes.
    * Closes: #35749
    
    Lead-authored-by: Felipe Oliveira Carvalho <fe...@gmail.com>
    Co-authored-by: Antoine Pitrou <pi...@free.fr>
    Signed-off-by: Antoine Pitrou <an...@python.org>
---
 cpp/src/arrow/compute/api_vector.h                 |   6 +
 cpp/src/arrow/compute/kernel.cc                    |   4 +
 cpp/src/arrow/compute/kernel.h                     |   6 +
 cpp/src/arrow/compute/kernels/ree_util_internal.h  |  25 +
 cpp/src/arrow/compute/kernels/vector_selection.cc  |   7 +-
 .../kernels/vector_selection_filter_internal.cc    | 543 +++++++++++++--------
 .../compute/kernels/vector_selection_internal.cc   | 158 +++++-
 .../compute/kernels/vector_selection_internal.h    |  27 +-
 .../kernels/vector_selection_take_internal.cc      | 154 +++++-
 .../arrow/compute/kernels/vector_selection_test.cc | 158 ++++--
 10 files changed, 802 insertions(+), 286 deletions(-)

diff --git a/cpp/src/arrow/compute/api_vector.h b/cpp/src/arrow/compute/api_vector.h
index d02c505f3e..400d69efc7 100644
--- a/cpp/src/arrow/compute/api_vector.h
+++ b/cpp/src/arrow/compute/api_vector.h
@@ -254,12 +254,18 @@ namespace internal {
 // These internal functions are implemented in kernels/vector_selection.cc
 
 /// \brief Return the number of selected indices in the boolean filter
+///
+/// \param filter a plain or run-end encoded boolean array with or without nulls
+/// \param null_selection how to handle nulls in the filter
 ARROW_EXPORT
 int64_t GetFilterOutputSize(const ArraySpan& filter,
                             FilterOptions::NullSelectionBehavior null_selection);
 
 /// \brief Compute uint64 selection indices for use with Take given a boolean
 /// filter
+///
+/// \param filter a plain or run-end encoded boolean array with or without nulls
+/// \param null_selection how to handle nulls in the filter
 ARROW_EXPORT
 Result<std::shared_ptr<ArrayData>> GetTakeIndices(
     const ArraySpan& filter, FilterOptions::NullSelectionBehavior null_selection,
diff --git a/cpp/src/arrow/compute/kernel.cc b/cpp/src/arrow/compute/kernel.cc
index 430a8dd8e5..fd554ba3d8 100644
--- a/cpp/src/arrow/compute/kernel.cc
+++ b/cpp/src/arrow/compute/kernel.cc
@@ -338,6 +338,10 @@ std::shared_ptr<TypeMatcher> RunEndEncoded(
                                                 std::move(value_type_matcher));
 }
 
+std::shared_ptr<TypeMatcher> RunEndEncoded(Type::type value_type_id) {
+  return RunEndEncoded(SameTypeId(value_type_id));
+}
+
 std::shared_ptr<TypeMatcher> RunEndEncoded(
     std::shared_ptr<TypeMatcher> run_end_type_matcher,
     std::shared_ptr<TypeMatcher> value_type_matcher) {
diff --git a/cpp/src/arrow/compute/kernel.h b/cpp/src/arrow/compute/kernel.h
index 7e408625b0..a52636aeb6 100644
--- a/cpp/src/arrow/compute/kernel.h
+++ b/cpp/src/arrow/compute/kernel.h
@@ -152,6 +152,12 @@ ARROW_EXPORT std::shared_ptr<TypeMatcher> RunEndInteger();
 ARROW_EXPORT std::shared_ptr<TypeMatcher> RunEndEncoded(
     std::shared_ptr<TypeMatcher> value_type_matcher);
 
+/// \brief Match run-end encoded types that use any valid run-end type and
+/// encode specific value types
+///
+/// @param[in] value_type_id a type id that the type of the values field should match
+ARROW_EXPORT std::shared_ptr<TypeMatcher> RunEndEncoded(Type::type value_type_id);
+
 /// \brief Match run-end encoded types that encode specific run-end and value types
 ///
 /// @param[in] run_end_type_matcher a matcher that is applied to the run_ends field
diff --git a/cpp/src/arrow/compute/kernels/ree_util_internal.h b/cpp/src/arrow/compute/kernels/ree_util_internal.h
index 7dda702c04..080d23c06a 100644
--- a/cpp/src/arrow/compute/kernels/ree_util_internal.h
+++ b/cpp/src/arrow/compute/kernels/ree_util_internal.h
@@ -83,6 +83,18 @@ class ReadWriteValue<ArrowType, in_has_validity_buffer, out_has_validity_buffer,
     return valid;
   }
 
+  /// Pre-conditions guaranteed by the callers:
+  /// - i and j are valid indices into the values buffer
+  /// - the values in i and j are valid
+  bool CompareValuesAt(int64_t i, int64_t j) const {
+    if constexpr (std::is_same_v<ArrowType, BooleanType>) {
+      return bit_util::GetBit(input_values_, i) == bit_util::GetBit(input_values_, j);
+    } else {
+      return (reinterpret_cast<const ValueRepr*>(input_values_))[i] ==
+             (reinterpret_cast<const ValueRepr*>(input_values_))[j];
+    }
+  }
+
   /// \brief Ensure padding is zeroed in validity bitmap.
   void ZeroValidityPadding(int64_t length) const {
     DCHECK(output_values_);
@@ -166,6 +178,11 @@ class ReadWriteValue<ArrowType, in_has_validity_buffer, out_has_validity_buffer,
     return valid;
   }
 
+  bool CompareValuesAt(int64_t i, int64_t j) const {
+    return 0 == memcmp(input_values_ + (i * byte_width_),
+                       input_values_ + (j * byte_width_), byte_width_);
+  }
+
   /// \brief Ensure padding is zeroed in validity bitmap.
   void ZeroValidityPadding(int64_t length) const {
     DCHECK(output_values_);
@@ -253,6 +270,14 @@ class ReadWriteValue<ArrowType, in_has_validity_buffer, out_has_validity_buffer,
     return valid;
   }
 
+  bool CompareValuesAt(int64_t i, int64_t j) const {
+    const offset_type len_i = input_offsets_[i + 1] - input_offsets_[i];
+    const offset_type len_j = input_offsets_[j + 1] - input_offsets_[j];
+    return len_i == len_j &&
+           memcmp(input_values_ + input_offsets_[i], input_values_ + input_offsets_[j],
+                  static_cast<size_t>(len_i));
+  }
+
   /// \brief Ensure padding is zeroed in validity bitmap.
   void ZeroValidityPadding(int64_t length) const {
     DCHECK(output_values_);
diff --git a/cpp/src/arrow/compute/kernels/vector_selection.cc b/cpp/src/arrow/compute/kernels/vector_selection.cc
index 3469744966..64c3db204c 100644
--- a/cpp/src/arrow/compute/kernels/vector_selection.cc
+++ b/cpp/src/arrow/compute/kernels/vector_selection.cc
@@ -332,8 +332,8 @@ void RegisterVectorSelection(FunctionRegistry* registry) {
   VectorKernel filter_base;
   filter_base.init = FilterState::Init;
   RegisterSelectionFunction("array_filter", array_filter_doc, filter_base,
-                            /*selection_type=*/boolean(), filter_kernels,
-                            GetDefaultFilterOptions(), registry);
+                            std::move(filter_kernels), GetDefaultFilterOptions(),
+                            registry);
 
   DCHECK_OK(registry->AddFunction(MakeFilterMetaFunction()));
 
@@ -345,8 +345,7 @@ void RegisterVectorSelection(FunctionRegistry* registry) {
   take_base.init = TakeState::Init;
   take_base.can_execute_chunkwise = false;
   RegisterSelectionFunction("array_take", array_take_doc, take_base,
-                            /*selection_type=*/match::Integer(), take_kernels,
-                            GetDefaultTakeOptions(), registry);
+                            std::move(take_kernels), GetDefaultTakeOptions(), registry);
 
   DCHECK_OK(registry->AddFunction(MakeTakeMetaFunction()));
 
diff --git a/cpp/src/arrow/compute/kernels/vector_selection_filter_internal.cc b/cpp/src/arrow/compute/kernels/vector_selection_filter_internal.cc
index 7a3ced9953..ea9ee8a102 100644
--- a/cpp/src/arrow/compute/kernels/vector_selection_filter_internal.cc
+++ b/cpp/src/arrow/compute/kernels/vector_selection_filter_internal.cc
@@ -17,6 +17,7 @@
 
 #include <cstdint>
 #include <cstring>
+#include <functional>
 #include <memory>
 #include <type_traits>
 #include <vector>
@@ -84,13 +85,29 @@ int64_t GetBitmapFilterOutputSize(const ArraySpan& filter,
   return output_size;
 }
 
-// TODO(pr-35750): Handle run-end encoded filters in compute kernels
+int64_t GetREEFilterOutputSize(const ArraySpan& filter,
+                               FilterOptions::NullSelectionBehavior null_selection) {
+  const auto& ree_type = checked_cast<const RunEndEncodedType&>(*filter.type);
+  DCHECK_EQ(ree_type.value_type()->id(), Type::BOOL);
+  int64_t output_size = 0;
+  VisitPlainxREEFilterOutputSegments(
+      filter, /*filter_may_have_nulls=*/true, null_selection,
+      [&output_size](int64_t, int64_t segment_length, bool) {
+        output_size += segment_length;
+        return true;
+      });
+  return output_size;
+}
 
 }  // namespace
 
 int64_t GetFilterOutputSize(const ArraySpan& filter,
                             FilterOptions::NullSelectionBehavior null_selection) {
-  return GetBitmapFilterOutputSize(filter, null_selection);
+  if (filter.type->id() == Type::BOOL) {
+    return GetBitmapFilterOutputSize(filter, null_selection);
+  }
+  DCHECK_EQ(filter.type->id(), Type::RUN_END_ENCODED);
+  return GetREEFilterOutputSize(filter, null_selection);
 }
 
 namespace {
@@ -146,10 +163,7 @@ class PrimitiveFilterImpl {
         values_null_count_(values.null_count),
         values_offset_(values.offset),
         values_length_(values.length),
-        filter_is_valid_(filter.buffers[0].data),
-        filter_data_(filter.buffers[1].data),
-        filter_null_count_(filter.null_count),
-        filter_offset_(filter.offset),
+        filter_(filter),
         null_selection_(null_selection) {
     if (values.type->id() != Type::BOOL) {
       // No offset applied for boolean because it's a bitmap
@@ -166,24 +180,83 @@ class PrimitiveFilterImpl {
     out_position_ = 0;
   }
 
-  void ExecNonNull() {
-    // Fast filter when values and filter are not null
-    ::arrow::internal::VisitSetBitRunsVoid(
-        filter_data_, filter_offset_, values_length_,
-        [&](int64_t position, int64_t length) { WriteValueSegment(position, length); });
+  void ExecREEFilter() {
+    if (filter_.child_data[1].null_count == 0 && values_null_count_ == 0) {
+      DCHECK(!out_is_valid_);
+      // Fastest: no nulls in either filter or values
+      return VisitPlainxREEFilterOutputSegments(
+          filter_, /*filter_may_have_nulls=*/false, null_selection_,
+          [&](int64_t position, int64_t segment_length, bool filter_valid) {
+            // Fastest path: all values in range are included and not null
+            WriteValueSegment(position, segment_length);
+            DCHECK(filter_valid);
+            return true;
+          });
+    }
+    if (values_is_valid_) {
+      DCHECK(out_is_valid_);
+      // Slower path: values can be null, so the validity bitmap should be copied
+      return VisitPlainxREEFilterOutputSegments(
+          filter_, /*filter_may_have_nulls=*/true, null_selection_,
+          [&](int64_t position, int64_t segment_length, bool filter_valid) {
+            if (filter_valid) {
+              CopyBitmap(values_is_valid_, values_offset_ + position, segment_length,
+                         out_is_valid_, out_offset_ + out_position_);
+              WriteValueSegment(position, segment_length);
+            } else {
+              bit_util::SetBitsTo(out_is_valid_, out_offset_ + out_position_,
+                                  segment_length, false);
+              memset(out_data_ + out_offset_ + out_position_, 0,
+                     segment_length * sizeof(T));
+              out_position_ += segment_length;
+            }
+            return true;
+          });
+    }
+    // Faster path: only write to out_is_valid_ if filter contains nulls and
+    // null_selection is EMIT_NULL
+    if (out_is_valid_) {
+      // Set all to valid, so only if nulls are produced by EMIT_NULL, we need
+      // to set out_is_valid[i] to false.
+      bit_util::SetBitsTo(out_is_valid_, out_offset_, out_length_, true);
+    }
+    return VisitPlainxREEFilterOutputSegments(
+        filter_, /*filter_may_have_nulls=*/true, null_selection_,
+        [&](int64_t position, int64_t segment_length, bool filter_valid) {
+          if (filter_valid) {
+            WriteValueSegment(position, segment_length);
+          } else {
+            bit_util::SetBitsTo(out_is_valid_, out_offset_ + out_position_,
+                                segment_length, false);
+            memset(out_data_ + out_offset_ + out_position_, 0,
+                   segment_length * sizeof(T));
+            out_position_ += segment_length;
+          }
+          return true;
+        });
   }
 
   void Exec() {
-    if (filter_null_count_ == 0 && values_null_count_ == 0) {
-      return ExecNonNull();
+    if (filter_.type->id() == Type::RUN_END_ENCODED) {
+      return ExecREEFilter();
+    }
+    const auto* filter_is_valid = filter_.buffers[0].data;
+    const auto* filter_data = filter_.buffers[1].data;
+    const auto filter_offset = filter_.offset;
+    if (filter_.null_count == 0 && values_null_count_ == 0) {
+      // Fast filter when values and filter are not null
+      ::arrow::internal::VisitSetBitRunsVoid(
+          filter_data, filter_.offset, values_length_,
+          [&](int64_t position, int64_t length) { WriteValueSegment(position, length); });
+      return;
     }
 
     // Bit counters used for both null_selection behaviors
-    DropNullCounter drop_null_counter(filter_is_valid_, filter_data_, filter_offset_,
+    DropNullCounter drop_null_counter(filter_is_valid, filter_data, filter_offset,
                                       values_length_);
     OptionalBitBlockCounter data_counter(values_is_valid_, values_offset_,
                                          values_length_);
-    OptionalBitBlockCounter filter_valid_counter(filter_is_valid_, filter_offset_,
+    OptionalBitBlockCounter filter_valid_counter(filter_is_valid, filter_offset,
                                                  values_length_);
 
     auto WriteNotNull = [&](int64_t index) {
@@ -228,7 +301,7 @@ class PrimitiveFilterImpl {
           if (filter_valid_block.AllSet()) {
             // Filter is non-null but some values are false
             for (int64_t i = 0; i < filter_block.length; ++i) {
-              if (bit_util::GetBit(filter_data_, filter_offset_ + in_position)) {
+              if (bit_util::GetBit(filter_data, filter_offset + in_position)) {
                 WriteNotNull(in_position);
               }
               ++in_position;
@@ -236,8 +309,8 @@ class PrimitiveFilterImpl {
           } else if (null_selection_ == FilterOptions::DROP) {
             // If any values are selected, they ARE NOT null
             for (int64_t i = 0; i < filter_block.length; ++i) {
-              if (bit_util::GetBit(filter_is_valid_, filter_offset_ + in_position) &&
-                  bit_util::GetBit(filter_data_, filter_offset_ + in_position)) {
+              if (bit_util::GetBit(filter_is_valid, filter_offset + in_position) &&
+                  bit_util::GetBit(filter_data, filter_offset + in_position)) {
                 WriteNotNull(in_position);
               }
               ++in_position;
@@ -246,9 +319,9 @@ class PrimitiveFilterImpl {
             // Data values in this block are not null
             for (int64_t i = 0; i < filter_block.length; ++i) {
               const bool is_valid =
-                  bit_util::GetBit(filter_is_valid_, filter_offset_ + in_position);
+                  bit_util::GetBit(filter_is_valid, filter_offset + in_position);
               if (is_valid &&
-                  bit_util::GetBit(filter_data_, filter_offset_ + in_position)) {
+                  bit_util::GetBit(filter_data, filter_offset + in_position)) {
                 // Filter slot is non-null and set
                 WriteNotNull(in_position);
               } else if (!is_valid) {
@@ -264,7 +337,7 @@ class PrimitiveFilterImpl {
           if (filter_valid_block.AllSet()) {
             // Filter is non-null but some values are false
             for (int64_t i = 0; i < filter_block.length; ++i) {
-              if (bit_util::GetBit(filter_data_, filter_offset_ + in_position)) {
+              if (bit_util::GetBit(filter_data, filter_offset + in_position)) {
                 WriteMaybeNull(in_position);
               }
               ++in_position;
@@ -272,8 +345,8 @@ class PrimitiveFilterImpl {
           } else if (null_selection_ == FilterOptions::DROP) {
             // If any values are selected, they ARE NOT null
             for (int64_t i = 0; i < filter_block.length; ++i) {
-              if (bit_util::GetBit(filter_is_valid_, filter_offset_ + in_position) &&
-                  bit_util::GetBit(filter_data_, filter_offset_ + in_position)) {
+              if (bit_util::GetBit(filter_is_valid, filter_offset + in_position) &&
+                  bit_util::GetBit(filter_data, filter_offset + in_position)) {
                 WriteMaybeNull(in_position);
               }
               ++in_position;
@@ -282,9 +355,9 @@ class PrimitiveFilterImpl {
             // Data values in this block are not null
             for (int64_t i = 0; i < filter_block.length; ++i) {
               const bool is_valid =
-                  bit_util::GetBit(filter_is_valid_, filter_offset_ + in_position);
+                  bit_util::GetBit(filter_is_valid, filter_offset + in_position);
               if (is_valid &&
-                  bit_util::GetBit(filter_data_, filter_offset_ + in_position)) {
+                  bit_util::GetBit(filter_data, filter_offset + in_position)) {
                 // Filter slot is non-null and set
                 WriteMaybeNull(in_position);
               } else if (!is_valid) {
@@ -303,7 +376,7 @@ class PrimitiveFilterImpl {
   // Write the next out_position given the selected in_position for the input
   // data and advance out_position
   void WriteValue(int64_t in_position) {
-    out_data_[out_position_++] = values_data_[in_position];
+    out_data_[out_offset_ + out_position_++] = values_data_[in_position];
   }
 
   void WriteValueSegment(int64_t in_start, int64_t length) {
@@ -313,7 +386,7 @@ class PrimitiveFilterImpl {
 
   void WriteNull() {
     // Zero the memory
-    out_data_[out_position_++] = T{};
+    out_data_[out_offset_ + out_position_++] = T{};
   }
 
  private:
@@ -322,12 +395,9 @@ class PrimitiveFilterImpl {
   int64_t values_null_count_;
   int64_t values_offset_;
   int64_t values_length_;
-  const uint8_t* filter_is_valid_;
-  const uint8_t* filter_data_;
-  int64_t filter_null_count_;
-  int64_t filter_offset_;
+  const ArraySpan& filter_;
   FilterOptions::NullSelectionBehavior null_selection_;
-  uint8_t* out_is_valid_;
+  uint8_t* out_is_valid_ = NULLPTR;
   T* out_data_;
   int64_t out_offset_;
   int64_t out_length_;
@@ -357,6 +427,7 @@ inline void PrimitiveFilterImpl<BooleanType>::WriteNull() {
 Status PrimitiveFilterExec(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) {
   const ArraySpan& values = batch[0].array;
   const ArraySpan& filter = batch[1].array;
+  const bool is_ree_filter = filter.type->id() == Type::RUN_END_ENCODED;
   FilterOptions::NullSelectionBehavior null_selection =
       FilterState::Get(ctx).null_selection_behavior;
 
@@ -364,20 +435,23 @@ Status PrimitiveFilterExec(KernelContext* ctx, const ExecSpan& batch, ExecResult
 
   ArrayData* out_arr = out->array_data().get();
 
+  const bool filter_null_count_is_zero =
+      is_ree_filter ? filter.child_data[1].null_count == 0 : filter.null_count == 0;
+
   // The output precomputed null count is unknown except in the narrow
   // condition that all the values are non-null and the filter will not cause
   // any new nulls to be created.
   if (values.null_count == 0 &&
-      (null_selection == FilterOptions::DROP || filter.null_count == 0)) {
+      (null_selection == FilterOptions::DROP || filter_null_count_is_zero)) {
     out_arr->null_count = 0;
   } else {
     out_arr->null_count = kUnknownNullCount;
   }
 
   // When neither the values nor filter is known to have any nulls, we will
-  // elect the optimized ExecNonNull path where there is no need to populate a
+  // elect the optimized non-null path where there is no need to populate a
   // validity bitmap.
-  bool allocate_validity = values.null_count != 0 || filter.null_count != 0;
+  const bool allocate_validity = values.null_count != 0 || !filter_null_count_is_zero;
 
   const int bit_width = values.type->bit_width();
   RETURN_NOT_OK(PreallocatePrimitiveArrayData(ctx, output_length, bit_width,
@@ -444,31 +518,44 @@ Status PrimitiveFilterExec(KernelContext* ctx, const ExecSpan& batch, ExecResult
 
 // Optimized binary filter for the case where neither values nor filter have
 // nulls
-template <typename Type>
+template <typename ArrowType>
 Status BinaryFilterNonNullImpl(KernelContext* ctx, const ArraySpan& values,
                                const ArraySpan& filter, int64_t output_length,
                                FilterOptions::NullSelectionBehavior null_selection,
                                ArrayData* out) {
-  using offset_type = typename Type::offset_type;
-  const auto filter_data = filter.buffers[1].data;
+  using offset_type = typename ArrowType::offset_type;
+  const bool is_ree_filter = filter.type->id() == Type::RUN_END_ENCODED;
 
   BINARY_FILTER_SETUP_COMMON();
 
-  RETURN_NOT_OK(arrow::internal::VisitSetBitRuns(
-      filter_data, filter.offset, filter.length, [&](int64_t position, int64_t length) {
-        // Bulk-append raw data
-        const offset_type run_data_bytes =
-            (raw_offsets[position + length] - raw_offsets[position]);
-        APPEND_RAW_DATA(raw_data + raw_offsets[position], run_data_bytes);
-        // Append offsets
-        offset_type cur_offset = raw_offsets[position];
-        for (int64_t i = 0; i < length; ++i) {
-          offset_builder.UnsafeAppend(offset);
-          offset += raw_offsets[i + position + 1] - cur_offset;
-          cur_offset = raw_offsets[i + position + 1];
-        }
-        return Status::OK();
-      }));
+  auto emit_segment = [&](int64_t position, int64_t length) {
+    // Bulk-append raw data
+    const offset_type run_data_bytes =
+        (raw_offsets[position + length] - raw_offsets[position]);
+    APPEND_RAW_DATA(raw_data + raw_offsets[position], run_data_bytes);
+    // Append offsets
+    for (int64_t i = 0; i < length; ++i) {
+      offset_builder.UnsafeAppend(offset);
+      offset += raw_offsets[i + position + 1] - raw_offsets[i + position + 1];
+    }
+    return Status::OK();
+  };
+  if (is_ree_filter) {
+    Status status;
+    VisitPlainxREEFilterOutputSegments(
+        filter, /*filter_may_have_nulls=*/false, null_selection,
+        [&status, emit_segment = std::move(emit_segment)](
+            int64_t position, int64_t segment_length, bool filter_valid) {
+          DCHECK(filter_valid);
+          status = emit_segment(position, segment_length);
+          return status.ok();
+        });
+    RETURN_NOT_OK(std::move(status));
+  } else {
+    const auto filter_data = filter.buffers[1].data;
+    RETURN_NOT_OK(arrow::internal::VisitSetBitRuns(
+        filter_data, filter.offset, filter.length, std::move(emit_segment)));
+  }
 
   offset_builder.UnsafeAppend(offset);
   out->length = output_length;
@@ -476,164 +563,199 @@ Status BinaryFilterNonNullImpl(KernelContext* ctx, const ArraySpan& values,
   return data_builder.Finish(&out->buffers[2]);
 }
 
-template <typename Type>
+template <typename ArrowType>
 Status BinaryFilterImpl(KernelContext* ctx, const ArraySpan& values,
                         const ArraySpan& filter, int64_t output_length,
                         FilterOptions::NullSelectionBehavior null_selection,
                         ArrayData* out) {
-  using offset_type = typename Type::offset_type;
+  using offset_type = typename ArrowType::offset_type;
+
+  const bool is_ree_filter = filter.type->id() == Type::RUN_END_ENCODED;
 
-  const auto filter_data = filter.buffers[1].data;
-  const uint8_t* filter_is_valid = filter.buffers[0].data;
-  const int64_t filter_offset = filter.offset;
+  BINARY_FILTER_SETUP_COMMON();
 
   const uint8_t* values_is_valid = values.buffers[0].data;
   const int64_t values_offset = values.offset;
 
+  const int64_t out_offset = out->offset;
   uint8_t* out_is_valid = out->buffers[0]->mutable_data();
   // Zero bits and then only have to set valid values to true
-  bit_util::SetBitsTo(out_is_valid, 0, output_length, false);
-
-  // We use 3 block counters for fast scanning of the filter
-  //
-  // * values_valid_counter: for values null/not-null
-  // * filter_valid_counter: for filter null/not-null
-  // * filter_counter: for filter true/false
-  OptionalBitBlockCounter values_valid_counter(values_is_valid, values_offset,
-                                               values.length);
-  OptionalBitBlockCounter filter_valid_counter(filter_is_valid, filter_offset,
-                                               filter.length);
-  BitBlockCounter filter_counter(filter_data, filter_offset, filter.length);
-
-  BINARY_FILTER_SETUP_COMMON();
+  bit_util::SetBitsTo(out_is_valid, out_offset, output_length, false);
 
   int64_t in_position = 0;
   int64_t out_position = 0;
-  while (in_position < filter.length) {
-    BitBlockCount filter_valid_block = filter_valid_counter.NextWord();
-    BitBlockCount values_valid_block = values_valid_counter.NextWord();
-    BitBlockCount filter_block = filter_counter.NextWord();
-    if (filter_block.NoneSet() && null_selection == FilterOptions::DROP) {
-      // For this exceedingly common case in low-selectivity filters we can
-      // skip further analysis of the data and move on to the next block.
-      in_position += filter_block.length;
-    } else if (filter_valid_block.AllSet()) {
-      // Simpler path: no filter values are null
-      if (filter_block.AllSet()) {
-        // Fastest path: filter values are all true and not null
-        if (values_valid_block.AllSet()) {
-          // The values aren't null either
-          bit_util::SetBitsTo(out_is_valid, out_position, filter_block.length, true);
-
-          // Bulk-append raw data
-          offset_type block_data_bytes =
-              (raw_offsets[in_position + filter_block.length] - raw_offsets[in_position]);
-          APPEND_RAW_DATA(raw_data + raw_offsets[in_position], block_data_bytes);
-          // Append offsets
-          for (int64_t i = 0; i < filter_block.length; ++i, ++in_position) {
-            offset_builder.UnsafeAppend(offset);
-            offset += raw_offsets[in_position + 1] - raw_offsets[in_position];
-          }
-          out_position += filter_block.length;
-        } else {
-          // Some of the values in this block are null
-          for (int64_t i = 0; i < filter_block.length;
-               ++i, ++in_position, ++out_position) {
-            offset_builder.UnsafeAppend(offset);
-            if (bit_util::GetBit(values_is_valid, values_offset + in_position)) {
-              bit_util::SetBit(out_is_valid, out_position);
-              APPEND_SINGLE_VALUE();
-            }
+  if (is_ree_filter) {
+    auto emit_segment = [&](int64_t position, int64_t segment_length, bool filter_valid) {
+      in_position = position;
+      if (filter_valid) {
+        // Filter values are all true and not null
+        // Some of the values in the block may be null
+        for (int64_t i = 0; i < segment_length; ++i, ++in_position, ++out_position) {
+          offset_builder.UnsafeAppend(offset);
+          if (bit_util::GetBit(values_is_valid, values_offset + in_position)) {
+            bit_util::SetBit(out_is_valid, out_offset + out_position);
+            APPEND_SINGLE_VALUE();
           }
         }
-      } else {  // !filter_block.AllSet()
-        // Some of the filter values are false, but all not null
-        if (values_valid_block.AllSet()) {
-          // All the values are not-null, so we can skip null checking for
-          // them
-          for (int64_t i = 0; i < filter_block.length; ++i, ++in_position) {
-            if (bit_util::GetBit(filter_data, filter_offset + in_position)) {
+      } else {
+        offset_builder.UnsafeAppend(segment_length, offset);
+        out_position += segment_length;
+      }
+      return Status::OK();
+    };
+    Status status;
+    VisitPlainxREEFilterOutputSegments(
+        filter, /*filter_may_have_nulls=*/true, null_selection,
+        [&status, emit_segment = std::move(emit_segment)](
+            int64_t position, int64_t segment_length, bool filter_valid) {
+          status = emit_segment(position, segment_length, filter_valid);
+          return status.ok();
+        });
+    RETURN_NOT_OK(std::move(status));
+  } else {
+    const auto filter_data = filter.buffers[1].data;
+    const uint8_t* filter_is_valid = filter.buffers[0].data;
+    const int64_t filter_offset = filter.offset;
+
+    // We use 3 block counters for fast scanning of the filter
+    //
+    // * values_valid_counter: for values null/not-null
+    // * filter_valid_counter: for filter null/not-null
+    // * filter_counter: for filter true/false
+    OptionalBitBlockCounter values_valid_counter(values_is_valid, values_offset,
+                                                 values.length);
+    OptionalBitBlockCounter filter_valid_counter(filter_is_valid, filter_offset,
+                                                 filter.length);
+    BitBlockCounter filter_counter(filter_data, filter_offset, filter.length);
+
+    while (in_position < filter.length) {
+      BitBlockCount filter_valid_block = filter_valid_counter.NextWord();
+      BitBlockCount values_valid_block = values_valid_counter.NextWord();
+      BitBlockCount filter_block = filter_counter.NextWord();
+      if (filter_block.NoneSet() && null_selection == FilterOptions::DROP) {
+        // For this exceedingly common case in low-selectivity filters we can
+        // skip further analysis of the data and move on to the next block.
+        in_position += filter_block.length;
+      } else if (filter_valid_block.AllSet()) {
+        // Simpler path: no filter values are null
+        if (filter_block.AllSet()) {
+          // Fastest path: filter values are all true and not null
+          if (values_valid_block.AllSet()) {
+            // The values aren't null either
+            bit_util::SetBitsTo(out_is_valid, out_offset + out_position,
+                                filter_block.length, true);
+
+            // Bulk-append raw data
+            offset_type block_data_bytes =
+                (raw_offsets[in_position + filter_block.length] -
+                 raw_offsets[in_position]);
+            APPEND_RAW_DATA(raw_data + raw_offsets[in_position], block_data_bytes);
+            // Append offsets
+            for (int64_t i = 0; i < filter_block.length; ++i, ++in_position) {
               offset_builder.UnsafeAppend(offset);
-              bit_util::SetBit(out_is_valid, out_position++);
-              APPEND_SINGLE_VALUE();
+              offset += raw_offsets[in_position + 1] - raw_offsets[in_position];
             }
-          }
-        } else {
-          // Some of the values in the block are null, so we have to check
-          // each one
-          for (int64_t i = 0; i < filter_block.length; ++i, ++in_position) {
-            if (bit_util::GetBit(filter_data, filter_offset + in_position)) {
+            out_position += filter_block.length;
+          } else {
+            // Some of the values in this block are null
+            for (int64_t i = 0; i < filter_block.length;
+                 ++i, ++in_position, ++out_position) {
               offset_builder.UnsafeAppend(offset);
               if (bit_util::GetBit(values_is_valid, values_offset + in_position)) {
-                bit_util::SetBit(out_is_valid, out_position);
+                bit_util::SetBit(out_is_valid, out_offset + out_position);
                 APPEND_SINGLE_VALUE();
               }
-              ++out_position;
-            }
-          }
-        }
-      }
-    } else {  // !filter_valid_block.AllSet()
-      // Some of the filter values are null, so we have to handle the DROP
-      // versus EMIT_NULL null selection behavior.
-      if (null_selection == FilterOptions::DROP) {
-        // Filter null values are treated as false.
-        if (values_valid_block.AllSet()) {
-          for (int64_t i = 0; i < filter_block.length; ++i, ++in_position) {
-            if (bit_util::GetBit(filter_is_valid, filter_offset + in_position) &&
-                bit_util::GetBit(filter_data, filter_offset + in_position)) {
-              offset_builder.UnsafeAppend(offset);
-              bit_util::SetBit(out_is_valid, out_position++);
-              APPEND_SINGLE_VALUE();
             }
           }
-        } else {
-          for (int64_t i = 0; i < filter_block.length; ++i, ++in_position) {
-            if (bit_util::GetBit(filter_is_valid, filter_offset + in_position) &&
-                bit_util::GetBit(filter_data, filter_offset + in_position)) {
-              offset_builder.UnsafeAppend(offset);
-              if (bit_util::GetBit(values_is_valid, values_offset + in_position)) {
-                bit_util::SetBit(out_is_valid, out_position);
+        } else {  // !filter_block.AllSet()
+          // Some of the filter values are false, but all not null
+          if (values_valid_block.AllSet()) {
+            // All the values are not-null, so we can skip null checking for
+            // them
+            for (int64_t i = 0; i < filter_block.length; ++i, ++in_position) {
+              if (bit_util::GetBit(filter_data, filter_offset + in_position)) {
+                offset_builder.UnsafeAppend(offset);
+                bit_util::SetBit(out_is_valid, out_offset + out_position++);
                 APPEND_SINGLE_VALUE();
               }
-              ++out_position;
+            }
+          } else {
+            // Some of the values in the block are null, so we have to check
+            // each one
+            for (int64_t i = 0; i < filter_block.length; ++i, ++in_position) {
+              if (bit_util::GetBit(filter_data, filter_offset + in_position)) {
+                offset_builder.UnsafeAppend(offset);
+                if (bit_util::GetBit(values_is_valid, values_offset + in_position)) {
+                  bit_util::SetBit(out_is_valid, out_offset + out_position);
+                  APPEND_SINGLE_VALUE();
+                }
+                ++out_position;
+              }
             }
           }
         }
-      } else {
-        // EMIT_NULL
-
-        // Filter null values are appended to output as null whether the
-        // value in the corresponding slot is valid or not
-        if (values_valid_block.AllSet()) {
-          for (int64_t i = 0; i < filter_block.length; ++i, ++in_position) {
-            const bool filter_not_null =
-                bit_util::GetBit(filter_is_valid, filter_offset + in_position);
-            if (filter_not_null &&
-                bit_util::GetBit(filter_data, filter_offset + in_position)) {
-              offset_builder.UnsafeAppend(offset);
-              bit_util::SetBit(out_is_valid, out_position++);
-              APPEND_SINGLE_VALUE();
-            } else if (!filter_not_null) {
-              offset_builder.UnsafeAppend(offset);
-              ++out_position;
+      } else {  // !filter_valid_block.AllSet()
+        // Some of the filter values are null, so we have to handle the DROP
+        // versus EMIT_NULL null selection behavior.
+        if (null_selection == FilterOptions::DROP) {
+          // Filter null values are treated as false.
+          if (values_valid_block.AllSet()) {
+            for (int64_t i = 0; i < filter_block.length; ++i, ++in_position) {
+              if (bit_util::GetBit(filter_is_valid, filter_offset + in_position) &&
+                  bit_util::GetBit(filter_data, filter_offset + in_position)) {
+                offset_builder.UnsafeAppend(offset);
+                bit_util::SetBit(out_is_valid, out_offset + out_position++);
+                APPEND_SINGLE_VALUE();
+              }
+            }
+          } else {
+            for (int64_t i = 0; i < filter_block.length; ++i, ++in_position) {
+              if (bit_util::GetBit(filter_is_valid, filter_offset + in_position) &&
+                  bit_util::GetBit(filter_data, filter_offset + in_position)) {
+                offset_builder.UnsafeAppend(offset);
+                if (bit_util::GetBit(values_is_valid, values_offset + in_position)) {
+                  bit_util::SetBit(out_is_valid, out_offset + out_position);
+                  APPEND_SINGLE_VALUE();
+                }
+                ++out_position;
+              }
             }
           }
         } else {
-          for (int64_t i = 0; i < filter_block.length; ++i, ++in_position) {
-            const bool filter_not_null =
-                bit_util::GetBit(filter_is_valid, filter_offset + in_position);
-            if (filter_not_null &&
-                bit_util::GetBit(filter_data, filter_offset + in_position)) {
-              offset_builder.UnsafeAppend(offset);
-              if (bit_util::GetBit(values_is_valid, values_offset + in_position)) {
-                bit_util::SetBit(out_is_valid, out_position);
+          // EMIT_NULL
+
+          // Filter null values are appended to output as null whether the
+          // value in the corresponding slot is valid or not
+          if (values_valid_block.AllSet()) {
+            for (int64_t i = 0; i < filter_block.length; ++i, ++in_position) {
+              const bool filter_not_null =
+                  bit_util::GetBit(filter_is_valid, filter_offset + in_position);
+              if (filter_not_null &&
+                  bit_util::GetBit(filter_data, filter_offset + in_position)) {
+                offset_builder.UnsafeAppend(offset);
+                bit_util::SetBit(out_is_valid, out_offset + out_position++);
                 APPEND_SINGLE_VALUE();
+              } else if (!filter_not_null) {
+                offset_builder.UnsafeAppend(offset);
+                ++out_position;
+              }
+            }
+          } else {
+            for (int64_t i = 0; i < filter_block.length; ++i, ++in_position) {
+              const bool filter_not_null =
+                  bit_util::GetBit(filter_is_valid, filter_offset + in_position);
+              if (filter_not_null &&
+                  bit_util::GetBit(filter_data, filter_offset + in_position)) {
+                offset_builder.UnsafeAppend(offset);
+                if (bit_util::GetBit(values_is_valid, values_offset + in_position)) {
+                  bit_util::SetBit(out_is_valid, out_offset + out_position);
+                  APPEND_SINGLE_VALUE();
+                }
+                ++out_position;
+              } else if (!filter_not_null) {
+                offset_builder.UnsafeAppend(offset);
+                ++out_position;
               }
-              ++out_position;
-            } else if (!filter_not_null) {
-              offset_builder.UnsafeAppend(offset);
-              ++out_position;
             }
           }
         }
@@ -656,21 +778,25 @@ Status BinaryFilterExec(KernelContext* ctx, const ExecSpan& batch, ExecResult* o
 
   const ArraySpan& values = batch[0].array;
   const ArraySpan& filter = batch[1].array;
+  const bool is_ree_filter = filter.type->id() == Type::RUN_END_ENCODED;
   int64_t output_length = GetFilterOutputSize(filter, null_selection);
 
   ArrayData* out_arr = out->array_data().get();
 
+  const bool filter_null_count_is_zero =
+      is_ree_filter ? filter.child_data[1].null_count == 0 : filter.null_count == 0;
+
   // The output precomputed null count is unknown except in the narrow
   // condition that all the values are non-null and the filter will not cause
   // any new nulls to be created.
   if (values.null_count == 0 &&
-      (null_selection == FilterOptions::DROP || filter.null_count == 0)) {
+      (null_selection == FilterOptions::DROP || filter_null_count_is_zero)) {
     out_arr->null_count = 0;
   } else {
     out_arr->null_count = kUnknownNullCount;
   }
   Type::type type_id = values.type->id();
-  if (values.null_count == 0 && filter.null_count == 0) {
+  if (values.null_count == 0 && filter_null_count_is_zero) {
     // Faster no-nulls case
     if (is_binary_like(type_id)) {
       RETURN_NOT_OK(BinaryFilterNonNullImpl<BinaryType>(
@@ -863,7 +989,13 @@ class FilterMetaFunction : public MetaFunction {
   Result<Datum> ExecuteImpl(const std::vector<Datum>& args,
                             const FunctionOptions* options,
                             ExecContext* ctx) const override {
-    if (args[1].type()->id() != Type::BOOL) {
+    const auto& filter_type = *args[1].type();
+    const bool filter_is_plain_bool = filter_type.id() == Type::BOOL;
+    const bool filter_is_ree_bool =
+        filter_type.id() == Type::RUN_END_ENCODED &&
+        checked_cast<const arrow::RunEndEncodedType&>(filter_type).value_type()->id() ==
+            Type::BOOL;
+    if (!filter_is_plain_bool && !filter_is_ree_bool) {
       return Status::NotImplemented("Filter argument must be boolean type");
     }
 
@@ -897,22 +1029,43 @@ std::unique_ptr<Function> MakeFilterMetaFunction() {
 }
 
 void PopulateFilterKernels(std::vector<SelectionKernelData>* out) {
+  auto plain_filter = InputType(Type::BOOL);
+  auto ree_filter = InputType(match::RunEndEncoded(Type::BOOL));
+
   *out = {
-      {InputType(match::Primitive()), PrimitiveFilterExec},
-      {InputType(match::BinaryLike()), BinaryFilterExec},
-      {InputType(match::LargeBinaryLike()), BinaryFilterExec},
-      {InputType(Type::FIXED_SIZE_BINARY), FSBFilterExec},
-      {InputType(null()), NullFilterExec},
-      {InputType(Type::DECIMAL128), FSBFilterExec},
-      {InputType(Type::DECIMAL256), FSBFilterExec},
-      {InputType(Type::DICTIONARY), DictionaryFilterExec},
-      {InputType(Type::EXTENSION), ExtensionFilterExec},
-      {InputType(Type::LIST), ListFilterExec},
-      {InputType(Type::LARGE_LIST), LargeListFilterExec},
-      {InputType(Type::FIXED_SIZE_LIST), FSLFilterExec},
-      {InputType(Type::DENSE_UNION), DenseUnionFilterExec},
-      {InputType(Type::STRUCT), StructFilterExec},
-      {InputType(Type::MAP), MapFilterExec},
+      // * x Boolean
+      {InputType(match::Primitive()), plain_filter, PrimitiveFilterExec},
+      {InputType(match::BinaryLike()), plain_filter, BinaryFilterExec},
+      {InputType(match::LargeBinaryLike()), plain_filter, BinaryFilterExec},
+      {InputType(Type::FIXED_SIZE_BINARY), plain_filter, FSBFilterExec},
+      {InputType(null()), plain_filter, NullFilterExec},
+      {InputType(Type::DECIMAL128), plain_filter, FSBFilterExec},
+      {InputType(Type::DECIMAL256), plain_filter, FSBFilterExec},
+      {InputType(Type::DICTIONARY), plain_filter, DictionaryFilterExec},
+      {InputType(Type::EXTENSION), plain_filter, ExtensionFilterExec},
+      {InputType(Type::LIST), plain_filter, ListFilterExec},
+      {InputType(Type::LARGE_LIST), plain_filter, LargeListFilterExec},
+      {InputType(Type::FIXED_SIZE_LIST), plain_filter, FSLFilterExec},
+      {InputType(Type::DENSE_UNION), plain_filter, DenseUnionFilterExec},
+      {InputType(Type::STRUCT), plain_filter, StructFilterExec},
+      {InputType(Type::MAP), plain_filter, MapFilterExec},
+
+      // * x REE(Boolean)
+      {InputType(match::Primitive()), ree_filter, PrimitiveFilterExec},
+      {InputType(match::BinaryLike()), ree_filter, BinaryFilterExec},
+      {InputType(match::LargeBinaryLike()), ree_filter, BinaryFilterExec},
+      {InputType(Type::FIXED_SIZE_BINARY), ree_filter, FSBFilterExec},
+      {InputType(null()), ree_filter, NullFilterExec},
+      {InputType(Type::DECIMAL128), ree_filter, FSBFilterExec},
+      {InputType(Type::DECIMAL256), ree_filter, FSBFilterExec},
+      {InputType(Type::DICTIONARY), ree_filter, DictionaryFilterExec},
+      {InputType(Type::EXTENSION), ree_filter, ExtensionFilterExec},
+      {InputType(Type::LIST), ree_filter, ListFilterExec},
+      {InputType(Type::LARGE_LIST), ree_filter, LargeListFilterExec},
+      {InputType(Type::FIXED_SIZE_LIST), ree_filter, FSLFilterExec},
+      {InputType(Type::DENSE_UNION), ree_filter, DenseUnionFilterExec},
+      {InputType(Type::STRUCT), ree_filter, StructFilterExec},
+      {InputType(Type::MAP), ree_filter, MapFilterExec},
   };
 }
 
diff --git a/cpp/src/arrow/compute/kernels/vector_selection_internal.cc b/cpp/src/arrow/compute/kernels/vector_selection_internal.cc
index 0fd5d9ca00..23b8b75bfa 100644
--- a/cpp/src/arrow/compute/kernels/vector_selection_internal.cc
+++ b/cpp/src/arrow/compute/kernels/vector_selection_internal.cc
@@ -39,6 +39,7 @@
 #include "arrow/util/bit_util.h"
 #include "arrow/util/int_util.h"
 #include "arrow/util/logging.h"
+#include "arrow/util/ree_util.h"
 
 namespace arrow {
 
@@ -48,18 +49,20 @@ namespace compute {
 namespace internal {
 
 void RegisterSelectionFunction(const std::string& name, FunctionDoc doc,
-                               VectorKernel base_kernel, InputType selection_type,
-                               const std::vector<SelectionKernelData>& kernels,
+                               VectorKernel base_kernel,
+                               std::vector<SelectionKernelData>&& kernels,
                                const FunctionOptions* default_options,
                                FunctionRegistry* registry) {
   auto func = std::make_shared<VectorFunction>(name, Arity::Binary(), std::move(doc),
                                                default_options);
-  for (auto& kernel_data : kernels) {
-    base_kernel.signature =
-        KernelSignature::Make({std::move(kernel_data.input), selection_type}, FirstType);
+  for (auto&& kernel_data : kernels) {
+    base_kernel.signature = KernelSignature::Make(
+        {std::move(kernel_data.value_type), std::move(kernel_data.selection_type)},
+        OutputType(FirstType));
     base_kernel.exec = kernel_data.exec;
     DCHECK_OK(func->AddKernel(base_kernel));
   }
+  kernels.clear();
   DCHECK_OK(registry->AddFunction(std::move(func)));
 }
 
@@ -82,6 +85,92 @@ Status PreallocatePrimitiveArrayData(KernelContext* ctx, int64_t length, int bit
 
 namespace {
 
+/// \brief Iterate over a REE filter, emitting ranges of a plain values array that
+/// would pass the filter.
+///
+/// Differently from REExREE, and REExPlain filtering, PlainxREE filtering
+/// does not produce a REE output, but rather a plain output array. As such it's
+/// much simpler.
+///
+/// \param filter_may_have_nulls Only pass false if you know the filter has no nulls.
+template <typename FilterRunEndType>
+void VisitPlainxREEFilterOutputSegmentsImpl(
+    const ArraySpan& filter, bool filter_may_have_nulls,
+    FilterOptions::NullSelectionBehavior null_selection,
+    const EmitREEFilterSegment& emit_segment) {
+  using FilterRunEndCType = typename FilterRunEndType::c_type;
+  const ArraySpan& filter_values = arrow::ree_util::ValuesArray(filter);
+  const int64_t filter_values_offset = filter_values.offset;
+  const uint8_t* filter_is_valid = filter_values.buffers[0].data;
+  const uint8_t* filter_selection = filter_values.buffers[1].data;
+  filter_may_have_nulls = filter_may_have_nulls && filter_is_valid != nullptr &&
+                          filter_values.null_count != 0;
+
+  const arrow::ree_util::RunEndEncodedArraySpan<FilterRunEndCType> filter_span(filter);
+  auto it = filter_span.begin();
+  if (filter_may_have_nulls) {
+    if (null_selection == FilterOptions::EMIT_NULL) {
+      while (!it.is_end(filter_span)) {
+        const int64_t i = filter_values_offset + it.index_into_array();
+        const bool valid = bit_util::GetBit(filter_is_valid, i);
+        const bool emit = !valid || bit_util::GetBit(filter_selection, i);
+        if (ARROW_PREDICT_FALSE(
+                emit && !emit_segment(it.logical_position(), it.run_length(), valid))) {
+          break;
+        }
+        ++it;
+      }
+    } else {  // DROP nulls
+      while (!it.is_end(filter_span)) {
+        const int64_t i = filter_values_offset + it.index_into_array();
+        const bool emit =
+            bit_util::GetBit(filter_is_valid, i) && bit_util::GetBit(filter_selection, i);
+        if (ARROW_PREDICT_FALSE(
+                emit && !emit_segment(it.logical_position(), it.run_length(), true))) {
+          break;
+        }
+        ++it;
+      }
+    }
+  } else {
+    while (!it.is_end(filter_span)) {
+      const int64_t i = filter_values_offset + it.index_into_array();
+      const bool emit = bit_util::GetBit(filter_selection, i);
+      if (ARROW_PREDICT_FALSE(
+              emit && !emit_segment(it.logical_position(), it.run_length(), true))) {
+        break;
+      }
+      ++it;
+    }
+  }
+}
+
+}  // namespace
+
+void VisitPlainxREEFilterOutputSegments(
+    const ArraySpan& filter, bool filter_may_have_nulls,
+    FilterOptions::NullSelectionBehavior null_selection,
+    const EmitREEFilterSegment& emit_segment) {
+  if (filter.length == 0) {
+    return;
+  }
+  const auto& ree_type = checked_cast<const RunEndEncodedType&>(*filter.type);
+  switch (ree_type.run_end_type()->id()) {
+    case Type::INT16:
+      return VisitPlainxREEFilterOutputSegmentsImpl<Int16Type>(
+          filter, filter_may_have_nulls, null_selection, emit_segment);
+    case Type::INT32:
+      return VisitPlainxREEFilterOutputSegmentsImpl<Int32Type>(
+          filter, filter_may_have_nulls, null_selection, emit_segment);
+    default:
+      DCHECK(ree_type.run_end_type()->id() == Type::INT64);
+      return VisitPlainxREEFilterOutputSegmentsImpl<Int64Type>(
+          filter, filter_may_have_nulls, null_selection, emit_segment);
+  }
+}
+
+namespace {
+
 using FilterState = OptionsWrapper<FilterOptions>;
 using TakeState = OptionsWrapper<TakeOptions>;
 
@@ -91,9 +180,9 @@ using TakeState = OptionsWrapper<TakeOptions>;
 
 // Use CRTP to dispatch to type-specific processing of take indices for each
 // unsigned integer type.
-template <typename Impl, typename Type>
+template <typename Impl, typename ArrowType>
 struct Selection {
-  using ValuesArrayType = typename TypeTraits<Type>::ArrayType;
+  using ValuesArrayType = typename TypeTraits<ArrowType>::ArrayType;
 
   // Forwards the generic value visitors to the VisitFilter template
   struct FilterAdapter {
@@ -199,28 +288,12 @@ struct Selection {
   // nulls coming from the filter when using FilterOptions::EMIT_NULL
   template <typename ValidVisitor, typename NullVisitor>
   Status VisitFilter(ValidVisitor&& visit_valid, NullVisitor&& visit_null) {
-    auto null_selection = FilterState::Get(ctx).null_selection_behavior;
-
-    const uint8_t* filter_data = selection.buffers[1].data;
+    const bool is_ree_filter = selection.type->id() == Type::RUN_END_ENCODED;
+    const auto null_selection = FilterState::Get(ctx).null_selection_behavior;
 
-    const uint8_t* filter_is_valid = selection.buffers[0].data;
-    const int64_t filter_offset = selection.offset;
     arrow::internal::OptionalBitIndexer values_is_valid(values.buffers[0].data,
                                                         values.offset);
 
-    // We use 3 block counters for fast scanning of the filter
-    //
-    // * values_valid_counter: for values null/not-null
-    // * filter_valid_counter: for filter null/not-null
-    // * filter_counter: for filter true/false
-    arrow::internal::OptionalBitBlockCounter values_valid_counter(
-        values.buffers[0].data, values.offset, values.length);
-    arrow::internal::OptionalBitBlockCounter filter_valid_counter(
-        filter_is_valid, filter_offset, selection.length);
-    arrow::internal::BitBlockCounter filter_counter(filter_data, filter_offset,
-                                                    selection.length);
-    int64_t in_position = 0;
-
     auto AppendNotNull = [&](int64_t index) -> Status {
       validity_builder.UnsafeAppend(true);
       return visit_valid(index);
@@ -239,6 +312,41 @@ struct Selection {
       }
     };
 
+    if (is_ree_filter) {
+      Status status;
+      VisitPlainxREEFilterOutputSegments(
+          selection, /*filter_may_have_nulls=*/true, null_selection,
+          [&](int64_t position, int64_t segment_length, bool filter_valid) {
+            if (filter_valid) {
+              for (int64_t i = 0; i < segment_length; ++i) {
+                status = AppendMaybeNull(position + i);
+              }
+            } else {
+              for (int64_t i = 0; i < segment_length; ++i) {
+                status = AppendNull();
+              }
+            }
+            return status.ok();
+          });
+      return status;
+    }
+
+    const uint8_t* filter_data = selection.buffers[1].data;
+    const uint8_t* filter_is_valid = selection.buffers[0].data;
+    const int64_t filter_offset = selection.offset;
+    // We use 3 block counters for fast scanning of the filter
+    //
+    // * values_valid_counter: for values null/not-null
+    // * filter_valid_counter: for filter null/not-null
+    // * filter_counter: for filter true/false
+    arrow::internal::OptionalBitBlockCounter values_valid_counter(
+        values.buffers[0].data, values.offset, values.length);
+    arrow::internal::OptionalBitBlockCounter filter_valid_counter(
+        filter_is_valid, filter_offset, selection.length);
+    arrow::internal::BitBlockCounter filter_counter(filter_data, filter_offset,
+                                                    selection.length);
+
+    int64_t in_position = 0;
     while (in_position < selection.length) {
       arrow::internal::BitBlockCount filter_valid_block = filter_valid_counter.NextWord();
       arrow::internal::BitBlockCount values_valid_block = values_valid_counter.NextWord();
diff --git a/cpp/src/arrow/compute/kernels/vector_selection_internal.h b/cpp/src/arrow/compute/kernels/vector_selection_internal.h
index dd84748a4b..bcffdd820d 100644
--- a/cpp/src/arrow/compute/kernels/vector_selection_internal.h
+++ b/cpp/src/arrow/compute/kernels/vector_selection_internal.h
@@ -22,6 +22,7 @@
 #include <vector>
 
 #include "arrow/array/data.h"
+#include "arrow/compute/api_vector.h"
 #include "arrow/compute/exec.h"
 #include "arrow/compute/function.h"
 #include "arrow/compute/kernel.h"
@@ -31,13 +32,14 @@ namespace compute {
 namespace internal {
 
 struct SelectionKernelData {
-  InputType input;
+  InputType value_type;
+  InputType selection_type;
   ArrayKernelExec exec;
 };
 
 void RegisterSelectionFunction(const std::string& name, FunctionDoc doc,
-                               VectorKernel base_kernel, InputType selection_type,
-                               const std::vector<SelectionKernelData>& kernels,
+                               VectorKernel base_kernel,
+                               std::vector<SelectionKernelData>&& kernels,
                                const FunctionOptions* default_options,
                                FunctionRegistry* registry);
 
@@ -47,6 +49,25 @@ void RegisterSelectionFunction(const std::string& name, FunctionDoc doc,
 Status PreallocatePrimitiveArrayData(KernelContext* ctx, int64_t length, int bit_width,
                                      bool allocate_validity, ArrayData* out);
 
+/// \brief Callback type for VisitPlainxREEFilterOutputSegments.
+///
+/// position is the logical position in the values array relative to its offset.
+///
+/// segment_length is the number of elements that should be emitted.
+///
+/// filter_valid is true if the filter run value is non-NULL. This value can
+/// only be false if null_selection is NullSelectionBehavior::EMIT_NULL. For
+/// NullSelectionBehavior::DROP, NULL values from the filter are simply skipped.
+///
+/// Return true if iteration should continue, false if iteration should stop.
+using EmitREEFilterSegment =
+    std::function<bool(int64_t position, int64_t segment_length, bool filter_valid)>;
+
+void VisitPlainxREEFilterOutputSegments(
+    const ArraySpan& filter, bool filter_may_have_nulls,
+    FilterOptions::NullSelectionBehavior null_selection,
+    const EmitREEFilterSegment& emit_segment);
+
 Status FSBFilterExec(KernelContext*, const ExecSpan&, ExecResult*);
 Status ListFilterExec(KernelContext*, const ExecSpan&, ExecResult*);
 Status LargeListFilterExec(KernelContext*, const ExecSpan&, ExecResult*);
diff --git a/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc b/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc
index 882dd3e9af..ab80127731 100644
--- a/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc
+++ b/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc
@@ -38,6 +38,7 @@
 #include "arrow/util/bit_run_reader.h"
 #include "arrow/util/bit_util.h"
 #include "arrow/util/int_util.h"
+#include "arrow/util/ree_util.h"
 
 namespace arrow {
 
@@ -53,7 +54,7 @@ namespace internal {
 namespace {
 
 template <typename IndexType>
-Result<std::shared_ptr<ArrayData>> GetTakeIndicesImpl(
+Result<std::shared_ptr<ArrayData>> GetTakeIndicesFromBitmapImpl(
     const ArraySpan& filter, FilterOptions::NullSelectionBehavior null_selection,
     MemoryPool* memory_pool) {
   using T = typename IndexType::c_type;
@@ -180,14 +181,102 @@ Result<std::shared_ptr<ArrayData>> GetTakeIndicesImpl(
                                      BufferVector{nullptr, out_buffer}, /*null_count=*/0);
 }
 
+template <typename RunEndType>
+Result<std::shared_ptr<ArrayData>> GetTakeIndicesFromREEBitmapImpl(
+    const ArraySpan& filter, FilterOptions::NullSelectionBehavior null_selection,
+    MemoryPool* memory_pool) {
+  using T = typename RunEndType::c_type;
+  const ArraySpan& filter_values = ::arrow::ree_util::ValuesArray(filter);
+  const int64_t filter_values_offset = filter_values.offset;
+  const uint8_t* filter_is_valid = filter_values.buffers[0].data;
+  const uint8_t* filter_selection = filter_values.buffers[1].data;
+  const bool filter_may_have_nulls = filter_values.MayHaveNulls();
+
+  // BinaryBitBlockCounter is not used here because a REE bitmap, if built
+  // correctly, is not going to have long continuous runs of 0s or 1s in the
+  // values array.
+
+  const ::arrow::ree_util::RunEndEncodedArraySpan<T> filter_span(filter);
+  auto it = filter_span.begin();
+  if (filter_may_have_nulls && null_selection == FilterOptions::EMIT_NULL) {
+    // Most complex case: the filter may have nulls and we don't drop them.
+    // The logic is ternary:
+    // - filter is null: emit null
+    // - filter is valid and true: emit index
+    // - filter is valid and false: don't emit anything
+
+    typename TypeTraits<RunEndType>::BuilderType builder(memory_pool);
+    for (; !it.is_end(filter_span); ++it) {
+      const int64_t position_with_offset = filter_values_offset + it.index_into_array();
+      const bool is_null = !bit_util::GetBit(filter_is_valid, position_with_offset);
+      if (is_null) {
+        RETURN_NOT_OK(builder.AppendNulls(it.run_length()));
+      } else {
+        const bool emit_run = bit_util::GetBit(filter_selection, position_with_offset);
+        if (emit_run) {
+          const int64_t run_end = it.run_end();
+          RETURN_NOT_OK(builder.Reserve(run_end - it.logical_position()));
+          for (int64_t position = it.logical_position(); position < run_end; position++) {
+            builder.UnsafeAppend(static_cast<T>(position));
+          }
+        }
+      }
+    }
+    std::shared_ptr<ArrayData> result;
+    RETURN_NOT_OK(builder.FinishInternal(&result));
+    return result;
+  }
+
+  // Other cases don't emit nulls and are therefore simpler.
+  TypedBufferBuilder<T> builder(memory_pool);
+
+  if (filter_may_have_nulls) {
+    DCHECK_EQ(null_selection, FilterOptions::DROP);
+    // The filter may have nulls, so we scan the validity bitmap and the filter
+    // data bitmap together.
+    for (; !it.is_end(filter_span); ++it) {
+      const int64_t position_with_offset = filter_values_offset + it.index_into_array();
+      const bool emit_run = bit_util::GetBit(filter_is_valid, position_with_offset) &&
+                            bit_util::GetBit(filter_selection, position_with_offset);
+      if (emit_run) {
+        const int64_t run_end = it.run_end();
+        RETURN_NOT_OK(builder.Reserve(run_end - it.logical_position()));
+        for (int64_t position = it.logical_position(); position < run_end; position++) {
+          builder.UnsafeAppend(static_cast<T>(position));
+        }
+      }
+    }
+  } else {
+    // The filter has no nulls, so we need only look for true values
+    for (; !it.is_end(filter_span); ++it) {
+      const int64_t position_with_offset = filter_values_offset + it.index_into_array();
+      const bool emit_run = bit_util::GetBit(filter_selection, position_with_offset);
+      if (emit_run) {
+        const int64_t run_end = it.run_end();
+        RETURN_NOT_OK(builder.Reserve(run_end - it.logical_position()));
+        for (int64_t position = it.logical_position(); position < run_end; position++) {
+          builder.UnsafeAppend(static_cast<T>(position));
+        }
+      }
+    }
+  }
+
+  const int64_t length = builder.length();
+  std::shared_ptr<Buffer> out_buffer;
+  RETURN_NOT_OK(builder.Finish(&out_buffer));
+  return std::make_shared<ArrayData>(TypeTraits<RunEndType>::type_singleton(), length,
+                                     BufferVector{nullptr, std::move(out_buffer)},
+                                     /*null_count=*/0);
+}
+
 Result<std::shared_ptr<ArrayData>> GetTakeIndicesFromBitmap(
     const ArraySpan& filter, FilterOptions::NullSelectionBehavior null_selection,
     MemoryPool* memory_pool) {
   DCHECK_EQ(filter.type->id(), Type::BOOL);
   if (filter.length <= std::numeric_limits<uint16_t>::max()) {
-    return GetTakeIndicesImpl<UInt16Type>(filter, null_selection, memory_pool);
+    return GetTakeIndicesFromBitmapImpl<UInt16Type>(filter, null_selection, memory_pool);
   } else if (filter.length <= std::numeric_limits<uint32_t>::max()) {
-    return GetTakeIndicesImpl<UInt32Type>(filter, null_selection, memory_pool);
+    return GetTakeIndicesFromBitmapImpl<UInt32Type>(filter, null_selection, memory_pool);
   } else {
     // Arrays over 4 billion elements, not especially likely.
     return Status::NotImplemented(
@@ -196,14 +285,37 @@ Result<std::shared_ptr<ArrayData>> GetTakeIndicesFromBitmap(
   }
 }
 
-// TODO(pr-35750): Handle run-end encoded filters in compute kernels
+Result<std::shared_ptr<ArrayData>> GetTakeIndicesFromREEBitmap(
+    const ArraySpan& filter, FilterOptions::NullSelectionBehavior null_selection,
+    MemoryPool* memory_pool) {
+  const auto& ree_type = checked_cast<const RunEndEncodedType&>(*filter.type);
+  // The resulting array will contain indexes of the same type as the run-end type of the
+  // run-end encoded filter. Run-end encoded arrays have to pick the smallest run-end type
+  // to maximize memory savings, so we can be re-use that decision here and get a good
+  // result without checking the logical length of the filter.
+  switch (ree_type.run_end_type()->id()) {
+    case Type::INT16:
+      return GetTakeIndicesFromREEBitmapImpl<Int16Type>(filter, null_selection,
+                                                        memory_pool);
+    case Type::INT32:
+      return GetTakeIndicesFromREEBitmapImpl<Int32Type>(filter, null_selection,
+                                                        memory_pool);
+    default:
+      DCHECK_EQ(ree_type.run_end_type()->id(), Type::INT64);
+      return GetTakeIndicesFromREEBitmapImpl<Int64Type>(filter, null_selection,
+                                                        memory_pool);
+  }
+}
 
 }  // namespace
 
 Result<std::shared_ptr<ArrayData>> GetTakeIndices(
     const ArraySpan& filter, FilterOptions::NullSelectionBehavior null_selection,
     MemoryPool* memory_pool) {
-  return GetTakeIndicesFromBitmap(filter, null_selection, memory_pool);
+  if (filter.type->id() == Type::BOOL) {
+    return GetTakeIndicesFromBitmap(filter, null_selection, memory_pool);
+  }
+  return GetTakeIndicesFromREEBitmap(filter, null_selection, memory_pool);
 }
 
 namespace {
@@ -716,22 +828,24 @@ std::unique_ptr<Function> MakeTakeMetaFunction() {
 }
 
 void PopulateTakeKernels(std::vector<SelectionKernelData>* out) {
+  auto take_indices = match::Integer();
+
   *out = {
-      {InputType(match::Primitive()), PrimitiveTakeExec},
-      {InputType(match::BinaryLike()), VarBinaryTakeExec},
-      {InputType(match::LargeBinaryLike()), LargeVarBinaryTakeExec},
-      {InputType(Type::FIXED_SIZE_BINARY), FSBTakeExec},
-      {InputType(null()), NullTakeExec},
-      {InputType(Type::DECIMAL128), FSBTakeExec},
-      {InputType(Type::DECIMAL256), FSBTakeExec},
-      {InputType(Type::DICTIONARY), DictionaryTake},
-      {InputType(Type::EXTENSION), ExtensionTake},
-      {InputType(Type::LIST), ListTakeExec},
-      {InputType(Type::LARGE_LIST), LargeListTakeExec},
-      {InputType(Type::FIXED_SIZE_LIST), FSLTakeExec},
-      {InputType(Type::DENSE_UNION), DenseUnionTakeExec},
-      {InputType(Type::STRUCT), StructTakeExec},
-      {InputType(Type::MAP), MapTakeExec},
+      {InputType(match::Primitive()), take_indices, PrimitiveTakeExec},
+      {InputType(match::BinaryLike()), take_indices, VarBinaryTakeExec},
+      {InputType(match::LargeBinaryLike()), take_indices, LargeVarBinaryTakeExec},
+      {InputType(Type::FIXED_SIZE_BINARY), take_indices, FSBTakeExec},
+      {InputType(null()), take_indices, NullTakeExec},
+      {InputType(Type::DECIMAL128), take_indices, FSBTakeExec},
+      {InputType(Type::DECIMAL256), take_indices, FSBTakeExec},
+      {InputType(Type::DICTIONARY), take_indices, DictionaryTake},
+      {InputType(Type::EXTENSION), take_indices, ExtensionTake},
+      {InputType(Type::LIST), take_indices, ListTakeExec},
+      {InputType(Type::LARGE_LIST), take_indices, LargeListTakeExec},
+      {InputType(Type::FIXED_SIZE_LIST), take_indices, FSLTakeExec},
+      {InputType(Type::DENSE_UNION), take_indices, DenseUnionTakeExec},
+      {InputType(Type::STRUCT), take_indices, StructTakeExec},
+      {InputType(Type::MAP), take_indices, MapTakeExec},
   };
 }
 
diff --git a/cpp/src/arrow/compute/kernels/vector_selection_test.cc b/cpp/src/arrow/compute/kernels/vector_selection_test.cc
index e1f2f28dee..5b624911ff 100644
--- a/cpp/src/arrow/compute/kernels/vector_selection_test.cc
+++ b/cpp/src/arrow/compute/kernels/vector_selection_test.cc
@@ -42,99 +42,162 @@ using std::string_view;
 
 namespace compute {
 
+namespace {
+
+template <typename T>
+Result<std::shared_ptr<Array>> REEncode(const T& array) {
+  ARROW_ASSIGN_OR_RAISE(auto datum, RunEndEncode(array));
+  return datum.make_array();
+}
+
+Result<std::shared_ptr<Array>> REEFromJSON(const std::shared_ptr<DataType>& ree_type,
+                                           const std::string& json) {
+  auto ree_type_ptr = checked_cast<const RunEndEncodedType*>(ree_type.get());
+  auto array = ArrayFromJSON(ree_type_ptr->value_type(), json);
+  ARROW_ASSIGN_OR_RAISE(
+      auto datum, RunEndEncode(array, RunEndEncodeOptions{ree_type_ptr->run_end_type()}));
+  return datum.make_array();
+}
+
+Result<std::shared_ptr<Array>> FilterFromJSON(
+    const std::shared_ptr<DataType>& filter_type, const std::string& json) {
+  if (filter_type->id() == Type::RUN_END_ENCODED) {
+    return REEFromJSON(filter_type, json);
+  } else {
+    return ArrayFromJSON(filter_type, json);
+  }
+}
+
+Result<std::shared_ptr<Array>> REEncode(const std::shared_ptr<Array>& array) {
+  ARROW_ASSIGN_OR_RAISE(auto datum, RunEndEncode(array));
+  return datum.make_array();
+}
+
+void CheckTakeIndicesCase(const BooleanArray& filter,
+                          const std::shared_ptr<Array>& expected_indices,
+                          FilterOptions::NullSelectionBehavior null_selection) {
+  ASSERT_OK_AND_ASSIGN(auto indices,
+                       internal::GetTakeIndices(*filter.data(), null_selection));
+  auto indices_array = MakeArray(indices);
+  ValidateOutput(indices);
+  AssertArraysEqual(*expected_indices, *indices_array, /*verbose=*/true);
+
+  ASSERT_OK_AND_ASSIGN(auto ree_filter, REEncode(filter));
+  ASSERT_OK_AND_ASSIGN(auto indices_from_ree,
+                       internal::GetTakeIndices(*ree_filter->data(), null_selection));
+  auto indices_from_ree_array = MakeArray(indices);
+  ValidateOutput(indices_from_ree);
+  AssertArraysEqual(*expected_indices, *indices_from_ree_array, /*verbose=*/true);
+}
+
+void CheckTakeIndicesCase(const std::string& filter_json, const std::string& indices_json,
+                          FilterOptions::NullSelectionBehavior null_selection,
+                          const std::shared_ptr<DataType>& indices_type = uint16()) {
+  auto filter = ArrayFromJSON(boolean(), filter_json);
+  auto expected_indices = ArrayFromJSON(indices_type, indices_json);
+  const auto& boolean_filter = checked_cast<const BooleanArray&>(*filter);
+  CheckTakeIndicesCase(boolean_filter, expected_indices, null_selection);
+}
+
+}  // namespace
+
 // ----------------------------------------------------------------------
 
 TEST(GetTakeIndices, Basics) {
-  auto CheckCase = [&](const std::string& filter_json, const std::string& indices_json,
-                       FilterOptions::NullSelectionBehavior null_selection,
-                       const std::shared_ptr<DataType>& indices_type = uint16()) {
-    auto filter = ArrayFromJSON(boolean(), filter_json);
-    auto expected_indices = ArrayFromJSON(indices_type, indices_json);
-    ASSERT_OK_AND_ASSIGN(auto indices,
-                         internal::GetTakeIndices(*filter->data(), null_selection));
-    auto indices_array = MakeArray(indices);
-    ValidateOutput(indices);
-    AssertArraysEqual(*expected_indices, *indices_array, /*verbose=*/true);
-  };
-
   // Drop null cases
-  CheckCase("[]", "[]", FilterOptions::DROP);
-  CheckCase("[null]", "[]", FilterOptions::DROP);
-  CheckCase("[null, false, true, true, false, true]", "[2, 3, 5]", FilterOptions::DROP);
+  CheckTakeIndicesCase("[]", "[]", FilterOptions::DROP);
+  CheckTakeIndicesCase("[null]", "[]", FilterOptions::DROP);
+  CheckTakeIndicesCase("[null, false, true, true, false, true]", "[2, 3, 5]",
+                       FilterOptions::DROP);
 
   // Emit null cases
-  CheckCase("[]", "[]", FilterOptions::EMIT_NULL);
-  CheckCase("[null]", "[null]", FilterOptions::EMIT_NULL);
-  CheckCase("[null, false, true, true]", "[null, 2, 3]", FilterOptions::EMIT_NULL);
+  CheckTakeIndicesCase("[]", "[]", FilterOptions::EMIT_NULL);
+  CheckTakeIndicesCase("[null]", "[null]", FilterOptions::EMIT_NULL);
+  CheckTakeIndicesCase("[null, false, true, true]", "[null, 2, 3]",
+                       FilterOptions::EMIT_NULL);
 }
 
 TEST(GetTakeIndices, NullValidityBuffer) {
   BooleanArray filter(1, *AllocateEmptyBitmap(1), /*null_bitmap=*/nullptr);
   auto expected_indices = ArrayFromJSON(uint16(), "[]");
 
-  ASSERT_OK_AND_ASSIGN(auto indices,
-                       internal::GetTakeIndices(*filter.data(), FilterOptions::DROP));
-  auto indices_array = MakeArray(indices);
-  ValidateOutput(indices);
-  AssertArraysEqual(*expected_indices, *indices_array, /*verbose=*/true);
-
-  ASSERT_OK_AND_ASSIGN(
-      indices, internal::GetTakeIndices(*filter.data(), FilterOptions::EMIT_NULL));
-  indices_array = MakeArray(indices);
-  ValidateOutput(indices);
-  AssertArraysEqual(*expected_indices, *indices_array, /*verbose=*/true);
+  CheckTakeIndicesCase(filter, expected_indices, FilterOptions::DROP);
+  CheckTakeIndicesCase(filter, expected_indices, FilterOptions::EMIT_NULL);
 }
 
 template <typename IndexArrayType>
 void CheckGetTakeIndicesCase(const Array& untyped_filter) {
   const auto& filter = checked_cast<const BooleanArray&>(untyped_filter);
+  ASSERT_OK_AND_ASSIGN(auto ree_filter, REEncode(*filter.data()));
+
   ASSERT_OK_AND_ASSIGN(std::shared_ptr<ArrayData> drop_indices,
                        internal::GetTakeIndices(*filter.data(), FilterOptions::DROP));
+  ASSERT_OK_AND_ASSIGN(
+      std::shared_ptr<ArrayData> drop_indices_from_ree,
+      internal::GetTakeIndices(*ree_filter->data(), FilterOptions::DROP));
   // Verify DROP indices
   {
     IndexArrayType indices(drop_indices);
+    IndexArrayType indices_from_ree(drop_indices);
     ValidateOutput(indices);
+    ValidateOutput(indices_from_ree);
 
     int64_t out_position = 0;
     for (int64_t i = 0; i < filter.length(); ++i) {
       if (filter.IsValid(i)) {
         if (filter.Value(i)) {
           ASSERT_EQ(indices.Value(out_position), i);
+          ASSERT_EQ(indices_from_ree.Value(out_position), i);
           ++out_position;
         }
       }
     }
     ASSERT_EQ(out_position, indices.length());
+    ASSERT_EQ(out_position, indices_from_ree.length());
+
     // Check that the end length agrees with the output of GetFilterOutputSize
     ASSERT_EQ(out_position,
               internal::GetFilterOutputSize(*filter.data(), FilterOptions::DROP));
+    ASSERT_EQ(out_position,
+              internal::GetFilterOutputSize(*ree_filter->data(), FilterOptions::DROP));
   }
 
   ASSERT_OK_AND_ASSIGN(
       std::shared_ptr<ArrayData> emit_indices,
       internal::GetTakeIndices(*filter.data(), FilterOptions::EMIT_NULL));
+  ASSERT_OK_AND_ASSIGN(
+      std::shared_ptr<ArrayData> emit_indices_from_ree,
+      internal::GetTakeIndices(*ree_filter->data(), FilterOptions::EMIT_NULL));
   // Verify EMIT_NULL indices
   {
     IndexArrayType indices(emit_indices);
+    IndexArrayType indices_from_ree(emit_indices);
     ValidateOutput(indices);
+    ValidateOutput(indices_from_ree);
 
     int64_t out_position = 0;
     for (int64_t i = 0; i < filter.length(); ++i) {
       if (filter.IsValid(i)) {
         if (filter.Value(i)) {
           ASSERT_EQ(indices.Value(out_position), i);
+          ASSERT_EQ(indices_from_ree.Value(out_position), i);
           ++out_position;
         }
       } else {
         ASSERT_TRUE(indices.IsNull(out_position));
+        ASSERT_TRUE(indices_from_ree.IsNull(out_position));
         ++out_position;
       }
     }
 
     ASSERT_EQ(out_position, indices.length());
+    ASSERT_EQ(out_position, indices_from_ree.length());
+
     // Check that the end length agrees with the output of GetFilterOutputSize
     ASSERT_EQ(out_position,
               internal::GetFilterOutputSize(*filter.data(), FilterOptions::EMIT_NULL));
+    ASSERT_EQ(out_position, internal::GetFilterOutputSize(*ree_filter->data(),
+                                                          FilterOptions::EMIT_NULL));
   }
 }
 
@@ -163,15 +226,25 @@ TEST(GetTakeIndices, RandomlyGenerated) {
 // Filter tests
 
 std::shared_ptr<Array> CoalesceNullToFalse(std::shared_ptr<Array> filter) {
-  if (filter->null_count() == 0) {
+  const bool is_ree = filter->type_id() == Type::RUN_END_ENCODED;
+  // Work directly on run values array in case of REE
+  const ArrayData& data = is_ree ? *filter->data()->child_data[1] : *filter->data();
+  if (data.GetNullCount() == 0) {
     return filter;
   }
-  const auto& data = *filter->data();
   auto is_true = std::make_shared<BooleanArray>(data.length, data.buffers[1], nullptr, 0,
                                                 data.offset);
   auto is_valid = std::make_shared<BooleanArray>(data.length, data.buffers[0], nullptr, 0,
                                                  data.offset);
   EXPECT_OK_AND_ASSIGN(Datum out_datum, And(is_true, is_valid));
+  if (is_ree) {
+    const auto& ree_filter = checked_cast<const RunEndEncodedArray&>(*filter);
+    EXPECT_OK_AND_ASSIGN(
+        auto new_ree_filter,
+        RunEndEncodedArray::Make(ree_filter.length(), ree_filter.run_ends(),
+                                 /*values=*/out_datum.make_array(), ree_filter.offset()));
+    return new_ree_filter;
+  }
   return out_datum.make_array();
 }
 
@@ -218,23 +291,30 @@ class TestFilterKernel : public ::testing::Test {
     // add N(=2) dummy values at the start and end of `filter`.
     ARROW_SCOPED_TRACE("for sliced values and filter");
     ASSERT_OK_AND_ASSIGN(auto values_filler, MakeArrayOfNull(values->type(), 3));
-    auto filter_filler = ArrayFromJSON(boolean(), "[true, false]");
-    ASSERT_OK_AND_ASSIGN(auto values_sliced,
+    ASSERT_OK_AND_ASSIGN(auto filter_filler,
+                         FilterFromJSON(filter->type(), "[true, false]"));
+    ASSERT_OK_AND_ASSIGN(auto values_with_filler,
                          Concatenate({values_filler, values, values_filler}));
-    ASSERT_OK_AND_ASSIGN(auto filter_sliced,
+    ASSERT_OK_AND_ASSIGN(auto filter_with_filler,
                          Concatenate({filter_filler, filter, filter_filler}));
-    values_sliced = values_sliced->Slice(3, values->length());
-    filter_sliced = filter_sliced->Slice(2, filter->length());
+    auto values_sliced = values_with_filler->Slice(3, values->length());
+    auto filter_sliced = filter_with_filler->Slice(2, filter->length());
     DoAssertFilter(values_sliced, filter_sliced, expected);
   }
 
   void AssertFilter(const std::shared_ptr<DataType>& type, const std::string& values,
                     const std::string& filter, const std::string& expected) {
-    AssertFilter(ArrayFromJSON(type, values), ArrayFromJSON(boolean(), filter),
-                 ArrayFromJSON(type, expected));
+    auto values_array = ArrayFromJSON(type, values);
+    auto filter_array = ArrayFromJSON(boolean(), filter);
+    auto expected_array = ArrayFromJSON(type, expected);
+    AssertFilter(values_array, filter_array, expected_array);
+
+    ASSERT_OK_AND_ASSIGN(auto ree_filter, REEncode(filter_array));
+    ARROW_SCOPED_TRACE("for plain values and REE filter");
+    AssertFilter(values_array, ree_filter, expected_array);
   }
 
-  FilterOptions emit_null_, drop_;
+  const FilterOptions emit_null_, drop_;
 };
 
 void ValidateFilter(const std::shared_ptr<Array>& values,