You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "felipecrv (via GitHub)" <gi...@apache.org> on 2023/05/24 21:40:51 UTC

[GitHub] [arrow] felipecrv opened a new pull request, #35750: GH-35749: [C++] Handle run-end encoded filters in compute kernels

felipecrv opened a new pull request, #35750:
URL: https://github.com/apache/arrow/pull/35750

   ### Rationale for this change
   
   Boolean arrays (bitmaps) used to represent filters in Arrow take 1 bit per boolean value. If the filter contains long runs, the filter can be run-end encoded and save even more memory.
   
   Using POPCNT, a bitmap can be scanned efficiently for <64 runs of logical values, but a run-end encoded array gives the lengths of the run directly and go beyond word size per run.
   
   These two observations make the case that, for the right dataset, REE filters can be more efficiently processed in compute kernels.
   
   ### What changes are included in this PR?
   
    - [x] `GetFilterOutputSize` can count number of emits from a REE filter
    - [x] `GetTakeIndices` can produce an array of logical indices from a REE filter
    - [ ] `"array_filter"` can handle REE filters
    - [ ] `"array_take"` can handle REE filters
   
   ### Are these changes tested?
   
   Yes.
   
   ### Are there any user-facing changes?
   
   Yes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] felipecrv commented on a diff in pull request #35750: GH-35749: [C++] Handle run-end encoded filters in compute kernels

Posted by "felipecrv (via GitHub)" <gi...@apache.org>.
felipecrv commented on code in PR #35750:
URL: https://github.com/apache/arrow/pull/35750#discussion_r1218600095


##########
cpp/src/arrow/compute/kernels/vector_selection_filter_internal.cc:
##########
@@ -161,39 +173,88 @@ class PrimitiveFilterImpl {
       out_is_valid_ = out_arr->buffers[0]->mutable_data();
     }
     out_data_ = reinterpret_cast<T*>(out_arr->buffers[1]->mutable_data());
-    out_offset_ = out_arr->offset;
     out_length_ = out_arr->length;
+    DCHECK_EQ(out_arr->offset, 0);
     out_position_ = 0;
   }
 
