You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2020/06/09 03:08:26 UTC

[arrow] branch master updated: ARROW-9034: [C++] Implement "BinaryBitBlockCounter", add single-word functions to BitBlockCounter

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 824c871  ARROW-9034: [C++] Implement "BinaryBitBlockCounter", add single-word functions to BitBlockCounter
824c871 is described below

commit 824c87165f531c25130c0a4385cafd06c74f9b80
Author: Wes McKinney <we...@apache.org>
AuthorDate: Mon Jun 8 22:07:13 2020 -0500

    ARROW-9034: [C++] Implement "BinaryBitBlockCounter", add single-word functions to BitBlockCounter
    
    BinaryBitBlockCounter computes the popcount of the bitwise-and of each corresponding bit-run (with target of 64 bits at a time) of two bitmaps. This permits iterating through two validity bitmaps that don't have a lot of nulls much more quickly than using two BitmapReaders.
    
    I also added an inline-able BitBlockCounter::NextWordInline for 64-bits at a time for a single bitmap. It seems like this may be preferable to the four word version. Now we have `NextWord` and `NextFourWords` so the developer can choose either variant.
    
    Benchmarks and tests covering all this are included. I'll post the benchmarks on my machine as a comment.
    
    Closes #7356 from wesm/ARROW-9034
    
    Authored-by: Wes McKinney <we...@apache.org>
    Signed-off-by: Wes McKinney <we...@apache.org>
---
 cpp/src/arrow/array/array_primitive.h             |   1 +
 cpp/src/arrow/util/CMakeLists.txt                 |   2 +
 cpp/src/arrow/util/bit_block_counter.cc           | 148 +++++++++---
 cpp/src/arrow/util/bit_block_counter.h            |  72 ++++--
 cpp/src/arrow/util/bit_block_counter_benchmark.cc | 266 ++++++++++++++++++++++
 cpp/src/arrow/util/bit_block_counter_test.cc      | 250 ++++++++++++++++++++
 cpp/src/arrow/util/bit_util_benchmark.cc          | 128 +----------
 cpp/src/arrow/util/bit_util_test.cc               | 102 ---------
 8 files changed, 701 insertions(+), 268 deletions(-)

diff --git a/cpp/src/arrow/array/array_primitive.h b/cpp/src/arrow/array/array_primitive.h
index 000934e..e58f5f4 100644
--- a/cpp/src/arrow/array/array_primitive.h
+++ b/cpp/src/arrow/array/array_primitive.h
@@ -26,6 +26,7 @@
 #include "arrow/array/array_base.h"
 #include "arrow/array/data.h"
 #include "arrow/type.h"
+#include "arrow/type_fwd.h"  // IWYU pragma: export
 #include "arrow/type_traits.h"
 #include "arrow/util/bit_util.h"
 #include "arrow/util/macros.h"
