You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/02/07 11:31:26 UTC

[GitHub] [arrow-rs] tustvold opened a new pull request #1283: Refactor BitReader to contain explicit state (#1282)

tustvold opened a new pull request #1283:
URL: https://github.com/apache/arrow-rs/pull/1283


   # Which issue does this PR close?
   
   Closes #1282.
   
   # Rationale for this change
    
   I think it makes the logic easier to follow, although this may be a subjective assessment
   
   # What changes are included in this PR?
   
   Refactors BitReader to explicitly separate the aligned and unaligned operations
   
   # Are there any user-facing changes?
   
   No
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] tustvold commented on a change in pull request #1283: Refactor BitReader to contain explicit state (#1282)

Posted by GitBox <gi...@apache.org>.
tustvold commented on a change in pull request #1283:
URL: https://github.com/apache/arrow-rs/pull/1283#discussion_r801453816



##########
File path: parquet/src/util/bit_util.rs
##########
@@ -445,158 +445,307 @@ impl BitWriter {
 /// MAX_VLQ_BYTE_LEN = 5 for i32, and MAX_VLQ_BYTE_LEN = 10 for i64
 pub const MAX_VLQ_BYTE_LEN: usize = 10;
 
-pub struct BitReader {
-    // The byte buffer to read from, passed in by client
+/// A struct storing the state for reading individual bits from a byte array
+struct BitReaderUnaligned {
+    /// The byte buffer to read from, passed in by client
     buffer: ByteBufferPtr,
 
-    // Bytes are memcpy'd from `buffer` and values are read from this variable.
-    // This is faster than reading values byte by byte directly from `buffer`
-    buffered_values: u64,
-
-    //
-    // End                                         Start
-    // |............|B|B|B|B|B|B|B|B|..............|
-    //                   ^          ^
-    //                 bit_offset   byte_offset
-    //
-    // Current byte offset in `buffer`
+    ///
+    /// End                                         Start
+    /// |............|B|B|B|B|B|B|B|B|..............|
+    ///                   ^          ^
+    ///                 bit_offset   byte_offset
+    ///
+    /// Current byte offset in `buffer`
     byte_offset: usize,
 
-    // Current bit offset in `buffered_values`
+    /// Current bit offset in `buffered_values`
     bit_offset: usize,
 
-    // Total number of bytes in `buffer`
-    total_bytes: usize,
+    /// Bytes are memcpy'd from `buffer` and values are read from this variable.
+    /// This is faster than reading values byte by byte directly from `buffer`
+    buffered_values: u64,
 }
 
-/// Utility class to read bit/byte stream. This class can read bits or bytes that are
-/// either byte aligned or not.
-impl BitReader {
-    pub fn new(buffer: ByteBufferPtr) -> Self {
-        let total_bytes = buffer.len();
-        let num_bytes = cmp::min(8, total_bytes);
-        let buffered_values = read_num_bytes!(u64, num_bytes, buffer.as_ref());
-        BitReader {
-            buffer,
-            buffered_values,
-            byte_offset: 0,
-            bit_offset: 0,
-            total_bytes,
-        }
-    }
-
-    pub fn reset(&mut self, buffer: ByteBufferPtr) {
-        self.buffer = buffer;
-        self.total_bytes = self.buffer.len();
-        let num_bytes = cmp::min(8, self.total_bytes);
-        self.buffered_values = read_num_bytes!(u64, num_bytes, self.buffer.as_ref());
-        self.byte_offset = 0;
-        self.bit_offset = 0;
-    }
-
-    /// Gets the current byte offset
-    #[inline]
-    pub fn get_byte_offset(&self) -> usize {
-        self.byte_offset + ceil(self.bit_offset as i64, 8) as usize
+impl BitReaderUnaligned {
+    fn reload_buffer_values(&mut self) {
+        let bytes_to_read = cmp::min(self.buffer.len() - self.byte_offset, 8);
+        self.buffered_values =
+            read_num_bytes!(u64, bytes_to_read, self.buffer.data()[self.byte_offset..]);
     }
 
-    /// Reads a value of type `T` and of size `num_bits`.
-    ///
-    /// Returns `None` if there's not enough data available. `Some` otherwise.
-    pub fn get_value<T: FromBytes>(&mut self, num_bits: usize) -> Option<T> {
-        assert!(num_bits <= 64);
-        assert!(num_bits <= size_of::<T>() * 8);
+    fn get<T: FromBytes>(&mut self, bit_width: usize) -> Option<T> {
+        assert!(bit_width <= 64);
+        assert!(bit_width <= size_of::<T>() * 8);
 
-        if self.byte_offset * 8 + self.bit_offset + num_bits > self.total_bytes * 8 {
+        if self.byte_offset * 8 + self.bit_offset + bit_width > self.buffer.len() * 8 {
             return None;
         }
 
-        let mut v = trailing_bits(self.buffered_values, self.bit_offset + num_bits)
+        let mut v = trailing_bits(self.buffered_values, self.bit_offset + bit_width)
             >> self.bit_offset;
-        self.bit_offset += num_bits;
+        self.bit_offset += bit_width;
 
         if self.bit_offset >= 64 {
             self.byte_offset += 8;
             self.bit_offset -= 64;
 
             self.reload_buffer_values();
             v |= trailing_bits(self.buffered_values, self.bit_offset)
-                .wrapping_shl((num_bits - self.bit_offset) as u32);
+                .wrapping_shl((bit_width - self.bit_offset) as u32);
         }
 
         // TODO: better to avoid copying here
         Some(from_ne_slice(v.as_bytes()))
     }
 
-    pub fn get_batch<T: FromBytes>(&mut self, batch: &mut [T], num_bits: usize) -> usize {
-        assert!(num_bits <= 32);
-        assert!(num_bits <= size_of::<T>() * 8);
+    /// Gets the current byte offset
+    fn aligned_byte_offset(&self) -> usize {
+        self.byte_offset + ceil(self.bit_offset as i64, 8) as usize
+    }
+}
 
-        let mut values_to_read = batch.len();
-        let needed_bits = num_bits * values_to_read;
-        let remaining_bits = (self.total_bytes - self.byte_offset) * 8 - self.bit_offset;
-        if remaining_bits < needed_bits {
-            values_to_read = remaining_bits / num_bits;
+/// A struct for storing the state for reading whole bytes from a byte stream
+struct BitReaderAligned {
+    /// The byte buffer to read from, passed in by client
+    buffer: ByteBufferPtr,
+    /// The current offset in `buffer`
+    byte_offset: usize,
+}
+
+impl BitReaderAligned {
+    fn get<T: FromBytes>(&mut self, num_bytes: usize) -> Option<T> {
+        if self.byte_offset + num_bytes > self.buffer.len() {
+            return None;
         }
 
-        let mut i = 0;
+        let v = read_num_bytes!(T, num_bytes, self.buffer.data()[self.byte_offset..]);
+        self.byte_offset += num_bytes;
 
-        // First align bit offset to byte offset
-        if self.bit_offset != 0 {
-            while i < values_to_read && self.bit_offset != 0 {
-                batch[i] = self
-                    .get_value(num_bits)
-                    .expect("expected to have more data");
-                i += 1;
-            }
-        }
+        Some(v)
+    }
 
-        unsafe {
-            let in_buf = &self.buffer.data()[self.byte_offset..];
-            let mut in_ptr = in_buf as *const [u8] as *const u8 as *const u32;
-            if size_of::<T>() == 4 {
-                while values_to_read - i >= 32 {
-                    let out_ptr = &mut batch[i..] as *mut [T] as *mut T as *mut u32;
-                    in_ptr = unpack32(in_ptr, out_ptr, num_bits);
-                    self.byte_offset += 4 * num_bits;
-                    i += 32;
-                }
-            } else {
-                let mut out_buf = [0u32; 32];
-                let out_ptr = &mut out_buf as &mut [u32] as *mut [u32] as *mut u32;
-                while values_to_read - i >= 32 {
-                    in_ptr = unpack32(in_ptr, out_ptr, num_bits);
-                    self.byte_offset += 4 * num_bits;
-                    for n in 0..32 {
-                        // We need to copy from smaller size to bigger size to avoid
-                        // overwriting other memory regions.
-                        if size_of::<T>() > size_of::<u32>() {
+    /// Read up to `to_read` values from a packed buffer `batch` with bit width `num_bits`
+    /// in batches of 32, returning the number of values read
+    ///
+    /// # Panics
+    ///
+    /// This function panics if
+    /// * `bit_width` is greater than 32
+    /// * less than `to_read` values in the buffer
+    fn get_batch_x32<T: FromBytes>(
+        &mut self,
+        batch: &mut [T],
+        to_read: usize,
+        bit_width: usize,
+    ) -> usize {
+        assert!(bit_width <= 32);
+
+        let mut values_read = 0;
+        let in_buf = &self.buffer.data()[self.byte_offset..];
+        assert!(in_buf.len() * 8 >= to_read * bit_width);
+
+        let mut in_ptr = in_buf as *const [u8] as *const u8 as *const u32;
+        if size_of::<T>() == 4 {
+            while to_read - values_read >= 32 {
+                let out_ptr = &mut batch[values_read..] as *mut [T] as *mut T as *mut u32;
+                in_ptr = unsafe { unpack32(in_ptr, out_ptr, bit_width) };
+                self.byte_offset += 4 * bit_width;
+                values_read += 32;
+            }
+        } else {
+            let mut out_buf = [0u32; 32];
+            let out_ptr = &mut out_buf as &mut [u32] as *mut [u32] as *mut u32;
+            while to_read - values_read >= 32 {
+                in_ptr = unsafe { unpack32(in_ptr, out_ptr, bit_width) };
+                self.byte_offset += 4 * bit_width;
+                for n in 0..32 {
+                    // We need to copy from smaller size to bigger size to avoid
+                    // overwriting other memory regions.
+                    if size_of::<T>() > size_of::<u32>() {
+                        unsafe {
                             std::ptr::copy_nonoverlapping(
                                 out_buf[n..].as_ptr() as *const u32,
-                                &mut batch[i] as *mut T as *mut u32,
+                                &mut batch[values_read] as *mut T as *mut u32,
                                 1,
                             );
-                        } else {
+                        }
+                    } else {
+                        unsafe {
                             std::ptr::copy_nonoverlapping(
                                 out_buf[n..].as_ptr() as *const T,
-                                &mut batch[i] as *mut T,
+                                &mut batch[values_read] as *mut T,
                                 1,
                             );
                         }
-                        i += 1;
                     }
+                    values_read += 1;
                 }
             }
         }
+        values_read
+    }
+}
+
+/// Combines [`BitReaderAligned`] and [`BitReaderUnaligned`] providing conversions between them
+enum BitReaderState {
+    Unaligned(BitReaderUnaligned),
+    Aligned(BitReaderAligned),
+}
 
-        assert!(values_to_read - i < 32);
+impl BitReaderState {
+    fn new(buffer: ByteBufferPtr, byte_offset: usize) -> Self {
+        BitReaderState::Aligned(BitReaderAligned {
+            buffer,
+            byte_offset,
+        })
+    }
 
-        self.reload_buffer_values();
-        while i < values_to_read {
-            batch[i] = self
-                .get_value(num_bits)
-                .expect("expected to have more data");
-            i += 1;
+    /// Returns the number of remaining bits
+    fn remaining_bits(&self) -> usize {
+        match &self {
+            BitReaderState::Unaligned(s) => {
+                (s.buffer.len() - s.byte_offset) * 8 - s.bit_offset
+            }
+            BitReaderState::Aligned(s) => (s.buffer.len() - s.byte_offset) * 8,
+        }
+    }
+
+    /// Returns the current byte offset, rounds up to the nearest whole byte
+    fn get_byte_offset(&self) -> usize {
+        match self {
+            BitReaderState::Unaligned(s) => s.aligned_byte_offset(),
+            BitReaderState::Aligned(s) => s.byte_offset,
+        }
+    }
+
+    /// Converts this to a [`BitReaderAligned`] advancing to the next whole byte
+    fn as_aligned(&mut self) -> &mut BitReaderAligned {
+        match self {
+            BitReaderState::Unaligned(s) => {
+                let offset = s.aligned_byte_offset();
+                *self = BitReaderState::new(s.buffer.clone(), offset);
+
+                match self {
+                    BitReaderState::Aligned(s) => s,
+                    _ => unreachable!(),
+                }
+            }
+            BitReaderState::Aligned(s) => s,
+        }
+    }
+
+    /// Converts this to a [`BitReaderUnaligned`]
+    fn as_unaligned(&mut self) -> &mut BitReaderUnaligned {
+        match self {
+            BitReaderState::Unaligned(s) => s,
+            BitReaderState::Aligned(s) => {
+                let mut scan = BitReaderUnaligned {
+                    buffer: s.buffer.clone(),
+                    byte_offset: s.byte_offset,
+                    bit_offset: 0,
+                    buffered_values: 0,
+                };
+
+                scan.reload_buffer_values();
+                *self = BitReaderState::Unaligned(scan);
+
+                match self {
+                    BitReaderState::Unaligned(s) => s,
+                    _ => unreachable!(),
+                }
+            }
+        }
+    }
+}
+
+pub struct BitReader {
+    /// The state of the bit reader, can use simpler logic
+    /// when reading aligned data
+    state: BitReaderState,
+}
+
+/// Utility class to read bit/byte stream. This class can read bits or bytes that are
+/// either byte aligned or not.
+impl BitReader {
+    pub fn new(buffer: ByteBufferPtr) -> Self {
+        BitReader {
+            state: BitReaderState::new(buffer, 0),
+        }
+    }
+
+    pub fn reset(&mut self, buffer: ByteBufferPtr) {
+        self.state = BitReaderState::new(buffer, 0)
+    }
+
+    /// Gets the current byte offset, rounding up to the nearest whole byte
+    #[inline]
+    pub fn get_byte_offset(&self) -> usize {
+        self.state.get_byte_offset()
+    }
+
+    /// Reads a value of type `T` and of size `num_bits`.
+    ///
+    /// Returns `None` if there's not enough data available. `Some` otherwise.
+    pub fn get_value<T: FromBytes>(&mut self, num_bits: usize) -> Option<T> {
+        self.state.as_unaligned().get(num_bits)

Review comment:
       Running the benchmarks if anything I see a teeny performance improvement, this is likely because whilst there is a branch here, it isn't typically called in a loop.
   
   Although `DeltaBitPackDecoder` is an exception here, but that's what #1284 is for :smile: 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] sunchao commented on a change in pull request #1283: Refactor BitReader to contain explicit state (#1282)

Posted by GitBox <gi...@apache.org>.
sunchao commented on a change in pull request #1283:
URL: https://github.com/apache/arrow-rs/pull/1283#discussion_r801176472



##########
File path: parquet/src/util/bit_util.rs
##########
@@ -445,158 +445,307 @@ impl BitWriter {
 /// MAX_VLQ_BYTE_LEN = 5 for i32, and MAX_VLQ_BYTE_LEN = 10 for i64
 pub const MAX_VLQ_BYTE_LEN: usize = 10;
 
-pub struct BitReader {
-    // The byte buffer to read from, passed in by client
+/// A struct storing the state for reading individual bits from a byte array
+struct BitReaderUnaligned {
+    /// The byte buffer to read from, passed in by client
     buffer: ByteBufferPtr,
 
-    // Bytes are memcpy'd from `buffer` and values are read from this variable.
-    // This is faster than reading values byte by byte directly from `buffer`
-    buffered_values: u64,
-
-    //
-    // End                                         Start
-    // |............|B|B|B|B|B|B|B|B|..............|
-    //                   ^          ^
-    //                 bit_offset   byte_offset
-    //
-    // Current byte offset in `buffer`
+    ///
+    /// End                                         Start
+    /// |............|B|B|B|B|B|B|B|B|..............|
+    ///                   ^          ^
+    ///                 bit_offset   byte_offset
+    ///
+    /// Current byte offset in `buffer`
     byte_offset: usize,
 
-    // Current bit offset in `buffered_values`
+    /// Current bit offset in `buffered_values`
     bit_offset: usize,
 
-    // Total number of bytes in `buffer`
-    total_bytes: usize,
+    /// Bytes are memcpy'd from `buffer` and values are read from this variable.
+    /// This is faster than reading values byte by byte directly from `buffer`
+    buffered_values: u64,
 }
 
-/// Utility class to read bit/byte stream. This class can read bits or bytes that are
-/// either byte aligned or not.
-impl BitReader {
-    pub fn new(buffer: ByteBufferPtr) -> Self {
-        let total_bytes = buffer.len();
-        let num_bytes = cmp::min(8, total_bytes);
-        let buffered_values = read_num_bytes!(u64, num_bytes, buffer.as_ref());
-        BitReader {
-            buffer,
-            buffered_values,
-            byte_offset: 0,
-            bit_offset: 0,
-            total_bytes,
-        }
-    }
-
-    pub fn reset(&mut self, buffer: ByteBufferPtr) {
-        self.buffer = buffer;
-        self.total_bytes = self.buffer.len();
-        let num_bytes = cmp::min(8, self.total_bytes);
-        self.buffered_values = read_num_bytes!(u64, num_bytes, self.buffer.as_ref());
-        self.byte_offset = 0;
-        self.bit_offset = 0;
-    }
-
-    /// Gets the current byte offset
-    #[inline]
-    pub fn get_byte_offset(&self) -> usize {
-        self.byte_offset + ceil(self.bit_offset as i64, 8) as usize
+impl BitReaderUnaligned {
+    fn reload_buffer_values(&mut self) {
+        let bytes_to_read = cmp::min(self.buffer.len() - self.byte_offset, 8);
+        self.buffered_values =
+            read_num_bytes!(u64, bytes_to_read, self.buffer.data()[self.byte_offset..]);
     }
 
-    /// Reads a value of type `T` and of size `num_bits`.
-    ///
-    /// Returns `None` if there's not enough data available. `Some` otherwise.
-    pub fn get_value<T: FromBytes>(&mut self, num_bits: usize) -> Option<T> {
-        assert!(num_bits <= 64);
-        assert!(num_bits <= size_of::<T>() * 8);
+    fn get<T: FromBytes>(&mut self, bit_width: usize) -> Option<T> {
+        assert!(bit_width <= 64);
+        assert!(bit_width <= size_of::<T>() * 8);
 
-        if self.byte_offset * 8 + self.bit_offset + num_bits > self.total_bytes * 8 {
+        if self.byte_offset * 8 + self.bit_offset + bit_width > self.buffer.len() * 8 {
             return None;
         }
 
-        let mut v = trailing_bits(self.buffered_values, self.bit_offset + num_bits)
+        let mut v = trailing_bits(self.buffered_values, self.bit_offset + bit_width)
             >> self.bit_offset;
-        self.bit_offset += num_bits;
+        self.bit_offset += bit_width;
 
         if self.bit_offset >= 64 {
             self.byte_offset += 8;
             self.bit_offset -= 64;
 
             self.reload_buffer_values();
             v |= trailing_bits(self.buffered_values, self.bit_offset)
-                .wrapping_shl((num_bits - self.bit_offset) as u32);
+                .wrapping_shl((bit_width - self.bit_offset) as u32);
         }
 
         // TODO: better to avoid copying here
         Some(from_ne_slice(v.as_bytes()))
     }
 
-    pub fn get_batch<T: FromBytes>(&mut self, batch: &mut [T], num_bits: usize) -> usize {
-        assert!(num_bits <= 32);
-        assert!(num_bits <= size_of::<T>() * 8);
+    /// Gets the current byte offset
+    fn aligned_byte_offset(&self) -> usize {
+        self.byte_offset + ceil(self.bit_offset as i64, 8) as usize
+    }
+}
 
-        let mut values_to_read = batch.len();
-        let needed_bits = num_bits * values_to_read;
-        let remaining_bits = (self.total_bytes - self.byte_offset) * 8 - self.bit_offset;
-        if remaining_bits < needed_bits {
-            values_to_read = remaining_bits / num_bits;
+/// A struct for storing the state for reading whole bytes from a byte stream
+struct BitReaderAligned {
+    /// The byte buffer to read from, passed in by client
+    buffer: ByteBufferPtr,
+    /// The current offset in `buffer`
+    byte_offset: usize,
+}
+
+impl BitReaderAligned {
+    fn get<T: FromBytes>(&mut self, num_bytes: usize) -> Option<T> {
+        if self.byte_offset + num_bytes > self.buffer.len() {
+            return None;
         }
 
-        let mut i = 0;
+        let v = read_num_bytes!(T, num_bytes, self.buffer.data()[self.byte_offset..]);
+        self.byte_offset += num_bytes;
 
-        // First align bit offset to byte offset
-        if self.bit_offset != 0 {
-            while i < values_to_read && self.bit_offset != 0 {
-                batch[i] = self
-                    .get_value(num_bits)
-                    .expect("expected to have more data");
-                i += 1;
-            }
-        }
+        Some(v)
+    }
 
-        unsafe {
-            let in_buf = &self.buffer.data()[self.byte_offset..];
-            let mut in_ptr = in_buf as *const [u8] as *const u8 as *const u32;
-            if size_of::<T>() == 4 {
-                while values_to_read - i >= 32 {
-                    let out_ptr = &mut batch[i..] as *mut [T] as *mut T as *mut u32;
-                    in_ptr = unpack32(in_ptr, out_ptr, num_bits);
-                    self.byte_offset += 4 * num_bits;
-                    i += 32;
-                }
-            } else {
-                let mut out_buf = [0u32; 32];
-                let out_ptr = &mut out_buf as &mut [u32] as *mut [u32] as *mut u32;
-                while values_to_read - i >= 32 {
-                    in_ptr = unpack32(in_ptr, out_ptr, num_bits);
-                    self.byte_offset += 4 * num_bits;
-                    for n in 0..32 {
-                        // We need to copy from smaller size to bigger size to avoid
-                        // overwriting other memory regions.
-                        if size_of::<T>() > size_of::<u32>() {
+    /// Read up to `to_read` values from a packed buffer `batch` with bit width `num_bits`
+    /// in batches of 32, returning the number of values read
+    ///
+    /// # Panics
+    ///
+    /// This function panics if
+    /// * `bit_width` is greater than 32
+    /// * less than `to_read` values in the buffer
+    fn get_batch_x32<T: FromBytes>(
+        &mut self,
+        batch: &mut [T],
+        to_read: usize,
+        bit_width: usize,
+    ) -> usize {
+        assert!(bit_width <= 32);
+
+        let mut values_read = 0;
+        let in_buf = &self.buffer.data()[self.byte_offset..];
+        assert!(in_buf.len() * 8 >= to_read * bit_width);
+
+        let mut in_ptr = in_buf as *const [u8] as *const u8 as *const u32;
+        if size_of::<T>() == 4 {
+            while to_read - values_read >= 32 {
+                let out_ptr = &mut batch[values_read..] as *mut [T] as *mut T as *mut u32;
+                in_ptr = unsafe { unpack32(in_ptr, out_ptr, bit_width) };
+                self.byte_offset += 4 * bit_width;
+                values_read += 32;
+            }
+        } else {
+            let mut out_buf = [0u32; 32];
+            let out_ptr = &mut out_buf as &mut [u32] as *mut [u32] as *mut u32;
+            while to_read - values_read >= 32 {
+                in_ptr = unsafe { unpack32(in_ptr, out_ptr, bit_width) };
+                self.byte_offset += 4 * bit_width;
+                for n in 0..32 {
+                    // We need to copy from smaller size to bigger size to avoid
+                    // overwriting other memory regions.
+                    if size_of::<T>() > size_of::<u32>() {
+                        unsafe {
                             std::ptr::copy_nonoverlapping(
                                 out_buf[n..].as_ptr() as *const u32,
-                                &mut batch[i] as *mut T as *mut u32,
+                                &mut batch[values_read] as *mut T as *mut u32,
                                 1,
                             );
-                        } else {
+                        }
+                    } else {
+                        unsafe {
                             std::ptr::copy_nonoverlapping(
                                 out_buf[n..].as_ptr() as *const T,
-                                &mut batch[i] as *mut T,
+                                &mut batch[values_read] as *mut T,
                                 1,
                             );
                         }
-                        i += 1;
                     }
+                    values_read += 1;
                 }
             }
         }
+        values_read
+    }
+}
+
+/// Combines [`BitReaderAligned`] and [`BitReaderUnaligned`] providing conversions between them
+enum BitReaderState {
+    Unaligned(BitReaderUnaligned),
+    Aligned(BitReaderAligned),
+}
 
-        assert!(values_to_read - i < 32);
+impl BitReaderState {
+    fn new(buffer: ByteBufferPtr, byte_offset: usize) -> Self {
+        BitReaderState::Aligned(BitReaderAligned {
+            buffer,
+            byte_offset,
+        })
+    }
 
-        self.reload_buffer_values();
-        while i < values_to_read {
-            batch[i] = self
-                .get_value(num_bits)
-                .expect("expected to have more data");
-            i += 1;
+    /// Returns the number of remaining bits
+    fn remaining_bits(&self) -> usize {
+        match &self {
+            BitReaderState::Unaligned(s) => {
+                (s.buffer.len() - s.byte_offset) * 8 - s.bit_offset
+            }
+            BitReaderState::Aligned(s) => (s.buffer.len() - s.byte_offset) * 8,
+        }
+    }
+
+    /// Returns the current byte offset, rounds up to the nearest whole byte
+    fn get_byte_offset(&self) -> usize {
+        match self {
+            BitReaderState::Unaligned(s) => s.aligned_byte_offset(),
+            BitReaderState::Aligned(s) => s.byte_offset,
+        }
+    }
+
+    /// Converts this to a [`BitReaderAligned`] advancing to the next whole byte
+    fn as_aligned(&mut self) -> &mut BitReaderAligned {
+        match self {
+            BitReaderState::Unaligned(s) => {
+                let offset = s.aligned_byte_offset();
+                *self = BitReaderState::new(s.buffer.clone(), offset);
+
+                match self {
+                    BitReaderState::Aligned(s) => s,
+                    _ => unreachable!(),
+                }
+            }
+            BitReaderState::Aligned(s) => s,
+        }
+    }
+
+    /// Converts this to a [`BitReaderUnaligned`]
+    fn as_unaligned(&mut self) -> &mut BitReaderUnaligned {
+        match self {
+            BitReaderState::Unaligned(s) => s,
+            BitReaderState::Aligned(s) => {
+                let mut scan = BitReaderUnaligned {
+                    buffer: s.buffer.clone(),
+                    byte_offset: s.byte_offset,
+                    bit_offset: 0,
+                    buffered_values: 0,
+                };
+
+                scan.reload_buffer_values();
+                *self = BitReaderState::Unaligned(scan);
+
+                match self {
+                    BitReaderState::Unaligned(s) => s,
+                    _ => unreachable!(),
+                }
+            }
+        }
+    }
+}
+
+pub struct BitReader {
+    /// The state of the bit reader, can use simpler logic
+    /// when reading aligned data
+    state: BitReaderState,
+}
+
+/// Utility class to read bit/byte stream. This class can read bits or bytes that are
+/// either byte aligned or not.
+impl BitReader {
+    pub fn new(buffer: ByteBufferPtr) -> Self {
+        BitReader {
+            state: BitReaderState::new(buffer, 0),
+        }
+    }
+
+    pub fn reset(&mut self, buffer: ByteBufferPtr) {
+        self.state = BitReaderState::new(buffer, 0)
+    }
+
+    /// Gets the current byte offset, rounding up to the nearest whole byte
+    #[inline]
+    pub fn get_byte_offset(&self) -> usize {
+        self.state.get_byte_offset()
+    }
+
+    /// Reads a value of type `T` and of size `num_bits`.
+    ///
+    /// Returns `None` if there's not enough data available. `Some` otherwise.
+    pub fn get_value<T: FromBytes>(&mut self, num_bits: usize) -> Option<T> {
+        self.state.as_unaligned().get(num_bits)

Review comment:
       Not sure whether there's any performance implication here since it now requires some extra compare & jump instructions. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] tustvold commented on pull request #1283: Refactor BitReader to contain explicit state (#1282)

Posted by GitBox <gi...@apache.org>.
tustvold commented on pull request #1283:
URL: https://github.com/apache/arrow-rs/pull/1283#issuecomment-1037241480


   Going to close this as it wasn't needed for #1284


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] tustvold closed pull request #1283: Refactor BitReader to contain explicit state (#1282)

Posted by GitBox <gi...@apache.org>.
tustvold closed pull request #1283:
URL: https://github.com/apache/arrow-rs/pull/1283


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] tustvold commented on a change in pull request #1283: Refactor BitReader to contain explicit state (#1282)

Posted by GitBox <gi...@apache.org>.
tustvold commented on a change in pull request #1283:
URL: https://github.com/apache/arrow-rs/pull/1283#discussion_r800565191



##########
File path: parquet/src/util/bit_util.rs
##########
@@ -445,158 +445,307 @@ impl BitWriter {
 /// MAX_VLQ_BYTE_LEN = 5 for i32, and MAX_VLQ_BYTE_LEN = 10 for i64
 pub const MAX_VLQ_BYTE_LEN: usize = 10;
 
-pub struct BitReader {
-    // The byte buffer to read from, passed in by client
+/// A struct storing the state for reading individual bits from a byte array
+struct BitReaderUnaligned {
+    /// The byte buffer to read from, passed in by client
     buffer: ByteBufferPtr,
 
-    // Bytes are memcpy'd from `buffer` and values are read from this variable.
-    // This is faster than reading values byte by byte directly from `buffer`
-    buffered_values: u64,
-
-    //
-    // End                                         Start
-    // |............|B|B|B|B|B|B|B|B|..............|
-    //                   ^          ^
-    //                 bit_offset   byte_offset
-    //
-    // Current byte offset in `buffer`
+    ///
+    /// End                                         Start
+    /// |............|B|B|B|B|B|B|B|B|..............|
+    ///                   ^          ^
+    ///                 bit_offset   byte_offset
+    ///
+    /// Current byte offset in `buffer`
     byte_offset: usize,
 
-    // Current bit offset in `buffered_values`
+    /// Current bit offset in `buffered_values`
     bit_offset: usize,
 
-    // Total number of bytes in `buffer`
-    total_bytes: usize,
+    /// Bytes are memcpy'd from `buffer` and values are read from this variable.
+    /// This is faster than reading values byte by byte directly from `buffer`
+    buffered_values: u64,
 }
 
-/// Utility class to read bit/byte stream. This class can read bits or bytes that are
-/// either byte aligned or not.
-impl BitReader {
-    pub fn new(buffer: ByteBufferPtr) -> Self {
-        let total_bytes = buffer.len();
-        let num_bytes = cmp::min(8, total_bytes);
-        let buffered_values = read_num_bytes!(u64, num_bytes, buffer.as_ref());
-        BitReader {
-            buffer,
-            buffered_values,
-            byte_offset: 0,
-            bit_offset: 0,
-            total_bytes,
-        }
-    }
-
-    pub fn reset(&mut self, buffer: ByteBufferPtr) {
-        self.buffer = buffer;
-        self.total_bytes = self.buffer.len();
-        let num_bytes = cmp::min(8, self.total_bytes);
-        self.buffered_values = read_num_bytes!(u64, num_bytes, self.buffer.as_ref());
-        self.byte_offset = 0;
-        self.bit_offset = 0;
-    }
-
-    /// Gets the current byte offset
-    #[inline]
-    pub fn get_byte_offset(&self) -> usize {
-        self.byte_offset + ceil(self.bit_offset as i64, 8) as usize
+impl BitReaderUnaligned {
+    fn reload_buffer_values(&mut self) {
+        let bytes_to_read = cmp::min(self.buffer.len() - self.byte_offset, 8);
+        self.buffered_values =
+            read_num_bytes!(u64, bytes_to_read, self.buffer.data()[self.byte_offset..]);
     }
 
-    /// Reads a value of type `T` and of size `num_bits`.
-    ///
-    /// Returns `None` if there's not enough data available. `Some` otherwise.
-    pub fn get_value<T: FromBytes>(&mut self, num_bits: usize) -> Option<T> {
-        assert!(num_bits <= 64);
-        assert!(num_bits <= size_of::<T>() * 8);
+    fn get<T: FromBytes>(&mut self, bit_width: usize) -> Option<T> {
+        assert!(bit_width <= 64);
+        assert!(bit_width <= size_of::<T>() * 8);
 
-        if self.byte_offset * 8 + self.bit_offset + num_bits > self.total_bytes * 8 {
+        if self.byte_offset * 8 + self.bit_offset + bit_width > self.buffer.len() * 8 {
             return None;
         }
 
-        let mut v = trailing_bits(self.buffered_values, self.bit_offset + num_bits)
+        let mut v = trailing_bits(self.buffered_values, self.bit_offset + bit_width)
             >> self.bit_offset;
-        self.bit_offset += num_bits;
+        self.bit_offset += bit_width;
 
         if self.bit_offset >= 64 {
             self.byte_offset += 8;
             self.bit_offset -= 64;
 
             self.reload_buffer_values();
             v |= trailing_bits(self.buffered_values, self.bit_offset)
-                .wrapping_shl((num_bits - self.bit_offset) as u32);
+                .wrapping_shl((bit_width - self.bit_offset) as u32);
         }
 
         // TODO: better to avoid copying here
         Some(from_ne_slice(v.as_bytes()))
     }
 
-    pub fn get_batch<T: FromBytes>(&mut self, batch: &mut [T], num_bits: usize) -> usize {
-        assert!(num_bits <= 32);
-        assert!(num_bits <= size_of::<T>() * 8);
+    /// Gets the current byte offset
+    fn aligned_byte_offset(&self) -> usize {
+        self.byte_offset + ceil(self.bit_offset as i64, 8) as usize
+    }
+}
 
-        let mut values_to_read = batch.len();
-        let needed_bits = num_bits * values_to_read;
-        let remaining_bits = (self.total_bytes - self.byte_offset) * 8 - self.bit_offset;
-        if remaining_bits < needed_bits {
-            values_to_read = remaining_bits / num_bits;
+/// A struct for storing the state for reading whole bytes from a byte stream
+struct BitReaderAligned {
+    /// The byte buffer to read from, passed in by client
+    buffer: ByteBufferPtr,
+    /// The current offset in `buffer`
+    byte_offset: usize,
+}
+
+impl BitReaderAligned {
+    fn get<T: FromBytes>(&mut self, num_bytes: usize) -> Option<T> {
+        if self.byte_offset + num_bytes > self.buffer.len() {
+            return None;
         }
 
-        let mut i = 0;
+        let v = read_num_bytes!(T, num_bytes, self.buffer.data()[self.byte_offset..]);
+        self.byte_offset += num_bytes;
 
-        // First align bit offset to byte offset
-        if self.bit_offset != 0 {
-            while i < values_to_read && self.bit_offset != 0 {
-                batch[i] = self
-                    .get_value(num_bits)
-                    .expect("expected to have more data");
-                i += 1;
-            }
-        }
+        Some(v)
+    }
 
-        unsafe {
-            let in_buf = &self.buffer.data()[self.byte_offset..];
-            let mut in_ptr = in_buf as *const [u8] as *const u8 as *const u32;
-            if size_of::<T>() == 4 {
-                while values_to_read - i >= 32 {
-                    let out_ptr = &mut batch[i..] as *mut [T] as *mut T as *mut u32;
-                    in_ptr = unpack32(in_ptr, out_ptr, num_bits);
-                    self.byte_offset += 4 * num_bits;
-                    i += 32;
-                }
-            } else {
-                let mut out_buf = [0u32; 32];
-                let out_ptr = &mut out_buf as &mut [u32] as *mut [u32] as *mut u32;
-                while values_to_read - i >= 32 {
-                    in_ptr = unpack32(in_ptr, out_ptr, num_bits);
-                    self.byte_offset += 4 * num_bits;
-                    for n in 0..32 {
-                        // We need to copy from smaller size to bigger size to avoid
-                        // overwriting other memory regions.
-                        if size_of::<T>() > size_of::<u32>() {
+    /// Read up to `to_read` values from a packed buffer `batch` with bit width `num_bits`
+    /// in batches of 32, returning the number of values read
+    ///
+    /// # Panics
+    ///
+    /// This function panics if
+    /// * `bit_width` is greater than 32
+    /// * less than `to_read` values in the buffer
+    fn get_batch_x32<T: FromBytes>(
+        &mut self,
+        batch: &mut [T],
+        to_read: usize,
+        bit_width: usize,
+    ) -> usize {
+        assert!(bit_width <= 32);
+
+        let mut values_read = 0;
+        let in_buf = &self.buffer.data()[self.byte_offset..];
+        assert!(in_buf.len() * 8 >= to_read * bit_width);
+
+        let mut in_ptr = in_buf as *const [u8] as *const u8 as *const u32;

Review comment:
       This block is simply moved, with some slightly more verbose naming




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] tustvold closed pull request #1283: Refactor BitReader to contain explicit state (#1282)

Posted by GitBox <gi...@apache.org>.
tustvold closed pull request #1283:
URL: https://github.com/apache/arrow-rs/pull/1283


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] codecov-commenter commented on pull request #1283: Refactor BitReader to contain explicit state (#1282)

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on pull request #1283:
URL: https://github.com/apache/arrow-rs/pull/1283#issuecomment-1031375927


   # [Codecov](https://codecov.io/gh/apache/arrow-rs/pull/1283?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#1283](https://codecov.io/gh/apache/arrow-rs/pull/1283?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (9b456fa) into [master](https://codecov.io/gh/apache/arrow-rs/commit/ce15d0cd7bad127799f3f0d5845e2a0267e670a9?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (ce15d0c) will **increase** coverage by `0.00%`.
   > The diff coverage is `94.59%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/arrow-rs/pull/1283/graphs/tree.svg?width=650&height=150&src=pr&token=pq9V9qWZ1N&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/arrow-rs/pull/1283?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@           Coverage Diff           @@
   ##           master    #1283   +/-   ##
   =======================================
     Coverage   83.02%   83.02%           
   =======================================
     Files         180      180           
     Lines       52269    52306   +37     
   =======================================
   + Hits        43394    43427   +33     
   - Misses       8875     8879    +4     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow-rs/pull/1283?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [parquet/src/util/bit\_util.rs](https://codecov.io/gh/apache/arrow-rs/pull/1283/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvdXRpbC9iaXRfdXRpbC5ycw==) | `93.10% <94.59%> (-0.09%)` | :arrow_down: |
   | [arrow/src/datatypes/field.rs](https://codecov.io/gh/apache/arrow-rs/pull/1283/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YXJyb3cvc3JjL2RhdGF0eXBlcy9maWVsZC5ycw==) | `53.79% <0.00%> (-0.31%)` | :arrow_down: |
   | [arrow/src/array/transform/mod.rs](https://codecov.io/gh/apache/arrow-rs/pull/1283/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YXJyb3cvc3JjL2FycmF5L3RyYW5zZm9ybS9tb2QucnM=) | `84.51% <0.00%> (-0.13%)` | :arrow_down: |
   | [parquet\_derive/src/parquet\_field.rs](https://codecov.io/gh/apache/arrow-rs/pull/1283/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldF9kZXJpdmUvc3JjL3BhcnF1ZXRfZmllbGQucnM=) | `66.21% <0.00%> (+0.22%)` | :arrow_up: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow-rs/pull/1283?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow-rs/pull/1283?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [ce15d0c...9b456fa](https://codecov.io/gh/apache/arrow-rs/pull/1283?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] tustvold commented on a change in pull request #1283: Refactor BitReader to contain explicit state (#1282)

Posted by GitBox <gi...@apache.org>.
tustvold commented on a change in pull request #1283:
URL: https://github.com/apache/arrow-rs/pull/1283#discussion_r800564813



##########
File path: parquet/src/util/bit_util.rs
##########
@@ -445,158 +445,307 @@ impl BitWriter {
 /// MAX_VLQ_BYTE_LEN = 5 for i32, and MAX_VLQ_BYTE_LEN = 10 for i64
 pub const MAX_VLQ_BYTE_LEN: usize = 10;
 
-pub struct BitReader {
-    // The byte buffer to read from, passed in by client
+/// A struct storing the state for reading individual bits from a byte array
+struct BitReaderUnaligned {
+    /// The byte buffer to read from, passed in by client
     buffer: ByteBufferPtr,
 
-    // Bytes are memcpy'd from `buffer` and values are read from this variable.
-    // This is faster than reading values byte by byte directly from `buffer`
-    buffered_values: u64,
-
-    //
-    // End                                         Start
-    // |............|B|B|B|B|B|B|B|B|..............|
-    //                   ^          ^
-    //                 bit_offset   byte_offset
-    //
-    // Current byte offset in `buffer`
+    ///
+    /// End                                         Start
+    /// |............|B|B|B|B|B|B|B|B|..............|
+    ///                   ^          ^
+    ///                 bit_offset   byte_offset
+    ///
+    /// Current byte offset in `buffer`
     byte_offset: usize,
 
-    // Current bit offset in `buffered_values`
+    /// Current bit offset in `buffered_values`
     bit_offset: usize,
 
-    // Total number of bytes in `buffer`
-    total_bytes: usize,
+    /// Bytes are memcpy'd from `buffer` and values are read from this variable.
+    /// This is faster than reading values byte by byte directly from `buffer`
+    buffered_values: u64,
 }
 
-/// Utility class to read bit/byte stream. This class can read bits or bytes that are
-/// either byte aligned or not.
-impl BitReader {
-    pub fn new(buffer: ByteBufferPtr) -> Self {
-        let total_bytes = buffer.len();
-        let num_bytes = cmp::min(8, total_bytes);
-        let buffered_values = read_num_bytes!(u64, num_bytes, buffer.as_ref());
-        BitReader {
-            buffer,
-            buffered_values,
-            byte_offset: 0,
-            bit_offset: 0,
-            total_bytes,
-        }
-    }
-
-    pub fn reset(&mut self, buffer: ByteBufferPtr) {
-        self.buffer = buffer;
-        self.total_bytes = self.buffer.len();
-        let num_bytes = cmp::min(8, self.total_bytes);
-        self.buffered_values = read_num_bytes!(u64, num_bytes, self.buffer.as_ref());
-        self.byte_offset = 0;
-        self.bit_offset = 0;
-    }
-
-    /// Gets the current byte offset
-    #[inline]
-    pub fn get_byte_offset(&self) -> usize {
-        self.byte_offset + ceil(self.bit_offset as i64, 8) as usize
+impl BitReaderUnaligned {
+    fn reload_buffer_values(&mut self) {
+        let bytes_to_read = cmp::min(self.buffer.len() - self.byte_offset, 8);
+        self.buffered_values =
+            read_num_bytes!(u64, bytes_to_read, self.buffer.data()[self.byte_offset..]);
     }
 
-    /// Reads a value of type `T` and of size `num_bits`.
-    ///
-    /// Returns `None` if there's not enough data available. `Some` otherwise.
-    pub fn get_value<T: FromBytes>(&mut self, num_bits: usize) -> Option<T> {
-        assert!(num_bits <= 64);
-        assert!(num_bits <= size_of::<T>() * 8);
+    fn get<T: FromBytes>(&mut self, bit_width: usize) -> Option<T> {
+        assert!(bit_width <= 64);
+        assert!(bit_width <= size_of::<T>() * 8);
 
-        if self.byte_offset * 8 + self.bit_offset + num_bits > self.total_bytes * 8 {
+        if self.byte_offset * 8 + self.bit_offset + bit_width > self.buffer.len() * 8 {
             return None;
         }
 
-        let mut v = trailing_bits(self.buffered_values, self.bit_offset + num_bits)
+        let mut v = trailing_bits(self.buffered_values, self.bit_offset + bit_width)
             >> self.bit_offset;
-        self.bit_offset += num_bits;
+        self.bit_offset += bit_width;
 
         if self.bit_offset >= 64 {
             self.byte_offset += 8;
             self.bit_offset -= 64;
 
             self.reload_buffer_values();
             v |= trailing_bits(self.buffered_values, self.bit_offset)
-                .wrapping_shl((num_bits - self.bit_offset) as u32);
+                .wrapping_shl((bit_width - self.bit_offset) as u32);
         }
 
         // TODO: better to avoid copying here
         Some(from_ne_slice(v.as_bytes()))
     }
 
-    pub fn get_batch<T: FromBytes>(&mut self, batch: &mut [T], num_bits: usize) -> usize {
-        assert!(num_bits <= 32);
-        assert!(num_bits <= size_of::<T>() * 8);
+    /// Gets the current byte offset
+    fn aligned_byte_offset(&self) -> usize {
+        self.byte_offset + ceil(self.bit_offset as i64, 8) as usize
+    }
+}
 
-        let mut values_to_read = batch.len();
-        let needed_bits = num_bits * values_to_read;
-        let remaining_bits = (self.total_bytes - self.byte_offset) * 8 - self.bit_offset;
-        if remaining_bits < needed_bits {
-            values_to_read = remaining_bits / num_bits;
+/// A struct for storing the state for reading whole bytes from a byte stream
+struct BitReaderAligned {
+    /// The byte buffer to read from, passed in by client
+    buffer: ByteBufferPtr,
+    /// The current offset in `buffer`
+    byte_offset: usize,
+}
+
+impl BitReaderAligned {
+    fn get<T: FromBytes>(&mut self, num_bytes: usize) -> Option<T> {
+        if self.byte_offset + num_bytes > self.buffer.len() {
+            return None;
         }
 
-        let mut i = 0;
+        let v = read_num_bytes!(T, num_bytes, self.buffer.data()[self.byte_offset..]);
+        self.byte_offset += num_bytes;
 
-        // First align bit offset to byte offset
-        if self.bit_offset != 0 {
-            while i < values_to_read && self.bit_offset != 0 {
-                batch[i] = self
-                    .get_value(num_bits)
-                    .expect("expected to have more data");
-                i += 1;
-            }
-        }
+        Some(v)
+    }
 
-        unsafe {
-            let in_buf = &self.buffer.data()[self.byte_offset..];
-            let mut in_ptr = in_buf as *const [u8] as *const u8 as *const u32;
-            if size_of::<T>() == 4 {
-                while values_to_read - i >= 32 {
-                    let out_ptr = &mut batch[i..] as *mut [T] as *mut T as *mut u32;
-                    in_ptr = unpack32(in_ptr, out_ptr, num_bits);
-                    self.byte_offset += 4 * num_bits;
-                    i += 32;
-                }
-            } else {
-                let mut out_buf = [0u32; 32];
-                let out_ptr = &mut out_buf as &mut [u32] as *mut [u32] as *mut u32;
-                while values_to_read - i >= 32 {
-                    in_ptr = unpack32(in_ptr, out_ptr, num_bits);
-                    self.byte_offset += 4 * num_bits;
-                    for n in 0..32 {
-                        // We need to copy from smaller size to bigger size to avoid
-                        // overwriting other memory regions.
-                        if size_of::<T>() > size_of::<u32>() {
+    /// Read up to `to_read` values from a packed buffer `batch` with bit width `num_bits`
+    /// in batches of 32, returning the number of values read
+    ///
+    /// # Panics
+    ///
+    /// This function panics if
+    /// * `bit_width` is greater than 32
+    /// * less than `to_read` values in the buffer
+    fn get_batch_x32<T: FromBytes>(
+        &mut self,
+        batch: &mut [T],
+        to_read: usize,
+        bit_width: usize,
+    ) -> usize {
+        assert!(bit_width <= 32);
+
+        let mut values_read = 0;
+        let in_buf = &self.buffer.data()[self.byte_offset..];
+        assert!(in_buf.len() * 8 >= to_read * bit_width);
+
+        let mut in_ptr = in_buf as *const [u8] as *const u8 as *const u32;
+        if size_of::<T>() == 4 {
+            while to_read - values_read >= 32 {
+                let out_ptr = &mut batch[values_read..] as *mut [T] as *mut T as *mut u32;
+                in_ptr = unsafe { unpack32(in_ptr, out_ptr, bit_width) };
+                self.byte_offset += 4 * bit_width;
+                values_read += 32;
+            }
+        } else {
+            let mut out_buf = [0u32; 32];
+            let out_ptr = &mut out_buf as &mut [u32] as *mut [u32] as *mut u32;
+            while to_read - values_read >= 32 {
+                in_ptr = unsafe { unpack32(in_ptr, out_ptr, bit_width) };
+                self.byte_offset += 4 * bit_width;
+                for n in 0..32 {
+                    // We need to copy from smaller size to bigger size to avoid
+                    // overwriting other memory regions.
+                    if size_of::<T>() > size_of::<u32>() {
+                        unsafe {
                             std::ptr::copy_nonoverlapping(
                                 out_buf[n..].as_ptr() as *const u32,
-                                &mut batch[i] as *mut T as *mut u32,
+                                &mut batch[values_read] as *mut T as *mut u32,
                                 1,
                             );
-                        } else {
+                        }
+                    } else {
+                        unsafe {
                             std::ptr::copy_nonoverlapping(
                                 out_buf[n..].as_ptr() as *const T,
-                                &mut batch[i] as *mut T,
+                                &mut batch[values_read] as *mut T,
                                 1,
                             );
                         }
-                        i += 1;
                     }
+                    values_read += 1;
                 }
             }
         }
+        values_read
+    }
+}
+
+/// Combines [`BitReaderAligned`] and [`BitReaderUnaligned`] providing conversions between them
+enum BitReaderState {
+    Unaligned(BitReaderUnaligned),
+    Aligned(BitReaderAligned),
+}
 
-        assert!(values_to_read - i < 32);
+impl BitReaderState {
+    fn new(buffer: ByteBufferPtr, byte_offset: usize) -> Self {
+        BitReaderState::Aligned(BitReaderAligned {
+            buffer,
+            byte_offset,
+        })
+    }
 
-        self.reload_buffer_values();
-        while i < values_to_read {
-            batch[i] = self
-                .get_value(num_bits)
-                .expect("expected to have more data");
-            i += 1;
+    /// Returns the number of remaining bits
+    fn remaining_bits(&self) -> usize {
+        match &self {
+            BitReaderState::Unaligned(s) => {
+                (s.buffer.len() - s.byte_offset) * 8 - s.bit_offset
+            }
+            BitReaderState::Aligned(s) => (s.buffer.len() - s.byte_offset) * 8,
+        }
+    }
+
+    /// Returns the current byte offset, rounds up to the nearest whole byte
+    fn get_byte_offset(&self) -> usize {
+        match self {
+            BitReaderState::Unaligned(s) => s.aligned_byte_offset(),
+            BitReaderState::Aligned(s) => s.byte_offset,
+        }
+    }
+
+    /// Converts this to a [`BitReaderAligned`] advancing to the next whole byte
+    fn as_aligned(&mut self) -> &mut BitReaderAligned {
+        match self {
+            BitReaderState::Unaligned(s) => {
+                let offset = s.aligned_byte_offset();
+                *self = BitReaderState::new(s.buffer.clone(), offset);
+
+                match self {
+                    BitReaderState::Aligned(s) => s,
+                    _ => unreachable!(),
+                }
+            }
+            BitReaderState::Aligned(s) => s,
+        }
+    }
+
+    /// Converts this to a [`BitReaderUnaligned`]
+    fn as_unaligned(&mut self) -> &mut BitReaderUnaligned {
+        match self {
+            BitReaderState::Unaligned(s) => s,
+            BitReaderState::Aligned(s) => {
+                let mut scan = BitReaderUnaligned {
+                    buffer: s.buffer.clone(),
+                    byte_offset: s.byte_offset,
+                    bit_offset: 0,
+                    buffered_values: 0,
+                };
+
+                scan.reload_buffer_values();
+                *self = BitReaderState::Unaligned(scan);
+
+                match self {
+                    BitReaderState::Unaligned(s) => s,
+                    _ => unreachable!(),
+                }
+            }
+        }
+    }
+}
+
+pub struct BitReader {
+    /// The state of the bit reader, can use simpler logic
+    /// when reading aligned data
+    state: BitReaderState,
+}
+
+/// Utility class to read bit/byte stream. This class can read bits or bytes that are
+/// either byte aligned or not.
+impl BitReader {
+    pub fn new(buffer: ByteBufferPtr) -> Self {
+        BitReader {
+            state: BitReaderState::new(buffer, 0),
+        }
+    }
+
+    pub fn reset(&mut self, buffer: ByteBufferPtr) {
+        self.state = BitReaderState::new(buffer, 0)
+    }
+
+    /// Gets the current byte offset, rounding up to the nearest whole byte
+    #[inline]
+    pub fn get_byte_offset(&self) -> usize {
+        self.state.get_byte_offset()
+    }
+
+    /// Reads a value of type `T` and of size `num_bits`.
+    ///
+    /// Returns `None` if there's not enough data available. `Some` otherwise.
+    pub fn get_value<T: FromBytes>(&mut self, num_bits: usize) -> Option<T> {
+        self.state.as_unaligned().get(num_bits)
+    }
+
+    /// Read multiple values from their packed representation
+    ///
+    /// # Panics
+    ///
+    /// This function panics if
+    /// - `bit_width > 32`
+    /// - `bit_width` is larger than the bit-capacity of `T`
+    ///
+    pub fn get_batch<T: FromBytes>(
+        &mut self,
+        batch: &mut [T],
+        bit_width: usize,
+    ) -> usize {
+        assert!(bit_width <= size_of::<T>() * 8);
+
+        let mut values_to_read = batch.len();
+        let needed_bits = bit_width * values_to_read;
+        let remaining_bits = self.state.remaining_bits();
+        if remaining_bits < needed_bits {
+            values_to_read = remaining_bits / bit_width;
+        }
+
+        let mut values_read = 0;
+
+        // First align bit offset to byte offset
+        if let BitReaderState::Unaligned(unaligned) = &mut self.state {
+            while values_to_read != values_read && (unaligned.bit_offset % 8) != 0 {

Review comment:
       There is a slight change here - we align to the byte not `u64`. I think this was unintentional, as the comment said `byte offset`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] tustvold commented on pull request #1283: Refactor BitReader to contain explicit state (#1282)

Posted by GitBox <gi...@apache.org>.
tustvold commented on pull request #1283:
URL: https://github.com/apache/arrow-rs/pull/1283#issuecomment-1037241480


   Going to close this as it wasn't needed for #1284


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] tustvold commented on pull request #1283: Refactor BitReader to contain explicit state (#1282)

Posted by GitBox <gi...@apache.org>.
tustvold commented on pull request #1283:
URL: https://github.com/apache/arrow-rs/pull/1283#issuecomment-1032430104


   Going to get #1284 in first and then revisit this


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] tustvold commented on a change in pull request #1283: Refactor BitReader to contain explicit state (#1282)

Posted by GitBox <gi...@apache.org>.
tustvold commented on a change in pull request #1283:
URL: https://github.com/apache/arrow-rs/pull/1283#discussion_r800564813



##########
File path: parquet/src/util/bit_util.rs
##########
@@ -445,158 +445,307 @@ impl BitWriter {
 /// MAX_VLQ_BYTE_LEN = 5 for i32, and MAX_VLQ_BYTE_LEN = 10 for i64
 pub const MAX_VLQ_BYTE_LEN: usize = 10;
 
-pub struct BitReader {
-    // The byte buffer to read from, passed in by client
+/// A struct storing the state for reading individual bits from a byte array
+struct BitReaderUnaligned {
+    /// The byte buffer to read from, passed in by client
     buffer: ByteBufferPtr,
 
-    // Bytes are memcpy'd from `buffer` and values are read from this variable.
-    // This is faster than reading values byte by byte directly from `buffer`
-    buffered_values: u64,
-
-    //
-    // End                                         Start
-    // |............|B|B|B|B|B|B|B|B|..............|
-    //                   ^          ^
-    //                 bit_offset   byte_offset
-    //
-    // Current byte offset in `buffer`
+    ///
+    /// End                                         Start
+    /// |............|B|B|B|B|B|B|B|B|..............|
+    ///                   ^          ^
+    ///                 bit_offset   byte_offset
+    ///
+    /// Current byte offset in `buffer`
     byte_offset: usize,
 
-    // Current bit offset in `buffered_values`
+    /// Current bit offset in `buffered_values`
     bit_offset: usize,
 
-    // Total number of bytes in `buffer`
-    total_bytes: usize,
+    /// Bytes are memcpy'd from `buffer` and values are read from this variable.
+    /// This is faster than reading values byte by byte directly from `buffer`
+    buffered_values: u64,
 }
 
-/// Utility class to read bit/byte stream. This class can read bits or bytes that are
-/// either byte aligned or not.
-impl BitReader {
-    pub fn new(buffer: ByteBufferPtr) -> Self {
-        let total_bytes = buffer.len();
-        let num_bytes = cmp::min(8, total_bytes);
-        let buffered_values = read_num_bytes!(u64, num_bytes, buffer.as_ref());
-        BitReader {
-            buffer,
-            buffered_values,
-            byte_offset: 0,
-            bit_offset: 0,
-            total_bytes,
-        }
-    }
-
-    pub fn reset(&mut self, buffer: ByteBufferPtr) {
-        self.buffer = buffer;
-        self.total_bytes = self.buffer.len();
-        let num_bytes = cmp::min(8, self.total_bytes);
-        self.buffered_values = read_num_bytes!(u64, num_bytes, self.buffer.as_ref());
-        self.byte_offset = 0;
-        self.bit_offset = 0;
-    }
-
-    /// Gets the current byte offset
-    #[inline]
-    pub fn get_byte_offset(&self) -> usize {
-        self.byte_offset + ceil(self.bit_offset as i64, 8) as usize
+impl BitReaderUnaligned {
+    fn reload_buffer_values(&mut self) {
+        let bytes_to_read = cmp::min(self.buffer.len() - self.byte_offset, 8);
+        self.buffered_values =
+            read_num_bytes!(u64, bytes_to_read, self.buffer.data()[self.byte_offset..]);
     }
 
-    /// Reads a value of type `T` and of size `num_bits`.
-    ///
-    /// Returns `None` if there's not enough data available. `Some` otherwise.
-    pub fn get_value<T: FromBytes>(&mut self, num_bits: usize) -> Option<T> {
-        assert!(num_bits <= 64);
-        assert!(num_bits <= size_of::<T>() * 8);
+    fn get<T: FromBytes>(&mut self, bit_width: usize) -> Option<T> {
+        assert!(bit_width <= 64);
+        assert!(bit_width <= size_of::<T>() * 8);
 
-        if self.byte_offset * 8 + self.bit_offset + num_bits > self.total_bytes * 8 {
+        if self.byte_offset * 8 + self.bit_offset + bit_width > self.buffer.len() * 8 {
             return None;
         }
 
-        let mut v = trailing_bits(self.buffered_values, self.bit_offset + num_bits)
+        let mut v = trailing_bits(self.buffered_values, self.bit_offset + bit_width)
             >> self.bit_offset;
-        self.bit_offset += num_bits;
+        self.bit_offset += bit_width;
 
         if self.bit_offset >= 64 {
             self.byte_offset += 8;
             self.bit_offset -= 64;
 
             self.reload_buffer_values();
             v |= trailing_bits(self.buffered_values, self.bit_offset)
-                .wrapping_shl((num_bits - self.bit_offset) as u32);
+                .wrapping_shl((bit_width - self.bit_offset) as u32);
         }
 
         // TODO: better to avoid copying here
         Some(from_ne_slice(v.as_bytes()))
     }
 
-    pub fn get_batch<T: FromBytes>(&mut self, batch: &mut [T], num_bits: usize) -> usize {
-        assert!(num_bits <= 32);
-        assert!(num_bits <= size_of::<T>() * 8);
+    /// Gets the current byte offset
+    fn aligned_byte_offset(&self) -> usize {
+        self.byte_offset + ceil(self.bit_offset as i64, 8) as usize
+    }
+}
 
-        let mut values_to_read = batch.len();
-        let needed_bits = num_bits * values_to_read;
-        let remaining_bits = (self.total_bytes - self.byte_offset) * 8 - self.bit_offset;
-        if remaining_bits < needed_bits {
-            values_to_read = remaining_bits / num_bits;
+/// A struct for storing the state for reading whole bytes from a byte stream
+struct BitReaderAligned {
+    /// The byte buffer to read from, passed in by client
+    buffer: ByteBufferPtr,
+    /// The current offset in `buffer`
+    byte_offset: usize,
+}
+
+impl BitReaderAligned {
+    fn get<T: FromBytes>(&mut self, num_bytes: usize) -> Option<T> {
+        if self.byte_offset + num_bytes > self.buffer.len() {
+            return None;
         }
 
-        let mut i = 0;
+        let v = read_num_bytes!(T, num_bytes, self.buffer.data()[self.byte_offset..]);
+        self.byte_offset += num_bytes;
 
-        // First align bit offset to byte offset
-        if self.bit_offset != 0 {
-            while i < values_to_read && self.bit_offset != 0 {
-                batch[i] = self
-                    .get_value(num_bits)
-                    .expect("expected to have more data");
-                i += 1;
-            }
-        }
+        Some(v)
+    }
 
-        unsafe {
-            let in_buf = &self.buffer.data()[self.byte_offset..];
-            let mut in_ptr = in_buf as *const [u8] as *const u8 as *const u32;
-            if size_of::<T>() == 4 {
-                while values_to_read - i >= 32 {
-                    let out_ptr = &mut batch[i..] as *mut [T] as *mut T as *mut u32;
-                    in_ptr = unpack32(in_ptr, out_ptr, num_bits);
-                    self.byte_offset += 4 * num_bits;
-                    i += 32;
-                }
-            } else {
-                let mut out_buf = [0u32; 32];
-                let out_ptr = &mut out_buf as &mut [u32] as *mut [u32] as *mut u32;
-                while values_to_read - i >= 32 {
-                    in_ptr = unpack32(in_ptr, out_ptr, num_bits);
-                    self.byte_offset += 4 * num_bits;
-                    for n in 0..32 {
-                        // We need to copy from smaller size to bigger size to avoid
-                        // overwriting other memory regions.
-                        if size_of::<T>() > size_of::<u32>() {
+    /// Read up to `to_read` values from a packed buffer `batch` with bit width `num_bits`
+    /// in batches of 32, returning the number of values read
+    ///
+    /// # Panics
+    ///
+    /// This function panics if
+    /// * `bit_width` is greater than 32
+    /// * less than `to_read` values in the buffer
+    fn get_batch_x32<T: FromBytes>(
+        &mut self,
+        batch: &mut [T],
+        to_read: usize,
+        bit_width: usize,
+    ) -> usize {
+        assert!(bit_width <= 32);
+
+        let mut values_read = 0;
+        let in_buf = &self.buffer.data()[self.byte_offset..];
+        assert!(in_buf.len() * 8 >= to_read * bit_width);
+
+        let mut in_ptr = in_buf as *const [u8] as *const u8 as *const u32;
+        if size_of::<T>() == 4 {
+            while to_read - values_read >= 32 {
+                let out_ptr = &mut batch[values_read..] as *mut [T] as *mut T as *mut u32;
+                in_ptr = unsafe { unpack32(in_ptr, out_ptr, bit_width) };
+                self.byte_offset += 4 * bit_width;
+                values_read += 32;
+            }
+        } else {
+            let mut out_buf = [0u32; 32];
+            let out_ptr = &mut out_buf as &mut [u32] as *mut [u32] as *mut u32;
+            while to_read - values_read >= 32 {
+                in_ptr = unsafe { unpack32(in_ptr, out_ptr, bit_width) };
+                self.byte_offset += 4 * bit_width;
+                for n in 0..32 {
+                    // We need to copy from smaller size to bigger size to avoid
+                    // overwriting other memory regions.
+                    if size_of::<T>() > size_of::<u32>() {
+                        unsafe {
                             std::ptr::copy_nonoverlapping(
                                 out_buf[n..].as_ptr() as *const u32,
-                                &mut batch[i] as *mut T as *mut u32,
+                                &mut batch[values_read] as *mut T as *mut u32,
                                 1,
                             );
-                        } else {
+                        }
+                    } else {
+                        unsafe {
                             std::ptr::copy_nonoverlapping(
                                 out_buf[n..].as_ptr() as *const T,
-                                &mut batch[i] as *mut T,
+                                &mut batch[values_read] as *mut T,
                                 1,
                             );
                         }
-                        i += 1;
                     }
+                    values_read += 1;
                 }
             }
         }
+        values_read
+    }
+}
+
+/// Combines [`BitReaderAligned`] and [`BitReaderUnaligned`] providing conversions between them
+enum BitReaderState {
+    Unaligned(BitReaderUnaligned),
+    Aligned(BitReaderAligned),
+}
 
-        assert!(values_to_read - i < 32);
+impl BitReaderState {
+    fn new(buffer: ByteBufferPtr, byte_offset: usize) -> Self {
+        BitReaderState::Aligned(BitReaderAligned {
+            buffer,
+            byte_offset,
+        })
+    }
 
-        self.reload_buffer_values();
-        while i < values_to_read {
-            batch[i] = self
-                .get_value(num_bits)
-                .expect("expected to have more data");
-            i += 1;
+    /// Returns the number of remaining bits
+    fn remaining_bits(&self) -> usize {
+        match &self {
+            BitReaderState::Unaligned(s) => {
+                (s.buffer.len() - s.byte_offset) * 8 - s.bit_offset
+            }
+            BitReaderState::Aligned(s) => (s.buffer.len() - s.byte_offset) * 8,
+        }
+    }
+
+    /// Returns the current byte offset, rounds up to the nearest whole byte
+    fn get_byte_offset(&self) -> usize {
+        match self {
+            BitReaderState::Unaligned(s) => s.aligned_byte_offset(),
+            BitReaderState::Aligned(s) => s.byte_offset,
+        }
+    }
+
+    /// Converts this to a [`BitReaderAligned`] advancing to the next whole byte
+    fn as_aligned(&mut self) -> &mut BitReaderAligned {
+        match self {
+            BitReaderState::Unaligned(s) => {
+                let offset = s.aligned_byte_offset();
+                *self = BitReaderState::new(s.buffer.clone(), offset);
+
+                match self {
+                    BitReaderState::Aligned(s) => s,
+                    _ => unreachable!(),
+                }
+            }
+            BitReaderState::Aligned(s) => s,
+        }
+    }
+
+    /// Converts this to a [`BitReaderUnaligned`]
+    fn as_unaligned(&mut self) -> &mut BitReaderUnaligned {
+        match self {
+            BitReaderState::Unaligned(s) => s,
+            BitReaderState::Aligned(s) => {
+                let mut scan = BitReaderUnaligned {
+                    buffer: s.buffer.clone(),
+                    byte_offset: s.byte_offset,
+                    bit_offset: 0,
+                    buffered_values: 0,
+                };
+
+                scan.reload_buffer_values();
+                *self = BitReaderState::Unaligned(scan);
+
+                match self {
+                    BitReaderState::Unaligned(s) => s,
+                    _ => unreachable!(),
+                }
+            }
+        }
+    }
+}
+
+pub struct BitReader {
+    /// The state of the bit reader, can use simpler logic
+    /// when reading aligned data
+    state: BitReaderState,
+}
+
+/// Utility class to read bit/byte stream. This class can read bits or bytes that are
+/// either byte aligned or not.
+impl BitReader {
+    pub fn new(buffer: ByteBufferPtr) -> Self {
+        BitReader {
+            state: BitReaderState::new(buffer, 0),
+        }
+    }
+
+    pub fn reset(&mut self, buffer: ByteBufferPtr) {
+        self.state = BitReaderState::new(buffer, 0)
+    }
+
+    /// Gets the current byte offset, rounding up to the nearest whole byte
+    #[inline]
+    pub fn get_byte_offset(&self) -> usize {
+        self.state.get_byte_offset()
+    }
+
+    /// Reads a value of type `T` and of size `num_bits`.
+    ///
+    /// Returns `None` if there's not enough data available. `Some` otherwise.
+    pub fn get_value<T: FromBytes>(&mut self, num_bits: usize) -> Option<T> {
+        self.state.as_unaligned().get(num_bits)
+    }
+
+    /// Read multiple values from their packed representation
+    ///
+    /// # Panics
+    ///
+    /// This function panics if
+    /// - `bit_width > 32`
+    /// - `bit_width` is larger than the bit-capacity of `T`
+    ///
+    pub fn get_batch<T: FromBytes>(
+        &mut self,
+        batch: &mut [T],
+        bit_width: usize,
+    ) -> usize {
+        assert!(bit_width <= size_of::<T>() * 8);
+
+        let mut values_to_read = batch.len();
+        let needed_bits = bit_width * values_to_read;
+        let remaining_bits = self.state.remaining_bits();
+        if remaining_bits < needed_bits {
+            values_to_read = remaining_bits / bit_width;
+        }
+
+        let mut values_read = 0;
+
+        // First align bit offset to byte offset
+        if let BitReaderState::Unaligned(unaligned) = &mut self.state {
+            while values_to_read != values_read && (unaligned.bit_offset % 8) != 0 {

Review comment:
       There is a slight change here - we align to the byte not `u64`. I think this was unintentional, as the comment said `byte offset` and there is no reason to align to `u64`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org