You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/02/02 20:37:18 UTC

[arrow-rs] branch master updated: Faster bitmask iteration (#1228)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new f055fb0  Faster bitmask iteration (#1228)
f055fb0 is described below

commit f055fb04e98664499ac97e6966e6cef51ebfe2f4
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Wed Feb 2 20:37:10 2022 +0000

    Faster bitmask iteration (#1228)
    
    * Add UnalignedBitChunks (#1227)
    
    * Clippy
    
    * Fix flaky test
    
    * Improve test legibility
    
    * Fix SlicesIterator offset direction
    
    * Format
    
    * Fix byte-aligned termination
    
    * Test edge-cases
    
    * More tests
    
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
    
    * Review feedback
    
    * Make UnalignedBitChunkIterator crate local
    
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
 arrow/src/buffer/immutable.rs        |   8 +-
 arrow/src/compute/kernels/filter.rs  | 295 +++++++++++++++-------------
 arrow/src/util/bit_chunk_iterator.rs | 361 ++++++++++++++++++++++++++++++++++-
 parquet/src/arrow/bit_util.rs        |  49 +++--
 4 files changed, 547 insertions(+), 166 deletions(-)

diff --git a/arrow/src/buffer/immutable.rs b/arrow/src/buffer/immutable.rs
index a3de0d4..b918c0d 100644
--- a/arrow/src/buffer/immutable.rs
+++ b/arrow/src/buffer/immutable.rs
@@ -21,7 +21,7 @@ use std::ptr::NonNull;
 use std::sync::Arc;
 use std::{convert::AsRef, usize};
 
-use crate::util::bit_chunk_iterator::BitChunks;
+use crate::util::bit_chunk_iterator::{BitChunks, UnalignedBitChunk};
 use crate::{
     bytes::{Bytes, Deallocation},
     datatypes::ArrowNativeType,
@@ -205,11 +205,7 @@ impl Buffer {
     /// Returns the number of 1-bits in this buffer, starting from `offset` with `length` bits
     /// inspected. Note that both `offset` and `length` are measured in bits.
     pub fn count_set_bits_offset(&self, offset: usize, len: usize) -> usize {
-        let chunks = self.bit_chunks(offset, len);
-        let mut count = chunks.iter().map(|c| c.count_ones() as usize).sum();
-        count += chunks.remainder_bits().count_ones() as usize;
-
-        count
+        UnalignedBitChunk::new(self.as_slice(), offset, len).count_ones()
     }
 }
 
diff --git a/arrow/src/compute/kernels/filter.rs b/arrow/src/compute/kernels/filter.rs
index 3ced928..0418263 100644
--- a/arrow/src/compute/kernels/filter.rs
+++ b/arrow/src/compute/kernels/filter.rs
@@ -17,129 +17,61 @@
 
 //! Defines miscellaneous array kernels.
 
+use crate::array::*;
 use crate::buffer::buffer_bin_and;
 use crate::datatypes::DataType;
 use crate::error::Result;
 use crate::record_batch::RecordBatch;
-use crate::{array::*, util::bit_chunk_iterator::BitChunkIterator};
-use std::iter::Enumerate;
+use crate::util::bit_chunk_iterator::{UnalignedBitChunk, UnalignedBitChunkIterator};
 
 /// Function that can filter arbitrary arrays
 pub type Filter<'a> = Box<dyn Fn(&ArrayData) -> ArrayData + 'a>;
 
-/// Internal state of [SlicesIterator]
-#[derive(Debug, PartialEq)]
-enum State {
-    // it is iterating over bits of a mask (`u64`, steps of size of 1 slot)
-    Bits(u64),
-    // it is iterating over chunks (steps of size of 64 slots)
-    Chunks,
-    // it is iterating over the remaining bits (steps of size of 1 slot)
-    Remainder,
-    // nothing more to iterate.
-    Finish,
-}
-
 /// An iterator of `(usize, usize)` each representing an interval `[start,end[` whose
 /// slots of a [BooleanArray] are true. Each interval corresponds to a contiguous region of memory to be
 /// "taken" from an array to be filtered.
 #[derive(Debug)]
 pub struct SlicesIterator<'a> {
-    iter: Enumerate<BitChunkIterator<'a>>,
-    state: State,
-    filter: &'a BooleanArray,
-    remainder_mask: u64,
-    remainder_len: usize,
-    chunk_len: usize,
+    iter: UnalignedBitChunkIterator<'a>,
     len: usize,
-    start: usize,
-    on_region: bool,
-    current_chunk: usize,
-    current_bit: usize,
+    chunk_end_offset: usize,
+    current_chunk: u64,
 }
 
 impl<'a> SlicesIterator<'a> {
     pub fn new(filter: &'a BooleanArray) -> Self {
         let values = &filter.data_ref().buffers()[0];
-        let chunks = values.bit_chunks(filter.offset(), filter.len());
+        let len = filter.len();
+        let chunk = UnalignedBitChunk::new(values.as_slice(), filter.offset(), len);
+        let mut iter = chunk.iter();
+
+        let chunk_end_offset = 64 - chunk.lead_padding();
+        let current_chunk = iter.next().unwrap_or(0);
 
         Self {
-            iter: chunks.iter().enumerate(),
-            state: State::Chunks,
-            filter,
-            remainder_len: chunks.remainder_len(),
-            chunk_len: chunks.chunk_len(),
-            remainder_mask: chunks.remainder_bits(),
-            len: 0,
-            start: 0,
-            on_region: false,
-            current_chunk: 0,
-            current_bit: 0,
+            iter,
+            len,
+            chunk_end_offset,
+            current_chunk,
         }
     }
 
-    /// Counts the number of set bits in the filter array.
-    fn filter_count(&self) -> usize {
-        let values = self.filter.values();
-        values.count_set_bits_offset(self.filter.offset(), self.filter.len())
-    }
-
-    #[inline]
-    fn current_start(&self) -> usize {
-        self.current_chunk * 64 + self.current_bit
-    }
-
-    #[inline]
-    fn iterate_bits(&mut self, mask: u64, max: usize) -> Option<(usize, usize)> {
-        while self.current_bit < max {
-            if (mask & (1 << self.current_bit)) != 0 {
-                if !self.on_region {
-                    self.start = self.current_start();
-                    self.on_region = true;
-                }
-                self.len += 1;
-            } else if self.on_region {
-                let result = (self.start, self.start + self.len);
-                self.len = 0;
-                self.on_region = false;
-                self.current_bit += 1;
-                return Some(result);
+    /// Returns `Some((chunk_offset, bit_offset))` for the next chunk that has at
+    /// least one bit set, or None if there is no such chunk.
+    ///
+    /// Where `chunk_offset` is the bit offset to the current `usize`d chunk
+    /// and `bit_offset` is the offset of the first `1` bit in that chunk
+    fn advance_to_set_bit(&mut self) -> Option<(usize, u32)> {
+        loop {
+            if self.current_chunk != 0 {
+                // Find the index of the first 1
+                let bit_pos = self.current_chunk.trailing_zeros();
+                return Some((self.chunk_end_offset, bit_pos));
             }
-            self.current_bit += 1;
-        }
-        self.current_bit = 0;
-        None
-    }
 
-    /// iterates over chunks.
-    #[inline]
-    fn iterate_chunks(&mut self) -> Option<(usize, usize)> {
-        while let Some((i, mask)) = self.iter.next() {
-            self.current_chunk = i;
-            if mask == 0 {
-                if self.on_region {
-                    let result = (self.start, self.start + self.len);
-                    self.len = 0;
-                    self.on_region = false;
-                    return Some(result);
-                }
-            } else if mask == 18446744073709551615u64 {
-                // = !0u64
-                if !self.on_region {
-                    self.start = self.current_start();
-                    self.on_region = true;
-                }
-                self.len += 64;
-            } else {
-                // there is a chunk that has a non-trivial mask => iterate over bits.
-                self.state = State::Bits(mask);
-                return None;
-            }
+            self.current_chunk = self.iter.next()?;
+            self.chunk_end_offset += 64;
         }
-        // no more chunks => start iterating over the remainder
-        self.current_chunk = self.chunk_len;
-        self.state = State::Remainder;
-        None
     }
 }
 
@@ -147,46 +79,52 @@ impl<'a> Iterator for SlicesIterator<'a> {
     type Item = (usize, usize);
 
     fn next(&mut self) -> Option<Self::Item> {
-        match self.state {
-            State::Chunks => {
-                match self.iterate_chunks() {
-                    None => {
-                        // iterating over chunks does not yield any new slice => continue to the next
-                        self.current_bit = 0;
-                        self.next()
-                    }
-                    other => other,
-                }
+        // Used as termination condition
+        if self.len == 0 {
+            return None;
+        }
+
+        let (start_chunk, start_bit) = self.advance_to_set_bit()?;
+
+        // Set bits up to start
+        self.current_chunk |= (1 << start_bit) - 1;
+
+        loop {
+            if self.current_chunk != u64::MAX {
+                // Find the index of the first 0
+                let end_bit = self.current_chunk.trailing_ones();
+
+                // Zero out up to end_bit
+                self.current_chunk &= !((1 << end_bit) - 1);
+
+                return Some((
+                    start_chunk + start_bit as usize - 64,
+                    self.chunk_end_offset + end_bit as usize - 64,
+                ));
             }
-            State::Bits(mask) => {
-                match self.iterate_bits(mask, 64) {
-                    None => {
-                        // iterating over bits does not yield any new slice => change back
-                        // to chunks and continue to the next
-                        self.state = State::Chunks;
-                        self.next()
-                    }
-                    other => other,
+
+            match self.iter.next() {
+                Some(next) => {
+                    self.current_chunk = next;
+                    self.chunk_end_offset += 64;
                 }
-            }
-            State::Remainder => {
-                match self.iterate_bits(self.remainder_mask, self.remainder_len) {
-                    None => {
-                        self.state = State::Finish;
-                        if self.on_region {
-                            Some((self.start, self.start + self.len))
-                        } else {
-                            None
-                        }
-                    }
-                    other => other,
+                None => {
+                    return Some((
+                        start_chunk + start_bit as usize - 64,
+                        std::mem::replace(&mut self.len, 0),
+                    ));
                 }
             }
-            State::Finish => None,
         }
     }
 }
 
+fn filter_count(filter: &BooleanArray) -> usize {
+    filter
+        .values()
+        .count_set_bits_offset(filter.offset(), filter.len())
+}
+
 /// Returns a prepared function optimized to filter multiple arrays.
 /// Creating this function requires time, but using it is faster than [filter] when the
 /// same filter needs to be applied to multiple arrays (e.g. a multi-column `RecordBatch`).
@@ -194,7 +132,7 @@ impl<'a> Iterator for SlicesIterator<'a> {
 /// Therefore, it is considered undefined behavior to pass `filter` with null values.
 pub fn build_filter(filter: &BooleanArray) -> Result<Filter> {
     let iter = SlicesIterator::new(filter);
-    let filter_count = iter.filter_count();
+    let filter_count = filter_count(filter);
     let chunks = iter.collect::<Vec<_>>();
 
     Ok(Box::new(move |array: &ArrayData| {
@@ -255,8 +193,8 @@ pub fn filter(array: &dyn Array, predicate: &BooleanArray) -> Result<ArrayRef> {
         return filter(array, &predicate);
     }
 
-    let iter = SlicesIterator::new(predicate);
-    let filter_count = iter.filter_count();
+    let filter_count = filter_count(predicate);
+
     match filter_count {
         0 => {
             // return empty
@@ -271,7 +209,10 @@ pub fn filter(array: &dyn Array, predicate: &BooleanArray) -> Result<ArrayRef> {
             // actually filter
             let mut mutable =
                 MutableArrayData::new(vec![array.data_ref()], false, filter_count);
+
+            let iter = SlicesIterator::new(predicate);
             iter.for_each(|(start, end)| mutable.extend(0, start, end));
+
             let data = mutable.freeze();
             Ok(make_array(data))
         }
@@ -316,6 +257,7 @@ mod tests {
         buffer::Buffer,
         datatypes::{DataType, Field},
     };
+    use rand::prelude::*;
 
     macro_rules! def_temporal_test {
         ($test:ident, $array_type: ident, $data: expr) => {
@@ -614,9 +556,9 @@ mod tests {
     fn test_slice_iterator_bits() {
         let filter_values = (0..64).map(|i| i == 1).collect::<Vec<bool>>();
         let filter = BooleanArray::from(filter_values);
+        let filter_count = filter_count(&filter);
 
         let iter = SlicesIterator::new(&filter);
-        let filter_count = iter.filter_count();
         let chunks = iter.collect::<Vec<_>>();
 
         assert_eq!(chunks, vec![(1, 2)]);
@@ -627,9 +569,9 @@ mod tests {
     fn test_slice_iterator_bits1() {
         let filter_values = (0..64).map(|i| i != 1).collect::<Vec<bool>>();
         let filter = BooleanArray::from(filter_values);
+        let filter_count = filter_count(&filter);
 
         let iter = SlicesIterator::new(&filter);
-        let filter_count = iter.filter_count();
         let chunks = iter.collect::<Vec<_>>();
 
         assert_eq!(chunks, vec![(0, 1), (2, 64)]);
@@ -640,9 +582,9 @@ mod tests {
     fn test_slice_iterator_chunk_and_bits() {
         let filter_values = (0..130).map(|i| i % 62 != 0).collect::<Vec<bool>>();
         let filter = BooleanArray::from(filter_values);
+        let filter_count = filter_count(&filter);
 
         let iter = SlicesIterator::new(&filter);
-        let filter_count = iter.filter_count();
         let chunks = iter.collect::<Vec<_>>();
 
         assert_eq!(chunks, vec![(1, 62), (63, 124), (125, 130)]);
@@ -693,4 +635,89 @@ mod tests {
         assert_eq!(out.data_type(), &DataType::Int64);
         Ok(())
     }
+
+    #[test]
+    fn test_slices() {
+        // takes up 2 u64s
+        let bools = std::iter::repeat(true)
+            .take(10)
+            .chain(std::iter::repeat(false).take(30))
+            .chain(std::iter::repeat(true).take(20))
+            .chain(std::iter::repeat(false).take(17))
+            .chain(std::iter::repeat(true).take(4));
+
+        let bool_array: BooleanArray = bools.map(Some).collect();
+
+        let slices: Vec<_> = SlicesIterator::new(&bool_array).collect();
+        let expected = vec![(0, 10), (40, 60), (77, 81)];
+        assert_eq!(slices, expected);
+
+        // slice with offset and truncated len
+        let len = bool_array.len();
+        let sliced_array = bool_array.slice(7, len - 10);
+        let sliced_array = sliced_array
+            .as_any()
+            .downcast_ref::<BooleanArray>()
+            .unwrap();
+        let slices: Vec<_> = SlicesIterator::new(sliced_array).collect();
+        let expected = vec![(0, 3), (33, 53), (70, 71)];
+        assert_eq!(slices, expected);
+    }
+
+    fn test_slices_fuzz(mask_len: usize, offset: usize, truncate: usize) {
+        let mut rng = thread_rng();
+
+        let bools: Vec<bool> = std::iter::from_fn(|| Some(rng.gen()))
+            .take(mask_len)
+            .collect();
+
+        let buffer = Buffer::from_iter(bools.iter().cloned());
+
+        let truncated_length = mask_len - offset - truncate;
+
+        let data = ArrayDataBuilder::new(DataType::Boolean)
+            .len(truncated_length)
+            .offset(offset)
+            .add_buffer(buffer)
+            .build()
+            .unwrap();
+
+        let bool_array = BooleanArray::from(data);
+
+        let bits: Vec<_> = SlicesIterator::new(&bool_array)
+            .flat_map(|(start, end)| start..end)
+            .collect();
+
+        let expected_bits: Vec<_> = bools
+            .iter()
+            .skip(offset)
+            .take(truncated_length)
+            .enumerate()
+            .flat_map(|(idx, v)| v.then(|| idx))
+            .collect();
+
+        assert_eq!(bits, expected_bits);
+    }
+
+    #[test]
+    fn fuzz_test_slices_iterator() {
+        let mut rng = thread_rng();
+
+        for _ in 0..100 {
+            let mask_len = rng.gen_range(0..1024);
+            let max_offset = 64.min(mask_len);
+            let offset = rng.gen::<usize>().checked_rem(max_offset).unwrap_or(0);
+
+            let max_truncate = 128.min(mask_len - offset);
+            let truncate = rng.gen::<usize>().checked_rem(max_truncate).unwrap_or(0);
+
+            test_slices_fuzz(mask_len, offset, truncate);
+        }
+
+        test_slices_fuzz(64, 0, 0);
+        test_slices_fuzz(64, 8, 0);
+        test_slices_fuzz(64, 8, 8);
+        test_slices_fuzz(32, 8, 8);
+        test_slices_fuzz(32, 5, 9);
+    }
 }
diff --git a/arrow/src/util/bit_chunk_iterator.rs b/arrow/src/util/bit_chunk_iterator.rs
index ea9280c..3de2e9b 100644
--- a/arrow/src/util/bit_chunk_iterator.rs
+++ b/arrow/src/util/bit_chunk_iterator.rs
@@ -1,3 +1,5 @@
+use std::fmt::Debug;
+
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -15,8 +17,192 @@
 // specific language governing permissions and limitations
 // under the License.
 use crate::util::bit_util::ceil;
-use std::fmt::Debug;
 
+/// Iterates over an arbitrarily aligned byte buffer
+///
+/// Yields an iterator of aligned u64, along with the leading and trailing
+/// u64 necessary to align the buffer to a 8-byte boundary
+///
+/// This is unlike [`BitChunkIterator`] which only exposes a trailing u64,
+/// and consequently has to perform more work for each read
+#[derive(Debug)]
+pub struct UnalignedBitChunk<'a> {
+    lead_padding: usize,
+    trailing_padding: usize,
+
+    prefix: Option<u64>,
+    chunks: &'a [u64],
+    suffix: Option<u64>,
+}
+
+impl<'a> UnalignedBitChunk<'a> {
+    /// Create a from a byte array, and and an offset and length in bits
+    pub fn new(buffer: &'a [u8], offset: usize, len: usize) -> Self {
+        if len == 0 {
+            return Self {
+                lead_padding: 0,
+                trailing_padding: 0,
+                prefix: None,
+                chunks: &[],
+                suffix: None,
+            };
+        }
+
+        let byte_offset = offset / 8;
+        let offset_padding = offset % 8;
+
+        let bytes_len = (len + offset_padding + 7) / 8;
+        let buffer = &buffer[byte_offset..byte_offset + bytes_len];
+
+        let prefix_mask = compute_prefix_mask(offset_padding);
+
+        // If less than 8 bytes, read into prefix
+        if buffer.len() <= 8 {
+            let (suffix_mask, trailing_padding) =
+                compute_suffix_mask(len, offset_padding);
+            let prefix = read_u64(buffer) & suffix_mask & prefix_mask;
+
+            return Self {
+                lead_padding: offset_padding,
+                trailing_padding,
+                prefix: Some(prefix),
+                chunks: &[],
+                suffix: None,
+            };
+        }
+
+        // If less than 16 bytes, read into prefix and suffix
+        if buffer.len() <= 16 {
+            let (suffix_mask, trailing_padding) =
+                compute_suffix_mask(len, offset_padding);
+            let prefix = read_u64(&buffer[..8]) & prefix_mask;
+            let suffix = read_u64(&buffer[8..]) & suffix_mask;
+
+            return Self {
+                lead_padding: offset_padding,
+                trailing_padding,
+                prefix: Some(prefix),
+                chunks: &[],
+                suffix: Some(suffix),
+            };
+        }
+
+        // Read into prefix and suffix as needed
+        let (prefix, mut chunks, suffix) = unsafe { buffer.align_to::<u64>() };
+        assert!(
+            prefix.len() < 8 && suffix.len() < 8,
+            "align_to did not return largest possible aligned slice"
+        );
+
+        let (alignment_padding, prefix) = match (offset_padding, prefix.is_empty()) {
+            (0, true) => (0, None),
+            (_, true) => {
+                let prefix = chunks[0] & prefix_mask;
+                chunks = &chunks[1..];
+                (0, Some(prefix))
+            }
+            (_, false) => {
+                let alignment_padding = (8 - prefix.len()) * 8;
+
+                let prefix = (read_u64(prefix) & prefix_mask) << alignment_padding;
+                (alignment_padding, Some(prefix))
+            }
+        };
+
+        let lead_padding = offset_padding + alignment_padding;
+        let (suffix_mask, trailing_padding) = compute_suffix_mask(len, lead_padding);
+
+        let suffix = match (trailing_padding, suffix.is_empty()) {
+            (0, _) => None,
+            (_, true) => {
+                let suffix = chunks[chunks.len() - 1] & suffix_mask;
+                chunks = &chunks[..chunks.len() - 1];
+                Some(suffix)
+            }
+            (_, false) => Some(read_u64(suffix) & suffix_mask),
+        };
+
+        Self {
+            lead_padding,
+            trailing_padding,
+            prefix,
+            chunks,
+            suffix,
+        }
+    }
+
+    pub fn lead_padding(&self) -> usize {
+        self.lead_padding
+    }
+
+    pub fn trailing_padding(&self) -> usize {
+        self.trailing_padding
+    }
+
+    pub fn prefix(&self) -> Option<u64> {
+        self.prefix
+    }
+
+    pub fn suffix(&self) -> Option<u64> {
+        self.suffix
+    }
+
+    pub fn chunks(&self) -> &'a [u64] {
+        self.chunks
+    }
+
+    pub(crate) fn iter(&self) -> UnalignedBitChunkIterator<'a> {
+        self.prefix
+            .into_iter()
+            .chain(self.chunks.iter().cloned())
+            .chain(self.suffix.into_iter())
+    }
+
+    /// Counts the number of ones
+    pub fn count_ones(&self) -> usize {
+        self.iter().map(|x| x.count_ones() as usize).sum()
+    }
+}
+
+pub(crate) type UnalignedBitChunkIterator<'a> = std::iter::Chain<
+    std::iter::Chain<
+        std::option::IntoIter<u64>,
+        std::iter::Cloned<std::slice::Iter<'a, u64>>,
+    >,
+    std::option::IntoIter<u64>,
+>;
+
+#[inline]
+fn read_u64(input: &[u8]) -> u64 {
+    let len = input.len().min(8);
+    let mut buf = [0_u8; 8];
+    (&mut buf[..len]).copy_from_slice(input);
+    u64::from_le_bytes(buf)
+}
+
+#[inline]
+fn compute_prefix_mask(lead_padding: usize) -> u64 {
+    !((1 << lead_padding) - 1)
+}
+
+#[inline]
+fn compute_suffix_mask(len: usize, lead_padding: usize) -> (u64, usize) {
+    let trailing_bits = (len + lead_padding) % 64;
+
+    if trailing_bits == 0 {
+        return (u64::MAX, 0);
+    }
+
+    let trailing_padding = 64 - trailing_bits;
+    let suffix_mask = (1 << trailing_bits) - 1;
+    (suffix_mask, trailing_padding)
+}
+
+/// Iterates over an arbitrarily aligned byte buffer
+///
+/// Yields an iterator of u64, and a remainder. The first byte in the buffer
+/// will be the least significant byte in output u64
+///
 #[derive(Debug)]
 pub struct BitChunks<'a> {
     buffer: &'a [u8],
@@ -174,7 +360,11 @@ impl ExactSizeIterator for BitChunkIterator<'_> {
 
 #[cfg(test)]
 mod tests {
+    use rand::prelude::*;
+
+    use crate::alloc::ALIGNMENT;
     use crate::buffer::Buffer;
+    use crate::util::bit_chunk_iterator::UnalignedBitChunk;
 
     #[test]
     fn test_iter_aligned() {
@@ -272,4 +462,173 @@ mod tests {
         assert_eq!(u64::MAX, bitchunks.iter().last().unwrap());
         assert_eq!(0x7F, bitchunks.remainder_bits());
     }
+
+    #[test]
+    #[allow(clippy::assertions_on_constants)]
+    fn test_unaligned_bit_chunk_iterator() {
+        // This test exploits the fact Buffer is at least 64-byte aligned
+        assert!(ALIGNMENT > 64);
+
+        let buffer = Buffer::from(&[0xFF; 5]);
+        let unaligned = UnalignedBitChunk::new(buffer.as_slice(), 0, 40);
+
+        assert!(unaligned.chunks().is_empty()); // Less than 128 elements
+        assert_eq!(unaligned.lead_padding(), 0);
+        assert_eq!(unaligned.trailing_padding(), 24);
+        // 24x 1 bit then 40x 0 bits
+        assert_eq!(
+            unaligned.prefix(),
+            Some(0b0000000000000000000000001111111111111111111111111111111111111111)
+        );
+        assert_eq!(unaligned.suffix(), None);
+
+        let buffer = buffer.slice(1);
+        let unaligned = UnalignedBitChunk::new(buffer.as_slice(), 0, 32);
+
+        assert!(unaligned.chunks().is_empty()); // Less than 128 elements
+        assert_eq!(unaligned.lead_padding(), 0);
+        assert_eq!(unaligned.trailing_padding(), 32);
+        // 32x 1 bit then 32x 0 bits
+        assert_eq!(
+            unaligned.prefix(),
+            Some(0b0000000000000000000000000000000011111111111111111111111111111111)
+        );
+        assert_eq!(unaligned.suffix(), None);
+
+        let unaligned = UnalignedBitChunk::new(buffer.as_slice(), 5, 27);
+
+        assert!(unaligned.chunks().is_empty()); // Less than 128 elements
+        assert_eq!(unaligned.lead_padding(), 5); // 5 % 8 == 5
+        assert_eq!(unaligned.trailing_padding(), 32);
+        // 5x 0 bit, 27x 1 bit then 32x 0 bits
+        assert_eq!(
+            unaligned.prefix(),
+            Some(0b0000000000000000000000000000000011111111111111111111111111100000)
+        );
+        assert_eq!(unaligned.suffix(), None);
+
+        let unaligned = UnalignedBitChunk::new(buffer.as_slice(), 12, 20);
+
+        assert!(unaligned.chunks().is_empty()); // Less than 128 elements
+        assert_eq!(unaligned.lead_padding(), 4); // 12 % 8 == 4
+        assert_eq!(unaligned.trailing_padding(), 40);
+        // 4x 0 bit, 20x 1 bit then 40x 0 bits
+        assert_eq!(
+            unaligned.prefix(),
+            Some(0b0000000000000000000000000000000000000000111111111111111111110000)
+        );
+        assert_eq!(unaligned.suffix(), None);
+
+        let buffer = Buffer::from(&[0xFF; 14]);
+        let unaligned = UnalignedBitChunk::new(buffer.as_slice(), 0, 112);
+
+        assert!(unaligned.chunks().is_empty()); // Less than 128 elements
+        assert_eq!(unaligned.lead_padding(), 0); // No offset and buffer aligned on 64-bit boundary
+        assert_eq!(unaligned.trailing_padding(), 16);
+        assert_eq!(unaligned.prefix(), Some(u64::MAX));
+        assert_eq!(unaligned.suffix(), Some((1 << 48) - 1));
+
+        let buffer = Buffer::from(&[0xFF; 16]);
+        let unaligned = UnalignedBitChunk::new(buffer.as_slice(), 0, 128);
+
+        assert_eq!(unaligned.prefix(), Some(u64::MAX));
+        assert_eq!(unaligned.suffix(), Some(u64::MAX));
+        assert!(unaligned.chunks().is_empty()); // Exactly 128 elements
+
+        let buffer = Buffer::from(&[0xFF; 64]);
+        let unaligned = UnalignedBitChunk::new(buffer.as_slice(), 0, 512);
+
+        // Buffer is completely aligned and larger than 128 elements -> all in chunks array
+        assert_eq!(unaligned.suffix(), None);
+        assert_eq!(unaligned.prefix(), None);
+        assert_eq!(unaligned.chunks(), [u64::MAX; 8].as_slice());
+        assert_eq!(unaligned.lead_padding(), 0);
+        assert_eq!(unaligned.trailing_padding(), 0);
+
+        let buffer = buffer.slice(1); // Offset buffer 1 byte off 64-bit alignment
+        let unaligned = UnalignedBitChunk::new(buffer.as_slice(), 0, 504);
+
+        // Need a prefix with 1 byte of lead padding to bring the buffer into alignment
+        assert_eq!(unaligned.prefix(), Some(u64::MAX - 0xFF));
+        assert_eq!(unaligned.suffix(), None);
+        assert_eq!(unaligned.chunks(), [u64::MAX; 7].as_slice());
+        assert_eq!(unaligned.lead_padding(), 8);
+        assert_eq!(unaligned.trailing_padding(), 0);
+
+        let unaligned = UnalignedBitChunk::new(buffer.as_slice(), 17, 300);
+
+        // Out of 64-bit alignment by 8 bits from buffer, and 17 bits from provided offset
+        //   => need 8 + 17 = 25 bits of lead padding + 39 bits in prefix
+        //
+        // This leaves 300 - 17 = 261 bits remaining
+        //   => 4x 64-bit aligned 64-bit chunks + 5 remaining bits
+        //   => trailing padding of 59 bits
+        assert_eq!(unaligned.lead_padding(), 25);
+        assert_eq!(unaligned.trailing_padding(), 59);
+        assert_eq!(unaligned.prefix(), Some(u64::MAX - (1 << 25) + 1));
+        assert_eq!(unaligned.suffix(), Some(0b11111));
+        assert_eq!(unaligned.chunks(), [u64::MAX; 4].as_slice());
+
+        let unaligned = UnalignedBitChunk::new(buffer.as_slice(), 17, 0);
+
+        assert_eq!(unaligned.prefix(), None);
+        assert_eq!(unaligned.suffix(), None);
+        assert!(unaligned.chunks().is_empty());
+        assert_eq!(unaligned.lead_padding(), 0);
+        assert_eq!(unaligned.trailing_padding(), 0);
+
+        let unaligned = UnalignedBitChunk::new(buffer.as_slice(), 17, 1);
+
+        assert_eq!(unaligned.prefix(), Some(2));
+        assert_eq!(unaligned.suffix(), None);
+        assert!(unaligned.chunks().is_empty());
+        assert_eq!(unaligned.lead_padding(), 1);
+        assert_eq!(unaligned.trailing_padding(), 62);
+    }
+
+    #[test]
+    fn fuzz_unaligned_bit_chunk_iterator() {
+        let mut rng = thread_rng();
+
+        for _ in 0..100 {
+            let mask_len = rng.gen_range(0..1024);
+            let bools: Vec<_> = std::iter::from_fn(|| Some(rng.gen()))
+                .take(mask_len)
+                .collect();
+
+            let buffer = Buffer::from_iter(bools.iter().cloned());
+
+            let max_offset = 64.min(mask_len);
+            let offset = rng.gen::<usize>().checked_rem(max_offset).unwrap_or(0);
+
+            let max_truncate = 128.min(mask_len - offset);
+            let truncate = rng.gen::<usize>().checked_rem(max_truncate).unwrap_or(0);
+
+            let unaligned = UnalignedBitChunk::new(
+                buffer.as_slice(),
+                offset,
+                mask_len - offset - truncate,
+            );
+
+            let bool_slice = &bools[offset..mask_len - truncate];
+
+            let count = unaligned.count_ones();
+            let expected_count = bool_slice.iter().filter(|x| **x).count();
+
+            assert_eq!(count, expected_count);
+
+            let collected: Vec<u64> = unaligned.iter().collect();
+
+            let get_bit = |idx: usize| -> bool {
+                let padded_index = idx + unaligned.lead_padding();
+                let byte_idx = padded_index / 64;
+                let bit_idx = padded_index % 64;
+                (collected[byte_idx] & (1 << bit_idx)) != 0
+            };
+
+            for (idx, b) in bool_slice.iter().enumerate() {
+                assert_eq!(*b, get_bit(idx))
+            }
+        }
+    }
 }
diff --git a/parquet/src/arrow/bit_util.rs b/parquet/src/arrow/bit_util.rs
index 881c67d..192ab4b 100644
--- a/parquet/src/arrow/bit_util.rs
+++ b/parquet/src/arrow/bit_util.rs
@@ -15,40 +15,39 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use arrow::util::bit_chunk_iterator::BitChunks;
+use arrow::util::bit_chunk_iterator::UnalignedBitChunk;
 use std::ops::Range;
 
 /// Counts the number of set bits in the provided range
 pub fn count_set_bits(bytes: &[u8], range: Range<usize>) -> usize {
-    let mut count = 0_usize;
-    let chunks = BitChunks::new(bytes, range.start, range.end - range.start);
-    chunks.iter().for_each(|chunk| {
-        count += chunk.count_ones() as usize;
-    });
-    count += chunks.remainder_bits().count_ones() as usize;
-    count
+    let unaligned = UnalignedBitChunk::new(bytes, range.start, range.end - range.start);
+    unaligned.count_ones()
 }
 
 /// Iterates through the set bit positions in `bytes` in reverse order
 pub fn iter_set_bits_rev(bytes: &[u8]) -> impl Iterator<Item = usize> + '_ {
-    let (mut byte_idx, mut in_progress) = match bytes.len() {
-        0 => (0, 0),
-        len => (len - 1, bytes[len - 1]),
-    };
+    let bit_length = bytes.len() * 8;
+    let unaligned = UnalignedBitChunk::new(bytes, 0, bit_length);
+    let mut chunk_end_idx =
+        bit_length + unaligned.lead_padding() + unaligned.trailing_padding();
 
-    std::iter::from_fn(move || loop {
-        if in_progress != 0 {
-            let bit_pos = 7 - in_progress.leading_zeros();
-            in_progress ^= 1 << bit_pos;
-            return Some((byte_idx << 3) + (bit_pos as usize));
-        }
-
-        if byte_idx == 0 {
-            return None;
-        }
+    let iter = unaligned
+        .prefix()
+        .into_iter()
+        .chain(unaligned.chunks().iter().cloned())
+        .chain(unaligned.suffix().into_iter());
 
-        byte_idx -= 1;
-        in_progress = bytes[byte_idx];
+    iter.rev().flat_map(move |mut chunk| {
+        let chunk_idx = chunk_end_idx - 64;
+        chunk_end_idx = chunk_idx;
+        std::iter::from_fn(move || {
+            if chunk != 0 {
+                let bit_pos = 63 - chunk.leading_zeros();
+                chunk ^= 1 << bit_pos;
+                return Some(chunk_idx + (bit_pos as usize));
+            }
+            None
+        })
     })
 }
 
@@ -61,7 +60,7 @@ mod tests {
     #[test]
     fn test_bit_fns() {
         let mut rng = thread_rng();
-        let mask_length = rng.gen_range(1..20);
+        let mask_length = rng.gen_range(1..1024);
         let bools: Vec<_> = std::iter::from_fn(|| Some(rng.next_u32() & 1 == 0))
             .take(mask_length)
             .collect();