diff --git a/cpp/src/arrow/util/CMakeLists.txt b/cpp/src/arrow/util/CMakeLists.txt
index bd31cc6..c968cdd 100644
--- a/cpp/src/arrow/util/CMakeLists.txt
+++ b/cpp/src/arrow/util/CMakeLists.txt
@@ -41,6 +41,7 @@ endif()
 add_arrow_test(utility-test
                SOURCES
                align_util_test.cc
+               bit_block_counter_test.cc
                bit_util_test.cc
                checked_cast_test.cc
                compression_test.cc
@@ -68,6 +69,7 @@ add_arrow_test(threading-utility-test
                task_group_test
                thread_pool_test)
 
+add_arrow_benchmark(bit_block_counter_benchmark)
 add_arrow_benchmark(bit_util_benchmark)
 add_arrow_benchmark(compression_benchmark)
 add_arrow_benchmark(decimal_benchmark)
diff --git a/cpp/src/arrow/util/bit_block_counter.cc b/cpp/src/arrow/util/bit_block_counter.cc
index a596c07..ca5c159 100644
--- a/cpp/src/arrow/util/bit_block_counter.cc
+++ b/cpp/src/arrow/util/bit_block_counter.cc
@@ -17,59 +17,143 @@
 
 #include "arrow/util/bit_block_counter.h"
 
+#include <algorithm>
 #include <cstdint>
 #include <type_traits>
 
 #include "arrow/util/bit_util.h"
-#include "arrow/util/bitmap_ops.h"
 #include "arrow/util/ubsan.h"
 
 namespace arrow {
 namespace internal {
 
-BitBlockCounter::Block BitBlockCounter::NextBlock() {
-  auto load_word = [](const uint8_t* bytes) -> uint64_t {
-    return BitUtil::ToLittleEndian(util::SafeLoadAs<uint64_t>(bytes));
-  };
-  auto shift_word = [](uint64_t current, uint64_t next, int64_t shift) -> uint64_t {
-    return (current >> shift) | (next << (64 - shift));
-  };
+static constexpr int64_t kWordBits = 64;
+static constexpr int64_t kFourWordsBits = 256;
 
-  // When the offset is > 0, we need there to be a word beyond the last aligned
-  // word in the bitmap for the bit shifting logic.
-  const int64_t bits_required_to_scan_words = offset_ == 0 ? 256 : 256 + (64 - offset_);
-  if (bits_remaining_ < bits_required_to_scan_words) {
-    // End of the bitmap, leave it to the caller to decide how to best check
-    // these bits, no need to do redundant computation here.
-    const int16_t run_length = static_cast<int16_t>(bits_remaining_);
-    bits_remaining_ -= run_length;
-    return {run_length, static_cast<int16_t>(CountSetBits(bitmap_, offset_, run_length))};
+static inline uint64_t LoadWord(const uint8_t* bytes) {
+  return BitUtil::ToLittleEndian(util::SafeLoadAs<uint64_t>(bytes));
+}
+
+static inline uint64_t ShiftWord(uint64_t current, uint64_t next, int64_t shift) {
+  if (shift == 0) {
+    return current;
+  }
+  return (current >> shift) | (next << (64 - shift));
+}
+
+BitBlockCount BitBlockCounter::GetBlockSlow(int64_t block_size) {
+  const int16_t run_length = static_cast<int16_t>(std::min(bits_remaining_, block_size));
+  int16_t popcount = static_cast<int16_t>(CountSetBits(bitmap_, offset_, run_length));
+  bits_remaining_ -= run_length;
+  // This code path should trigger _at most_ 2 times. In the "two times"
+  // case, the first time the run length will be a multiple of 8 by construction
+  bitmap_ += run_length / 8;
+  return {run_length, popcount};
+}
+
+BitBlockCount BitBlockCounter::NextWord() {
+  if (!bits_remaining_) {
+    return {0, 0};
+  }
+  int64_t popcount = 0;
+  if (offset_ == 0) {
+    if (bits_remaining_ < kWordBits) {
+      return GetBlockSlow(kWordBits);
+    }
+    popcount = BitUtil::PopCount(LoadWord(bitmap_));
+  } else {
+    // When the offset is > 0, we need there to be a word beyond the last
+    // aligned word in the bitmap for the bit shifting logic.
+    if (bits_remaining_ < 2 * kWordBits - offset_) {
+      return GetBlockSlow(kWordBits);
+    }
+    popcount =
+        BitUtil::PopCount(ShiftWord(LoadWord(bitmap_), LoadWord(bitmap_ + 8), offset_));
   }
+  bitmap_ += kWordBits / 8;
+  bits_remaining_ -= kWordBits;
+  return {64, static_cast<int16_t>(popcount)};
+}
 
+BitBlockCount BitBlockCounter::NextFourWords() {
+  if (!bits_remaining_) {
+    return {0, 0};
+  }
   int64_t total_popcount = 0;
   if (offset_ == 0) {
-    total_popcount += BitUtil::PopCount(load_word(bitmap_));
-    total_popcount += BitUtil::PopCount(load_word(bitmap_ + 8));
-    total_popcount += BitUtil::PopCount(load_word(bitmap_ + 16));
-    total_popcount += BitUtil::PopCount(load_word(bitmap_ + 24));
+    if (bits_remaining_ < kFourWordsBits) {
+      return GetBlockSlow(kFourWordsBits);
+    }
+    total_popcount += BitUtil::PopCount(LoadWord(bitmap_));
+    total_popcount += BitUtil::PopCount(LoadWord(bitmap_ + 8));
+    total_popcount += BitUtil::PopCount(LoadWord(bitmap_ + 16));
+    total_popcount += BitUtil::PopCount(LoadWord(bitmap_ + 24));
   } else {
-    auto current = load_word(bitmap_);
-    auto next = load_word(bitmap_ + 8);
-    total_popcount += BitUtil::PopCount(shift_word(current, next, offset_));
+    // When the offset is > 0, we need there to be a word beyond the last
+    // aligned word in the bitmap for the bit shifting logic.
+    if (bits_remaining_ < 5 * kFourWordsBits - offset_) {
+      return GetBlockSlow(kFourWordsBits);
+    }
+    auto current = LoadWord(bitmap_);
+    auto next = LoadWord(bitmap_ + 8);
+    total_popcount += BitUtil::PopCount(ShiftWord(current, next, offset_));
     current = next;
-    next = load_word(bitmap_ + 16);
-    total_popcount += BitUtil::PopCount(shift_word(current, next, offset_));
+    next = LoadWord(bitmap_ + 16);
+    total_popcount += BitUtil::PopCount(ShiftWord(current, next, offset_));
     current = next;
-    next = load_word(bitmap_ + 24);
-    total_popcount += BitUtil::PopCount(shift_word(current, next, offset_));
+    next = LoadWord(bitmap_ + 24);
+    total_popcount += BitUtil::PopCount(ShiftWord(current, next, offset_));
     current = next;
-    next = load_word(bitmap_ + 32);
-    total_popcount += BitUtil::PopCount(shift_word(current, next, offset_));
+    next = LoadWord(bitmap_ + 32);
+    total_popcount += BitUtil::PopCount(ShiftWord(current, next, offset_));
   }
-  bitmap_ += BitUtil::BytesForBits(kTargetBlockLength);
-  bits_remaining_ -= 256;
+  bitmap_ += BitUtil::BytesForBits(kFourWordsBits);
+  bits_remaining_ -= kFourWordsBits;
   return {256, static_cast<int16_t>(total_popcount)};
 }
 
+BitBlockCount BinaryBitBlockCounter::NextAndWord() {
+  if (!bits_remaining_) {
+    return {0, 0};
+  }
+
+  // When the offset is > 0, we need there to be a word beyond the last aligned
+  // word in the bitmap for the bit shifting logic.
+  const int64_t bits_required_to_use_words =
+      std::max(left_offset_ == 0 ? 64 : 64 + (64 - left_offset_),
+               right_offset_ == 0 ? 64 : 64 + (64 - right_offset_));
+  if (bits_remaining_ < bits_required_to_use_words) {
+    const int16_t run_length = static_cast<int16_t>(std::min(bits_remaining_, kWordBits));
+    int16_t popcount = 0;
+    for (int64_t i = 0; i < run_length; ++i) {
+      if (BitUtil::GetBit(left_bitmap_, left_offset_ + i) &&
+          BitUtil::GetBit(right_bitmap_, right_offset_ + i)) {
+        ++popcount;
+      }
+    }
+    // This code path should trigger _at most_ 2 times. In the "two times"
+    // case, the first time the run length will be a multiple of 8.
+    left_bitmap_ += run_length / 8;
+    right_bitmap_ += run_length / 8;
+    bits_remaining_ -= run_length;
+    return {run_length, popcount};
+  }
+
+  int64_t popcount = 0;
+  if (left_offset_ == 0 && right_offset_ == 0) {
+    popcount = BitUtil::PopCount(LoadWord(left_bitmap_) & LoadWord(right_bitmap_));
+  } else {
+    auto left_word =
+        ShiftWord(LoadWord(left_bitmap_), LoadWord(left_bitmap_ + 8), left_offset_);
+    auto right_word =
+        ShiftWord(LoadWord(right_bitmap_), LoadWord(right_bitmap_ + 8), right_offset_);
+    popcount = BitUtil::PopCount(left_word & right_word);
+  }
+  left_bitmap_ += kWordBits / 8;
+  right_bitmap_ += kWordBits / 8;
+  bits_remaining_ -= kWordBits;
+  return {64, static_cast<int16_t>(popcount)};
+}
+
 }  // namespace internal
 }  // namespace arrow
diff --git a/cpp/src/arrow/util/bit_block_counter.h b/cpp/src/arrow/util/bit_block_counter.h
index 92e2b9d..3e01dda 100644
--- a/cpp/src/arrow/util/bit_block_counter.h
+++ b/cpp/src/arrow/util/bit_block_counter.h
@@ -19,37 +19,83 @@
 
 #include <cstdint>
 
+#include "arrow/util/bit_util.h"
+#include "arrow/util/bitmap_ops.h"
+#include "arrow/util/ubsan.h"
 #include "arrow/util/visibility.h"
 
 namespace arrow {
 namespace internal {
 
-/// \brief A class that scans through a true/false bitmap to yield blocks of up
-/// to 256 bits at a time along with their popcount. This is used to accelerate
-/// processing of mostly-not-null array data.
+/// \brief Return value from bit block counters: the total number of bits and
+/// the number of set bits.
+struct BitBlockCount {
+  int16_t length;
+  int16_t popcount;
+};
+
+/// \brief A class that scans through a true/false bitmap to compute popcounts
+/// 64 or 256 bits at a time. This is used to accelerate processing of
+/// mostly-not-null array data.
 class ARROW_EXPORT BitBlockCounter {
  public:
-  struct Block {
-    int16_t length;
-    int16_t popcount;
-  };
-
-  static constexpr int16_t kTargetBlockLength = 256;
-
   BitBlockCounter(const uint8_t* bitmap, int64_t start_offset, int64_t length)
       : bitmap_(bitmap + start_offset / 8),
         bits_remaining_(length),
         offset_(start_offset % 8) {}
 
-  /// \brief Return the next run of available bits, up to 256. The returned
-  /// pair contains the size of run and the number of true values
-  Block NextBlock();
+  /// \brief Return the next run of available bits, usually 256. The returned
+  /// pair contains the size of run and the number of true values. The last
+  /// block will have a length less than 256 if the bitmap length is not a
+  /// multiple of 256, and will return 0-length blocks in subsequent
+  /// invocations.
+  BitBlockCount NextFourWords();
+
+  /// \brief Return the next run of available bits, usually 64. The returned
+  /// pair contains the size of run and the number of true values. The last
+  /// block will have a length less than 64 if the bitmap length is not a
+  /// multiple of 64, and will return 0-length blocks in subsequent
+  /// invocations.
+  BitBlockCount NextWord();
 
  private:
+  /// \brief Return block with the requested size when doing word-wise
+  /// computation is not possible due to inadequate bits remaining.
+  BitBlockCount GetBlockSlow(int64_t block_size);
+
   const uint8_t* bitmap_;
   int64_t bits_remaining_;
   int64_t offset_;
 };
 
+/// \brief A class that computes popcounts on the result of bitwise operations
+/// between two bitmaps, 64 bits at a time. A 64-bit word is loaded from each
+/// bitmap, then the popcount is computed on e.g. the bitwise-and of the two
+/// words.
+class ARROW_EXPORT BinaryBitBlockCounter {
+ public:
+  BinaryBitBlockCounter(const uint8_t* left_bitmap, int64_t left_offset,
+                        const uint8_t* right_bitmap, int64_t right_offset, int64_t length)
+      : left_bitmap_(left_bitmap + left_offset / 8),
+        left_offset_(left_offset % 8),
+        right_bitmap_(right_bitmap + right_offset / 8),
+        right_offset_(right_offset % 8),
+        bits_remaining_(length) {}
+
+  /// \brief Return the popcount of the bitwise-and of the next run of
+  /// available bits, up to 64. The returned pair contains the size of run and
+  /// the number of true values. The last block will have a length less than 64
+  /// if the bitmap length is not a multiple of 64, and will return 0-length
+  /// blocks in subsequent invocations.
+  BitBlockCount NextAndWord();
+
+ private:
+  const uint8_t* left_bitmap_;
+  int64_t left_offset_;
+  const uint8_t* right_bitmap_;
+  int64_t right_offset_;
+  int64_t bits_remaining_;
+};
+
 }  // namespace internal
 }  // namespace arrow
diff --git a/cpp/src/arrow/util/bit_block_counter_benchmark.cc b/cpp/src/arrow/util/bit_block_counter_benchmark.cc
new file mode 100644
index 0000000..4299934
--- /dev/null
+++ b/cpp/src/arrow/util/bit_block_counter_benchmark.cc
@@ -0,0 +1,266 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "benchmark/benchmark.h"
+
+#include <cstdint>
+#include <cstdlib>
+#include <memory>
+
+#include "arrow/array/array_base.h"
+#include "arrow/array/array_primitive.h"
+#include "arrow/testing/random.h"
+#include "arrow/util/bit_block_counter.h"
+#include "arrow/util/bit_util.h"
+#include "arrow/util/bitmap_reader.h"
+
+namespace arrow {
+namespace internal {
+
+struct UnaryBitBlockBenchmark {
+  benchmark::State& state;
+  int64_t offset;
+  int64_t bitmap_length;
+  std::shared_ptr<Array> arr;
+  int64_t expected;
+
+  explicit UnaryBitBlockBenchmark(benchmark::State& state, int64_t offset = 0)
+      : state(state), offset(offset), bitmap_length(1 << 20) {
+    random::RandomArrayGenerator rng(/*seed=*/0);
+    // State parameter is the average number of total values for each null
+    // value. So 100 means that 1 out of 100 on average are null.
+    double null_probability = 1. / static_cast<double>(state.range(0));
+    arr = rng.Int8(bitmap_length, 0, 100, null_probability);
+
+    // Compute the expected result
+    this->expected = 0;
+    const auto& int8_arr = static_cast<const Int8Array&>(*arr);
+    for (int64_t i = this->offset; i < bitmap_length; ++i) {
+      if (int8_arr.IsValid(i)) {
+        this->expected += int8_arr.Value(i);
+      }
+    }
+  }
+
+  template <typename NextBlockFunc>
+  void BenchBitBlockCounter(NextBlockFunc&& next_block) {
+    const auto& int8_arr = static_cast<const Int8Array&>(*arr);
+    const uint8_t* bitmap = arr->null_bitmap_data();
+    for (auto _ : state) {
+      BitBlockCounter scanner(bitmap, this->offset, bitmap_length - this->offset);
+      int64_t result = 0;
+      int64_t position = this->offset;
+      while (true) {
+        BitBlockCount block = next_block(&scanner);
+        if (block.length == 0) {
+          break;
+        }
+        if (block.length == block.popcount) {
+          // All not-null
+          for (int64_t i = 0; i < block.length; ++i) {
+            result += int8_arr.Value(position + i);
+          }
+        } else if (block.popcount > 0) {
+          // Some but not all not-null
+          for (int64_t i = 0; i < block.length; ++i) {
+            if (BitUtil::GetBit(bitmap, position + i)) {
+              result += int8_arr.Value(position + i);
+            }
+          }
+        }
+        position += block.length;
+      }
+      // Sanity check
+      if (result != expected) {
+        std::abort();
+      }
+    }
+    state.SetItemsProcessed(state.iterations() * bitmap_length);
+  }
+
+  void BenchBitmapReader() {
+    const auto& int8_arr = static_cast<const Int8Array&>(*arr);
+    for (auto _ : state) {
+      internal::BitmapReader bit_reader(arr->null_bitmap_data(), this->offset,
+                                        bitmap_length - this->offset);
+      int64_t result = 0;
+      for (int64_t i = this->offset; i < bitmap_length; ++i) {
+        if (bit_reader.IsSet()) {
+          result += int8_arr.Value(i);
+        }
+        bit_reader.Next();
+      }
+      // Sanity check
+      if (result != expected) {
+        std::abort();
+      }
+    }
+    state.SetItemsProcessed(state.iterations() * bitmap_length);
+  }
+};
+
+struct BinaryBitBlockBenchmark {
+  benchmark::State& state;
+  int64_t offset;
+  int64_t bitmap_length;
+  std::shared_ptr<Array> left;
+  std::shared_ptr<Array> right;
+  int64_t expected;
+  const Int8Array* left_int8;
+  const Int8Array* right_int8;
+
+  explicit BinaryBitBlockBenchmark(benchmark::State& state, int64_t offset = 0)
+      : state(state), offset(offset), bitmap_length(1 << 20) {
+    random::RandomArrayGenerator rng(/*seed=*/0);
+
+    // State parameter is the average number of total values for each null
+    // value. So 100 means that 1 out of 100 on average are null.
+    double null_probability = 1. / static_cast<double>(state.range(0));
+    left = rng.Int8(bitmap_length, 0, 100, null_probability);
+    right = rng.Int8(bitmap_length, 0, 50, null_probability);
+    left_int8 = static_cast<const Int8Array*>(left.get());
+    right_int8 = static_cast<const Int8Array*>(right.get());
+
+    // Compute the expected result
+    expected = 0;
+    for (int64_t i = this->offset; i < bitmap_length; ++i) {
+      if (left_int8->IsValid(i) && right_int8->IsValid(i)) {
+        expected += left_int8->Value(i) + right_int8->Value(i);
+      }
+    }
+  }
+
+  void BenchBitBlockCounter() {
+    const uint8_t* left_bitmap = left->null_bitmap_data();
+    const uint8_t* right_bitmap = right->null_bitmap_data();
+    for (auto _ : state) {
+      BinaryBitBlockCounter scanner(left_bitmap, this->offset, right_bitmap, this->offset,
+                                    bitmap_length - this->offset);
+      int64_t result = 0;
+      int64_t position = this->offset;
+      while (true) {
+        BitBlockCount block = scanner.NextAndWord();
+        if (block.length == 0) {
+          break;
+        }
+        if (block.length == block.popcount) {
+          // All not-null
+          for (int64_t i = 0; i < block.length; ++i) {
+            result += left_int8->Value(position + i) + right_int8->Value(position + i);
+          }
+        } else if (block.popcount > 0) {
+          // Some but not all not-null
+          for (int64_t i = 0; i < block.length; ++i) {
+            if (BitUtil::GetBit(left_bitmap, position + i) &&
+                BitUtil::GetBit(right_bitmap, position + i)) {
+              result += left_int8->Value(position + i) + right_int8->Value(position + i);
+            }
+          }
+        }
+        position += block.length;
+      }
+      // Sanity check
+      if (result != expected) {
+        std::abort();
+      }
+    }
+    state.SetItemsProcessed(state.iterations() * bitmap_length);
+  }
+
+  void BenchBitmapReader() {
+    for (auto _ : state) {
+      internal::BitmapReader left_reader(left->null_bitmap_data(), this->offset,
+                                         bitmap_length - this->offset);
+      internal::BitmapReader right_reader(right->null_bitmap_data(), this->offset,
+                                          bitmap_length - this->offset);
+      int64_t result = 0;
+      for (int64_t i = this->offset; i < bitmap_length; ++i) {
+        if (left_reader.IsSet() && right_reader.IsSet()) {
+          result += left_int8->Value(i) + right_int8->Value(i);
+        }
+        left_reader.Next();
+        right_reader.Next();
+      }
+      // Sanity check
+      if (result != expected) {
+        std::abort();
+      }
+    }
+    state.SetItemsProcessed(state.iterations() * bitmap_length);
+  }
+};
+
+static void BitBlockCounterSum(benchmark::State& state) {
+  UnaryBitBlockBenchmark(state, /*offset=*/0)
+      .BenchBitBlockCounter([](BitBlockCounter* counter) { return counter->NextWord(); });
+}
+
+static void BitBlockCounterSumWithOffset(benchmark::State& state) {
+  UnaryBitBlockBenchmark(state, /*offset=*/4)
+      .BenchBitBlockCounter([](BitBlockCounter* counter) { return counter->NextWord(); });
+}
+
+static void BitBlockCounterFourWordsSum(benchmark::State& state) {
+  UnaryBitBlockBenchmark(state, /*offset=*/0)
+      .BenchBitBlockCounter(
+          [](BitBlockCounter* counter) { return counter->NextFourWords(); });
+}
+
+static void BitBlockCounterFourWordsSumWithOffset(benchmark::State& state) {
+  UnaryBitBlockBenchmark(state, /*offset=*/4)
+      .BenchBitBlockCounter(
+          [](BitBlockCounter* counter) { return counter->NextFourWords(); });
+}
+
+static void BitmapReaderSum(benchmark::State& state) {
+  UnaryBitBlockBenchmark(state, /*offset=*/0).BenchBitmapReader();
+}
+
+static void BitmapReaderSumWithOffset(benchmark::State& state) {
+  UnaryBitBlockBenchmark(state, /*offset=*/4).BenchBitmapReader();
+}
+
+static void BinaryBitBlockCounterSum(benchmark::State& state) {
+  BinaryBitBlockBenchmark(state, /*offset=*/0).BenchBitBlockCounter();
+}
+
+static void BinaryBitBlockCounterSumWithOffset(benchmark::State& state) {
+  BinaryBitBlockBenchmark(state, /*offset=*/4).BenchBitBlockCounter();
+}
+
+static void BinaryBitmapReaderSum(benchmark::State& state) {
+  BinaryBitBlockBenchmark(state, /*offset=*/0).BenchBitmapReader();
+}
+
+static void BinaryBitmapReaderSumWithOffset(benchmark::State& state) {
+  BinaryBitBlockBenchmark(state, /*offset=*/4).BenchBitmapReader();
+}
+
+// Range value: average number of total values per null
+BENCHMARK(BitBlockCounterSum)->Range(2, 1 << 16);
+BENCHMARK(BitBlockCounterSumWithOffset)->Range(2, 1 << 16);
+BENCHMARK(BitBlockCounterFourWordsSum)->Range(2, 1 << 16);
+BENCHMARK(BitBlockCounterFourWordsSumWithOffset)->Range(2, 1 << 16);
+BENCHMARK(BitmapReaderSum)->Range(2, 1 << 16);
+BENCHMARK(BitmapReaderSumWithOffset)->Range(2, 1 << 16);
+BENCHMARK(BinaryBitBlockCounterSum)->Range(2, 1 << 16);
+BENCHMARK(BinaryBitBlockCounterSumWithOffset)->Range(2, 1 << 16);
+BENCHMARK(BinaryBitmapReaderSum)->Range(2, 1 << 16);
+BENCHMARK(BinaryBitmapReaderSumWithOffset)->Range(2, 1 << 16);
+
+}  // namespace internal
+}  // namespace arrow
diff --git a/cpp/src/arrow/util/bit_block_counter_test.cc b/cpp/src/arrow/util/bit_block_counter_test.cc
new file mode 100644
index 0000000..d08e1f8
--- /dev/null
+++ b/cpp/src/arrow/util/bit_block_counter_test.cc
@@ -0,0 +1,250 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <algorithm>
+#include <cstdint>
+#include <cstring>
+#include <memory>
+
+#include <gtest/gtest.h>
+
+#include "arrow/buffer.h"
+#include "arrow/memory_pool.h"
+#include "arrow/result.h"
+#include "arrow/testing/gtest_common.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/testing/util.h"
+#include "arrow/util/bit_block_counter.h"
+#include "arrow/util/bit_util.h"
+
+namespace arrow {
+namespace internal {
+
+class TestBitBlockCounter : public ::testing::Test {
+ public:
+  void Create(int64_t nbytes, int64_t offset, int64_t length) {
+    ASSERT_OK_AND_ASSIGN(buf_, AllocateBuffer(nbytes));
+    // Start with data zeroed out
+    std::memset(buf_->mutable_data(), 0, nbytes);
+    counter_.reset(new BitBlockCounter(buf_->data(), offset, length));
+  }
+
+ protected:
+  std::shared_ptr<Buffer> buf_;
+  std::unique_ptr<BitBlockCounter> counter_;
+};
+
+static constexpr int64_t kWordSize = 64;
+
+TEST_F(TestBitBlockCounter, OneWordBasics) {
+  const int64_t nbytes = 1024;
+
+  Create(nbytes, 0, nbytes * 8);
+
+  int64_t bits_scanned = 0;
+  for (int64_t i = 0; i < nbytes / 8; ++i) {
+    BitBlockCount block = counter_->NextWord();
+    ASSERT_EQ(block.length, kWordSize);
+    ASSERT_EQ(block.popcount, 0);
+    bits_scanned += block.length;
+  }
+  ASSERT_EQ(bits_scanned, 1024 * 8);
+
+  auto block = counter_->NextWord();
+  ASSERT_EQ(block.length, 0);
+  ASSERT_EQ(block.popcount, 0);
+}
+
+TEST_F(TestBitBlockCounter, FourWordsBasics) {
+  const int64_t nbytes = 1024;
+
+  Create(nbytes, 0, nbytes * 8);
+
+  int64_t bits_scanned = 0;
+  for (int64_t i = 0; i < nbytes / 32; ++i) {
+    BitBlockCount block = counter_->NextFourWords();
+    ASSERT_EQ(block.length, 4 * kWordSize);
+    ASSERT_EQ(block.popcount, 0);
+    bits_scanned += block.length;
+  }
+  ASSERT_EQ(bits_scanned, 1024 * 8);
+
+  auto block = counter_->NextFourWords();
+  ASSERT_EQ(block.length, 0);
+  ASSERT_EQ(block.popcount, 0);
+}
+
+TEST_F(TestBitBlockCounter, OneWordWithOffsets) {
+  auto CheckWithOffset = [&](int64_t offset) {
+    const int64_t nwords = 4;
+
+    const int64_t total_bytes = nwords * 8 + 1;
+    // Trim a bit from the end of the bitmap so we can check the remainder bits
+    // behavior
+    Create(total_bytes, offset, nwords * kWordSize - offset - 1);
+
+    // Start with data all set
+    std::memset(buf_->mutable_data(), 0xFF, total_bytes);
+
+    BitBlockCount block = counter_->NextWord();
+    ASSERT_EQ(kWordSize, block.length);
+    ASSERT_EQ(block.popcount, 64);
+
+    // Add a false value to the next word
+    BitUtil::SetBitTo(buf_->mutable_data(), kWordSize + offset, false);
+    block = counter_->NextWord();
+    ASSERT_EQ(block.length, 64);
+    ASSERT_EQ(block.popcount, 63);
+
+    // Set the next word to all false
+    BitUtil::SetBitsTo(buf_->mutable_data(), 2 * kWordSize + offset, kWordSize, false);
+
+    block = counter_->NextWord();
+    ASSERT_EQ(block.length, 64);
+    ASSERT_EQ(block.popcount, 0);
+
+    block = counter_->NextWord();
+    ASSERT_EQ(block.length, kWordSize - offset - 1);
+    ASSERT_EQ(block.length, block.popcount);
+
+    // We can keep calling NextWord safely
+    block = counter_->NextWord();
+    ASSERT_EQ(block.length, 0);
+    ASSERT_EQ(block.popcount, 0);
+  };
+
+  for (int64_t offset_i = 0; offset_i < 8; ++offset_i) {
+    CheckWithOffset(offset_i);
+  }
+}
+
+TEST_F(TestBitBlockCounter, FourWordsWithOffsets) {
+  auto CheckWithOffset = [&](int64_t offset) {
+    const int64_t nwords = 17;
+
+    const int64_t total_bytes = nwords * 8 + 1;
+    // Trim a bit from the end of the bitmap so we can check the remainder bits
+    // behavior
+    Create(total_bytes, offset, nwords * kWordSize - offset - 1);
+
+    // Start with data all set
+    std::memset(buf_->mutable_data(), 0xFF, total_bytes);
+
+    BitBlockCount block = counter_->NextFourWords();
+    ASSERT_EQ(block.length, 4 * kWordSize);
+    ASSERT_EQ(block.popcount, block.length);
+
+    // Add some false values to the next 3 shifted words
+    BitUtil::SetBitTo(buf_->mutable_data(), 4 * kWordSize + offset, false);
+    BitUtil::SetBitTo(buf_->mutable_data(), 5 * kWordSize + offset, false);
+    BitUtil::SetBitTo(buf_->mutable_data(), 6 * kWordSize + offset, false);
+    block = counter_->NextFourWords();
+
+    ASSERT_EQ(block.length, 4 * kWordSize);
+    ASSERT_EQ(block.popcount, 253);
+
+    // Set the next two words to all false
+    BitUtil::SetBitsTo(buf_->mutable_data(), 8 * kWordSize + offset, 2 * kWordSize,
+                       false);
+
+    // Block is half set
+    block = counter_->NextFourWords();
+    ASSERT_EQ(block.length, 4 * kWordSize);
+    ASSERT_EQ(block.popcount, 128);
+
+    // Last full block whether offset or no
+    block = counter_->NextFourWords();
+    ASSERT_EQ(block.length, 4 * kWordSize);
+    ASSERT_EQ(block.length, block.popcount);
+
+    // Partial block
+    block = counter_->NextFourWords();
+    ASSERT_EQ(block.length, kWordSize - offset - 1);
+    ASSERT_EQ(block.length, block.popcount);
+
+    // We can keep calling NextFourWords safely
+    block = counter_->NextFourWords();
+    ASSERT_EQ(block.length, 0);
+    ASSERT_EQ(block.popcount, 0);
+  };
+
+  for (int64_t offset_i = 0; offset_i < 8; ++offset_i) {
+    CheckWithOffset(offset_i);
+  }
+}
+
+TEST_F(TestBitBlockCounter, FourWordsRandomData) {
+  const int64_t nbytes = 1024;
+  auto buffer = *AllocateBuffer(nbytes);
+  random_bytes(nbytes, 0, buffer->mutable_data());
+
+  auto CheckWithOffset = [&](int64_t offset) {
+    BitBlockCounter counter(buffer->data(), offset, nbytes * 8 - offset);
+    for (int64_t i = 0; i < nbytes / 32; ++i) {
+      BitBlockCount block = counter.NextFourWords();
+      ASSERT_EQ(block.popcount,
+                CountSetBits(buffer->data(), i * 256 + offset, block.length));
+    }
+  };
+  for (int64_t offset_i = 0; offset_i < 8; ++offset_i) {
+    CheckWithOffset(offset_i);
+  }
+}
+
+TEST(TestBinaryBitBlockCounter, NextAndWord) {
+  const int64_t nbytes = 1024;
+  auto left = *AllocateBuffer(nbytes);
+  auto right = *AllocateBuffer(nbytes);
+  random_bytes(nbytes, 0, left->mutable_data());
+  random_bytes(nbytes, 0, right->mutable_data());
+
+  auto CheckWithOffsets = [&](int left_offset, int right_offset) {
+    int64_t overlap_length = nbytes * 8 - std::max(left_offset, right_offset);
+    BinaryBitBlockCounter counter(left->data(), left_offset, right->data(), right_offset,
+                                  overlap_length);
+    int64_t position = 0;
+    while (true) {
+      BitBlockCount block = counter.NextAndWord();
+      if (block.length == 0) {
+        break;
+      }
+      int expected_popcount = 0;
+      for (int j = 0; j < block.length; ++j) {
+        expected_popcount +=
+            static_cast<int>(BitUtil::GetBit(left->data(), position + left_offset + j) &&
+                             BitUtil::GetBit(right->data(), position + right_offset + j));
+      }
+      ASSERT_EQ(block.popcount, expected_popcount);
+      position += block.length;
+    }
+    // We made it through all the data
+    ASSERT_EQ(position, overlap_length);
+
+    BitBlockCount block = counter.NextAndWord();
+    ASSERT_EQ(block.length, 0);
+    ASSERT_EQ(block.popcount, 0);
+  };
+
+  for (int left_i = 0; left_i < 8; ++left_i) {
+    for (int right_i = 0; right_i < 8; ++right_i) {
+      CheckWithOffsets(left_i, right_i);
+    }
+  }
+}
+
+}  // namespace internal
+}  // namespace arrow
diff --git a/cpp/src/arrow/util/bit_util_benchmark.cc b/cpp/src/arrow/util/bit_util_benchmark.cc
index ab8f91e..a409bf7 100644
--- a/cpp/src/arrow/util/bit_util_benchmark.cc
+++ b/cpp/src/arrow/util/bit_util_benchmark.cc
@@ -17,19 +17,20 @@
 
 #include "benchmark/benchmark.h"
 
-#include <algorithm>
 #include <array>
 #include <bitset>
-#include <vector>
+#include <cstdint>
+#include <cstdlib>
+#include <cstring>
+#include <memory>
+#include <utility>
 
 #include "arrow/array/array_base.h"
+#include "arrow/array/array_primitive.h"
 #include "arrow/buffer.h"
-#include "arrow/builder.h"
-#include "arrow/memory_pool.h"
-#include "arrow/testing/gtest_util.h"
+#include "arrow/result.h"
 #include "arrow/testing/random.h"
 #include "arrow/testing/util.h"
-#include "arrow/util/bit_block_counter.h"
 #include "arrow/util/bit_util.h"
 #include "arrow/util/bitmap.h"
 #include "arrow/util/bitmap_generate.h"
@@ -39,7 +40,6 @@
 #include "arrow/util/bitmap_writer.h"
 
 namespace arrow {
-
 namespace BitUtil {
 
 #ifdef ARROW_WITH_BENCHMARKS_REFERENCE
@@ -396,114 +396,6 @@ static void BitmapEquals(benchmark::State& state) {
   state.SetBytesProcessed(state.iterations() * buffer_size);
 }
 
-template <int64_t Offset = 0>
-static void BitBlockCounterSumNotNull(benchmark::State& state) {
-  using internal::BitBlockCounter;
-
-  random::RandomArrayGenerator rng(/*seed=*/0);
-
-  const int64_t bitmap_length = 1 << 20;
-
-  // State parameter is the average number of total values for each false
-  // value. So 100 means that 1 out of 100 on average are false.
-  double true_probability = 1. - 1. / state.range(0);
-  auto arr = rng.Int8(bitmap_length, 0, 100, true_probability);
-
-  const uint8_t* bitmap = arr->null_bitmap_data();
-
-  // Compute the expected result
-  int64_t expected = 0;
-  const auto& int8_arr = static_cast<const Int8Array&>(*arr);
-  for (int64_t i = Offset; i < bitmap_length; ++i) {
-    if (int8_arr.IsValid(i)) {
-      expected += int8_arr.Value(i);
-    }
-  }
-  for (auto _ : state) {
-    BitBlockCounter scanner(bitmap, Offset, bitmap_length - Offset);
-    int64_t result = 0;
-    int64_t position = Offset;
-    while (true) {
-      BitBlockCounter::Block block = scanner.NextBlock();
-      if (block.length == 0) {
-        break;
-      }
-      if (block.length == block.popcount) {
-        // All not-null
-        for (int64_t i = 0; i < block.length; ++i) {
-          result += int8_arr.Value(position + i);
-        }
-      } else if (block.popcount > 0) {
-        // Some but not all not-null
-        for (int64_t i = 0; i < block.length; ++i) {
-          if (BitUtil::GetBit(bitmap, position + i)) {
-            result += int8_arr.Value(position + i);
-          }
-        }
-      }
-      position += block.length;
-    }
-    // Sanity check
-    if (result != expected) {
-      std::abort();
-    }
-  }
-  state.SetItemsProcessed(state.iterations() * bitmap_length);
-}
-
-template <int64_t Offset = 0>
-static void BitmapReaderSumNotNull(benchmark::State& state) {
-  random::RandomArrayGenerator rng(/*seed=*/0);
-
-  const int64_t bitmap_length = 1 << 20;
-
-  // State parameter is the average number of total values for each false
-  // value. So 100 means that 1 out of 100 on average are false.
-  double true_probability = 1. - 1. / state.range(0);
-  auto arr = rng.Int8(bitmap_length, 0, 100, true_probability);
-
-  const uint8_t* bitmap = arr->null_bitmap_data();
-  // Compute the expected result
-  int64_t expected = 0;
-  const auto& int8_arr = static_cast<const Int8Array&>(*arr);
-  for (int64_t i = Offset; i < bitmap_length; ++i) {
-    if (int8_arr.IsValid(i)) {
-      expected += int8_arr.Value(i);
-    }
-  }
-  for (auto _ : state) {
-    internal::BitmapReader bit_reader(bitmap, Offset, bitmap_length - Offset);
-    int64_t result = 0;
-    for (int64_t i = Offset; i < bitmap_length; ++i) {
-      if (bit_reader.IsSet()) {
-        result += int8_arr.Value(i);
-      }
-      bit_reader.Next();
-    }
-    // Sanity check
-    if (result != expected) {
-      std::abort();
-    }
-  }
-  state.SetItemsProcessed(state.iterations() * bitmap_length);
-}
-
-static void BitBlockCounterSumNotNull(benchmark::State& state) {
-  BitBlockCounterSumNotNull<0>(state);
-}
-
-static void BitBlockCounterSumNotNullWithOffset(benchmark::State& state) {
-  BitBlockCounterSumNotNull<4>(state);
-}
-
-static void BitmapReaderSumNotNull(benchmark::State& state) {
-  BitmapReaderSumNotNull<0>(state);
-}
-
-static void BitmapReaderSumNotNullWithOffset(benchmark::State& state) {
-  BitmapReaderSumNotNull<4>(state);
-}
-
 static void BitmapEqualsWithoutOffset(benchmark::State& state) { BitmapEquals<0>(state); }
 
 static void BitmapEqualsWithOffset(benchmark::State& state) { BitmapEquals<4>(state); }
@@ -541,12 +433,6 @@ BENCHMARK(CopyBitmapWithOffsetBoth)->Arg(kBufferSize);
 BENCHMARK(BitmapEqualsWithoutOffset)->Arg(kBufferSize);
 BENCHMARK(BitmapEqualsWithOffset)->Arg(kBufferSize);
 
-// Range value: average number of total values per null
-BENCHMARK(BitBlockCounterSumNotNull)->Range(8, 1 << 16);
-BENCHMARK(BitBlockCounterSumNotNullWithOffset)->Range(8, 1 << 16);
-BENCHMARK(BitmapReaderSumNotNull)->Range(8, 1 << 16);
-BENCHMARK(BitmapReaderSumNotNullWithOffset)->Range(8, 1 << 16);
-
 #define AND_BENCHMARK_RANGES                      \
   {                                               \
     {kBufferSize * 4, kBufferSize * 16}, { 0, 2 } \
diff --git a/cpp/src/arrow/util/bit_util_test.cc b/cpp/src/arrow/util/bit_util_test.cc
index 2eff46f..3772e9f 100644
--- a/cpp/src/arrow/util/bit_util_test.cc
+++ b/cpp/src/arrow/util/bit_util_test.cc
@@ -33,7 +33,6 @@
 #include "arrow/memory_pool.h"
 #include "arrow/testing/gtest_common.h"
 #include "arrow/testing/gtest_util.h"
-#include "arrow/util/bit_block_counter.h"
 #include "arrow/util/bit_stream_utils.h"
 #include "arrow/util/bit_util.h"
 #include "arrow/util/bitmap.h"
@@ -1557,106 +1556,5 @@ TEST(Bitmap, VisitWordsAnd) {
   }
 }
 
-class TestBitBlockCounter : public ::testing::Test {
- public:
-  void Create(int64_t nbytes, int64_t offset, int64_t length) {
-    ASSERT_OK_AND_ASSIGN(buf_, AllocateBuffer(nbytes));
-    // Start with data zeroed out
-    std::memset(buf_->mutable_data(), 0, nbytes);
-    scanner_.reset(new BitBlockCounter(buf_->data(), offset, length));
-  }
-
- protected:
-  std::shared_ptr<Buffer> buf_;
-  std::unique_ptr<BitBlockCounter> scanner_;
-};
-
-static constexpr int64_t kWordSize = 64;
-
-TEST_F(TestBitBlockCounter, Basics) {
-  const int64_t nbytes = 1024;
-
-  Create(nbytes, 0, nbytes * 8);
-
-  int64_t bits_scanned = 0;
-  for (int64_t i = 0; i < nbytes / 32; ++i) {
-    BitBlockCounter::Block block = scanner_->NextBlock();
-    ASSERT_EQ(block.length, 4 * kWordSize);
-    ASSERT_EQ(block.popcount, 0);
-    bits_scanned += block.length;
-  }
-  ASSERT_EQ(bits_scanned, 1024 * 8);
-
-  auto block = scanner_->NextBlock();
-  ASSERT_EQ(block.length, 0);
-  ASSERT_EQ(block.popcount, 0);
-}
-
-TEST_F(TestBitBlockCounter, Offsets) {
-  auto CheckWithOffset = [&](int64_t offset) {
-    const int64_t nwords = 15;
-
-    const int64_t total_bytes = nwords * 8 + 1;
-    // Trim a bit from the end of the bitmap so we can check the remainder bits
-    // behavior
-    Create(total_bytes, offset, nwords * kWordSize - offset - 1);
-
-    // Start with data all set
-    std::memset(buf_->mutable_data(), 0xFF, total_bytes);
-
-    BitBlockCounter::Block block = scanner_->NextBlock();
-    ASSERT_EQ(4 * kWordSize, block.length);
-    ASSERT_EQ(block.popcount, 256);
-
-    // Add some false values to the next 3 shifted words
-    BitUtil::SetBitTo(buf_->mutable_data(), 4 * kWordSize + offset, false);
-    BitUtil::SetBitTo(buf_->mutable_data(), 5 * kWordSize + offset, false);
-    BitUtil::SetBitTo(buf_->mutable_data(), 6 * kWordSize + offset, false);
-    block = scanner_->NextBlock();
-
-    ASSERT_EQ(block.length, 256);
-    ASSERT_EQ(block.popcount, 253);
-
-    BitUtil::SetBitsTo(buf_->mutable_data(), 8 * kWordSize + offset, 2 * kWordSize,
-                       false);
-
-    block = scanner_->NextBlock();
-    ASSERT_EQ(block.length, 256);
-    ASSERT_EQ(block.popcount, 128);
-
-    // Last block
-    block = scanner_->NextBlock();
-    ASSERT_EQ(block.length, 3 * kWordSize - offset - 1);
-    ASSERT_EQ(block.length, block.popcount);
-
-    // We can keep calling NextBlock safely
-    block = scanner_->NextBlock();
-    ASSERT_EQ(block.length, 0);
-    ASSERT_EQ(block.popcount, 0);
-  };
-
-  for (int64_t offset_i = 0; offset_i < 7; ++offset_i) {
-    CheckWithOffset(offset_i);
-  }
-}
-
-TEST_F(TestBitBlockCounter, RandomData) {
-  const int64_t nbytes = 1024;
-  auto buffer = *AllocateBuffer(nbytes);
-  random_bytes(nbytes, 0, buffer->mutable_data());
-
-  auto CheckWithOffset = [&](int64_t offset) {
-    BitBlockCounter scanner(buffer->data(), offset, nbytes * 8 - offset);
-    for (int64_t i = 0; i < nbytes / 32; ++i) {
-      BitBlockCounter::Block block = scanner.NextBlock();
-      ASSERT_EQ(block.popcount,
-                CountSetBits(buffer->data(), i * 256 + offset, block.length));
-    }
-  };
-  for (int64_t offset_i = 0; offset_i < 7; ++offset_i) {
-    CheckWithOffset(offset_i);
-  }
-}
-
 }  // namespace internal
 }  // namespace arrow