You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/02/02 20:37:18 UTC
[arrow-rs] branch master updated: Faster bitmask iteration (#1228)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new f055fb0 Faster bitmask iteration (#1228)
f055fb0 is described below
commit f055fb04e98664499ac97e6966e6cef51ebfe2f4
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Wed Feb 2 20:37:10 2022 +0000
Faster bitmask iteration (#1228)
* Add UnalignedBitChunks (#1227)
* Clippy
* Fix flaky test
* Improve test legibility
* Fix SlicesIterator offset direction
* Format
* Fix byte-aligned termination
* Test edge-cases
* More tests
Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
* Review feedback
* Make UnalignedBitChunkIterator crate local
Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
arrow/src/buffer/immutable.rs | 8 +-
arrow/src/compute/kernels/filter.rs | 295 +++++++++++++++-------------
arrow/src/util/bit_chunk_iterator.rs | 361 ++++++++++++++++++++++++++++++++++-
parquet/src/arrow/bit_util.rs | 49 +++--
4 files changed, 547 insertions(+), 166 deletions(-)
diff --git a/arrow/src/buffer/immutable.rs b/arrow/src/buffer/immutable.rs
index a3de0d4..b918c0d 100644
--- a/arrow/src/buffer/immutable.rs
+++ b/arrow/src/buffer/immutable.rs
@@ -21,7 +21,7 @@ use std::ptr::NonNull;
use std::sync::Arc;
use std::{convert::AsRef, usize};
-use crate::util::bit_chunk_iterator::BitChunks;
+use crate::util::bit_chunk_iterator::{BitChunks, UnalignedBitChunk};
use crate::{
bytes::{Bytes, Deallocation},
datatypes::ArrowNativeType,
@@ -205,11 +205,7 @@ impl Buffer {
/// Returns the number of 1-bits in this buffer, starting from `offset` with `length` bits
/// inspected. Note that both `offset` and `length` are measured in bits.
pub fn count_set_bits_offset(&self, offset: usize, len: usize) -> usize {
- let chunks = self.bit_chunks(offset, len);
- let mut count = chunks.iter().map(|c| c.count_ones() as usize).sum();
- count += chunks.remainder_bits().count_ones() as usize;
-
- count
+ UnalignedBitChunk::new(self.as_slice(), offset, len).count_ones()
}
}
diff --git a/arrow/src/compute/kernels/filter.rs b/arrow/src/compute/kernels/filter.rs
index 3ced928..0418263 100644
--- a/arrow/src/compute/kernels/filter.rs
+++ b/arrow/src/compute/kernels/filter.rs
@@ -17,129 +17,61 @@
//! Defines miscellaneous array kernels.
+use crate::array::*;
use crate::buffer::buffer_bin_and;
use crate::datatypes::DataType;
use crate::error::Result;
use crate::record_batch::RecordBatch;
-use crate::{array::*, util::bit_chunk_iterator::BitChunkIterator};
-use std::iter::Enumerate;
+use crate::util::bit_chunk_iterator::{UnalignedBitChunk, UnalignedBitChunkIterator};
/// Function that can filter arbitrary arrays
pub type Filter<'a> = Box<dyn Fn(&ArrayData) -> ArrayData + 'a>;
-/// Internal state of [SlicesIterator]
-#[derive(Debug, PartialEq)]
-enum State {
- // it is iterating over bits of a mask (`u64`, steps of size of 1 slot)
- Bits(u64),
- // it is iterating over chunks (steps of size of 64 slots)
- Chunks,
- // it is iterating over the remaining bits (steps of size of 1 slot)
- Remainder,
- // nothing more to iterate.
- Finish,
-}
-
/// An iterator of `(usize, usize)` each representing an interval `[start,end[` whose
/// slots of a [BooleanArray] are true. Each interval corresponds to a contiguous region of memory to be
/// "taken" from an array to be filtered.
#[derive(Debug)]
pub struct SlicesIterator<'a> {
- iter: Enumerate<BitChunkIterator<'a>>,
- state: State,
- filter: &'a BooleanArray,
- remainder_mask: u64,
- remainder_len: usize,
- chunk_len: usize,
+ iter: UnalignedBitChunkIterator<'a>,
len: usize,
- start: usize,
- on_region: bool,
- current_chunk: usize,
- current_bit: usize,
+ chunk_end_offset: usize,
+ current_chunk: u64,
}
impl<'a> SlicesIterator<'a> {
pub fn new(filter: &'a BooleanArray) -> Self {
let values = &filter.data_ref().buffers()[0];
- let chunks = values.bit_chunks(filter.offset(), filter.len());
+ let len = filter.len();
+ let chunk = UnalignedBitChunk::new(values.as_slice(), filter.offset(), len);
+ let mut iter = chunk.iter();
+
+ let chunk_end_offset = 64 - chunk.lead_padding();
+ let current_chunk = iter.next().unwrap_or(0);
Self {
- iter: chunks.iter().enumerate(),
- state: State::Chunks,
- filter,
- remainder_len: chunks.remainder_len(),
- chunk_len: chunks.chunk_len(),
- remainder_mask: chunks.remainder_bits(),
- len: 0,
- start: 0,
- on_region: false,
- current_chunk: 0,
- current_bit: 0,
+ iter,
+ len,
+ chunk_end_offset,
+ current_chunk,
}
}
- /// Counts the number of set bits in the filter array.
- fn filter_count(&self) -> usize {
- let values = self.filter.values();
- values.count_set_bits_offset(self.filter.offset(), self.filter.len())
- }
-
- #[inline]
- fn current_start(&self) -> usize {
- self.current_chunk * 64 + self.current_bit
- }
-
- #[inline]
- fn iterate_bits(&mut self, mask: u64, max: usize) -> Option<(usize, usize)> {
- while self.current_bit < max {
- if (mask & (1 << self.current_bit)) != 0 {
- if !self.on_region {
- self.start = self.current_start();
- self.on_region = true;
- }
- self.len += 1;
- } else if self.on_region {
- let result = (self.start, self.start + self.len);
- self.len = 0;
- self.on_region = false;
- self.current_bit += 1;
- return Some(result);
+ /// Returns `Some((chunk_offset, bit_offset))` for the next chunk that has at
+ /// least one bit set, or None if there is no such chunk.
+ ///
+ /// Where `chunk_offset` is the bit offset to the current `usize`d chunk
+ /// and `bit_offset` is the offset of the first `1` bit in that chunk
+ fn advance_to_set_bit(&mut self) -> Option<(usize, u32)> {
+ loop {
+ if self.current_chunk != 0 {
+ // Find the index of the first 1
+ let bit_pos = self.current_chunk.trailing_zeros();
+ return Some((self.chunk_end_offset, bit_pos));
}
- self.current_bit += 1;
- }
- self.current_bit = 0;
- None
- }
- /// iterates over chunks.
- #[inline]
- fn iterate_chunks(&mut self) -> Option<(usize, usize)> {
- while let Some((i, mask)) = self.iter.next() {
- self.current_chunk = i;
- if mask == 0 {
- if self.on_region {
- let result = (self.start, self.start + self.len);
- self.len = 0;
- self.on_region = false;
- return Some(result);
- }
- } else if mask == 18446744073709551615u64 {
- // = !0u64
- if !self.on_region {
- self.start = self.current_start();
- self.on_region = true;
- }
- self.len += 64;
- } else {
- // there is a chunk that has a non-trivial mask => iterate over bits.
- self.state = State::Bits(mask);
- return None;
- }
+ self.current_chunk = self.iter.next()?;
+ self.chunk_end_offset += 64;
}
- // no more chunks => start iterating over the remainder
- self.current_chunk = self.chunk_len;
- self.state = State::Remainder;
- None
}
}
@@ -147,46 +79,52 @@ impl<'a> Iterator for SlicesIterator<'a> {
type Item = (usize, usize);
fn next(&mut self) -> Option<Self::Item> {
- match self.state {
- State::Chunks => {
- match self.iterate_chunks() {
- None => {
- // iterating over chunks does not yield any new slice => continue to the next
- self.current_bit = 0;
- self.next()
- }
- other => other,
- }
+ // Used as termination condition
+ if self.len == 0 {
+ return None;
+ }
+
+ let (start_chunk, start_bit) = self.advance_to_set_bit()?;
+
+ // Set bits up to start
+ self.current_chunk |= (1 << start_bit) - 1;
+
+ loop {
+ if self.current_chunk != u64::MAX {
+ // Find the index of the first 0
+ let end_bit = self.current_chunk.trailing_ones();
+
+ // Zero out up to end_bit
+ self.current_chunk &= !((1 << end_bit) - 1);
+
+ return Some((
+ start_chunk + start_bit as usize - 64,
+ self.chunk_end_offset + end_bit as usize - 64,
+ ));
}
- State::Bits(mask) => {
- match self.iterate_bits(mask, 64) {
- None => {
- // iterating over bits does not yield any new slice => change back
- // to chunks and continue to the next
- self.state = State::Chunks;
- self.next()
- }
- other => other,
+
+ match self.iter.next() {
+ Some(next) => {
+ self.current_chunk = next;
+ self.chunk_end_offset += 64;
}
- }
- State::Remainder => {
- match self.iterate_bits(self.remainder_mask, self.remainder_len) {
- None => {
- self.state = State::Finish;
- if self.on_region {
- Some((self.start, self.start + self.len))
- } else {
- None
- }
- }
- other => other,
+ None => {
+ return Some((
+ start_chunk + start_bit as usize - 64,
+ std::mem::replace(&mut self.len, 0),
+ ));
}
}
- State::Finish => None,
}
}
}
+fn filter_count(filter: &BooleanArray) -> usize {
+ filter
+ .values()
+ .count_set_bits_offset(filter.offset(), filter.len())
+}
+
/// Returns a prepared function optimized to filter multiple arrays.
/// Creating this function requires time, but using it is faster than [filter] when the
/// same filter needs to be applied to multiple arrays (e.g. a multi-column `RecordBatch`).
@@ -194,7 +132,7 @@ impl<'a> Iterator for SlicesIterator<'a> {
/// Therefore, it is considered undefined behavior to pass `filter` with null values.
pub fn build_filter(filter: &BooleanArray) -> Result<Filter> {
let iter = SlicesIterator::new(filter);
- let filter_count = iter.filter_count();
+ let filter_count = filter_count(filter);
let chunks = iter.collect::<Vec<_>>();
Ok(Box::new(move |array: &ArrayData| {
@@ -255,8 +193,8 @@ pub fn filter(array: &dyn Array, predicate: &BooleanArray) -> Result<ArrayRef> {
return filter(array, &predicate);
}
- let iter = SlicesIterator::new(predicate);
- let filter_count = iter.filter_count();
+ let filter_count = filter_count(predicate);
+
match filter_count {
0 => {
// return empty
@@ -271,7 +209,10 @@ pub fn filter(array: &dyn Array, predicate: &BooleanArray) -> Result<ArrayRef> {
// actually filter
let mut mutable =
MutableArrayData::new(vec![array.data_ref()], false, filter_count);
+
+ let iter = SlicesIterator::new(predicate);
iter.for_each(|(start, end)| mutable.extend(0, start, end));
+
let data = mutable.freeze();
Ok(make_array(data))
}
@@ -316,6 +257,7 @@ mod tests {
buffer::Buffer,
datatypes::{DataType, Field},
};
+ use rand::prelude::*;
macro_rules! def_temporal_test {
($test:ident, $array_type: ident, $data: expr) => {
@@ -614,9 +556,9 @@ mod tests {
fn test_slice_iterator_bits() {
let filter_values = (0..64).map(|i| i == 1).collect::<Vec<bool>>();
let filter = BooleanArray::from(filter_values);
+ let filter_count = filter_count(&filter);
let iter = SlicesIterator::new(&filter);
- let filter_count = iter.filter_count();
let chunks = iter.collect::<Vec<_>>();
assert_eq!(chunks, vec![(1, 2)]);
@@ -627,9 +569,9 @@ mod tests {
fn test_slice_iterator_bits1() {
let filter_values = (0..64).map(|i| i != 1).collect::<Vec<bool>>();
let filter = BooleanArray::from(filter_values);
+ let filter_count = filter_count(&filter);
let iter = SlicesIterator::new(&filter);
- let filter_count = iter.filter_count();
let chunks = iter.collect::<Vec<_>>();
assert_eq!(chunks, vec![(0, 1), (2, 64)]);
@@ -640,9 +582,9 @@ mod tests {
fn test_slice_iterator_chunk_and_bits() {
let filter_values = (0..130).map(|i| i % 62 != 0).collect::<Vec<bool>>();
let filter = BooleanArray::from(filter_values);
+ let filter_count = filter_count(&filter);
let iter = SlicesIterator::new(&filter);
- let filter_count = iter.filter_count();
let chunks = iter.collect::<Vec<_>>();
assert_eq!(chunks, vec![(1, 62), (63, 124), (125, 130)]);
@@ -693,4 +635,89 @@ mod tests {
assert_eq!(out.data_type(), &DataType::Int64);
Ok(())
}
+
+ #[test]
+ fn test_slices() {
+ // takes up 2 u64s
+ let bools = std::iter::repeat(true)
+ .take(10)
+ .chain(std::iter::repeat(false).take(30))
+ .chain(std::iter::repeat(true).take(20))
+ .chain(std::iter::repeat(false).take(17))
+ .chain(std::iter::repeat(true).take(4));
+
+ let bool_array: BooleanArray = bools.map(Some).collect();
+
+ let slices: Vec<_> = SlicesIterator::new(&bool_array).collect();
+ let expected = vec![(0, 10), (40, 60), (77, 81)];
+ assert_eq!(slices, expected);
+
+ // slice with offset and truncated len
+ let len = bool_array.len();
+ let sliced_array = bool_array.slice(7, len - 10);
+ let sliced_array = sliced_array
+ .as_any()
+ .downcast_ref::<BooleanArray>()
+ .unwrap();
+ let slices: Vec<_> = SlicesIterator::new(sliced_array).collect();
+ let expected = vec![(0, 3), (33, 53), (70, 71)];
+ assert_eq!(slices, expected);
+ }
+
+ fn test_slices_fuzz(mask_len: usize, offset: usize, truncate: usize) {
+ let mut rng = thread_rng();
+
+ let bools: Vec<bool> = std::iter::from_fn(|| Some(rng.gen()))
+ .take(mask_len)
+ .collect();
+
+ let buffer = Buffer::from_iter(bools.iter().cloned());
+
+ let truncated_length = mask_len - offset - truncate;
+
+ let data = ArrayDataBuilder::new(DataType::Boolean)
+ .len(truncated_length)
+ .offset(offset)
+ .add_buffer(buffer)
+ .build()
+ .unwrap();
+
+ let bool_array = BooleanArray::from(data);
+
+ let bits: Vec<_> = SlicesIterator::new(&bool_array)
+ .flat_map(|(start, end)| start..end)
+ .collect();
+
+ let expected_bits: Vec<_> = bools
+ .iter()
+ .skip(offset)
+ .take(truncated_length)
+ .enumerate()
+ .flat_map(|(idx, v)| v.then(|| idx))
+ .collect();
+
+ assert_eq!(bits, expected_bits);
+ }
+
+ #[test]
+ fn fuzz_test_slices_iterator() {
+ let mut rng = thread_rng();
+
+ for _ in 0..100 {
+ let mask_len = rng.gen_range(0..1024);
+ let max_offset = 64.min(mask_len);
+ let offset = rng.gen::<usize>().checked_rem(max_offset).unwrap_or(0);
+
+ let max_truncate = 128.min(mask_len - offset);
+ let truncate = rng.gen::<usize>().checked_rem(max_truncate).unwrap_or(0);
+
+ test_slices_fuzz(mask_len, offset, truncate);
+ }
+
+ test_slices_fuzz(64, 0, 0);
+ test_slices_fuzz(64, 8, 0);
+ test_slices_fuzz(64, 8, 8);
+ test_slices_fuzz(32, 8, 8);
+ test_slices_fuzz(32, 5, 9);
+ }
}
diff --git a/arrow/src/util/bit_chunk_iterator.rs b/arrow/src/util/bit_chunk_iterator.rs
index ea9280c..3de2e9b 100644
--- a/arrow/src/util/bit_chunk_iterator.rs
+++ b/arrow/src/util/bit_chunk_iterator.rs
@@ -1,3 +1,5 @@
+use std::fmt::Debug;
+
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -15,8 +17,192 @@
// specific language governing permissions and limitations
// under the License.
use crate::util::bit_util::ceil;
-use std::fmt::Debug;
+/// Iterates over an arbitrarily aligned byte buffer
+///
+/// Yields an iterator of aligned u64, along with the leading and trailing
+/// u64 necessary to align the buffer to a 8-byte boundary
+///
+/// This is unlike [`BitChunkIterator`] which only exposes a trailing u64,
+/// and consequently has to perform more work for each read
+#[derive(Debug)]
+pub struct UnalignedBitChunk<'a> {
+ lead_padding: usize,
+ trailing_padding: usize,
+
+ prefix: Option<u64>,
+ chunks: &'a [u64],
+ suffix: Option<u64>,
+}
+
+impl<'a> UnalignedBitChunk<'a> {
+ /// Create a from a byte array, and and an offset and length in bits
+ pub fn new(buffer: &'a [u8], offset: usize, len: usize) -> Self {
+ if len == 0 {
+ return Self {
+ lead_padding: 0,
+ trailing_padding: 0,
+ prefix: None,
+ chunks: &[],
+ suffix: None,
+ };
+ }
+
+ let byte_offset = offset / 8;
+ let offset_padding = offset % 8;
+
+ let bytes_len = (len + offset_padding + 7) / 8;
+ let buffer = &buffer[byte_offset..byte_offset + bytes_len];
+
+ let prefix_mask = compute_prefix_mask(offset_padding);
+
+ // If less than 8 bytes, read into prefix
+ if buffer.len() <= 8 {
+ let (suffix_mask, trailing_padding) =
+ compute_suffix_mask(len, offset_padding);
+ let prefix = read_u64(buffer) & suffix_mask & prefix_mask;
+
+ return Self {
+ lead_padding: offset_padding,
+ trailing_padding,
+ prefix: Some(prefix),
+ chunks: &[],
+ suffix: None,
+ };
+ }
+
+ // If less than 16 bytes, read into prefix and suffix
+ if buffer.len() <= 16 {
+ let (suffix_mask, trailing_padding) =
+ compute_suffix_mask(len, offset_padding);
+ let prefix = read_u64(&buffer[..8]) & prefix_mask;
+ let suffix = read_u64(&buffer[8..]) & suffix_mask;
+
+ return Self {
+ lead_padding: offset_padding,
+ trailing_padding,
+ prefix: Some(prefix),
+ chunks: &[],
+ suffix: Some(suffix),
+ };
+ }
+
+ // Read into prefix and suffix as needed
+ let (prefix, mut chunks, suffix) = unsafe { buffer.align_to::<u64>() };
+ assert!(
+ prefix.len() < 8 && suffix.len() < 8,
+ "align_to did not return largest possible aligned slice"
+ );
+
+ let (alignment_padding, prefix) = match (offset_padding, prefix.is_empty()) {
+ (0, true) => (0, None),
+ (_, true) => {
+ let prefix = chunks[0] & prefix_mask;
+ chunks = &chunks[1..];
+ (0, Some(prefix))
+ }
+ (_, false) => {
+ let alignment_padding = (8 - prefix.len()) * 8;
+
+ let prefix = (read_u64(prefix) & prefix_mask) << alignment_padding;
+ (alignment_padding, Some(prefix))
+ }
+ };
+
+ let lead_padding = offset_padding + alignment_padding;
+ let (suffix_mask, trailing_padding) = compute_suffix_mask(len, lead_padding);
+
+ let suffix = match (trailing_padding, suffix.is_empty()) {
+ (0, _) => None,
+ (_, true) => {
+ let suffix = chunks[chunks.len() - 1] & suffix_mask;
+ chunks = &chunks[..chunks.len() - 1];
+ Some(suffix)
+ }
+ (_, false) => Some(read_u64(suffix) & suffix_mask),
+ };
+
+ Self {
+ lead_padding,
+ trailing_padding,
+ prefix,
+ chunks,
+ suffix,
+ }
+ }
+
+ pub fn lead_padding(&self) -> usize {
+ self.lead_padding
+ }
+
+ pub fn trailing_padding(&self) -> usize {
+ self.trailing_padding
+ }
+
+ pub fn prefix(&self) -> Option<u64> {
+ self.prefix
+ }
+
+ pub fn suffix(&self) -> Option<u64> {
+ self.suffix
+ }
+
+ pub fn chunks(&self) -> &'a [u64] {
+ self.chunks
+ }
+
+ pub(crate) fn iter(&self) -> UnalignedBitChunkIterator<'a> {
+ self.prefix
+ .into_iter()
+ .chain(self.chunks.iter().cloned())
+ .chain(self.suffix.into_iter())
+ }
+
+ /// Counts the number of ones
+ pub fn count_ones(&self) -> usize {
+ self.iter().map(|x| x.count_ones() as usize).sum()
+ }
+}
+
+pub(crate) type UnalignedBitChunkIterator<'a> = std::iter::Chain<
+ std::iter::Chain<
+ std::option::IntoIter<u64>,
+ std::iter::Cloned<std::slice::Iter<'a, u64>>,
+ >,
+ std::option::IntoIter<u64>,
+>;
+
+#[inline]
+fn read_u64(input: &[u8]) -> u64 {
+ let len = input.len().min(8);
+ let mut buf = [0_u8; 8];
+ (&mut buf[..len]).copy_from_slice(input);
+ u64::from_le_bytes(buf)
+}
+
+#[inline]
+fn compute_prefix_mask(lead_padding: usize) -> u64 {
+ !((1 << lead_padding) - 1)
+}
+
+#[inline]
+fn compute_suffix_mask(len: usize, lead_padding: usize) -> (u64, usize) {
+ let trailing_bits = (len + lead_padding) % 64;
+
+ if trailing_bits == 0 {
+ return (u64::MAX, 0);
+ }
+
+ let trailing_padding = 64 - trailing_bits;
+ let suffix_mask = (1 << trailing_bits) - 1;
+ (suffix_mask, trailing_padding)
+}
+
+/// Iterates over an arbitrarily aligned byte buffer
+///
+/// Yields an iterator of u64, and a remainder. The first byte in the buffer
+/// will be the least significant byte in output u64
+///
#[derive(Debug)]
pub struct BitChunks<'a> {
buffer: &'a [u8],
@@ -174,7 +360,11 @@ impl ExactSizeIterator for BitChunkIterator<'_> {
#[cfg(test)]
mod tests {
+ use rand::prelude::*;
+
+ use crate::alloc::ALIGNMENT;
use crate::buffer::Buffer;
+ use crate::util::bit_chunk_iterator::UnalignedBitChunk;
#[test]
fn test_iter_aligned() {
@@ -272,4 +462,173 @@ mod tests {
assert_eq!(u64::MAX, bitchunks.iter().last().unwrap());
assert_eq!(0x7F, bitchunks.remainder_bits());
}
+
+ #[test]
+ #[allow(clippy::assertions_on_constants)]
+ fn test_unaligned_bit_chunk_iterator() {
+ // This test exploits the fact Buffer is at least 64-byte aligned
+ assert!(ALIGNMENT > 64);
+
+ let buffer = Buffer::from(&[0xFF; 5]);
+ let unaligned = UnalignedBitChunk::new(buffer.as_slice(), 0, 40);
+
+ assert!(unaligned.chunks().is_empty()); // Less than 128 elements
+ assert_eq!(unaligned.lead_padding(), 0);
+ assert_eq!(unaligned.trailing_padding(), 24);
+ // 24x 1 bit then 40x 0 bits
+ assert_eq!(
+ unaligned.prefix(),
+ Some(0b0000000000000000000000001111111111111111111111111111111111111111)
+ );
+ assert_eq!(unaligned.suffix(), None);
+
+ let buffer = buffer.slice(1);
+ let unaligned = UnalignedBitChunk::new(buffer.as_slice(), 0, 32);
+
+ assert!(unaligned.chunks().is_empty()); // Less than 128 elements
+ assert_eq!(unaligned.lead_padding(), 0);
+ assert_eq!(unaligned.trailing_padding(), 32);
+ // 32x 1 bit then 32x 0 bits
+ assert_eq!(
+ unaligned.prefix(),
+ Some(0b0000000000000000000000000000000011111111111111111111111111111111)
+ );
+ assert_eq!(unaligned.suffix(), None);
+
+ let unaligned = UnalignedBitChunk::new(buffer.as_slice(), 5, 27);
+
+ assert!(unaligned.chunks().is_empty()); // Less than 128 elements
+ assert_eq!(unaligned.lead_padding(), 5); // 5 % 8 == 5
+ assert_eq!(unaligned.trailing_padding(), 32);
+ // 5x 0 bit, 27x 1 bit then 32x 0 bits
+ assert_eq!(
+ unaligned.prefix(),
+ Some(0b0000000000000000000000000000000011111111111111111111111111100000)
+ );
+ assert_eq!(unaligned.suffix(), None);
+
+ let unaligned = UnalignedBitChunk::new(buffer.as_slice(), 12, 20);
+
+ assert!(unaligned.chunks().is_empty()); // Less than 128 elements
+ assert_eq!(unaligned.lead_padding(), 4); // 12 % 8 == 4
+ assert_eq!(unaligned.trailing_padding(), 40);
+ // 4x 0 bit, 20x 1 bit then 40x 0 bits
+ assert_eq!(
+ unaligned.prefix(),
+ Some(0b0000000000000000000000000000000000000000111111111111111111110000)
+ );
+ assert_eq!(unaligned.suffix(), None);
+
+ let buffer = Buffer::from(&[0xFF; 14]);
+ let unaligned = UnalignedBitChunk::new(buffer.as_slice(), 0, 112);
+
+ assert!(unaligned.chunks().is_empty()); // Less than 128 elements
+ assert_eq!(unaligned.lead_padding(), 0); // No offset and buffer aligned on 64-bit boundary
+ assert_eq!(unaligned.trailing_padding(), 16);
+ assert_eq!(unaligned.prefix(), Some(u64::MAX));
+ assert_eq!(unaligned.suffix(), Some((1 << 48) - 1));
+
+ let buffer = Buffer::from(&[0xFF; 16]);
+ let unaligned = UnalignedBitChunk::new(buffer.as_slice(), 0, 128);
+
+ assert_eq!(unaligned.prefix(), Some(u64::MAX));
+ assert_eq!(unaligned.suffix(), Some(u64::MAX));
+ assert!(unaligned.chunks().is_empty()); // Exactly 128 elements
+
+ let buffer = Buffer::from(&[0xFF; 64]);
+ let unaligned = UnalignedBitChunk::new(buffer.as_slice(), 0, 512);
+
+ // Buffer is completely aligned and larger than 128 elements -> all in chunks array
+ assert_eq!(unaligned.suffix(), None);
+ assert_eq!(unaligned.prefix(), None);
+ assert_eq!(unaligned.chunks(), [u64::MAX; 8].as_slice());
+ assert_eq!(unaligned.lead_padding(), 0);
+ assert_eq!(unaligned.trailing_padding(), 0);
+
+ let buffer = buffer.slice(1); // Offset buffer 1 byte off 64-bit alignment
+ let unaligned = UnalignedBitChunk::new(buffer.as_slice(), 0, 504);
+
+ // Need a prefix with 1 byte of lead padding to bring the buffer into alignment
+ assert_eq!(unaligned.prefix(), Some(u64::MAX - 0xFF));
+ assert_eq!(unaligned.suffix(), None);
+ assert_eq!(unaligned.chunks(), [u64::MAX; 7].as_slice());
+ assert_eq!(unaligned.lead_padding(), 8);
+ assert_eq!(unaligned.trailing_padding(), 0);
+
+ let unaligned = UnalignedBitChunk::new(buffer.as_slice(), 17, 300);
+
+ // Out of 64-bit alignment by 8 bits from buffer, and 17 bits from provided offset
+ // => need 8 + 17 = 25 bits of lead padding + 39 bits in prefix
+ //
+ // This leaves 300 - 17 = 261 bits remaining
+ // => 4x 64-bit aligned 64-bit chunks + 5 remaining bits
+ // => trailing padding of 59 bits
+ assert_eq!(unaligned.lead_padding(), 25);
+ assert_eq!(unaligned.trailing_padding(), 59);
+ assert_eq!(unaligned.prefix(), Some(u64::MAX - (1 << 25) + 1));
+ assert_eq!(unaligned.suffix(), Some(0b11111));
+ assert_eq!(unaligned.chunks(), [u64::MAX; 4].as_slice());
+
+ let unaligned = UnalignedBitChunk::new(buffer.as_slice(), 17, 0);
+
+ assert_eq!(unaligned.prefix(), None);
+ assert_eq!(unaligned.suffix(), None);
+ assert!(unaligned.chunks().is_empty());
+ assert_eq!(unaligned.lead_padding(), 0);
+ assert_eq!(unaligned.trailing_padding(), 0);
+
+ let unaligned = UnalignedBitChunk::new(buffer.as_slice(), 17, 1);
+
+ assert_eq!(unaligned.prefix(), Some(2));
+ assert_eq!(unaligned.suffix(), None);
+ assert!(unaligned.chunks().is_empty());
+ assert_eq!(unaligned.lead_padding(), 1);
+ assert_eq!(unaligned.trailing_padding(), 62);
+ }
+
+ #[test]
+ fn fuzz_unaligned_bit_chunk_iterator() {
+ let mut rng = thread_rng();
+
+ for _ in 0..100 {
+ let mask_len = rng.gen_range(0..1024);
+ let bools: Vec<_> = std::iter::from_fn(|| Some(rng.gen()))
+ .take(mask_len)
+ .collect();
+
+ let buffer = Buffer::from_iter(bools.iter().cloned());
+
+ let max_offset = 64.min(mask_len);
+ let offset = rng.gen::<usize>().checked_rem(max_offset).unwrap_or(0);
+
+ let max_truncate = 128.min(mask_len - offset);
+ let truncate = rng.gen::<usize>().checked_rem(max_truncate).unwrap_or(0);
+
+ let unaligned = UnalignedBitChunk::new(
+ buffer.as_slice(),
+ offset,
+ mask_len - offset - truncate,
+ );
+
+ let bool_slice = &bools[offset..mask_len - truncate];
+
+ let count = unaligned.count_ones();
+ let expected_count = bool_slice.iter().filter(|x| **x).count();
+
+ assert_eq!(count, expected_count);
+
+ let collected: Vec<u64> = unaligned.iter().collect();
+
+ let get_bit = |idx: usize| -> bool {
+ let padded_index = idx + unaligned.lead_padding();
+ let byte_idx = padded_index / 64;
+ let bit_idx = padded_index % 64;
+ (collected[byte_idx] & (1 << bit_idx)) != 0
+ };
+
+ for (idx, b) in bool_slice.iter().enumerate() {
+ assert_eq!(*b, get_bit(idx))
+ }
+ }
+ }
}
diff --git a/parquet/src/arrow/bit_util.rs b/parquet/src/arrow/bit_util.rs
index 881c67d..192ab4b 100644
--- a/parquet/src/arrow/bit_util.rs
+++ b/parquet/src/arrow/bit_util.rs
@@ -15,40 +15,39 @@
// specific language governing permissions and limitations
// under the License.
-use arrow::util::bit_chunk_iterator::BitChunks;
+use arrow::util::bit_chunk_iterator::UnalignedBitChunk;
use std::ops::Range;
/// Counts the number of set bits in the provided range
pub fn count_set_bits(bytes: &[u8], range: Range<usize>) -> usize {
- let mut count = 0_usize;
- let chunks = BitChunks::new(bytes, range.start, range.end - range.start);
- chunks.iter().for_each(|chunk| {
- count += chunk.count_ones() as usize;
- });
- count += chunks.remainder_bits().count_ones() as usize;
- count
+ let unaligned = UnalignedBitChunk::new(bytes, range.start, range.end - range.start);
+ unaligned.count_ones()
}
/// Iterates through the set bit positions in `bytes` in reverse order
pub fn iter_set_bits_rev(bytes: &[u8]) -> impl Iterator<Item = usize> + '_ {
- let (mut byte_idx, mut in_progress) = match bytes.len() {
- 0 => (0, 0),
- len => (len - 1, bytes[len - 1]),
- };
+ let bit_length = bytes.len() * 8;
+ let unaligned = UnalignedBitChunk::new(bytes, 0, bit_length);
+ let mut chunk_end_idx =
+ bit_length + unaligned.lead_padding() + unaligned.trailing_padding();
- std::iter::from_fn(move || loop {
- if in_progress != 0 {
- let bit_pos = 7 - in_progress.leading_zeros();
- in_progress ^= 1 << bit_pos;
- return Some((byte_idx << 3) + (bit_pos as usize));
- }
-
- if byte_idx == 0 {
- return None;
- }
+ let iter = unaligned
+ .prefix()
+ .into_iter()
+ .chain(unaligned.chunks().iter().cloned())
+ .chain(unaligned.suffix().into_iter());
- byte_idx -= 1;
- in_progress = bytes[byte_idx];
+ iter.rev().flat_map(move |mut chunk| {
+ let chunk_idx = chunk_end_idx - 64;
+ chunk_end_idx = chunk_idx;
+ std::iter::from_fn(move || {
+ if chunk != 0 {
+ let bit_pos = 63 - chunk.leading_zeros();
+ chunk ^= 1 << bit_pos;
+ return Some(chunk_idx + (bit_pos as usize));
+ }
+ None
+ })
})
}
@@ -61,7 +60,7 @@ mod tests {
#[test]
fn test_bit_fns() {
let mut rng = thread_rng();
- let mask_length = rng.gen_range(1..20);
+ let mask_length = rng.gen_range(1..1024);
let bools: Vec<_> = std::iter::from_fn(|| Some(rng.next_u32() & 1 == 0))
.take(mask_length)
.collect();