-  void ExecNonNull() {
-    // Fast filter when values and filter are not null
-    ::arrow::internal::VisitSetBitRunsVoid(
-        filter_data_, filter_offset_, values_length_,
-        [&](int64_t position, int64_t length) { WriteValueSegment(position, length); });
+  void ExecREEFilter() {
+    if (filter_.child_data[1].null_count == 0 && values_null_count_ == 0) {
+      DCHECK(!out_is_valid_);
+      // Fastest: no nulls in either filter or values
+      return VisitPlainxREEFilterOutputSegments(
+          filter_, /*filter_may_have_nulls=*/false, null_selection_,
+          [&](int64_t position, int64_t segment_length, bool filter_valid) {
+            // Fastest path: all values in range are included and not null
+            WriteValueSegment(position, segment_length);
+            DCHECK(filter_valid);
+          });
+    }
+    if (values_is_valid_ && out_is_valid_) {
+      // Fast path: values can be null, so the validity bitmap should be copied

Review Comment:
   Changing to
   
   ```diff
              });
        }
        if (values_is_valid_ && out_is_valid_) {
   -      // Fast path: values can be null, so the validity bitmap should be copied
   +      // Slower path: values can be null, so the validity bitmap should be copied
          return VisitPlainxREEFilterOutputSegments(
              filter_, /*filter_may_have_nulls=*/true, null_selection_,
              [&](int64_t position, int64_t segment_length, bool filter_valid) {
   @@ -206,6 +206,8 @@ class PrimitiveFilterImpl {
                }
              });
        }
   +    // Faster path: only write to out_is_valid_ if filter contains nulls and
   +    // null_selection is EMIT_NULL
        if (out_is_valid_) {
          // Set all to valid, so only if nulls are produced by EMIT_NULL, we need
          // to set out_is_valid[i] to false.
     ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on a diff in pull request #35750: GH-35749: [C++] Handle run-end encoded filters in compute kernels

Posted by "pitrou (via GitHub)" <gi...@apache.org>.
pitrou commented on code in PR #35750:
URL: https://github.com/apache/arrow/pull/35750#discussion_r1217751388


##########
cpp/src/arrow/compute/kernels/ree_util_internal.h:
##########
@@ -83,6 +83,18 @@ class ReadWriteValue<ArrowType, in_has_validity_buffer, out_has_validity_buffer,
     return valid;
   }
 
+  /// Pre-conditions guaranteed by the callers:
+  /// - i and j are valid indices into the values buffer
+  /// - the values in i and j are valid
+  bool CompareValuesAt(int64_t i, int64_t j) const {

Review Comment:
   I don't understand what this is doing in REE utils? This is essentially representing value access in primitive arrays.



##########
cpp/src/arrow/compute/kernels/vector_selection_internal.cc:
##########
@@ -82,6 +85,89 @@ Status PreallocatePrimitiveArrayData(KernelContext* ctx, int64_t length, int bit
 
 namespace {
 
+/// \brief Iterate over a REE filter, emitting ranges of a plain values array that
+/// would pass the filter.
+///
+/// Differently from REExREE, and REExPlain filtering, PlainxREE filtering
+/// does not produce a REE output, but rather a plain output array. As such it's
+/// much simpler.
+///
+/// \param filter_may_have_nulls Only pass false if you know the filter has no nulls.
+template <typename FilterRunEndType>
+void VisitPlainxREEFilterOutputSegmentsImpl(
+    const ArraySpan& filter, bool filter_may_have_nulls,
+    FilterOptions::NullSelectionBehavior null_selection,
+    const EmitREEFilterSegment& emit_segment) {
+  using FilterRunEndCType = typename FilterRunEndType::c_type;
+  const ArraySpan& filter_values = arrow::ree_util::ValuesArray(filter);
+  const int64_t filter_values_offset = filter_values.offset;
+  const uint8_t* filter_is_valid = filter_values.buffers[0].data;
+  const uint8_t* filter_selection = filter_values.buffers[1].data;
+  filter_may_have_nulls = filter_may_have_nulls && filter_is_valid != NULLPTR &&

Review Comment:
   Nit: NULLPTR is only needed in .h files.
   ```suggestion
     filter_may_have_nulls = filter_may_have_nulls && filter_is_valid != nullptr &&
   ```



##########
cpp/src/arrow/compute/kernels/vector_selection_filter_internal.cc:
##########
@@ -161,39 +173,88 @@ class PrimitiveFilterImpl {
       out_is_valid_ = out_arr->buffers[0]->mutable_data();
     }
     out_data_ = reinterpret_cast<T*>(out_arr->buffers[1]->mutable_data());
-    out_offset_ = out_arr->offset;
     out_length_ = out_arr->length;
+    DCHECK_EQ(out_arr->offset, 0);

Review Comment:
   I'm curious, are we sure about this?



##########
cpp/src/arrow/compute/kernels/vector_selection_filter_internal.cc:
##########
@@ -84,13 +85,27 @@ int64_t GetBitmapFilterOutputSize(const ArraySpan& filter,
   return output_size;
 }
 
-// TODO(pr-35750): Handle run-end encoded filters in compute kernels
+int64_t GetREEFilterOutputSize(const ArraySpan& filter,
+                               FilterOptions::NullSelectionBehavior null_selection) {
+  const auto& ree_type = checked_cast<const RunEndEncodedType&>(*filter.type);
+  DCHECK_EQ(ree_type.value_type()->id(), Type::BOOL);
+  int64_t output_size = 0;
+  VisitPlainxREEFilterOutputSegments(
+      filter, true, null_selection,

Review Comment:
   ```suggestion
         filter, /*filter_may_have_nulls=*/ true, null_selection,
   ```



##########
cpp/src/arrow/compute/kernels/vector_selection_test.cc:
##########
@@ -55,6 +88,13 @@ TEST(GetTakeIndices, Basics) {
     auto indices_array = MakeArray(indices);
     ValidateOutput(indices);
     AssertArraysEqual(*expected_indices, *indices_array, /*verbose=*/true);
+
+    ASSERT_OK_AND_ASSIGN(auto ree_filter, REEncode(*filter));
+    ASSERT_OK_AND_ASSIGN(auto indices_from_ree,
+                         internal::GetTakeIndices(*ree_filter->data(), null_selection));
+    auto indices_from_ree_array = MakeArray(indices);
+    ValidateOutput(indices_from_ree);
+    AssertArraysEqual(*expected_indices, *indices_from_ree_array, /*verbose=*/true);

Review Comment:
   This is repeated in subsequent tests. Perhaps factor it out in a utility function or test fixture method?



##########
cpp/src/arrow/compute/kernels/vector_selection_filter_internal.cc:
##########
@@ -444,48 +503,64 @@ Status PrimitiveFilterExec(KernelContext* ctx, const ExecSpan& batch, ExecResult
 
 // Optimized binary filter for the case where neither values nor filter have
 // nulls
-template <typename Type>
+template <typename ArrowType>
 Status BinaryFilterNonNullImpl(KernelContext* ctx, const ArraySpan& values,
                                const ArraySpan& filter, int64_t output_length,
                                FilterOptions::NullSelectionBehavior null_selection,
                                ArrayData* out) {
-  using offset_type = typename Type::offset_type;
-  const auto filter_data = filter.buffers[1].data;
+  using offset_type = typename ArrowType::offset_type;
+  const bool is_ree_filter = filter.type->id() == Type::RUN_END_ENCODED;
 
   BINARY_FILTER_SETUP_COMMON();
 
-  RETURN_NOT_OK(arrow::internal::VisitSetBitRuns(
-      filter_data, filter.offset, filter.length, [&](int64_t position, int64_t length) {
-        // Bulk-append raw data
-        const offset_type run_data_bytes =
-            (raw_offsets[position + length] - raw_offsets[position]);
-        APPEND_RAW_DATA(raw_data + raw_offsets[position], run_data_bytes);
-        // Append offsets
-        offset_type cur_offset = raw_offsets[position];
-        for (int64_t i = 0; i < length; ++i) {
-          offset_builder.UnsafeAppend(offset);
-          offset += raw_offsets[i + position + 1] - cur_offset;
-          cur_offset = raw_offsets[i + position + 1];
-        }
-        return Status::OK();
-      }));
+  auto emit_segment = [&](int64_t position, int64_t length) {
+    // Bulk-append raw data
+    const offset_type run_data_bytes =
+        (raw_offsets[position + length] - raw_offsets[position]);
+    APPEND_RAW_DATA(raw_data + raw_offsets[position], run_data_bytes);
+    // Append offsets
+    offset_type cur_offset = raw_offsets[position];
+    for (int64_t i = 0; i < length; ++i) {
+      offset_builder.UnsafeAppend(offset);
+      offset += raw_offsets[i + position + 1] - cur_offset;
+      cur_offset = raw_offsets[i + position + 1];

Review Comment:
   Why not simply:
   ```suggestion
         offset += raw_offsets[i + position + 1] - raw_offsets[i + position];
   ```



##########
cpp/src/arrow/compute/kernels/vector_selection_internal.cc:
##########
@@ -82,6 +85,89 @@ Status PreallocatePrimitiveArrayData(KernelContext* ctx, int64_t length, int bit
 
 namespace {
 
+/// \brief Iterate over a REE filter, emitting ranges of a plain values array that
+/// would pass the filter.
+///
+/// Differently from REExREE, and REExPlain filtering, PlainxREE filtering
+/// does not produce a REE output, but rather a plain output array. As such it's
+/// much simpler.
+///
+/// \param filter_may_have_nulls Only pass false if you know the filter has no nulls.
+template <typename FilterRunEndType>
+void VisitPlainxREEFilterOutputSegmentsImpl(
+    const ArraySpan& filter, bool filter_may_have_nulls,
+    FilterOptions::NullSelectionBehavior null_selection,
+    const EmitREEFilterSegment& emit_segment) {
+  using FilterRunEndCType = typename FilterRunEndType::c_type;
+  const ArraySpan& filter_values = arrow::ree_util::ValuesArray(filter);
+  const int64_t filter_values_offset = filter_values.offset;
+  const uint8_t* filter_is_valid = filter_values.buffers[0].data;
+  const uint8_t* filter_selection = filter_values.buffers[1].data;
+  filter_may_have_nulls = filter_may_have_nulls && filter_is_valid != NULLPTR &&
+                          filter_values.null_count != 0;
+
+  const arrow::ree_util::RunEndEncodedArraySpan<FilterRunEndCType> filter_span(filter);
+  auto it = filter_span.begin();
+  if (filter_may_have_nulls) {
+    if (null_selection == FilterOptions::EMIT_NULL) {
+      while (!it.is_end(filter_span)) {

Review Comment:
   FTR, why didn't you respect the usual C++ conventions for iterators when defining the REE iteration facilities?



##########
cpp/src/arrow/compute/kernels/vector_selection_internal.cc:
##########
@@ -239,6 +309,43 @@ struct Selection {
       }
     };
 
+    if (is_ree_filter) {
+      Status status;

Review Comment:
   Why not let `emit_segment` return a `Status` instead?



##########
cpp/src/arrow/compute/kernels/vector_selection_filter_internal.cc:
##########
@@ -84,13 +85,27 @@ int64_t GetBitmapFilterOutputSize(const ArraySpan& filter,
   return output_size;
 }
 
-// TODO(pr-35750): Handle run-end encoded filters in compute kernels
+int64_t GetREEFilterOutputSize(const ArraySpan& filter,
+                               FilterOptions::NullSelectionBehavior null_selection) {
+  const auto& ree_type = checked_cast<const RunEndEncodedType&>(*filter.type);
+  DCHECK_EQ(ree_type.value_type()->id(), Type::BOOL);
+  int64_t output_size = 0;
+  VisitPlainxREEFilterOutputSegments(
+      filter, true, null_selection,
+      [&output_size](int64_t, int64_t segment_length, bool) {
+        output_size += segment_length;
+      });
+  return output_size;
+}
 
 }  // namespace
 
 int64_t GetFilterOutputSize(const ArraySpan& filter,
                             FilterOptions::NullSelectionBehavior null_selection) {
-  return GetBitmapFilterOutputSize(filter, null_selection);
+  if (filter.type->id() == Type::BOOL) {
+    return GetBitmapFilterOutputSize(filter, null_selection);
+  }

Review Comment:
   ```suggestion
     }
     DCHECK_EQ(filter.type->id(), Type::RUN_END_ENCODED);
   ```



##########
cpp/src/arrow/compute/kernels/vector_selection_internal.cc:
##########
@@ -82,6 +85,89 @@ Status PreallocatePrimitiveArrayData(KernelContext* ctx, int64_t length, int bit
 
 namespace {
 
+/// \brief Iterate over a REE filter, emitting ranges of a plain values array that
+/// would pass the filter.
+///
+/// Differently from REExREE, and REExPlain filtering, PlainxREE filtering
+/// does not produce a REE output, but rather a plain output array. As such it's
+/// much simpler.
+///
+/// \param filter_may_have_nulls Only pass false if you know the filter has no nulls.
+template <typename FilterRunEndType>
+void VisitPlainxREEFilterOutputSegmentsImpl(
+    const ArraySpan& filter, bool filter_may_have_nulls,
+    FilterOptions::NullSelectionBehavior null_selection,
+    const EmitREEFilterSegment& emit_segment) {
+  using FilterRunEndCType = typename FilterRunEndType::c_type;
+  const ArraySpan& filter_values = arrow::ree_util::ValuesArray(filter);
+  const int64_t filter_values_offset = filter_values.offset;
+  const uint8_t* filter_is_valid = filter_values.buffers[0].data;
+  const uint8_t* filter_selection = filter_values.buffers[1].data;
+  filter_may_have_nulls = filter_may_have_nulls && filter_is_valid != NULLPTR &&
+                          filter_values.null_count != 0;
+
+  const arrow::ree_util::RunEndEncodedArraySpan<FilterRunEndCType> filter_span(filter);
+  auto it = filter_span.begin();
+  if (filter_may_have_nulls) {
+    if (null_selection == FilterOptions::EMIT_NULL) {
+      while (!it.is_end(filter_span)) {
+        const int64_t i = filter_values_offset + it.index_into_array();
+        const bool valid = bit_util::GetBit(filter_is_valid, i);
+        const bool emit = !valid || bit_util::GetBit(filter_selection, i);
+        if (emit) {
+          emit_segment(it.logical_position(), it.run_length(), valid);

Review Comment:
   I'm not sure whether you tried to time these new kernels, but `emit_segment` being a `std::function` will come with its own overhead (I'm not sure by how much).
   
   Another approach would be to give `emit_segment` a batch of ranges:
   ```c++
   struct REEFilterSegment {
     int64_t position;
     int64_t segment_length;
     bool filter_valid; // false means emit none
   };
   
   using EmitREEFilterSegment =
       std::function<void(const REEFilterSegment*, int32_t num_segments)>;
   ```
   



##########
cpp/src/arrow/compute/kernels/vector_selection_filter_internal.cc:
##########
@@ -161,39 +173,88 @@ class PrimitiveFilterImpl {
       out_is_valid_ = out_arr->buffers[0]->mutable_data();
     }
     out_data_ = reinterpret_cast<T*>(out_arr->buffers[1]->mutable_data());
-    out_offset_ = out_arr->offset;
     out_length_ = out_arr->length;
+    DCHECK_EQ(out_arr->offset, 0);
     out_position_ = 0;
   }
 
-  void ExecNonNull() {
-    // Fast filter when values and filter are not null
-    ::arrow::internal::VisitSetBitRunsVoid(
-        filter_data_, filter_offset_, values_length_,
-        [&](int64_t position, int64_t length) { WriteValueSegment(position, length); });
+  void ExecREEFilter() {
+    if (filter_.child_data[1].null_count == 0 && values_null_count_ == 0) {

Review Comment:
   Note that `null_count` may be unknown (it's lazily computed), you should have called `GetNullCount` to make sure it's known. Perhaps in the constructor?



##########
cpp/src/arrow/compute/kernels/vector_selection_filter_internal.cc:
##########
@@ -161,39 +173,88 @@ class PrimitiveFilterImpl {
       out_is_valid_ = out_arr->buffers[0]->mutable_data();
     }
     out_data_ = reinterpret_cast<T*>(out_arr->buffers[1]->mutable_data());
-    out_offset_ = out_arr->offset;
     out_length_ = out_arr->length;
+    DCHECK_EQ(out_arr->offset, 0);
     out_position_ = 0;
   }
 
-  void ExecNonNull() {
-    // Fast filter when values and filter are not null
-    ::arrow::internal::VisitSetBitRunsVoid(
-        filter_data_, filter_offset_, values_length_,
-        [&](int64_t position, int64_t length) { WriteValueSegment(position, length); });
+  void ExecREEFilter() {
+    if (filter_.child_data[1].null_count == 0 && values_null_count_ == 0) {
+      DCHECK(!out_is_valid_);
+      // Fastest: no nulls in either filter or values
+      return VisitPlainxREEFilterOutputSegments(
+          filter_, /*filter_may_have_nulls=*/false, null_selection_,
+          [&](int64_t position, int64_t segment_length, bool filter_valid) {
+            // Fastest path: all values in range are included and not null
+            WriteValueSegment(position, segment_length);
+            DCHECK(filter_valid);
+          });
+    }
+    if (values_is_valid_ && out_is_valid_) {

Review Comment:
   Hmm... shouldn't the output always have a validity bitmap if the input has?
   ```suggestion
       if (values_is_valid_) {
         DCHECK(out_is_valid_);
   ```



##########
cpp/src/arrow/compute/kernels/vector_selection_filter_internal.cc:
##########
@@ -161,39 +173,88 @@ class PrimitiveFilterImpl {
       out_is_valid_ = out_arr->buffers[0]->mutable_data();
     }
     out_data_ = reinterpret_cast<T*>(out_arr->buffers[1]->mutable_data());
-    out_offset_ = out_arr->offset;
     out_length_ = out_arr->length;
+    DCHECK_EQ(out_arr->offset, 0);
     out_position_ = 0;
   }
 
-  void ExecNonNull() {
-    // Fast filter when values and filter are not null
-    ::arrow::internal::VisitSetBitRunsVoid(
-        filter_data_, filter_offset_, values_length_,
-        [&](int64_t position, int64_t length) { WriteValueSegment(position, length); });
+  void ExecREEFilter() {
+    if (filter_.child_data[1].null_count == 0 && values_null_count_ == 0) {
+      DCHECK(!out_is_valid_);
+      // Fastest: no nulls in either filter or values
+      return VisitPlainxREEFilterOutputSegments(
+          filter_, /*filter_may_have_nulls=*/false, null_selection_,
+          [&](int64_t position, int64_t segment_length, bool filter_valid) {
+            // Fastest path: all values in range are included and not null
+            WriteValueSegment(position, segment_length);
+            DCHECK(filter_valid);
+          });
+    }
+    if (values_is_valid_ && out_is_valid_) {
+      // Fast path: values can be null, so the validity bitmap should be copied

Review Comment:
   This is the slow path, rather, no? The path below will probably be faster, since it does not issue individual `CopyBitmap` calls.



##########
cpp/src/arrow/compute/kernels/vector_selection_filter_internal.cc:
##########
@@ -863,7 +976,13 @@ class FilterMetaFunction : public MetaFunction {
   Result<Datum> ExecuteImpl(const std::vector<Datum>& args,
                             const FunctionOptions* options,
                             ExecContext* ctx) const override {
-    if (args[1].type()->id() != Type::BOOL) {
+    auto& filter_type = *args[1].type();

Review Comment:
   Style nit
   ```suggestion
       const auto& filter_type = *args[1].type();
   ```



##########
cpp/src/arrow/compute/kernels/vector_selection_internal.cc:
##########
@@ -82,6 +85,89 @@ Status PreallocatePrimitiveArrayData(KernelContext* ctx, int64_t length, int bit
 
 namespace {
 
+/// \brief Iterate over a REE filter, emitting ranges of a plain values array that
+/// would pass the filter.
+///
+/// Differently from REExREE, and REExPlain filtering, PlainxREE filtering
+/// does not produce a REE output, but rather a plain output array. As such it's
+/// much simpler.
+///
+/// \param filter_may_have_nulls Only pass false if you know the filter has no nulls.
+template <typename FilterRunEndType>
+void VisitPlainxREEFilterOutputSegmentsImpl(
+    const ArraySpan& filter, bool filter_may_have_nulls,
+    FilterOptions::NullSelectionBehavior null_selection,
+    const EmitREEFilterSegment& emit_segment) {
+  using FilterRunEndCType = typename FilterRunEndType::c_type;
+  const ArraySpan& filter_values = arrow::ree_util::ValuesArray(filter);
+  const int64_t filter_values_offset = filter_values.offset;
+  const uint8_t* filter_is_valid = filter_values.buffers[0].data;
+  const uint8_t* filter_selection = filter_values.buffers[1].data;
+  filter_may_have_nulls = filter_may_have_nulls && filter_is_valid != NULLPTR &&
+                          filter_values.null_count != 0;
+
+  const arrow::ree_util::RunEndEncodedArraySpan<FilterRunEndCType> filter_span(filter);
+  auto it = filter_span.begin();
+  if (filter_may_have_nulls) {
+    if (null_selection == FilterOptions::EMIT_NULL) {
+      while (!it.is_end(filter_span)) {
+        const int64_t i = filter_values_offset + it.index_into_array();
+        const bool valid = bit_util::GetBit(filter_is_valid, i);
+        const bool emit = !valid || bit_util::GetBit(filter_selection, i);
+        if (emit) {
+          emit_segment(it.logical_position(), it.run_length(), valid);

Review Comment:
   Of course, _ideally_ a REE-encoded array has long enough runs to make REE encoding worthwhile...



##########
cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc:
##########
@@ -196,14 +285,37 @@ Result<std::shared_ptr<ArrayData>> GetTakeIndicesFromBitmap(
   }
 }
 
-// TODO(pr-35750): Handle run-end encoded filters in compute kernels
+Result<std::shared_ptr<ArrayData>> GetTakeIndicesFromREEBitmap(
+    const ArraySpan& filter, FilterOptions::NullSelectionBehavior null_selection,
+    MemoryPool* memory_pool) {
+  auto& ree_type = checked_cast<const RunEndEncodedType&>(*filter.type);

Review Comment:
   Style nit
   ```suggestion
     const auto& ree_type = checked_cast<const RunEndEncodedType&>(*filter.type);
   ```



##########
cpp/src/arrow/compute/kernels/vector_selection_test.cc:
##########
@@ -78,63 +118,102 @@ TEST(GetTakeIndices, NullValidityBuffer) {
   ValidateOutput(indices);
   AssertArraysEqual(*expected_indices, *indices_array, /*verbose=*/true);
 
+  ASSERT_OK_AND_ASSIGN(auto ree_filter, REEncode(filter));
+  ASSERT_OK_AND_ASSIGN(
+      auto indices_from_ree,
+      internal::GetTakeIndices(*ree_filter->data(), FilterOptions::DROP));
+  auto indices_from_ree_array = MakeArray(indices);
+  ValidateOutput(indices_from_ree);
+  AssertArraysEqual(*expected_indices, *indices_from_ree_array, /*verbose=*/true);
+
   ASSERT_OK_AND_ASSIGN(
       indices, internal::GetTakeIndices(*filter.data(), FilterOptions::EMIT_NULL));
   indices_array = MakeArray(indices);
   ValidateOutput(indices);
   AssertArraysEqual(*expected_indices, *indices_array, /*verbose=*/true);
+
+  ASSERT_OK_AND_ASSIGN(
+      indices_from_ree,
+      internal::GetTakeIndices(*ree_filter->data(), FilterOptions::EMIT_NULL));
+  indices_from_ree_array = MakeArray(indices);
+  ValidateOutput(indices_from_ree);
+  AssertArraysEqual(*expected_indices, *indices_from_ree_array, /*verbose=*/true);
 }
 
 template <typename IndexArrayType>
 void CheckGetTakeIndicesCase(const Array& untyped_filter) {
   const auto& filter = checked_cast<const BooleanArray&>(untyped_filter);
+  ASSERT_OK_AND_ASSIGN(auto ree_filter, REEncode(*filter.data()));
+
   ASSERT_OK_AND_ASSIGN(std::shared_ptr<ArrayData> drop_indices,
                        internal::GetTakeIndices(*filter.data(), FilterOptions::DROP));
+  ASSERT_OK_AND_ASSIGN(
+      std::shared_ptr<ArrayData> drop_indices_from_ree,
+      internal::GetTakeIndices(*ree_filter->data(), FilterOptions::DROP));
   // Verify DROP indices
   {
     IndexArrayType indices(drop_indices);
+    IndexArrayType indices_from_ree(drop_indices);
     ValidateOutput(indices);
+    ValidateOutput(indices_from_ree);
 
     int64_t out_position = 0;
     for (int64_t i = 0; i < filter.length(); ++i) {
       if (filter.IsValid(i)) {
         if (filter.Value(i)) {
           ASSERT_EQ(indices.Value(out_position), i);
+          ASSERT_EQ(indices_from_ree.Value(out_position), i);
           ++out_position;
         }
       }
     }
     ASSERT_EQ(out_position, indices.length());
+    ASSERT_EQ(out_position, indices_from_ree.length());
+
     // Check that the end length agrees with the output of GetFilterOutputSize
     ASSERT_EQ(out_position,
               internal::GetFilterOutputSize(*filter.data(), FilterOptions::DROP));
+    ASSERT_EQ(out_position,
+              internal::GetFilterOutputSize(*ree_filter->data(), FilterOptions::DROP));
   }
 
   ASSERT_OK_AND_ASSIGN(
       std::shared_ptr<ArrayData> emit_indices,
       internal::GetTakeIndices(*filter.data(), FilterOptions::EMIT_NULL));
+  ASSERT_OK_AND_ASSIGN(
+      std::shared_ptr<ArrayData> emit_indices_from_ree,
+      internal::GetTakeIndices(*ree_filter->data(), FilterOptions::EMIT_NULL));
   // Verify EMIT_NULL indices
   {
     IndexArrayType indices(emit_indices);
+    IndexArrayType indices_from_ree(emit_indices);
     ValidateOutput(indices);
+    ValidateOutput(indices_from_ree);
 
     int64_t out_position = 0;
     for (int64_t i = 0; i < filter.length(); ++i) {
       if (filter.IsValid(i)) {
         if (filter.Value(i)) {
           ASSERT_EQ(indices.Value(out_position), i);
+          ASSERT_EQ(indices_from_ree.Value(out_position), i);
           ++out_position;
         }
       } else {
         ASSERT_TRUE(indices.IsNull(out_position));
+        ASSERT_TRUE(indices_from_ree.IsNull(out_position));
         ++out_position;
       }
     }
 
     ASSERT_EQ(out_position, indices.length());
+    ASSERT_EQ(out_position, indices_from_ree.length());
+
     // Check that the end length agrees with the output of GetFilterOutputSize
     ASSERT_EQ(out_position,
               internal::GetFilterOutputSize(*filter.data(), FilterOptions::EMIT_NULL));
+    ASSERT_OK_AND_ASSIGN(auto ree_filter, REEncode(*filter.data()));

Review Comment:
   Didn't we already create a `ree_filter` above? Or am I misreading?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] felipecrv commented on a diff in pull request #35750: GH-35749: [C++] Handle run-end encoded filters in compute kernels

Posted by "felipecrv (via GitHub)" <gi...@apache.org>.
felipecrv commented on code in PR #35750:
URL: https://github.com/apache/arrow/pull/35750#discussion_r1218545713


##########
cpp/src/arrow/compute/kernels/ree_util_internal.h:
##########
@@ -83,6 +83,18 @@ class ReadWriteValue<ArrowType, in_has_validity_buffer, out_has_validity_buffer,
     return valid;
   }
 
+  /// Pre-conditions guaranteed by the callers:
+  /// - i and j are valid indices into the values buffer
+  /// - the values in i and j are valid
+  bool CompareValuesAt(int64_t i, int64_t j) const {

Review Comment:
   Comparing values is commonly used when run-end encoding kernels. I would happily move it out of here if you have a suggestion. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] felipecrv commented on a diff in pull request #35750: GH-35749: [C++] Handle run-end encoded filters in compute kernels

Posted by "felipecrv (via GitHub)" <gi...@apache.org>.
felipecrv commented on code in PR #35750:
URL: https://github.com/apache/arrow/pull/35750#discussion_r1230037576


##########
cpp/src/arrow/compute/kernels/vector_selection_filter_internal.cc:
##########
@@ -444,196 +518,244 @@ Status PrimitiveFilterExec(KernelContext* ctx, const ExecSpan& batch, ExecResult
 
 // Optimized binary filter for the case where neither values nor filter have
 // nulls
-template <typename Type>
+template <typename ArrowType>
 Status BinaryFilterNonNullImpl(KernelContext* ctx, const ArraySpan& values,
                                const ArraySpan& filter, int64_t output_length,
                                FilterOptions::NullSelectionBehavior null_selection,
                                ArrayData* out) {
-  using offset_type = typename Type::offset_type;
-  const auto filter_data = filter.buffers[1].data;
+  using offset_type = typename ArrowType::offset_type;
+  const bool is_ree_filter = filter.type->id() == Type::RUN_END_ENCODED;
 
   BINARY_FILTER_SETUP_COMMON();
 
-  RETURN_NOT_OK(arrow::internal::VisitSetBitRuns(
-      filter_data, filter.offset, filter.length, [&](int64_t position, int64_t length) {
-        // Bulk-append raw data
-        const offset_type run_data_bytes =
-            (raw_offsets[position + length] - raw_offsets[position]);
-        APPEND_RAW_DATA(raw_data + raw_offsets[position], run_data_bytes);
-        // Append offsets
-        offset_type cur_offset = raw_offsets[position];
-        for (int64_t i = 0; i < length; ++i) {
-          offset_builder.UnsafeAppend(offset);
-          offset += raw_offsets[i + position + 1] - cur_offset;
-          cur_offset = raw_offsets[i + position + 1];
-        }
-        return Status::OK();
-      }));
+  auto emit_segment = [&](int64_t position, int64_t length) {
+    // Bulk-append raw data
+    const offset_type run_data_bytes =
+        (raw_offsets[position + length] - raw_offsets[position]);
+    APPEND_RAW_DATA(raw_data + raw_offsets[position], run_data_bytes);
+    // Append offsets
+    for (int64_t i = 0; i < length; ++i) {
+      offset_builder.UnsafeAppend(offset);
+      offset += raw_offsets[i + position + 1] - raw_offsets[i + position + 1];
+    }
+    return Status::OK();
+  };
+  if (is_ree_filter) {
+    Status status;
+    VisitPlainxREEFilterOutputSegments(
+        filter, true, null_selection,
+        [&status, emit_segment = std::move(emit_segment)](
+            int64_t position, int64_t segment_length, bool filter_valid) {
+          DCHECK(filter_valid);
+          status = emit_segment(position, segment_length);
+          return status.ok();
+        });
+    RETURN_NOT_OK(std::move(status));
+  } else {
+    const auto filter_data = filter.buffers[1].data;
+    RETURN_NOT_OK(arrow::internal::VisitSetBitRuns(
+        filter_data, filter.offset, filter.length, std::move(emit_segment)));
+  }
 
   offset_builder.UnsafeAppend(offset);
   out->length = output_length;
   RETURN_NOT_OK(offset_builder.Finish(&out->buffers[1]));
   return data_builder.Finish(&out->buffers[2]);
 }
 
-template <typename Type>
+template <typename ArrowType>
 Status BinaryFilterImpl(KernelContext* ctx, const ArraySpan& values,
                         const ArraySpan& filter, int64_t output_length,
                         FilterOptions::NullSelectionBehavior null_selection,
                         ArrayData* out) {
-  using offset_type = typename Type::offset_type;
+  using offset_type = typename ArrowType::offset_type;
+
+  const bool is_ree_filter = filter.type->id() == Type::RUN_END_ENCODED;
 
-  const auto filter_data = filter.buffers[1].data;
-  const uint8_t* filter_is_valid = filter.buffers[0].data;
-  const int64_t filter_offset = filter.offset;
+  BINARY_FILTER_SETUP_COMMON();
 
   const uint8_t* values_is_valid = values.buffers[0].data;
   const int64_t values_offset = values.offset;
 
+  const int64_t out_offset = out->offset;
   uint8_t* out_is_valid = out->buffers[0]->mutable_data();
   // Zero bits and then only have to set valid values to true
-  bit_util::SetBitsTo(out_is_valid, 0, output_length, false);
-
-  // We use 3 block counters for fast scanning of the filter
-  //
-  // * values_valid_counter: for values null/not-null
-  // * filter_valid_counter: for filter null/not-null
-  // * filter_counter: for filter true/false
-  OptionalBitBlockCounter values_valid_counter(values_is_valid, values_offset,
-                                               values.length);
-  OptionalBitBlockCounter filter_valid_counter(filter_is_valid, filter_offset,
-                                               filter.length);
-  BitBlockCounter filter_counter(filter_data, filter_offset, filter.length);
-
-  BINARY_FILTER_SETUP_COMMON();
+  bit_util::SetBitsTo(out_is_valid, out_offset, output_length, false);
 
   int64_t in_position = 0;
   int64_t out_position = 0;
-  while (in_position < filter.length) {
-    BitBlockCount filter_valid_block = filter_valid_counter.NextWord();
-    BitBlockCount values_valid_block = values_valid_counter.NextWord();
-    BitBlockCount filter_block = filter_counter.NextWord();
-    if (filter_block.NoneSet() && null_selection == FilterOptions::DROP) {
-      // For this exceedingly common case in low-selectivity filters we can
-      // skip further analysis of the data and move on to the next block.
-      in_position += filter_block.length;
-    } else if (filter_valid_block.AllSet()) {
-      // Simpler path: no filter values are null
-      if (filter_block.AllSet()) {
-        // Fastest path: filter values are all true and not null
-        if (values_valid_block.AllSet()) {
-          // The values aren't null either
-          bit_util::SetBitsTo(out_is_valid, out_position, filter_block.length, true);
-
-          // Bulk-append raw data
-          offset_type block_data_bytes =
-              (raw_offsets[in_position + filter_block.length] - raw_offsets[in_position]);
-          APPEND_RAW_DATA(raw_data + raw_offsets[in_position], block_data_bytes);
-          // Append offsets
-          for (int64_t i = 0; i < filter_block.length; ++i, ++in_position) {
-            offset_builder.UnsafeAppend(offset);
-            offset += raw_offsets[in_position + 1] - raw_offsets[in_position];
-          }
-          out_position += filter_block.length;
-        } else {
-          // Some of the values in this block are null
-          for (int64_t i = 0; i < filter_block.length;
-               ++i, ++in_position, ++out_position) {
-            offset_builder.UnsafeAppend(offset);
-            if (bit_util::GetBit(values_is_valid, values_offset + in_position)) {
-              bit_util::SetBit(out_is_valid, out_position);
-              APPEND_SINGLE_VALUE();
-            }
+  if (is_ree_filter) {
+    auto emit_segment = [&](int64_t position, int64_t segment_length, bool filter_valid) {
+      in_position = position;
+      if (filter_valid) {
+        // Filter values are all true and not null
+        // Some of the values in the block may be null
+        for (int64_t i = 0; i < segment_length; ++i, ++in_position, ++out_position) {
+          offset_builder.UnsafeAppend(offset);
+          if (bit_util::GetBit(values_is_valid, values_offset + in_position)) {
+            bit_util::SetBit(out_is_valid, out_offset + out_position);
+            APPEND_SINGLE_VALUE();
           }
         }
-      } else {  // !filter_block.AllSet()
-        // Some of the filter values are false, but all not null
-        if (values_valid_block.AllSet()) {
-          // All the values are not-null, so we can skip null checking for
-          // them
-          for (int64_t i = 0; i < filter_block.length; ++i, ++in_position) {
-            if (bit_util::GetBit(filter_data, filter_offset + in_position)) {
+      } else {
+        offset_builder.UnsafeAppend(segment_length, offset);
+        out_position += segment_length;
+      }
+      return Status::OK();
+    };
+    Status status;
+    VisitPlainxREEFilterOutputSegments(
+        filter, true, null_selection,
+        [&status, emit_segment = std::move(emit_segment)](
+            int64_t position, int64_t segment_length, bool filter_valid) {
+          status = emit_segment(position, segment_length, filter_valid);
+          return status.ok();
+        });
+    RETURN_NOT_OK(std::move(status));

Review Comment:
   Which yes, is a pedantic `std::move`, but should I remove it? :D



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] felipecrv commented on a diff in pull request #35750: GH-35749: [C++] Handle run-end encoded filters in compute kernels

Posted by "felipecrv (via GitHub)" <gi...@apache.org>.
felipecrv commented on code in PR #35750:
URL: https://github.com/apache/arrow/pull/35750#discussion_r1218554815


##########
cpp/src/arrow/compute/kernels/vector_selection_internal.cc:
##########
@@ -82,6 +85,89 @@ Status PreallocatePrimitiveArrayData(KernelContext* ctx, int64_t length, int bit
 
 namespace {
 
+/// \brief Iterate over a REE filter, emitting ranges of a plain values array that
+/// would pass the filter.
+///
+/// Differently from REExREE, and REExPlain filtering, PlainxREE filtering
+/// does not produce a REE output, but rather a plain output array. As such it's
+/// much simpler.
+///
+/// \param filter_may_have_nulls Only pass false if you know the filter has no nulls.
+template <typename FilterRunEndType>
+void VisitPlainxREEFilterOutputSegmentsImpl(
+    const ArraySpan& filter, bool filter_may_have_nulls,
+    FilterOptions::NullSelectionBehavior null_selection,
+    const EmitREEFilterSegment& emit_segment) {
+  using FilterRunEndCType = typename FilterRunEndType::c_type;
+  const ArraySpan& filter_values = arrow::ree_util::ValuesArray(filter);
+  const int64_t filter_values_offset = filter_values.offset;
+  const uint8_t* filter_is_valid = filter_values.buffers[0].data;
+  const uint8_t* filter_selection = filter_values.buffers[1].data;
+  filter_may_have_nulls = filter_may_have_nulls && filter_is_valid != NULLPTR &&
+                          filter_values.null_count != 0;
+
+  const arrow::ree_util::RunEndEncodedArraySpan<FilterRunEndCType> filter_span(filter);
+  auto it = filter_span.begin();
+  if (filter_may_have_nulls) {
+    if (null_selection == FilterOptions::EMIT_NULL) {
+      while (!it.is_end(filter_span)) {
+        const int64_t i = filter_values_offset + it.index_into_array();
+        const bool valid = bit_util::GetBit(filter_is_valid, i);
+        const bool emit = !valid || bit_util::GetBit(filter_selection, i);
+        if (emit) {
+          emit_segment(it.logical_position(), it.run_length(), valid);

Review Comment:
   I developed the REExREE kernels first and measured the binary-size impact of using template-param lambdas: my compilation unit was the biggest in the whole project, so I decided to migrate to `std::function` to save multiple MBs in binary size. 
   
   ![image](https://github.com/apache/arrow/assets/207795/18cb1271-04c9-4737-a68a-56301fde3ebf)
   
   The PlainxREE kernel is simpler, so maybe the impact wouldn't be so bad.
   
   > Of course, ideally a REE-encoded array has long enough runs to make REE encoding worthwhile...
   
   Exactly. So I think it's better to start with `std::function` to avoid inflating the library size. If this gains adoption, we can revisit the kernels later.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] felipecrv commented on a diff in pull request #35750: GH-35749: [C++] Handle run-end encoded filters in compute kernels

Posted by "felipecrv (via GitHub)" <gi...@apache.org>.
felipecrv commented on code in PR #35750:
URL: https://github.com/apache/arrow/pull/35750#discussion_r1224600509


##########
cpp/src/arrow/compute/kernels/vector_selection_filter_internal.cc:
##########
@@ -161,39 +173,88 @@ class PrimitiveFilterImpl {
       out_is_valid_ = out_arr->buffers[0]->mutable_data();
     }
     out_data_ = reinterpret_cast<T*>(out_arr->buffers[1]->mutable_data());
-    out_offset_ = out_arr->offset;
     out_length_ = out_arr->length;
+    DCHECK_EQ(out_arr->offset, 0);

Review Comment:
   I brought back the offset and made it consistently used for the primitive kernels.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] felipecrv commented on pull request #35750: GH-35749: [C++] Handle run-end encoded filters in compute kernels

Posted by "felipecrv (via GitHub)" <gi...@apache.org>.
felipecrv commented on PR #35750:
URL: https://github.com/apache/arrow/pull/35750#issuecomment-1561970058

   The code-moving commits in this PR are better reviewed by themselves in this separate PR: #35751


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] felipecrv commented on a diff in pull request #35750: GH-35749: [C++] Handle run-end encoded filters in compute kernels

Posted by "felipecrv (via GitHub)" <gi...@apache.org>.
felipecrv commented on code in PR #35750:
URL: https://github.com/apache/arrow/pull/35750#discussion_r1218659064


##########
cpp/src/arrow/compute/kernels/vector_selection_filter_internal.cc:
##########
@@ -444,48 +503,64 @@ Status PrimitiveFilterExec(KernelContext* ctx, const ExecSpan& batch, ExecResult
 
 // Optimized binary filter for the case where neither values nor filter have
 // nulls
-template <typename Type>
+template <typename ArrowType>
 Status BinaryFilterNonNullImpl(KernelContext* ctx, const ArraySpan& values,
                                const ArraySpan& filter, int64_t output_length,
                                FilterOptions::NullSelectionBehavior null_selection,
                                ArrayData* out) {
-  using offset_type = typename Type::offset_type;
-  const auto filter_data = filter.buffers[1].data;
+  using offset_type = typename ArrowType::offset_type;
+  const bool is_ree_filter = filter.type->id() == Type::RUN_END_ENCODED;
 
   BINARY_FILTER_SETUP_COMMON();
 
-  RETURN_NOT_OK(arrow::internal::VisitSetBitRuns(
-      filter_data, filter.offset, filter.length, [&](int64_t position, int64_t length) {
-        // Bulk-append raw data
-        const offset_type run_data_bytes =
-            (raw_offsets[position + length] - raw_offsets[position]);
-        APPEND_RAW_DATA(raw_data + raw_offsets[position], run_data_bytes);
-        // Append offsets
-        offset_type cur_offset = raw_offsets[position];
-        for (int64_t i = 0; i < length; ++i) {
-          offset_builder.UnsafeAppend(offset);
-          offset += raw_offsets[i + position + 1] - cur_offset;
-          cur_offset = raw_offsets[i + position + 1];
-        }
-        return Status::OK();
-      }));
+  auto emit_segment = [&](int64_t position, int64_t length) {
+    // Bulk-append raw data
+    const offset_type run_data_bytes =
+        (raw_offsets[position + length] - raw_offsets[position]);
+    APPEND_RAW_DATA(raw_data + raw_offsets[position], run_data_bytes);
+    // Append offsets
+    offset_type cur_offset = raw_offsets[position];
+    for (int64_t i = 0; i < length; ++i) {
+      offset_builder.UnsafeAppend(offset);
+      offset += raw_offsets[i + position + 1] - cur_offset;
+      cur_offset = raw_offsets[i + position + 1];

Review Comment:
   Changed. Will push soon.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] felipecrv commented on a diff in pull request #35750: GH-35749: [C++] Handle run-end encoded filters in compute kernels

Posted by "felipecrv (via GitHub)" <gi...@apache.org>.
felipecrv commented on code in PR #35750:
URL: https://github.com/apache/arrow/pull/35750#discussion_r1218596876


##########
cpp/src/arrow/compute/kernels/vector_selection_filter_internal.cc:
##########
@@ -161,39 +173,88 @@ class PrimitiveFilterImpl {
       out_is_valid_ = out_arr->buffers[0]->mutable_data();
     }
     out_data_ = reinterpret_cast<T*>(out_arr->buffers[1]->mutable_data());
-    out_offset_ = out_arr->offset;
     out_length_ = out_arr->length;
+    DCHECK_EQ(out_arr->offset, 0);
     out_position_ = 0;
   }
 
-  void ExecNonNull() {
-    // Fast filter when values and filter are not null
-    ::arrow::internal::VisitSetBitRunsVoid(
-        filter_data_, filter_offset_, values_length_,
-        [&](int64_t position, int64_t length) { WriteValueSegment(position, length); });
+  void ExecREEFilter() {
+    if (filter_.child_data[1].null_count == 0 && values_null_count_ == 0) {
+      DCHECK(!out_is_valid_);
+      // Fastest: no nulls in either filter or values
+      return VisitPlainxREEFilterOutputSegments(
+          filter_, /*filter_may_have_nulls=*/false, null_selection_,
+          [&](int64_t position, int64_t segment_length, bool filter_valid) {
+            // Fastest path: all values in range are included and not null
+            WriteValueSegment(position, segment_length);
+            DCHECK(filter_valid);
+          });
+    }
+    if (values_is_valid_ && out_is_valid_) {
+      // Fast path: values can be null, so the validity bitmap should be copied

Review Comment:
   You're right. I adapted these comments from the other kernels, but they don't make sense here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] felipecrv commented on a diff in pull request #35750: GH-35749: [C++] Handle run-end encoded filters in compute kernels

Posted by "felipecrv (via GitHub)" <gi...@apache.org>.
felipecrv commented on code in PR #35750:
URL: https://github.com/apache/arrow/pull/35750#discussion_r1230031496


##########
cpp/src/arrow/compute/kernels/vector_selection_filter_internal.cc:
##########
@@ -444,196 +518,244 @@ Status PrimitiveFilterExec(KernelContext* ctx, const ExecSpan& batch, ExecResult
 
 // Optimized binary filter for the case where neither values nor filter have
 // nulls
-template <typename Type>
+template <typename ArrowType>
 Status BinaryFilterNonNullImpl(KernelContext* ctx, const ArraySpan& values,
                                const ArraySpan& filter, int64_t output_length,
                                FilterOptions::NullSelectionBehavior null_selection,
                                ArrayData* out) {
-  using offset_type = typename Type::offset_type;
-  const auto filter_data = filter.buffers[1].data;
+  using offset_type = typename ArrowType::offset_type;
+  const bool is_ree_filter = filter.type->id() == Type::RUN_END_ENCODED;
 
   BINARY_FILTER_SETUP_COMMON();
 
-  RETURN_NOT_OK(arrow::internal::VisitSetBitRuns(
-      filter_data, filter.offset, filter.length, [&](int64_t position, int64_t length) {
-        // Bulk-append raw data
-        const offset_type run_data_bytes =
-            (raw_offsets[position + length] - raw_offsets[position]);
-        APPEND_RAW_DATA(raw_data + raw_offsets[position], run_data_bytes);
-        // Append offsets
-        offset_type cur_offset = raw_offsets[position];
-        for (int64_t i = 0; i < length; ++i) {
-          offset_builder.UnsafeAppend(offset);
-          offset += raw_offsets[i + position + 1] - cur_offset;
-          cur_offset = raw_offsets[i + position + 1];
-        }
-        return Status::OK();
-      }));
+  auto emit_segment = [&](int64_t position, int64_t length) {
+    // Bulk-append raw data
+    const offset_type run_data_bytes =
+        (raw_offsets[position + length] - raw_offsets[position]);
+    APPEND_RAW_DATA(raw_data + raw_offsets[position], run_data_bytes);
+    // Append offsets
+    for (int64_t i = 0; i < length; ++i) {
+      offset_builder.UnsafeAppend(offset);
+      offset += raw_offsets[i + position + 1] - raw_offsets[i + position + 1];
+    }
+    return Status::OK();
+  };
+  if (is_ree_filter) {
+    Status status;
+    VisitPlainxREEFilterOutputSegments(
+        filter, true, null_selection,
+        [&status, emit_segment = std::move(emit_segment)](
+            int64_t position, int64_t segment_length, bool filter_valid) {
+          DCHECK(filter_valid);
+          status = emit_segment(position, segment_length);
+          return status.ok();
+        });
+    RETURN_NOT_OK(std::move(status));
+  } else {
+    const auto filter_data = filter.buffers[1].data;
+    RETURN_NOT_OK(arrow::internal::VisitSetBitRuns(
+        filter_data, filter.offset, filter.length, std::move(emit_segment)));
+  }
 
   offset_builder.UnsafeAppend(offset);
   out->length = output_length;
   RETURN_NOT_OK(offset_builder.Finish(&out->buffers[1]));
   return data_builder.Finish(&out->buffers[2]);
 }
 
-template <typename Type>
+template <typename ArrowType>
 Status BinaryFilterImpl(KernelContext* ctx, const ArraySpan& values,
                         const ArraySpan& filter, int64_t output_length,
                         FilterOptions::NullSelectionBehavior null_selection,
                         ArrayData* out) {
-  using offset_type = typename Type::offset_type;
+  using offset_type = typename ArrowType::offset_type;
+
+  const bool is_ree_filter = filter.type->id() == Type::RUN_END_ENCODED;
 
-  const auto filter_data = filter.buffers[1].data;
-  const uint8_t* filter_is_valid = filter.buffers[0].data;
-  const int64_t filter_offset = filter.offset;
+  BINARY_FILTER_SETUP_COMMON();
 
   const uint8_t* values_is_valid = values.buffers[0].data;
   const int64_t values_offset = values.offset;
 
+  const int64_t out_offset = out->offset;
   uint8_t* out_is_valid = out->buffers[0]->mutable_data();
   // Zero bits and then only have to set valid values to true
-  bit_util::SetBitsTo(out_is_valid, 0, output_length, false);
-
-  // We use 3 block counters for fast scanning of the filter
-  //
-  // * values_valid_counter: for values null/not-null
-  // * filter_valid_counter: for filter null/not-null
-  // * filter_counter: for filter true/false
-  OptionalBitBlockCounter values_valid_counter(values_is_valid, values_offset,
-                                               values.length);
-  OptionalBitBlockCounter filter_valid_counter(filter_is_valid, filter_offset,
-                                               filter.length);
-  BitBlockCounter filter_counter(filter_data, filter_offset, filter.length);
-
-  BINARY_FILTER_SETUP_COMMON();
+  bit_util::SetBitsTo(out_is_valid, out_offset, output_length, false);
 
   int64_t in_position = 0;
   int64_t out_position = 0;
-  while (in_position < filter.length) {
-    BitBlockCount filter_valid_block = filter_valid_counter.NextWord();
-    BitBlockCount values_valid_block = values_valid_counter.NextWord();
-    BitBlockCount filter_block = filter_counter.NextWord();
-    if (filter_block.NoneSet() && null_selection == FilterOptions::DROP) {
-      // For this exceedingly common case in low-selectivity filters we can
-      // skip further analysis of the data and move on to the next block.
-      in_position += filter_block.length;
-    } else if (filter_valid_block.AllSet()) {
-      // Simpler path: no filter values are null
-      if (filter_block.AllSet()) {
-        // Fastest path: filter values are all true and not null
-        if (values_valid_block.AllSet()) {
-          // The values aren't null either
-          bit_util::SetBitsTo(out_is_valid, out_position, filter_block.length, true);
-
-          // Bulk-append raw data
-          offset_type block_data_bytes =
-              (raw_offsets[in_position + filter_block.length] - raw_offsets[in_position]);
-          APPEND_RAW_DATA(raw_data + raw_offsets[in_position], block_data_bytes);
-          // Append offsets
-          for (int64_t i = 0; i < filter_block.length; ++i, ++in_position) {
-            offset_builder.UnsafeAppend(offset);
-            offset += raw_offsets[in_position + 1] - raw_offsets[in_position];
-          }
-          out_position += filter_block.length;
-        } else {
-          // Some of the values in this block are null
-          for (int64_t i = 0; i < filter_block.length;
-               ++i, ++in_position, ++out_position) {
-            offset_builder.UnsafeAppend(offset);
-            if (bit_util::GetBit(values_is_valid, values_offset + in_position)) {
-              bit_util::SetBit(out_is_valid, out_position);
-              APPEND_SINGLE_VALUE();
-            }
+  if (is_ree_filter) {
+    auto emit_segment = [&](int64_t position, int64_t segment_length, bool filter_valid) {
+      in_position = position;
+      if (filter_valid) {
+        // Filter values are all true and not null
+        // Some of the values in the block may be null
+        for (int64_t i = 0; i < segment_length; ++i, ++in_position, ++out_position) {
+          offset_builder.UnsafeAppend(offset);
+          if (bit_util::GetBit(values_is_valid, values_offset + in_position)) {
+            bit_util::SetBit(out_is_valid, out_offset + out_position);
+            APPEND_SINGLE_VALUE();
           }
         }
-      } else {  // !filter_block.AllSet()
-        // Some of the filter values are false, but all not null
-        if (values_valid_block.AllSet()) {
-          // All the values are not-null, so we can skip null checking for
-          // them
-          for (int64_t i = 0; i < filter_block.length; ++i, ++in_position) {
-            if (bit_util::GetBit(filter_data, filter_offset + in_position)) {
+      } else {
+        offset_builder.UnsafeAppend(segment_length, offset);
+        out_position += segment_length;
+      }
+      return Status::OK();
+    };
+    Status status;
+    VisitPlainxREEFilterOutputSegments(
+        filter, true, null_selection,

Review Comment:
   I reviewed all calls to `VisitPlainxREEFilterOutputSegments` and added the label.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] felipecrv commented on a diff in pull request #35750: GH-35749: [C++] Handle run-end encoded filters in compute kernels

Posted by "felipecrv (via GitHub)" <gi...@apache.org>.
felipecrv commented on code in PR #35750:
URL: https://github.com/apache/arrow/pull/35750#discussion_r1218603980


##########
cpp/src/arrow/compute/kernels/vector_selection_filter_internal.cc:
##########
@@ -444,48 +503,64 @@ Status PrimitiveFilterExec(KernelContext* ctx, const ExecSpan& batch, ExecResult
 
 // Optimized binary filter for the case where neither values nor filter have
 // nulls
-template <typename Type>
+template <typename ArrowType>
 Status BinaryFilterNonNullImpl(KernelContext* ctx, const ArraySpan& values,
                                const ArraySpan& filter, int64_t output_length,
                                FilterOptions::NullSelectionBehavior null_selection,
                                ArrayData* out) {
-  using offset_type = typename Type::offset_type;
-  const auto filter_data = filter.buffers[1].data;
+  using offset_type = typename ArrowType::offset_type;
+  const bool is_ree_filter = filter.type->id() == Type::RUN_END_ENCODED;
 
   BINARY_FILTER_SETUP_COMMON();
 
-  RETURN_NOT_OK(arrow::internal::VisitSetBitRuns(
-      filter_data, filter.offset, filter.length, [&](int64_t position, int64_t length) {
-        // Bulk-append raw data
-        const offset_type run_data_bytes =
-            (raw_offsets[position + length] - raw_offsets[position]);
-        APPEND_RAW_DATA(raw_data + raw_offsets[position], run_data_bytes);
-        // Append offsets
-        offset_type cur_offset = raw_offsets[position];
-        for (int64_t i = 0; i < length; ++i) {
-          offset_builder.UnsafeAppend(offset);
-          offset += raw_offsets[i + position + 1] - cur_offset;
-          cur_offset = raw_offsets[i + position + 1];
-        }
-        return Status::OK();
-      }));
+  auto emit_segment = [&](int64_t position, int64_t length) {
+    // Bulk-append raw data
+    const offset_type run_data_bytes =
+        (raw_offsets[position + length] - raw_offsets[position]);
+    APPEND_RAW_DATA(raw_data + raw_offsets[position], run_data_bytes);
+    // Append offsets
+    offset_type cur_offset = raw_offsets[position];
+    for (int64_t i = 0; i < length; ++i) {
+      offset_builder.UnsafeAppend(offset);
+      offset += raw_offsets[i + position + 1] - cur_offset;
+      cur_offset = raw_offsets[i + position + 1];

Review Comment:
   I can change. But note that this is existing code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou merged pull request #35750: GH-35749: [C++] Handle run-end encoded filters in compute kernels

Posted by "pitrou (via GitHub)" <gi...@apache.org>.
pitrou merged PR #35750:
URL: https://github.com/apache/arrow/pull/35750


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] felipecrv commented on a diff in pull request #35750: GH-35749: [C++] Handle run-end encoded filters in compute kernels

Posted by "felipecrv (via GitHub)" <gi...@apache.org>.
felipecrv commented on code in PR #35750:
URL: https://github.com/apache/arrow/pull/35750#discussion_r1218580958


##########
cpp/src/arrow/compute/kernels/vector_selection_filter_internal.cc:
##########
@@ -161,39 +173,88 @@ class PrimitiveFilterImpl {
       out_is_valid_ = out_arr->buffers[0]->mutable_data();
     }
     out_data_ = reinterpret_cast<T*>(out_arr->buffers[1]->mutable_data());
-    out_offset_ = out_arr->offset;
     out_length_ = out_arr->length;
+    DCHECK_EQ(out_arr->offset, 0);

Review Comment:
   Now that I learned about `Kernel::can_write_into_slices`, I must confess I'm not. I will undo this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] felipecrv commented on a diff in pull request #35750: GH-35749: [C++] Handle run-end encoded filters in compute kernels

Posted by "felipecrv (via GitHub)" <gi...@apache.org>.
felipecrv commented on code in PR #35750:
URL: https://github.com/apache/arrow/pull/35750#discussion_r1230034870


##########
cpp/src/arrow/compute/kernels/vector_selection_filter_internal.cc:
##########
@@ -444,196 +518,244 @@ Status PrimitiveFilterExec(KernelContext* ctx, const ExecSpan& batch, ExecResult
 
 // Optimized binary filter for the case where neither values nor filter have
 // nulls
-template <typename Type>
+template <typename ArrowType>
 Status BinaryFilterNonNullImpl(KernelContext* ctx, const ArraySpan& values,
                                const ArraySpan& filter, int64_t output_length,
                                FilterOptions::NullSelectionBehavior null_selection,
                                ArrayData* out) {
-  using offset_type = typename Type::offset_type;
-  const auto filter_data = filter.buffers[1].data;
+  using offset_type = typename ArrowType::offset_type;
+  const bool is_ree_filter = filter.type->id() == Type::RUN_END_ENCODED;
 
   BINARY_FILTER_SETUP_COMMON();
 
-  RETURN_NOT_OK(arrow::internal::VisitSetBitRuns(
-      filter_data, filter.offset, filter.length, [&](int64_t position, int64_t length) {
-        // Bulk-append raw data
-        const offset_type run_data_bytes =
-            (raw_offsets[position + length] - raw_offsets[position]);
-        APPEND_RAW_DATA(raw_data + raw_offsets[position], run_data_bytes);
-        // Append offsets
-        offset_type cur_offset = raw_offsets[position];
-        for (int64_t i = 0; i < length; ++i) {
-          offset_builder.UnsafeAppend(offset);
-          offset += raw_offsets[i + position + 1] - cur_offset;
-          cur_offset = raw_offsets[i + position + 1];
-        }
-        return Status::OK();
-      }));
+  auto emit_segment = [&](int64_t position, int64_t length) {
+    // Bulk-append raw data
+    const offset_type run_data_bytes =
+        (raw_offsets[position + length] - raw_offsets[position]);
+    APPEND_RAW_DATA(raw_data + raw_offsets[position], run_data_bytes);
+    // Append offsets
+    for (int64_t i = 0; i < length; ++i) {
+      offset_builder.UnsafeAppend(offset);
+      offset += raw_offsets[i + position + 1] - raw_offsets[i + position + 1];
+    }
+    return Status::OK();
+  };
+  if (is_ree_filter) {
+    Status status;
+    VisitPlainxREEFilterOutputSegments(
+        filter, true, null_selection,
+        [&status, emit_segment = std::move(emit_segment)](
+            int64_t position, int64_t segment_length, bool filter_valid) {
+          DCHECK(filter_valid);
+          status = emit_segment(position, segment_length);
+          return status.ok();
+        });
+    RETURN_NOT_OK(std::move(status));
+  } else {
+    const auto filter_data = filter.buffers[1].data;
+    RETURN_NOT_OK(arrow::internal::VisitSetBitRuns(
+        filter_data, filter.offset, filter.length, std::move(emit_segment)));
+  }
 
   offset_builder.UnsafeAppend(offset);
   out->length = output_length;
   RETURN_NOT_OK(offset_builder.Finish(&out->buffers[1]));
   return data_builder.Finish(&out->buffers[2]);
 }
 
-template <typename Type>
+template <typename ArrowType>
 Status BinaryFilterImpl(KernelContext* ctx, const ArraySpan& values,
                         const ArraySpan& filter, int64_t output_length,
                         FilterOptions::NullSelectionBehavior null_selection,
                         ArrayData* out) {
-  using offset_type = typename Type::offset_type;
+  using offset_type = typename ArrowType::offset_type;
+
+  const bool is_ree_filter = filter.type->id() == Type::RUN_END_ENCODED;
 
-  const auto filter_data = filter.buffers[1].data;
-  const uint8_t* filter_is_valid = filter.buffers[0].data;
-  const int64_t filter_offset = filter.offset;
+  BINARY_FILTER_SETUP_COMMON();
 
   const uint8_t* values_is_valid = values.buffers[0].data;
   const int64_t values_offset = values.offset;
 
+  const int64_t out_offset = out->offset;
   uint8_t* out_is_valid = out->buffers[0]->mutable_data();
   // Zero bits and then only have to set valid values to true
-  bit_util::SetBitsTo(out_is_valid, 0, output_length, false);
-
-  // We use 3 block counters for fast scanning of the filter
-  //
-  // * values_valid_counter: for values null/not-null
-  // * filter_valid_counter: for filter null/not-null
-  // * filter_counter: for filter true/false
-  OptionalBitBlockCounter values_valid_counter(values_is_valid, values_offset,
-                                               values.length);
-  OptionalBitBlockCounter filter_valid_counter(filter_is_valid, filter_offset,
-                                               filter.length);
-  BitBlockCounter filter_counter(filter_data, filter_offset, filter.length);
-
-  BINARY_FILTER_SETUP_COMMON();
+  bit_util::SetBitsTo(out_is_valid, out_offset, output_length, false);
 
   int64_t in_position = 0;
   int64_t out_position = 0;
-  while (in_position < filter.length) {
-    BitBlockCount filter_valid_block = filter_valid_counter.NextWord();
-    BitBlockCount values_valid_block = values_valid_counter.NextWord();
-    BitBlockCount filter_block = filter_counter.NextWord();
-    if (filter_block.NoneSet() && null_selection == FilterOptions::DROP) {
-      // For this exceedingly common case in low-selectivity filters we can
-      // skip further analysis of the data and move on to the next block.
-      in_position += filter_block.length;
-    } else if (filter_valid_block.AllSet()) {
-      // Simpler path: no filter values are null
-      if (filter_block.AllSet()) {
-        // Fastest path: filter values are all true and not null
-        if (values_valid_block.AllSet()) {
-          // The values aren't null either
-          bit_util::SetBitsTo(out_is_valid, out_position, filter_block.length, true);
-
-          // Bulk-append raw data
-          offset_type block_data_bytes =
-              (raw_offsets[in_position + filter_block.length] - raw_offsets[in_position]);
-          APPEND_RAW_DATA(raw_data + raw_offsets[in_position], block_data_bytes);
-          // Append offsets
-          for (int64_t i = 0; i < filter_block.length; ++i, ++in_position) {
-            offset_builder.UnsafeAppend(offset);
-            offset += raw_offsets[in_position + 1] - raw_offsets[in_position];
-          }
-          out_position += filter_block.length;
-        } else {
-          // Some of the values in this block are null
-          for (int64_t i = 0; i < filter_block.length;
-               ++i, ++in_position, ++out_position) {
-            offset_builder.UnsafeAppend(offset);
-            if (bit_util::GetBit(values_is_valid, values_offset + in_position)) {
-              bit_util::SetBit(out_is_valid, out_position);
-              APPEND_SINGLE_VALUE();
-            }
+  if (is_ree_filter) {
+    auto emit_segment = [&](int64_t position, int64_t segment_length, bool filter_valid) {
+      in_position = position;
+      if (filter_valid) {
+        // Filter values are all true and not null
+        // Some of the values in the block may be null
+        for (int64_t i = 0; i < segment_length; ++i, ++in_position, ++out_position) {
+          offset_builder.UnsafeAppend(offset);
+          if (bit_util::GetBit(values_is_valid, values_offset + in_position)) {
+            bit_util::SetBit(out_is_valid, out_offset + out_position);
+            APPEND_SINGLE_VALUE();
           }
         }
-      } else {  // !filter_block.AllSet()
-        // Some of the filter values are false, but all not null
-        if (values_valid_block.AllSet()) {
-          // All the values are not-null, so we can skip null checking for
-          // them
-          for (int64_t i = 0; i < filter_block.length; ++i, ++in_position) {
-            if (bit_util::GetBit(filter_data, filter_offset + in_position)) {
+      } else {
+        offset_builder.UnsafeAppend(segment_length, offset);
+        out_position += segment_length;
+      }
+      return Status::OK();
+    };
+    Status status;
+    VisitPlainxREEFilterOutputSegments(
+        filter, true, null_selection,
+        [&status, emit_segment = std::move(emit_segment)](
+            int64_t position, int64_t segment_length, bool filter_valid) {
+          status = emit_segment(position, segment_length, filter_valid);
+          return status.ok();
+        });
+    RETURN_NOT_OK(std::move(status));

Review Comment:
   `GenericToStatus` takes an r-value and that avoids code-bloat -- no need to generate memory allocation code to copy the `Status` string. This is multiplied by the number of template instances.
   
   ```cpp
     #define ARROW_RETURN_NOT_OK(status)                                   \
       do {                                                                \
         ::arrow::Status __s = ::arrow::internal::GenericToStatus(status); \
         ARROW_RETURN_IF_(!__s.ok(), __s, ARROW_STRINGIFY(status));        \
       } while (false)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] conbench-apache-arrow[bot] commented on pull request #35750: GH-35749: [C++] Handle run-end encoded filters in compute kernels

Posted by "conbench-apache-arrow[bot] (via GitHub)" <gi...@apache.org>.
conbench-apache-arrow[bot] commented on PR #35750:
URL: https://github.com/apache/arrow/pull/35750#issuecomment-1595645401

   Conbench analyzed the 6 benchmark runs on commit `475b5b94`.
   
   There were 30 benchmark results indicating a performance regression:
   
   - Commit Run on `arm64-m6g-linux-compute` at [2023-06-15 20:50:30Z](http://conbench.ursa.dev/compare/runs/7620dc85aa1441a9b6a3276c8f97684a...75128b0c6566429fb5081a5033959697/)
     - [params=1048576/6, source=cpp-micro, suite=arrow-compute-vector-selection-benchmark](http://conbench.ursa.dev/compare/benchmarks/0648b39e3d517ec68000c8a6cb127cbd...0648b799c9f674c38000814ec420c2ca)
     - [params=1048576/1, source=cpp-micro, suite=arrow-compute-vector-selection-benchmark](http://conbench.ursa.dev/compare/benchmarks/0648b39e307579e680008ec49772b4be...0648b799bbee7ba580003ac6cffb1659)
   - and 28 more (see the report linked below)
   
   The [full Conbench report](https://github.com/apache/arrow/runs/14340051751) has more details.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on a diff in pull request #35750: GH-35749: [C++] Handle run-end encoded filters in compute kernels

Posted by "pitrou (via GitHub)" <gi...@apache.org>.
pitrou commented on code in PR #35750:
URL: https://github.com/apache/arrow/pull/35750#discussion_r1230888137


##########
cpp/src/arrow/compute/kernels/vector_selection_filter_internal.cc:
##########
@@ -444,196 +518,244 @@ Status PrimitiveFilterExec(KernelContext* ctx, const ExecSpan& batch, ExecResult
 
 // Optimized binary filter for the case where neither values nor filter have
 // nulls
-template <typename Type>
+template <typename ArrowType>
 Status BinaryFilterNonNullImpl(KernelContext* ctx, const ArraySpan& values,
                                const ArraySpan& filter, int64_t output_length,
                                FilterOptions::NullSelectionBehavior null_selection,
                                ArrayData* out) {
-  using offset_type = typename Type::offset_type;
-  const auto filter_data = filter.buffers[1].data;
+  using offset_type = typename ArrowType::offset_type;
+  const bool is_ree_filter = filter.type->id() == Type::RUN_END_ENCODED;
 
   BINARY_FILTER_SETUP_COMMON();
 
-  RETURN_NOT_OK(arrow::internal::VisitSetBitRuns(
-      filter_data, filter.offset, filter.length, [&](int64_t position, int64_t length) {
-        // Bulk-append raw data
-        const offset_type run_data_bytes =
-            (raw_offsets[position + length] - raw_offsets[position]);
-        APPEND_RAW_DATA(raw_data + raw_offsets[position], run_data_bytes);
-        // Append offsets
-        offset_type cur_offset = raw_offsets[position];
-        for (int64_t i = 0; i < length; ++i) {
-          offset_builder.UnsafeAppend(offset);
-          offset += raw_offsets[i + position + 1] - cur_offset;
-          cur_offset = raw_offsets[i + position + 1];
-        }
-        return Status::OK();
-      }));
+  auto emit_segment = [&](int64_t position, int64_t length) {
+    // Bulk-append raw data
+    const offset_type run_data_bytes =
+        (raw_offsets[position + length] - raw_offsets[position]);
+    APPEND_RAW_DATA(raw_data + raw_offsets[position], run_data_bytes);
+    // Append offsets
+    for (int64_t i = 0; i < length; ++i) {
+      offset_builder.UnsafeAppend(offset);
+      offset += raw_offsets[i + position + 1] - raw_offsets[i + position + 1];
+    }
+    return Status::OK();
+  };
+  if (is_ree_filter) {
+    Status status;
+    VisitPlainxREEFilterOutputSegments(
+        filter, true, null_selection,
+        [&status, emit_segment = std::move(emit_segment)](
+            int64_t position, int64_t segment_length, bool filter_valid) {
+          DCHECK(filter_valid);
+          status = emit_segment(position, segment_length);
+          return status.ok();
+        });
+    RETURN_NOT_OK(std::move(status));
+  } else {
+    const auto filter_data = filter.buffers[1].data;
+    RETURN_NOT_OK(arrow::internal::VisitSetBitRuns(
+        filter_data, filter.offset, filter.length, std::move(emit_segment)));
+  }
 
   offset_builder.UnsafeAppend(offset);
   out->length = output_length;
   RETURN_NOT_OK(offset_builder.Finish(&out->buffers[1]));
   return data_builder.Finish(&out->buffers[2]);
 }
 
-template <typename Type>
+template <typename ArrowType>
 Status BinaryFilterImpl(KernelContext* ctx, const ArraySpan& values,
                         const ArraySpan& filter, int64_t output_length,
                         FilterOptions::NullSelectionBehavior null_selection,
                         ArrayData* out) {
-  using offset_type = typename Type::offset_type;
+  using offset_type = typename ArrowType::offset_type;
+
+  const bool is_ree_filter = filter.type->id() == Type::RUN_END_ENCODED;
 
-  const auto filter_data = filter.buffers[1].data;
-  const uint8_t* filter_is_valid = filter.buffers[0].data;
-  const int64_t filter_offset = filter.offset;
+  BINARY_FILTER_SETUP_COMMON();
 
   const uint8_t* values_is_valid = values.buffers[0].data;
   const int64_t values_offset = values.offset;
 
+  const int64_t out_offset = out->offset;
   uint8_t* out_is_valid = out->buffers[0]->mutable_data();
   // Zero bits and then only have to set valid values to true
-  bit_util::SetBitsTo(out_is_valid, 0, output_length, false);
-
-  // We use 3 block counters for fast scanning of the filter
-  //
-  // * values_valid_counter: for values null/not-null
-  // * filter_valid_counter: for filter null/not-null
-  // * filter_counter: for filter true/false
-  OptionalBitBlockCounter values_valid_counter(values_is_valid, values_offset,
-                                               values.length);
-  OptionalBitBlockCounter filter_valid_counter(filter_is_valid, filter_offset,
-                                               filter.length);
-  BitBlockCounter filter_counter(filter_data, filter_offset, filter.length);
-
-  BINARY_FILTER_SETUP_COMMON();
+  bit_util::SetBitsTo(out_is_valid, out_offset, output_length, false);
 
   int64_t in_position = 0;
   int64_t out_position = 0;
-  while (in_position < filter.length) {
-    BitBlockCount filter_valid_block = filter_valid_counter.NextWord();
-    BitBlockCount values_valid_block = values_valid_counter.NextWord();
-    BitBlockCount filter_block = filter_counter.NextWord();
-    if (filter_block.NoneSet() && null_selection == FilterOptions::DROP) {
-      // For this exceedingly common case in low-selectivity filters we can
-      // skip further analysis of the data and move on to the next block.
-      in_position += filter_block.length;
-    } else if (filter_valid_block.AllSet()) {
-      // Simpler path: no filter values are null
-      if (filter_block.AllSet()) {
-        // Fastest path: filter values are all true and not null
-        if (values_valid_block.AllSet()) {
-          // The values aren't null either
-          bit_util::SetBitsTo(out_is_valid, out_position, filter_block.length, true);
-
-          // Bulk-append raw data
-          offset_type block_data_bytes =
-              (raw_offsets[in_position + filter_block.length] - raw_offsets[in_position]);
-          APPEND_RAW_DATA(raw_data + raw_offsets[in_position], block_data_bytes);
-          // Append offsets
-          for (int64_t i = 0; i < filter_block.length; ++i, ++in_position) {
-            offset_builder.UnsafeAppend(offset);
-            offset += raw_offsets[in_position + 1] - raw_offsets[in_position];
-          }
-          out_position += filter_block.length;
-        } else {
-          // Some of the values in this block are null
-          for (int64_t i = 0; i < filter_block.length;
-               ++i, ++in_position, ++out_position) {
-            offset_builder.UnsafeAppend(offset);
-            if (bit_util::GetBit(values_is_valid, values_offset + in_position)) {
-              bit_util::SetBit(out_is_valid, out_position);
-              APPEND_SINGLE_VALUE();
-            }
+  if (is_ree_filter) {
+    auto emit_segment = [&](int64_t position, int64_t segment_length, bool filter_valid) {
+      in_position = position;
+      if (filter_valid) {
+        // Filter values are all true and not null
+        // Some of the values in the block may be null
+        for (int64_t i = 0; i < segment_length; ++i, ++in_position, ++out_position) {
+          offset_builder.UnsafeAppend(offset);
+          if (bit_util::GetBit(values_is_valid, values_offset + in_position)) {
+            bit_util::SetBit(out_is_valid, out_offset + out_position);
+            APPEND_SINGLE_VALUE();
           }
         }
-      } else {  // !filter_block.AllSet()
-        // Some of the filter values are false, but all not null
-        if (values_valid_block.AllSet()) {
-          // All the values are not-null, so we can skip null checking for
-          // them
-          for (int64_t i = 0; i < filter_block.length; ++i, ++in_position) {
-            if (bit_util::GetBit(filter_data, filter_offset + in_position)) {
+      } else {
+        offset_builder.UnsafeAppend(segment_length, offset);
+        out_position += segment_length;
+      }
+      return Status::OK();
+    };
+    Status status;
+    VisitPlainxREEFilterOutputSegments(
+        filter, true, null_selection,
+        [&status, emit_segment = std::move(emit_segment)](
+            int64_t position, int64_t segment_length, bool filter_valid) {
+          status = emit_segment(position, segment_length, filter_valid);
+          return status.ok();
+        });
+    RETURN_NOT_OK(std::move(status));

Review Comment:
   No need to remove it. I'm merely pointing out that it's not necessary to micro-optimize this particular end of function :-)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] felipecrv commented on a diff in pull request #35750: GH-35749: [C++] Handle run-end encoded filters in compute kernels

Posted by "felipecrv (via GitHub)" <gi...@apache.org>.
felipecrv commented on code in PR #35750:
URL: https://github.com/apache/arrow/pull/35750#discussion_r1218555851


##########
cpp/src/arrow/compute/kernels/vector_selection_internal.cc:
##########
@@ -82,6 +85,89 @@ Status PreallocatePrimitiveArrayData(KernelContext* ctx, int64_t length, int bit
 
 namespace {
 
+/// \brief Iterate over a REE filter, emitting ranges of a plain values array that
+/// would pass the filter.
+///
+/// Differently from REExREE, and REExPlain filtering, PlainxREE filtering
+/// does not produce a REE output, but rather a plain output array. As such it's
+/// much simpler.
+///
+/// \param filter_may_have_nulls Only pass false if you know the filter has no nulls.
+template <typename FilterRunEndType>
+void VisitPlainxREEFilterOutputSegmentsImpl(
+    const ArraySpan& filter, bool filter_may_have_nulls,
+    FilterOptions::NullSelectionBehavior null_selection,
+    const EmitREEFilterSegment& emit_segment) {
+  using FilterRunEndCType = typename FilterRunEndType::c_type;
+  const ArraySpan& filter_values = arrow::ree_util::ValuesArray(filter);
+  const int64_t filter_values_offset = filter_values.offset;
+  const uint8_t* filter_is_valid = filter_values.buffers[0].data;
+  const uint8_t* filter_selection = filter_values.buffers[1].data;
+  filter_may_have_nulls = filter_may_have_nulls && filter_is_valid != NULLPTR &&

Review Comment:
   That's a bit trickier to stick to. Can't we use NULLPTR everywhere? It simplifies code movement.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on a diff in pull request #35750: GH-35749: [C++] Handle run-end encoded filters in compute kernels

Posted by "pitrou (via GitHub)" <gi...@apache.org>.
pitrou commented on code in PR #35750:
URL: https://github.com/apache/arrow/pull/35750#discussion_r1218591060


##########
cpp/src/arrow/compute/kernels/vector_selection_filter_internal.cc:
##########
@@ -161,39 +173,88 @@ class PrimitiveFilterImpl {
       out_is_valid_ = out_arr->buffers[0]->mutable_data();
     }
     out_data_ = reinterpret_cast<T*>(out_arr->buffers[1]->mutable_data());
-    out_offset_ = out_arr->offset;
     out_length_ = out_arr->length;
+    DCHECK_EQ(out_arr->offset, 0);
     out_position_ = 0;
   }
 
-  void ExecNonNull() {
-    // Fast filter when values and filter are not null
-    ::arrow::internal::VisitSetBitRunsVoid(
-        filter_data_, filter_offset_, values_length_,
-        [&](int64_t position, int64_t length) { WriteValueSegment(position, length); });
+  void ExecREEFilter() {
+    if (filter_.child_data[1].null_count == 0 && values_null_count_ == 0) {

Review Comment:
   That might be the case indeed, though computing the null count should be extremely fast (it's a stream of popcount instructions).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] felipecrv commented on a diff in pull request #35750: GH-35749: [C++] Handle run-end encoded filters in compute kernels

Posted by "felipecrv (via GitHub)" <gi...@apache.org>.
felipecrv commented on code in PR #35750:
URL: https://github.com/apache/arrow/pull/35750#discussion_r1218699961


##########
cpp/src/arrow/compute/kernels/vector_selection_filter_internal.cc:
##########
@@ -161,39 +173,88 @@ class PrimitiveFilterImpl {
       out_is_valid_ = out_arr->buffers[0]->mutable_data();
     }
     out_data_ = reinterpret_cast<T*>(out_arr->buffers[1]->mutable_data());
-    out_offset_ = out_arr->offset;
     out_length_ = out_arr->length;
+    DCHECK_EQ(out_arr->offset, 0);

Review Comment:
   Note that in reverting my simplification and reviewing the writes I added `+ out_offset_` that didn't exist before. So this didn't really work for non-zero `out->offset` initially.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] felipecrv commented on a diff in pull request #35750: GH-35749: [C++] Handle run-end encoded filters in compute kernels

Posted by "felipecrv (via GitHub)" <gi...@apache.org>.
felipecrv commented on code in PR #35750:
URL: https://github.com/apache/arrow/pull/35750#discussion_r1218610215


##########
cpp/src/arrow/compute/kernels/vector_selection_internal.cc:
##########
@@ -239,6 +309,43 @@ struct Selection {
       }
     };
 
+    if (is_ree_filter) {
+      Status status;

Review Comment:
   When I started with the numeric kernels there was no way for `emit_segment` to fail. Making it return `Status` will create overhead in `VisitPlainxREEFilterOutputSegments` when it's dealing with primitives.
   
   Should I worry about the overhead of checking for the status returned by `std::function` in the context of primitive kernels?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] felipecrv commented on a diff in pull request #35750: GH-35749: [C++] Handle run-end encoded filters in compute kernels

Posted by "felipecrv (via GitHub)" <gi...@apache.org>.
felipecrv commented on code in PR #35750:
URL: https://github.com/apache/arrow/pull/35750#discussion_r1224600961


##########
cpp/src/arrow/compute/kernels/vector_selection_internal.cc:
##########
@@ -239,6 +309,43 @@ struct Selection {
       }
     };
 
+    if (is_ree_filter) {
+      Status status;

Review Comment:
   I added a `bool` return type that I check in the `VisitPlainxREEFilterOutputSegments` loop.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] felipecrv commented on a diff in pull request #35750: GH-35749: [C++] Handle run-end encoded filters in compute kernels

Posted by "felipecrv (via GitHub)" <gi...@apache.org>.
felipecrv commented on code in PR #35750:
URL: https://github.com/apache/arrow/pull/35750#discussion_r1230036933


##########
cpp/src/arrow/compute/kernels/vector_selection_filter_internal.cc:
##########
@@ -444,196 +518,244 @@ Status PrimitiveFilterExec(KernelContext* ctx, const ExecSpan& batch, ExecResult
 
 // Optimized binary filter for the case where neither values nor filter have
 // nulls
-template <typename Type>
+template <typename ArrowType>
 Status BinaryFilterNonNullImpl(KernelContext* ctx, const ArraySpan& values,
                                const ArraySpan& filter, int64_t output_length,
                                FilterOptions::NullSelectionBehavior null_selection,
                                ArrayData* out) {
-  using offset_type = typename Type::offset_type;
-  const auto filter_data = filter.buffers[1].data;
+  using offset_type = typename ArrowType::offset_type;
+  const bool is_ree_filter = filter.type->id() == Type::RUN_END_ENCODED;
 
   BINARY_FILTER_SETUP_COMMON();
 
-  RETURN_NOT_OK(arrow::internal::VisitSetBitRuns(
-      filter_data, filter.offset, filter.length, [&](int64_t position, int64_t length) {
-        // Bulk-append raw data
-        const offset_type run_data_bytes =
-            (raw_offsets[position + length] - raw_offsets[position]);
-        APPEND_RAW_DATA(raw_data + raw_offsets[position], run_data_bytes);
-        // Append offsets
-        offset_type cur_offset = raw_offsets[position];
-        for (int64_t i = 0; i < length; ++i) {
-          offset_builder.UnsafeAppend(offset);
-          offset += raw_offsets[i + position + 1] - cur_offset;
-          cur_offset = raw_offsets[i + position + 1];
-        }
-        return Status::OK();
-      }));
+  auto emit_segment = [&](int64_t position, int64_t length) {
+    // Bulk-append raw data
+    const offset_type run_data_bytes =
+        (raw_offsets[position + length] - raw_offsets[position]);
+    APPEND_RAW_DATA(raw_data + raw_offsets[position], run_data_bytes);
+    // Append offsets
+    for (int64_t i = 0; i < length; ++i) {
+      offset_builder.UnsafeAppend(offset);
+      offset += raw_offsets[i + position + 1] - raw_offsets[i + position + 1];
+    }
+    return Status::OK();
+  };
+  if (is_ree_filter) {
+    Status status;
+    VisitPlainxREEFilterOutputSegments(
+        filter, true, null_selection,
+        [&status, emit_segment = std::move(emit_segment)](
+            int64_t position, int64_t segment_length, bool filter_valid) {
+          DCHECK(filter_valid);
+          status = emit_segment(position, segment_length);
+          return status.ok();
+        });
+    RETURN_NOT_OK(std::move(status));
+  } else {
+    const auto filter_data = filter.buffers[1].data;
+    RETURN_NOT_OK(arrow::internal::VisitSetBitRuns(
+        filter_data, filter.offset, filter.length, std::move(emit_segment)));
+  }
 
   offset_builder.UnsafeAppend(offset);
   out->length = output_length;
   RETURN_NOT_OK(offset_builder.Finish(&out->buffers[1]));
   return data_builder.Finish(&out->buffers[2]);
 }
 
-template <typename Type>
+template <typename ArrowType>
 Status BinaryFilterImpl(KernelContext* ctx, const ArraySpan& values,
                         const ArraySpan& filter, int64_t output_length,
                         FilterOptions::NullSelectionBehavior null_selection,
                         ArrayData* out) {
-  using offset_type = typename Type::offset_type;
+  using offset_type = typename ArrowType::offset_type;
+
+  const bool is_ree_filter = filter.type->id() == Type::RUN_END_ENCODED;
 
-  const auto filter_data = filter.buffers[1].data;
-  const uint8_t* filter_is_valid = filter.buffers[0].data;
-  const int64_t filter_offset = filter.offset;
+  BINARY_FILTER_SETUP_COMMON();
 
   const uint8_t* values_is_valid = values.buffers[0].data;
   const int64_t values_offset = values.offset;
 
+  const int64_t out_offset = out->offset;
   uint8_t* out_is_valid = out->buffers[0]->mutable_data();
   // Zero bits and then only have to set valid values to true
-  bit_util::SetBitsTo(out_is_valid, 0, output_length, false);
-
-  // We use 3 block counters for fast scanning of the filter
-  //
-  // * values_valid_counter: for values null/not-null
-  // * filter_valid_counter: for filter null/not-null
-  // * filter_counter: for filter true/false
-  OptionalBitBlockCounter values_valid_counter(values_is_valid, values_offset,
-                                               values.length);
-  OptionalBitBlockCounter filter_valid_counter(filter_is_valid, filter_offset,
-                                               filter.length);
-  BitBlockCounter filter_counter(filter_data, filter_offset, filter.length);
-
-  BINARY_FILTER_SETUP_COMMON();
+  bit_util::SetBitsTo(out_is_valid, out_offset, output_length, false);
 
   int64_t in_position = 0;
   int64_t out_position = 0;
-  while (in_position < filter.length) {
-    BitBlockCount filter_valid_block = filter_valid_counter.NextWord();
-    BitBlockCount values_valid_block = values_valid_counter.NextWord();
-    BitBlockCount filter_block = filter_counter.NextWord();
-    if (filter_block.NoneSet() && null_selection == FilterOptions::DROP) {
-      // For this exceedingly common case in low-selectivity filters we can
-      // skip further analysis of the data and move on to the next block.
-      in_position += filter_block.length;
-    } else if (filter_valid_block.AllSet()) {
-      // Simpler path: no filter values are null
-      if (filter_block.AllSet()) {
-        // Fastest path: filter values are all true and not null
-        if (values_valid_block.AllSet()) {
-          // The values aren't null either
-          bit_util::SetBitsTo(out_is_valid, out_position, filter_block.length, true);
-
-          // Bulk-append raw data
-          offset_type block_data_bytes =
-              (raw_offsets[in_position + filter_block.length] - raw_offsets[in_position]);
-          APPEND_RAW_DATA(raw_data + raw_offsets[in_position], block_data_bytes);
-          // Append offsets
-          for (int64_t i = 0; i < filter_block.length; ++i, ++in_position) {
-            offset_builder.UnsafeAppend(offset);
-            offset += raw_offsets[in_position + 1] - raw_offsets[in_position];
-          }
-          out_position += filter_block.length;
-        } else {
-          // Some of the values in this block are null
-          for (int64_t i = 0; i < filter_block.length;
-               ++i, ++in_position, ++out_position) {
-            offset_builder.UnsafeAppend(offset);
-            if (bit_util::GetBit(values_is_valid, values_offset + in_position)) {
-              bit_util::SetBit(out_is_valid, out_position);
-              APPEND_SINGLE_VALUE();
-            }
+  if (is_ree_filter) {
+    auto emit_segment = [&](int64_t position, int64_t segment_length, bool filter_valid) {
+      in_position = position;
+      if (filter_valid) {
+        // Filter values are all true and not null
+        // Some of the values in the block may be null
+        for (int64_t i = 0; i < segment_length; ++i, ++in_position, ++out_position) {
+          offset_builder.UnsafeAppend(offset);
+          if (bit_util::GetBit(values_is_valid, values_offset + in_position)) {
+            bit_util::SetBit(out_is_valid, out_offset + out_position);
+            APPEND_SINGLE_VALUE();
           }
         }
-      } else {  // !filter_block.AllSet()
-        // Some of the filter values are false, but all not null
-        if (values_valid_block.AllSet()) {
-          // All the values are not-null, so we can skip null checking for
-          // them
-          for (int64_t i = 0; i < filter_block.length; ++i, ++in_position) {
-            if (bit_util::GetBit(filter_data, filter_offset + in_position)) {
+      } else {
+        offset_builder.UnsafeAppend(segment_length, offset);
+        out_position += segment_length;
+      }
+      return Status::OK();
+    };
+    Status status;
+    VisitPlainxREEFilterOutputSegments(
+        filter, true, null_selection,
+        [&status, emit_segment = std::move(emit_segment)](
+            int64_t position, int64_t segment_length, bool filter_valid) {
+          status = emit_segment(position, segment_length, filter_valid);
+          return status.ok();
+        });
+    RETURN_NOT_OK(std::move(status));

Review Comment:
   ```cpp
   Status::Status(const Status& s)
         : state_((s.state_ == NULLPTR) ? NULLPTR : new State(*s.state_)) {}
   ```
   
   vs
   
   ```cpp
     Status::Status(Status&& s) noexcept : state_(s.state_) { s.state_ = NULLPTR; }
   ```
   
   being inlined.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on a diff in pull request #35750: GH-35749: [C++] Handle run-end encoded filters in compute kernels

Posted by "pitrou (via GitHub)" <gi...@apache.org>.
pitrou commented on code in PR #35750:
URL: https://github.com/apache/arrow/pull/35750#discussion_r1229706395


##########
cpp/src/arrow/compute/kernels/vector_selection_test.cc:
##########
@@ -42,99 +42,161 @@ using std::string_view;
 
 namespace compute {
 
+namespace {
+
+template <typename T>
+Result<std::shared_ptr<Array>> REEncode(const T& array) {
+  ARROW_ASSIGN_OR_RAISE(auto datum, RunEndEncode(array));
+  return datum.make_array();
+}
+
+Result<std::shared_ptr<Array>> REEFromJson(const std::shared_ptr<DataType>& ree_type,

Review Comment:
   Nit: use same casing as existing helpers?
   ```suggestion
   Result<std::shared_ptr<Array>> REEFromJSON(const std::shared_ptr<DataType>& ree_type,
   ```



##########
cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc:
##########
@@ -180,14 +181,102 @@ Result<std::shared_ptr<ArrayData>> GetTakeIndicesImpl(
                                      BufferVector{nullptr, out_buffer}, /*null_count=*/0);
 }
 
+template <typename RunEndType>
+Result<std::shared_ptr<ArrayData>> GetTakeIndicesFromREEBitmapImpl(
+    const ArraySpan& filter, FilterOptions::NullSelectionBehavior null_selection,
+    MemoryPool* memory_pool) {
+  using T = typename RunEndType::c_type;
+  const ArraySpan& filter_values = ::arrow::ree_util::ValuesArray(filter);
+  const int64_t filter_values_offset = filter_values.offset;
+  const uint8_t* filter_is_valid = filter_values.buffers[0].data;
+  const uint8_t* filter_selection = filter_values.buffers[1].data;
+  const bool filter_may_have_nulls = filter_values.MayHaveNulls();
+
+  // BinaryBitBlockCounter is not used here because a REE bitmap, if built
+  // correctly, is not going to have long continuous runs of 0s or 1s in the
+  // values array.
+
+  const ::arrow::ree_util::RunEndEncodedArraySpan<T> filter_span(filter);
+  auto it = filter_span.begin();
+  if (filter_may_have_nulls && null_selection == FilterOptions::EMIT_NULL) {
+    // Most complex case: the filter may have nulls and we don't drop them.
+    // The logic is ternary:
+    // - filter is null: emit null
+    // - filter is valid and true: emit index
+    // - filter is valid and false: don't emit anything
+
+    typename TypeTraits<RunEndType>::BuilderType builder(memory_pool);
+    for (; !it.is_end(filter_span); ++it) {
+      const int64_t position_with_offset = filter_values_offset + it.index_into_array();
+      const bool is_null = !bit_util::GetBit(filter_is_valid, position_with_offset);
+      if (is_null) {
+        RETURN_NOT_OK(builder.AppendNulls(it.run_length()));
+      } else {
+        const bool emit_run = bit_util::GetBit(filter_selection, position_with_offset);
+        if (emit_run) {
+          const int64_t run_end = it.run_end();
+          RETURN_NOT_OK(builder.Reserve(run_end - it.logical_position()));
+          for (int64_t position = it.logical_position(); position < run_end; position++) {
+            builder.UnsafeAppend(static_cast<T>(position));
+          }
+        }
+      }
+    }
+    std::shared_ptr<ArrayData> result;
+    RETURN_NOT_OK(builder.FinishInternal(&result));
+    return result;
+  }
+
+  // Other cases don't emit nulls and are therefore simpler.
+  TypedBufferBuilder<T> builder(memory_pool);
+
+  if (filter_may_have_nulls) {
+    DCHECK_EQ(null_selection, FilterOptions::DROP);
+    // The filter may have nulls, so we scan the validity bitmap and the filter
+    // data bitmap together.
+    for (; !it.is_end(filter_span); ++it) {
+      const int64_t position_with_offset = filter_values_offset + it.index_into_array();
+      const bool emit_run = bit_util::GetBit(filter_is_valid, position_with_offset) &&
+                            bit_util::GetBit(filter_selection, position_with_offset);
+      if (emit_run) {
+        const int64_t run_end = it.run_end();
+        RETURN_NOT_OK(builder.Reserve(run_end - it.logical_position()));
+        for (int64_t position = it.logical_position(); position < run_end; position++) {
+          builder.UnsafeAppend(static_cast<T>(position));
+        }
+      }
+    }
+  } else {
+    // The filter has no nulls, so we need only look for true values
+    for (; !it.is_end(filter_span); ++it) {
+      const int64_t position_with_offset = filter_values_offset + it.index_into_array();
+      const bool emit_run = bit_util::GetBit(filter_selection, position_with_offset);

Review Comment:
   By the way, given the definition of REE, the run values should alternate between true and false. This might help make this loop faster (but might not). It can be left for a subsequent PR or GH issue.



##########
cpp/src/arrow/compute/kernels/vector_selection_filter_internal.cc:
##########
@@ -444,196 +518,244 @@ Status PrimitiveFilterExec(KernelContext* ctx, const ExecSpan& batch, ExecResult
 
 // Optimized binary filter for the case where neither values nor filter have
 // nulls
-template <typename Type>
+template <typename ArrowType>
 Status BinaryFilterNonNullImpl(KernelContext* ctx, const ArraySpan& values,
                                const ArraySpan& filter, int64_t output_length,
                                FilterOptions::NullSelectionBehavior null_selection,
                                ArrayData* out) {
-  using offset_type = typename Type::offset_type;
-  const auto filter_data = filter.buffers[1].data;
+  using offset_type = typename ArrowType::offset_type;
+  const bool is_ree_filter = filter.type->id() == Type::RUN_END_ENCODED;
 
   BINARY_FILTER_SETUP_COMMON();
 
-  RETURN_NOT_OK(arrow::internal::VisitSetBitRuns(
-      filter_data, filter.offset, filter.length, [&](int64_t position, int64_t length) {
-        // Bulk-append raw data
-        const offset_type run_data_bytes =
-            (raw_offsets[position + length] - raw_offsets[position]);
-        APPEND_RAW_DATA(raw_data + raw_offsets[position], run_data_bytes);
-        // Append offsets
-        offset_type cur_offset = raw_offsets[position];
-        for (int64_t i = 0; i < length; ++i) {
-          offset_builder.UnsafeAppend(offset);
-          offset += raw_offsets[i + position + 1] - cur_offset;
-          cur_offset = raw_offsets[i + position + 1];
-        }
-        return Status::OK();
-      }));
+  auto emit_segment = [&](int64_t position, int64_t length) {
+    // Bulk-append raw data
+    const offset_type run_data_bytes =
+        (raw_offsets[position + length] - raw_offsets[position]);
+    APPEND_RAW_DATA(raw_data + raw_offsets[position], run_data_bytes);
+    // Append offsets
+    for (int64_t i = 0; i < length; ++i) {
+      offset_builder.UnsafeAppend(offset);
+      offset += raw_offsets[i + position + 1] - raw_offsets[i + position + 1];
+    }
+    return Status::OK();
+  };
+  if (is_ree_filter) {
+    Status status;
+    VisitPlainxREEFilterOutputSegments(
+        filter, true, null_selection,

Review Comment:
   Also, shouldn't its value be false?



##########
cpp/src/arrow/compute/kernels/vector_selection_filter_internal.cc:
##########
@@ -444,196 +518,244 @@ Status PrimitiveFilterExec(KernelContext* ctx, const ExecSpan& batch, ExecResult
 
 // Optimized binary filter for the case where neither values nor filter have
 // nulls
-template <typename Type>
+template <typename ArrowType>
 Status BinaryFilterNonNullImpl(KernelContext* ctx, const ArraySpan& values,
                                const ArraySpan& filter, int64_t output_length,
                                FilterOptions::NullSelectionBehavior null_selection,
                                ArrayData* out) {
-  using offset_type = typename Type::offset_type;
-  const auto filter_data = filter.buffers[1].data;
+  using offset_type = typename ArrowType::offset_type;
+  const bool is_ree_filter = filter.type->id() == Type::RUN_END_ENCODED;
 
   BINARY_FILTER_SETUP_COMMON();
 
-  RETURN_NOT_OK(arrow::internal::VisitSetBitRuns(
-      filter_data, filter.offset, filter.length, [&](int64_t position, int64_t length) {
-        // Bulk-append raw data
-        const offset_type run_data_bytes =
-            (raw_offsets[position + length] - raw_offsets[position]);
-        APPEND_RAW_DATA(raw_data + raw_offsets[position], run_data_bytes);
-        // Append offsets
-        offset_type cur_offset = raw_offsets[position];
-        for (int64_t i = 0; i < length; ++i) {
-          offset_builder.UnsafeAppend(offset);
-          offset += raw_offsets[i + position + 1] - cur_offset;
-          cur_offset = raw_offsets[i + position + 1];
-        }
-        return Status::OK();
-      }));
+  auto emit_segment = [&](int64_t position, int64_t length) {
+    // Bulk-append raw data
+    const offset_type run_data_bytes =
+        (raw_offsets[position + length] - raw_offsets[position]);
+    APPEND_RAW_DATA(raw_data + raw_offsets[position], run_data_bytes);
+    // Append offsets
+    for (int64_t i = 0; i < length; ++i) {
+      offset_builder.UnsafeAppend(offset);
+      offset += raw_offsets[i + position + 1] - raw_offsets[i + position + 1];
+    }
+    return Status::OK();
+  };
+  if (is_ree_filter) {
+    Status status;
+    VisitPlainxREEFilterOutputSegments(
+        filter, true, null_selection,
+        [&status, emit_segment = std::move(emit_segment)](
+            int64_t position, int64_t segment_length, bool filter_valid) {
+          DCHECK(filter_valid);
+          status = emit_segment(position, segment_length);
+          return status.ok();
+        });
+    RETURN_NOT_OK(std::move(status));
+  } else {
+    const auto filter_data = filter.buffers[1].data;
+    RETURN_NOT_OK(arrow::internal::VisitSetBitRuns(
+        filter_data, filter.offset, filter.length, std::move(emit_segment)));
+  }
 
   offset_builder.UnsafeAppend(offset);
   out->length = output_length;
   RETURN_NOT_OK(offset_builder.Finish(&out->buffers[1]));
   return data_builder.Finish(&out->buffers[2]);
 }
 
-template <typename Type>
+template <typename ArrowType>
 Status BinaryFilterImpl(KernelContext* ctx, const ArraySpan& values,
                         const ArraySpan& filter, int64_t output_length,
                         FilterOptions::NullSelectionBehavior null_selection,
                         ArrayData* out) {
-  using offset_type = typename Type::offset_type;
+  using offset_type = typename ArrowType::offset_type;
+
+  const bool is_ree_filter = filter.type->id() == Type::RUN_END_ENCODED;
 
-  const auto filter_data = filter.buffers[1].data;
-  const uint8_t* filter_is_valid = filter.buffers[0].data;
-  const int64_t filter_offset = filter.offset;
+  BINARY_FILTER_SETUP_COMMON();
 
   const uint8_t* values_is_valid = values.buffers[0].data;
   const int64_t values_offset = values.offset;
 
+  const int64_t out_offset = out->offset;
   uint8_t* out_is_valid = out->buffers[0]->mutable_data();
   // Zero bits and then only have to set valid values to true
-  bit_util::SetBitsTo(out_is_valid, 0, output_length, false);
-
-  // We use 3 block counters for fast scanning of the filter
-  //
-  // * values_valid_counter: for values null/not-null
-  // * filter_valid_counter: for filter null/not-null
-  // * filter_counter: for filter true/false
-  OptionalBitBlockCounter values_valid_counter(values_is_valid, values_offset,
-                                               values.length);
-  OptionalBitBlockCounter filter_valid_counter(filter_is_valid, filter_offset,
-                                               filter.length);
-  BitBlockCounter filter_counter(filter_data, filter_offset, filter.length);
-
-  BINARY_FILTER_SETUP_COMMON();
+  bit_util::SetBitsTo(out_is_valid, out_offset, output_length, false);
 
   int64_t in_position = 0;
   int64_t out_position = 0;
-  while (in_position < filter.length) {
-    BitBlockCount filter_valid_block = filter_valid_counter.NextWord();
-    BitBlockCount values_valid_block = values_valid_counter.NextWord();
-    BitBlockCount filter_block = filter_counter.NextWord();
-    if (filter_block.NoneSet() && null_selection == FilterOptions::DROP) {
-      // For this exceedingly common case in low-selectivity filters we can
-      // skip further analysis of the data and move on to the next block.
-      in_position += filter_block.length;
-    } else if (filter_valid_block.AllSet()) {
-      // Simpler path: no filter values are null
-      if (filter_block.AllSet()) {
-        // Fastest path: filter values are all true and not null
-        if (values_valid_block.AllSet()) {
-          // The values aren't null either
-          bit_util::SetBitsTo(out_is_valid, out_position, filter_block.length, true);
-
-          // Bulk-append raw data
-          offset_type block_data_bytes =
-              (raw_offsets[in_position + filter_block.length] - raw_offsets[in_position]);
-          APPEND_RAW_DATA(raw_data + raw_offsets[in_position], block_data_bytes);
-          // Append offsets
-          for (int64_t i = 0; i < filter_block.length; ++i, ++in_position) {
-            offset_builder.UnsafeAppend(offset);
-            offset += raw_offsets[in_position + 1] - raw_offsets[in_position];
-          }
-          out_position += filter_block.length;
-        } else {
-          // Some of the values in this block are null
-          for (int64_t i = 0; i < filter_block.length;
-               ++i, ++in_position, ++out_position) {
-            offset_builder.UnsafeAppend(offset);
-            if (bit_util::GetBit(values_is_valid, values_offset + in_position)) {
-              bit_util::SetBit(out_is_valid, out_position);
-              APPEND_SINGLE_VALUE();
-            }
+  if (is_ree_filter) {
+    auto emit_segment = [&](int64_t position, int64_t segment_length, bool filter_valid) {
+      in_position = position;
+      if (filter_valid) {
+        // Filter values are all true and not null
+        // Some of the values in the block may be null
+        for (int64_t i = 0; i < segment_length; ++i, ++in_position, ++out_position) {
+          offset_builder.UnsafeAppend(offset);
+          if (bit_util::GetBit(values_is_valid, values_offset + in_position)) {
+            bit_util::SetBit(out_is_valid, out_offset + out_position);
+            APPEND_SINGLE_VALUE();
           }
         }
-      } else {  // !filter_block.AllSet()
-        // Some of the filter values are false, but all not null
-        if (values_valid_block.AllSet()) {
-          // All the values are not-null, so we can skip null checking for
-          // them
-          for (int64_t i = 0; i < filter_block.length; ++i, ++in_position) {
-            if (bit_util::GetBit(filter_data, filter_offset + in_position)) {
+      } else {
+        offset_builder.UnsafeAppend(segment_length, offset);
+        out_position += segment_length;
+      }
+      return Status::OK();
+    };
+    Status status;
+    VisitPlainxREEFilterOutputSegments(
+        filter, true, null_selection,
+        [&status, emit_segment = std::move(emit_segment)](
+            int64_t position, int64_t segment_length, bool filter_valid) {
+          status = emit_segment(position, segment_length, filter_valid);
+          return status.ok();
+        });
+    RETURN_NOT_OK(std::move(status));

Review Comment:
   The `std::move` is a bit pedantic here IMHO...



##########
cpp/src/arrow/compute/kernels/vector_selection_filter_internal.cc:
##########
@@ -444,196 +518,244 @@ Status PrimitiveFilterExec(KernelContext* ctx, const ExecSpan& batch, ExecResult
 
 // Optimized binary filter for the case where neither values nor filter have
 // nulls
-template <typename Type>
+template <typename ArrowType>
 Status BinaryFilterNonNullImpl(KernelContext* ctx, const ArraySpan& values,
                                const ArraySpan& filter, int64_t output_length,
                                FilterOptions::NullSelectionBehavior null_selection,
                                ArrayData* out) {
-  using offset_type = typename Type::offset_type;
-  const auto filter_data = filter.buffers[1].data;
+  using offset_type = typename ArrowType::offset_type;
+  const bool is_ree_filter = filter.type->id() == Type::RUN_END_ENCODED;
 
   BINARY_FILTER_SETUP_COMMON();
 
-  RETURN_NOT_OK(arrow::internal::VisitSetBitRuns(
-      filter_data, filter.offset, filter.length, [&](int64_t position, int64_t length) {
-        // Bulk-append raw data
-        const offset_type run_data_bytes =
-            (raw_offsets[position + length] - raw_offsets[position]);
-        APPEND_RAW_DATA(raw_data + raw_offsets[position], run_data_bytes);
-        // Append offsets
-        offset_type cur_offset = raw_offsets[position];
-        for (int64_t i = 0; i < length; ++i) {
-          offset_builder.UnsafeAppend(offset);
-          offset += raw_offsets[i + position + 1] - cur_offset;
-          cur_offset = raw_offsets[i + position + 1];
-        }
-        return Status::OK();
-      }));
+  auto emit_segment = [&](int64_t position, int64_t length) {
+    // Bulk-append raw data
+    const offset_type run_data_bytes =
+        (raw_offsets[position + length] - raw_offsets[position]);
+    APPEND_RAW_DATA(raw_data + raw_offsets[position], run_data_bytes);
+    // Append offsets
+    for (int64_t i = 0; i < length; ++i) {
+      offset_builder.UnsafeAppend(offset);
+      offset += raw_offsets[i + position + 1] - raw_offsets[i + position + 1];
+    }
+    return Status::OK();
+  };
+  if (is_ree_filter) {
+    Status status;
+    VisitPlainxREEFilterOutputSegments(
+        filter, true, null_selection,
+        [&status, emit_segment = std::move(emit_segment)](
+            int64_t position, int64_t segment_length, bool filter_valid) {
+          DCHECK(filter_valid);
+          status = emit_segment(position, segment_length);
+          return status.ok();
+        });
+    RETURN_NOT_OK(std::move(status));
+  } else {
+    const auto filter_data = filter.buffers[1].data;
+    RETURN_NOT_OK(arrow::internal::VisitSetBitRuns(
+        filter_data, filter.offset, filter.length, std::move(emit_segment)));
+  }
 
   offset_builder.UnsafeAppend(offset);
   out->length = output_length;
   RETURN_NOT_OK(offset_builder.Finish(&out->buffers[1]));
   return data_builder.Finish(&out->buffers[2]);
 }
 
-template <typename Type>
+template <typename ArrowType>
 Status BinaryFilterImpl(KernelContext* ctx, const ArraySpan& values,
                         const ArraySpan& filter, int64_t output_length,
                         FilterOptions::NullSelectionBehavior null_selection,
                         ArrayData* out) {
-  using offset_type = typename Type::offset_type;
+  using offset_type = typename ArrowType::offset_type;
+
+  const bool is_ree_filter = filter.type->id() == Type::RUN_END_ENCODED;
 
-  const auto filter_data = filter.buffers[1].data;
-  const uint8_t* filter_is_valid = filter.buffers[0].data;
-  const int64_t filter_offset = filter.offset;
+  BINARY_FILTER_SETUP_COMMON();
 
   const uint8_t* values_is_valid = values.buffers[0].data;
   const int64_t values_offset = values.offset;
 
+  const int64_t out_offset = out->offset;
   uint8_t* out_is_valid = out->buffers[0]->mutable_data();
   // Zero bits and then only have to set valid values to true
-  bit_util::SetBitsTo(out_is_valid, 0, output_length, false);
-
-  // We use 3 block counters for fast scanning of the filter
-  //
-  // * values_valid_counter: for values null/not-null
-  // * filter_valid_counter: for filter null/not-null
-  // * filter_counter: for filter true/false
-  OptionalBitBlockCounter values_valid_counter(values_is_valid, values_offset,
-                                               values.length);
-  OptionalBitBlockCounter filter_valid_counter(filter_is_valid, filter_offset,
-                                               filter.length);
-  BitBlockCounter filter_counter(filter_data, filter_offset, filter.length);
-
-  BINARY_FILTER_SETUP_COMMON();
+  bit_util::SetBitsTo(out_is_valid, out_offset, output_length, false);
 
   int64_t in_position = 0;
   int64_t out_position = 0;
-  while (in_position < filter.length) {
-    BitBlockCount filter_valid_block = filter_valid_counter.NextWord();
-    BitBlockCount values_valid_block = values_valid_counter.NextWord();
-    BitBlockCount filter_block = filter_counter.NextWord();
-    if (filter_block.NoneSet() && null_selection == FilterOptions::DROP) {
-      // For this exceedingly common case in low-selectivity filters we can
-      // skip further analysis of the data and move on to the next block.
-      in_position += filter_block.length;
-    } else if (filter_valid_block.AllSet()) {
-      // Simpler path: no filter values are null
-      if (filter_block.AllSet()) {
-        // Fastest path: filter values are all true and not null
-        if (values_valid_block.AllSet()) {
-          // The values aren't null either
-          bit_util::SetBitsTo(out_is_valid, out_position, filter_block.length, true);
-
-          // Bulk-append raw data
-          offset_type block_data_bytes =
-              (raw_offsets[in_position + filter_block.length] - raw_offsets[in_position]);
-          APPEND_RAW_DATA(raw_data + raw_offsets[in_position], block_data_bytes);
-          // Append offsets
-          for (int64_t i = 0; i < filter_block.length; ++i, ++in_position) {
-            offset_builder.UnsafeAppend(offset);
-            offset += raw_offsets[in_position + 1] - raw_offsets[in_position];
-          }
-          out_position += filter_block.length;
-        } else {
-          // Some of the values in this block are null
-          for (int64_t i = 0; i < filter_block.length;
-               ++i, ++in_position, ++out_position) {
-            offset_builder.UnsafeAppend(offset);
-            if (bit_util::GetBit(values_is_valid, values_offset + in_position)) {
-              bit_util::SetBit(out_is_valid, out_position);
-              APPEND_SINGLE_VALUE();
-            }
+  if (is_ree_filter) {
+    auto emit_segment = [&](int64_t position, int64_t segment_length, bool filter_valid) {
+      in_position = position;
+      if (filter_valid) {
+        // Filter values are all true and not null
+        // Some of the values in the block may be null
+        for (int64_t i = 0; i < segment_length; ++i, ++in_position, ++out_position) {
+          offset_builder.UnsafeAppend(offset);
+          if (bit_util::GetBit(values_is_valid, values_offset + in_position)) {
+            bit_util::SetBit(out_is_valid, out_offset + out_position);
+            APPEND_SINGLE_VALUE();
           }
         }
-      } else {  // !filter_block.AllSet()
-        // Some of the filter values are false, but all not null
-        if (values_valid_block.AllSet()) {
-          // All the values are not-null, so we can skip null checking for
-          // them
-          for (int64_t i = 0; i < filter_block.length; ++i, ++in_position) {
-            if (bit_util::GetBit(filter_data, filter_offset + in_position)) {
+      } else {
+        offset_builder.UnsafeAppend(segment_length, offset);
+        out_position += segment_length;
+      }
+      return Status::OK();
+    };
+    Status status;
+    VisitPlainxREEFilterOutputSegments(
+        filter, true, null_selection,

Review Comment:
   Also include parameter name here.



##########
cpp/src/arrow/compute/kernels/vector_selection_filter_internal.cc:
##########
@@ -444,196 +518,244 @@ Status PrimitiveFilterExec(KernelContext* ctx, const ExecSpan& batch, ExecResult
 
 // Optimized binary filter for the case where neither values nor filter have
 // nulls
-template <typename Type>
+template <typename ArrowType>
 Status BinaryFilterNonNullImpl(KernelContext* ctx, const ArraySpan& values,
                                const ArraySpan& filter, int64_t output_length,
                                FilterOptions::NullSelectionBehavior null_selection,
                                ArrayData* out) {
-  using offset_type = typename Type::offset_type;
-  const auto filter_data = filter.buffers[1].data;
+  using offset_type = typename ArrowType::offset_type;
+  const bool is_ree_filter = filter.type->id() == Type::RUN_END_ENCODED;
 
   BINARY_FILTER_SETUP_COMMON();
 
-  RETURN_NOT_OK(arrow::internal::VisitSetBitRuns(
-      filter_data, filter.offset, filter.length, [&](int64_t position, int64_t length) {
-        // Bulk-append raw data
-        const offset_type run_data_bytes =
-            (raw_offsets[position + length] - raw_offsets[position]);
-        APPEND_RAW_DATA(raw_data + raw_offsets[position], run_data_bytes);
-        // Append offsets
-        offset_type cur_offset = raw_offsets[position];
-        for (int64_t i = 0; i < length; ++i) {
-          offset_builder.UnsafeAppend(offset);
-          offset += raw_offsets[i + position + 1] - cur_offset;
-          cur_offset = raw_offsets[i + position + 1];
-        }
-        return Status::OK();
-      }));
+  auto emit_segment = [&](int64_t position, int64_t length) {
+    // Bulk-append raw data
+    const offset_type run_data_bytes =
+        (raw_offsets[position + length] - raw_offsets[position]);
+    APPEND_RAW_DATA(raw_data + raw_offsets[position], run_data_bytes);
+    // Append offsets
+    for (int64_t i = 0; i < length; ++i) {
+      offset_builder.UnsafeAppend(offset);
+      offset += raw_offsets[i + position + 1] - raw_offsets[i + position + 1];
+    }
+    return Status::OK();
+  };
+  if (is_ree_filter) {
+    Status status;
+    VisitPlainxREEFilterOutputSegments(
+        filter, true, null_selection,

Review Comment:
   Can you include the actual parameter name for readability?
   ```suggestion
           filter, /*xxx=*/ true, null_selection,
   ```



##########
cpp/src/arrow/compute/kernels/vector_selection_test.cc:
##########
@@ -42,99 +42,161 @@ using std::string_view;
 
 namespace compute {
 
+namespace {
+
+template <typename T>
+Result<std::shared_ptr<Array>> REEncode(const T& array) {
+  ARROW_ASSIGN_OR_RAISE(auto datum, RunEndEncode(array));
+  return datum.make_array();
+}
+
+Result<std::shared_ptr<Array>> REEFromJson(const std::shared_ptr<DataType>& ree_type,
+                                           const std::string& json) {
+  auto ree_type_ptr = checked_cast<const RunEndEncodedType*>(ree_type.get());
+  auto array = ArrayFromJSON(ree_type_ptr->value_type(), json);
+  ARROW_ASSIGN_OR_RAISE(
+      auto datum, RunEndEncode(array, RunEndEncodeOptions{ree_type_ptr->run_end_type()}));
+  return datum.make_array();
+}
+
+Result<std::shared_ptr<Array>> FilterFromJson(
+    const std::shared_ptr<DataType>& filter_type, const std::string& json) {
+  if (filter_type->id() == Type::RUN_END_ENCODED) {
+    return REEFromJson(filter_type, json);
+  } else {
+    return ArrayFromJSON(filter_type, json);
+  }
+}
+
+Result<std::shared_ptr<Array>> REEncode(const std::shared_ptr<Array>& array) {
+  ARROW_ASSIGN_OR_RAISE(auto datum, RunEndEncode(array));
+  return datum.make_array();
+}
+
+void CheckTakeCase(const BooleanArray& filter,
+                   const std::shared_ptr<Array>& expected_indices,
+                   FilterOptions::NullSelectionBehavior null_selection) {
+  ASSERT_OK_AND_ASSIGN(auto indices,
+                       internal::GetTakeIndices(*filter.data(), null_selection));
+  auto indices_array = MakeArray(indices);
+  ValidateOutput(indices);
+  AssertArraysEqual(*expected_indices, *indices_array, /*verbose=*/true);
+
+  ASSERT_OK_AND_ASSIGN(auto ree_filter, REEncode(filter));
+  ASSERT_OK_AND_ASSIGN(auto indices_from_ree,
+                       internal::GetTakeIndices(*ree_filter->data(), null_selection));
+  auto indices_from_ree_array = MakeArray(indices);
+  ValidateOutput(indices_from_ree);
+  AssertArraysEqual(*expected_indices, *indices_from_ree_array, /*verbose=*/true);
+}
+
+void CheckTakeCase(const std::string& filter_json, const std::string& indices_json,

Review Comment:
   (same here)



##########
cpp/src/arrow/compute/kernels/vector_selection_test.cc:
##########
@@ -42,99 +42,161 @@ using std::string_view;
 
 namespace compute {
 
+namespace {
+
+template <typename T>
+Result<std::shared_ptr<Array>> REEncode(const T& array) {
+  ARROW_ASSIGN_OR_RAISE(auto datum, RunEndEncode(array));
+  return datum.make_array();
+}
+
+Result<std::shared_ptr<Array>> REEFromJson(const std::shared_ptr<DataType>& ree_type,
+                                           const std::string& json) {
+  auto ree_type_ptr = checked_cast<const RunEndEncodedType*>(ree_type.get());
+  auto array = ArrayFromJSON(ree_type_ptr->value_type(), json);
+  ARROW_ASSIGN_OR_RAISE(
+      auto datum, RunEndEncode(array, RunEndEncodeOptions{ree_type_ptr->run_end_type()}));
+  return datum.make_array();
+}
+
+Result<std::shared_ptr<Array>> FilterFromJson(
+    const std::shared_ptr<DataType>& filter_type, const std::string& json) {
+  if (filter_type->id() == Type::RUN_END_ENCODED) {
+    return REEFromJson(filter_type, json);
+  } else {
+    return ArrayFromJSON(filter_type, json);
+  }
+}
+
+Result<std::shared_ptr<Array>> REEncode(const std::shared_ptr<Array>& array) {
+  ARROW_ASSIGN_OR_RAISE(auto datum, RunEndEncode(array));
+  return datum.make_array();
+}
+
+void CheckTakeCase(const BooleanArray& filter,

Review Comment:
   I would name this a bit more precisely.
   ```suggestion
   void CheckTakeIndicesCase(const BooleanArray& filter,
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] felipecrv commented on a diff in pull request #35750: GH-35749: [C++] Handle run-end encoded filters in compute kernels

Posted by "felipecrv (via GitHub)" <gi...@apache.org>.
felipecrv commented on code in PR #35750:
URL: https://github.com/apache/arrow/pull/35750#discussion_r1218593771


##########
cpp/src/arrow/compute/kernels/vector_selection_filter_internal.cc:
##########
@@ -161,39 +173,88 @@ class PrimitiveFilterImpl {
       out_is_valid_ = out_arr->buffers[0]->mutable_data();
     }
     out_data_ = reinterpret_cast<T*>(out_arr->buffers[1]->mutable_data());
-    out_offset_ = out_arr->offset;
     out_length_ = out_arr->length;
+    DCHECK_EQ(out_arr->offset, 0);
     out_position_ = 0;
   }
 
-  void ExecNonNull() {
-    // Fast filter when values and filter are not null
-    ::arrow::internal::VisitSetBitRunsVoid(
-        filter_data_, filter_offset_, values_length_,
-        [&](int64_t position, int64_t length) { WriteValueSegment(position, length); });
+  void ExecREEFilter() {
+    if (filter_.child_data[1].null_count == 0 && values_null_count_ == 0) {
+      DCHECK(!out_is_valid_);
+      // Fastest: no nulls in either filter or values
+      return VisitPlainxREEFilterOutputSegments(
+          filter_, /*filter_may_have_nulls=*/false, null_selection_,
+          [&](int64_t position, int64_t segment_length, bool filter_valid) {
+            // Fastest path: all values in range are included and not null
+            WriteValueSegment(position, segment_length);
+            DCHECK(filter_valid);
+          });
+    }
+    if (values_is_valid_ && out_is_valid_) {

Review Comment:
   Yeah. Somehow this made more sense than the `DCHECK` when I was writing the code. :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] felipecrv commented on a diff in pull request #35750: GH-35749: [C++] Handle run-end encoded filters in compute kernels

Posted by "felipecrv (via GitHub)" <gi...@apache.org>.
felipecrv commented on code in PR #35750:
URL: https://github.com/apache/arrow/pull/35750#discussion_r1218546879


##########
cpp/src/arrow/compute/kernels/vector_selection_internal.cc:
##########
@@ -82,6 +85,89 @@ Status PreallocatePrimitiveArrayData(KernelContext* ctx, int64_t length, int bit
 
 namespace {
 
+/// \brief Iterate over a REE filter, emitting ranges of a plain values array that
+/// would pass the filter.
+///
+/// Differently from REExREE, and REExPlain filtering, PlainxREE filtering
+/// does not produce a REE output, but rather a plain output array. As such it's
+/// much simpler.
+///
+/// \param filter_may_have_nulls Only pass false if you know the filter has no nulls.
+template <typename FilterRunEndType>
+void VisitPlainxREEFilterOutputSegmentsImpl(
+    const ArraySpan& filter, bool filter_may_have_nulls,
+    FilterOptions::NullSelectionBehavior null_selection,
+    const EmitREEFilterSegment& emit_segment) {
+  using FilterRunEndCType = typename FilterRunEndType::c_type;
+  const ArraySpan& filter_values = arrow::ree_util::ValuesArray(filter);
+  const int64_t filter_values_offset = filter_values.offset;
+  const uint8_t* filter_is_valid = filter_values.buffers[0].data;
+  const uint8_t* filter_selection = filter_values.buffers[1].data;
+  filter_may_have_nulls = filter_may_have_nulls && filter_is_valid != NULLPTR &&
+                          filter_values.null_count != 0;
+
+  const arrow::ree_util::RunEndEncodedArraySpan<FilterRunEndCType> filter_span(filter);
+  auto it = filter_span.begin();
+  if (filter_may_have_nulls) {
+    if (null_selection == FilterOptions::EMIT_NULL) {
+      while (!it.is_end(filter_span)) {

Review Comment:
   Finding the ".end()" costs a binary search, but checking that end was reached is O(1).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] felipecrv commented on a diff in pull request #35750: GH-35749: [C++] Handle run-end encoded filters in compute kernels

Posted by "felipecrv (via GitHub)" <gi...@apache.org>.
felipecrv commented on code in PR #35750:
URL: https://github.com/apache/arrow/pull/35750#discussion_r1218657015


##########
cpp/src/arrow/compute/kernels/vector_selection_test.cc:
##########
@@ -55,6 +88,13 @@ TEST(GetTakeIndices, Basics) {
     auto indices_array = MakeArray(indices);
     ValidateOutput(indices);
     AssertArraysEqual(*expected_indices, *indices_array, /*verbose=*/true);
+
+    ASSERT_OK_AND_ASSIGN(auto ree_filter, REEncode(*filter));
+    ASSERT_OK_AND_ASSIGN(auto indices_from_ree,
+                         internal::GetTakeIndices(*ree_filter->data(), null_selection));
+    auto indices_from_ree_array = MakeArray(indices);
+    ValidateOutput(indices_from_ree);
+    AssertArraysEqual(*expected_indices, *indices_from_ree_array, /*verbose=*/true);

Review Comment:
   Fixing now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] felipecrv commented on a diff in pull request #35750: GH-35749: [C++] Handle run-end encoded filters in compute kernels

Posted by "felipecrv (via GitHub)" <gi...@apache.org>.
felipecrv commented on code in PR #35750:
URL: https://github.com/apache/arrow/pull/35750#discussion_r1218613817


##########
cpp/src/arrow/compute/kernels/vector_selection_test.cc:
##########
@@ -78,63 +118,102 @@ TEST(GetTakeIndices, NullValidityBuffer) {
   ValidateOutput(indices);
   AssertArraysEqual(*expected_indices, *indices_array, /*verbose=*/true);
 
+  ASSERT_OK_AND_ASSIGN(auto ree_filter, REEncode(filter));
+  ASSERT_OK_AND_ASSIGN(
+      auto indices_from_ree,
+      internal::GetTakeIndices(*ree_filter->data(), FilterOptions::DROP));
+  auto indices_from_ree_array = MakeArray(indices);
+  ValidateOutput(indices_from_ree);
+  AssertArraysEqual(*expected_indices, *indices_from_ree_array, /*verbose=*/true);
+
   ASSERT_OK_AND_ASSIGN(
       indices, internal::GetTakeIndices(*filter.data(), FilterOptions::EMIT_NULL));
   indices_array = MakeArray(indices);
   ValidateOutput(indices);
   AssertArraysEqual(*expected_indices, *indices_array, /*verbose=*/true);
+
+  ASSERT_OK_AND_ASSIGN(
+      indices_from_ree,
+      internal::GetTakeIndices(*ree_filter->data(), FilterOptions::EMIT_NULL));
+  indices_from_ree_array = MakeArray(indices);
+  ValidateOutput(indices_from_ree);
+  AssertArraysEqual(*expected_indices, *indices_from_ree_array, /*verbose=*/true);
 }
 
 template <typename IndexArrayType>
 void CheckGetTakeIndicesCase(const Array& untyped_filter) {
   const auto& filter = checked_cast<const BooleanArray&>(untyped_filter);
+  ASSERT_OK_AND_ASSIGN(auto ree_filter, REEncode(*filter.data()));
+
   ASSERT_OK_AND_ASSIGN(std::shared_ptr<ArrayData> drop_indices,
                        internal::GetTakeIndices(*filter.data(), FilterOptions::DROP));
+  ASSERT_OK_AND_ASSIGN(
+      std::shared_ptr<ArrayData> drop_indices_from_ree,
+      internal::GetTakeIndices(*ree_filter->data(), FilterOptions::DROP));
   // Verify DROP indices
   {
     IndexArrayType indices(drop_indices);
+    IndexArrayType indices_from_ree(drop_indices);
     ValidateOutput(indices);
+    ValidateOutput(indices_from_ree);
 
     int64_t out_position = 0;
     for (int64_t i = 0; i < filter.length(); ++i) {
       if (filter.IsValid(i)) {
         if (filter.Value(i)) {
           ASSERT_EQ(indices.Value(out_position), i);
+          ASSERT_EQ(indices_from_ree.Value(out_position), i);
           ++out_position;
         }
       }
     }
     ASSERT_EQ(out_position, indices.length());
+    ASSERT_EQ(out_position, indices_from_ree.length());
+
     // Check that the end length agrees with the output of GetFilterOutputSize
     ASSERT_EQ(out_position,
               internal::GetFilterOutputSize(*filter.data(), FilterOptions::DROP));
+    ASSERT_EQ(out_position,
+              internal::GetFilterOutputSize(*ree_filter->data(), FilterOptions::DROP));
   }
 
   ASSERT_OK_AND_ASSIGN(
       std::shared_ptr<ArrayData> emit_indices,
       internal::GetTakeIndices(*filter.data(), FilterOptions::EMIT_NULL));
+  ASSERT_OK_AND_ASSIGN(
+      std::shared_ptr<ArrayData> emit_indices_from_ree,
+      internal::GetTakeIndices(*ree_filter->data(), FilterOptions::EMIT_NULL));
   // Verify EMIT_NULL indices
   {
     IndexArrayType indices(emit_indices);
+    IndexArrayType indices_from_ree(emit_indices);
     ValidateOutput(indices);
+    ValidateOutput(indices_from_ree);
 
     int64_t out_position = 0;
     for (int64_t i = 0; i < filter.length(); ++i) {
       if (filter.IsValid(i)) {
         if (filter.Value(i)) {
           ASSERT_EQ(indices.Value(out_position), i);
+          ASSERT_EQ(indices_from_ree.Value(out_position), i);
           ++out_position;
         }
       } else {
         ASSERT_TRUE(indices.IsNull(out_position));
+        ASSERT_TRUE(indices_from_ree.IsNull(out_position));
         ++out_position;
       }
     }
 
     ASSERT_EQ(out_position, indices.length());
+    ASSERT_EQ(out_position, indices_from_ree.length());
+
     // Check that the end length agrees with the output of GetFilterOutputSize
     ASSERT_EQ(out_position,
               internal::GetFilterOutputSize(*filter.data(), FilterOptions::EMIT_NULL));
+    ASSERT_OK_AND_ASSIGN(auto ree_filter, REEncode(*filter.data()));

Review Comment:
   You're right. I'm removing it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] felipecrv commented on a diff in pull request #35750: GH-35749: [C++] Handle run-end encoded filters in compute kernels

Posted by "felipecrv (via GitHub)" <gi...@apache.org>.
felipecrv commented on code in PR #35750:
URL: https://github.com/apache/arrow/pull/35750#discussion_r1229735153


##########
cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc:
##########
@@ -180,14 +181,102 @@ Result<std::shared_ptr<ArrayData>> GetTakeIndicesImpl(
                                      BufferVector{nullptr, out_buffer}, /*null_count=*/0);
 }
 
+template <typename RunEndType>
+Result<std::shared_ptr<ArrayData>> GetTakeIndicesFromREEBitmapImpl(
+    const ArraySpan& filter, FilterOptions::NullSelectionBehavior null_selection,
+    MemoryPool* memory_pool) {
+  using T = typename RunEndType::c_type;
+  const ArraySpan& filter_values = ::arrow::ree_util::ValuesArray(filter);
+  const int64_t filter_values_offset = filter_values.offset;
+  const uint8_t* filter_is_valid = filter_values.buffers[0].data;
+  const uint8_t* filter_selection = filter_values.buffers[1].data;
+  const bool filter_may_have_nulls = filter_values.MayHaveNulls();
+
+  // BinaryBitBlockCounter is not used here because a REE bitmap, if built
+  // correctly, is not going to have long continuous runs of 0s or 1s in the
+  // values array.
+
+  const ::arrow::ree_util::RunEndEncodedArraySpan<T> filter_span(filter);
+  auto it = filter_span.begin();
+  if (filter_may_have_nulls && null_selection == FilterOptions::EMIT_NULL) {
+    // Most complex case: the filter may have nulls and we don't drop them.
+    // The logic is ternary:
+    // - filter is null: emit null
+    // - filter is valid and true: emit index
+    // - filter is valid and false: don't emit anything
+
+    typename TypeTraits<RunEndType>::BuilderType builder(memory_pool);
+    for (; !it.is_end(filter_span); ++it) {
+      const int64_t position_with_offset = filter_values_offset + it.index_into_array();
+      const bool is_null = !bit_util::GetBit(filter_is_valid, position_with_offset);
+      if (is_null) {
+        RETURN_NOT_OK(builder.AppendNulls(it.run_length()));
+      } else {
+        const bool emit_run = bit_util::GetBit(filter_selection, position_with_offset);
+        if (emit_run) {
+          const int64_t run_end = it.run_end();
+          RETURN_NOT_OK(builder.Reserve(run_end - it.logical_position()));
+          for (int64_t position = it.logical_position(); position < run_end; position++) {
+            builder.UnsafeAppend(static_cast<T>(position));
+          }
+        }
+      }
+    }
+    std::shared_ptr<ArrayData> result;
+    RETURN_NOT_OK(builder.FinishInternal(&result));
+    return result;
+  }
+
+  // Other cases don't emit nulls and are therefore simpler.
+  TypedBufferBuilder<T> builder(memory_pool);
+
+  if (filter_may_have_nulls) {
+    DCHECK_EQ(null_selection, FilterOptions::DROP);
+    // The filter may have nulls, so we scan the validity bitmap and the filter
+    // data bitmap together.
+    for (; !it.is_end(filter_span); ++it) {
+      const int64_t position_with_offset = filter_values_offset + it.index_into_array();
+      const bool emit_run = bit_util::GetBit(filter_is_valid, position_with_offset) &&
+                            bit_util::GetBit(filter_selection, position_with_offset);
+      if (emit_run) {
+        const int64_t run_end = it.run_end();
+        RETURN_NOT_OK(builder.Reserve(run_end - it.logical_position()));
+        for (int64_t position = it.logical_position(); position < run_end; position++) {
+          builder.UnsafeAppend(static_cast<T>(position));
+        }
+      }
+    }
+  } else {
+    // The filter has no nulls, so we need only look for true values
+    for (; !it.is_end(filter_span); ++it) {
+      const int64_t position_with_offset = filter_values_offset + it.index_into_array();
+      const bool emit_run = bit_util::GetBit(filter_selection, position_with_offset);

Review Comment:
   They won't necessarily alternate: think of REEs being concatenated naively. Some runs here and there might not be maximal.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on a diff in pull request #35750: GH-35749: [C++] Handle run-end encoded filters in compute kernels

Posted by "pitrou (via GitHub)" <gi...@apache.org>.
pitrou commented on code in PR #35750:
URL: https://github.com/apache/arrow/pull/35750#discussion_r1218576379


##########
cpp/src/arrow/compute/kernels/vector_selection_internal.cc:
##########
@@ -82,6 +85,89 @@ Status PreallocatePrimitiveArrayData(KernelContext* ctx, int64_t length, int bit
 
 namespace {
 
+/// \brief Iterate over a REE filter, emitting ranges of a plain values array that
+/// would pass the filter.
+///
+/// Differently from REExREE, and REExPlain filtering, PlainxREE filtering
+/// does not produce a REE output, but rather a plain output array. As such it's
+/// much simpler.
+///
+/// \param filter_may_have_nulls Only pass false if you know the filter has no nulls.
+template <typename FilterRunEndType>
+void VisitPlainxREEFilterOutputSegmentsImpl(
+    const ArraySpan& filter, bool filter_may_have_nulls,
+    FilterOptions::NullSelectionBehavior null_selection,
+    const EmitREEFilterSegment& emit_segment) {
+  using FilterRunEndCType = typename FilterRunEndType::c_type;
+  const ArraySpan& filter_values = arrow::ree_util::ValuesArray(filter);
+  const int64_t filter_values_offset = filter_values.offset;
+  const uint8_t* filter_is_valid = filter_values.buffers[0].data;
+  const uint8_t* filter_selection = filter_values.buffers[1].data;
+  filter_may_have_nulls = filter_may_have_nulls && filter_is_valid != NULLPTR &&

Review Comment:
   That's a good point. Feel free to use what's most convenient.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] felipecrv commented on a diff in pull request #35750: GH-35749: [C++] Handle run-end encoded filters in compute kernels

Posted by "felipecrv (via GitHub)" <gi...@apache.org>.
felipecrv commented on code in PR #35750:
URL: https://github.com/apache/arrow/pull/35750#discussion_r1218583103


##########
cpp/src/arrow/compute/kernels/vector_selection_filter_internal.cc:
##########
@@ -161,39 +173,88 @@ class PrimitiveFilterImpl {
       out_is_valid_ = out_arr->buffers[0]->mutable_data();
     }
     out_data_ = reinterpret_cast<T*>(out_arr->buffers[1]->mutable_data());
-    out_offset_ = out_arr->offset;
     out_length_ = out_arr->length;
+    DCHECK_EQ(out_arr->offset, 0);
     out_position_ = 0;
   }
 
-  void ExecNonNull() {
-    // Fast filter when values and filter are not null
-    ::arrow::internal::VisitSetBitRunsVoid(
-        filter_data_, filter_offset_, values_length_,
-        [&](int64_t position, int64_t length) { WriteValueSegment(position, length); });
+  void ExecREEFilter() {
+    if (filter_.child_data[1].null_count == 0 && values_null_count_ == 0) {

Review Comment:
   I understand that, but this is copying the existing behavior (I just moved the code around a bit). 
   
   And I think `GetNullCount` is avoided in the first place because forcing a scan of the bitmap to be able to use the slightly faster kernel in very few extra cases is not worth it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] github-actions[bot] commented on pull request #35750: GH-35749: [C++] Handle run-end encoded filters in compute kernels

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #35750:
URL: https://github.com/apache/arrow/pull/35750#issuecomment-1561962090

   * Closes: #35749


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on a diff in pull request #35750: GH-35749: [C++] Handle run-end encoded filters in compute kernels

Posted by "pitrou (via GitHub)" <gi...@apache.org>.
pitrou commented on code in PR #35750:
URL: https://github.com/apache/arrow/pull/35750#discussion_r1229746591


##########
cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc:
##########
@@ -180,14 +181,102 @@ Result<std::shared_ptr<ArrayData>> GetTakeIndicesImpl(
                                      BufferVector{nullptr, out_buffer}, /*null_count=*/0);
 }
 
+template <typename RunEndType>
+Result<std::shared_ptr<ArrayData>> GetTakeIndicesFromREEBitmapImpl(
+    const ArraySpan& filter, FilterOptions::NullSelectionBehavior null_selection,
+    MemoryPool* memory_pool) {
+  using T = typename RunEndType::c_type;
+  const ArraySpan& filter_values = ::arrow::ree_util::ValuesArray(filter);
+  const int64_t filter_values_offset = filter_values.offset;
+  const uint8_t* filter_is_valid = filter_values.buffers[0].data;
+  const uint8_t* filter_selection = filter_values.buffers[1].data;
+  const bool filter_may_have_nulls = filter_values.MayHaveNulls();
+
+  // BinaryBitBlockCounter is not used here because a REE bitmap, if built
+  // correctly, is not going to have long continuous runs of 0s or 1s in the
+  // values array.
+
+  const ::arrow::ree_util::RunEndEncodedArraySpan<T> filter_span(filter);
+  auto it = filter_span.begin();
+  if (filter_may_have_nulls && null_selection == FilterOptions::EMIT_NULL) {
+    // Most complex case: the filter may have nulls and we don't drop them.
+    // The logic is ternary:
+    // - filter is null: emit null
+    // - filter is valid and true: emit index
+    // - filter is valid and false: don't emit anything
+
+    typename TypeTraits<RunEndType>::BuilderType builder(memory_pool);
+    for (; !it.is_end(filter_span); ++it) {
+      const int64_t position_with_offset = filter_values_offset + it.index_into_array();
+      const bool is_null = !bit_util::GetBit(filter_is_valid, position_with_offset);
+      if (is_null) {
+        RETURN_NOT_OK(builder.AppendNulls(it.run_length()));
+      } else {
+        const bool emit_run = bit_util::GetBit(filter_selection, position_with_offset);
+        if (emit_run) {
+          const int64_t run_end = it.run_end();
+          RETURN_NOT_OK(builder.Reserve(run_end - it.logical_position()));
+          for (int64_t position = it.logical_position(); position < run_end; position++) {
+            builder.UnsafeAppend(static_cast<T>(position));
+          }
+        }
+      }
+    }
+    std::shared_ptr<ArrayData> result;
+    RETURN_NOT_OK(builder.FinishInternal(&result));
+    return result;
+  }
+
+  // Other cases don't emit nulls and are therefore simpler.
+  TypedBufferBuilder<T> builder(memory_pool);
+
+  if (filter_may_have_nulls) {
+    DCHECK_EQ(null_selection, FilterOptions::DROP);
+    // The filter may have nulls, so we scan the validity bitmap and the filter
+    // data bitmap together.
+    for (; !it.is_end(filter_span); ++it) {
+      const int64_t position_with_offset = filter_values_offset + it.index_into_array();
+      const bool emit_run = bit_util::GetBit(filter_is_valid, position_with_offset) &&
+                            bit_util::GetBit(filter_selection, position_with_offset);
+      if (emit_run) {
+        const int64_t run_end = it.run_end();
+        RETURN_NOT_OK(builder.Reserve(run_end - it.logical_position()));
+        for (int64_t position = it.logical_position(); position < run_end; position++) {
+          builder.UnsafeAppend(static_cast<T>(position));
+        }
+      }
+    }
+  } else {
+    // The filter has no nulls, so we need only look for true values
+    for (; !it.is_end(filter_span); ++it) {
+      const int64_t position_with_offset = filter_values_offset + it.index_into_array();
+      const bool emit_run = bit_util::GetBit(filter_selection, position_with_offset);

Review Comment:
   You're right. I was mistakingly assuming it was forbidden in the format.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] felipecrv commented on a diff in pull request #35750: GH-35749: [C++] Handle run-end encoded filters in compute kernels

Posted by "felipecrv (via GitHub)" <gi...@apache.org>.
felipecrv commented on code in PR #35750:
URL: https://github.com/apache/arrow/pull/35750#discussion_r1230025799


##########
cpp/src/arrow/compute/kernels/vector_selection_filter_internal.cc:
##########
@@ -444,196 +518,244 @@ Status PrimitiveFilterExec(KernelContext* ctx, const ExecSpan& batch, ExecResult
 
 // Optimized binary filter for the case where neither values nor filter have
 // nulls
-template <typename Type>
+template <typename ArrowType>
 Status BinaryFilterNonNullImpl(KernelContext* ctx, const ArraySpan& values,
                                const ArraySpan& filter, int64_t output_length,
                                FilterOptions::NullSelectionBehavior null_selection,
                                ArrayData* out) {
-  using offset_type = typename Type::offset_type;
-  const auto filter_data = filter.buffers[1].data;
+  using offset_type = typename ArrowType::offset_type;
+  const bool is_ree_filter = filter.type->id() == Type::RUN_END_ENCODED;
 
   BINARY_FILTER_SETUP_COMMON();
 
-  RETURN_NOT_OK(arrow::internal::VisitSetBitRuns(
-      filter_data, filter.offset, filter.length, [&](int64_t position, int64_t length) {
-        // Bulk-append raw data
-        const offset_type run_data_bytes =
-            (raw_offsets[position + length] - raw_offsets[position]);
-        APPEND_RAW_DATA(raw_data + raw_offsets[position], run_data_bytes);
-        // Append offsets
-        offset_type cur_offset = raw_offsets[position];
-        for (int64_t i = 0; i < length; ++i) {
-          offset_builder.UnsafeAppend(offset);
-          offset += raw_offsets[i + position + 1] - cur_offset;
-          cur_offset = raw_offsets[i + position + 1];
-        }
-        return Status::OK();
-      }));
+  auto emit_segment = [&](int64_t position, int64_t length) {
+    // Bulk-append raw data
+    const offset_type run_data_bytes =
+        (raw_offsets[position + length] - raw_offsets[position]);
+    APPEND_RAW_DATA(raw_data + raw_offsets[position], run_data_bytes);
+    // Append offsets
+    for (int64_t i = 0; i < length; ++i) {
+      offset_builder.UnsafeAppend(offset);
+      offset += raw_offsets[i + position + 1] - raw_offsets[i + position + 1];
+    }
+    return Status::OK();
+  };
+  if (is_ree_filter) {
+    Status status;
+    VisitPlainxREEFilterOutputSegments(
+        filter, true, null_selection,

Review Comment:
   Changing to `false`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] felipecrv commented on pull request #35750: GH-35749: [C++] Handle run-end encoded filters in compute kernels

Posted by "felipecrv (via GitHub)" <gi...@apache.org>.
felipecrv commented on PR #35750:
URL: https://github.com/apache/arrow/pull/35750#issuecomment-1592127263

   I rebased and forced-pushed (instead of merging) to see if the macOS script that failed in CI now works.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on a diff in pull request #35750: GH-35749: [C++] Handle run-end encoded filters in compute kernels

Posted by "pitrou (via GitHub)" <gi...@apache.org>.
pitrou commented on code in PR #35750:
URL: https://github.com/apache/arrow/pull/35750#discussion_r1219041492


##########
cpp/src/arrow/compute/kernels/vector_selection_internal.cc:
##########
@@ -239,6 +309,43 @@ struct Selection {
       }
     };
 
+    if (is_ree_filter) {
+      Status status;

Review Comment:
   That's a good question. Ideally there should be no overhead returning a successful Status, but in practice there is (we try to measure it in `type_benchmark.cc`). I'll let you choose what is best here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org