You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/07/31 20:45:10 UTC

[arrow-rs] branch master updated: Automatically grow parquet BitWriter (#2226) (~10% faster) (#2231)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 99ad91545 Automatically grow parquet BitWriter (#2226) (~10% faster) (#2231)
99ad91545 is described below

commit 99ad915452528d9add1427591ba23808542195c9
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Sun Jul 31 21:45:05 2022 +0100

    Automatically grow parquet BitWriter (#2226) (~10% faster) (#2231)
    
    * Automatically grow parquet BitWriter (#2226)
    
    * Review feedback
---
 parquet/src/column/writer/mod.rs               |   8 +-
 parquet/src/data_type.rs                       |  16 +-
 parquet/src/encodings/encoding/dict_encoder.rs |  10 +-
 parquet/src/encodings/encoding/mod.rs          |  42 ++--
 parquet/src/encodings/levels.rs                |  89 +++------
 parquet/src/encodings/rle.rs                   |  72 +++----
 parquet/src/util/bit_util.rs                   | 255 +++++++------------------
 parquet/src/util/test_common/page_util.rs      |   6 +-
 8 files changed, 158 insertions(+), 340 deletions(-)

diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs
index 9a371bc27..2d5556111 100644
--- a/parquet/src/column/writer/mod.rs
+++ b/parquet/src/column/writer/mod.rs
@@ -27,7 +27,7 @@ use crate::column::writer::encoder::{
 use crate::compression::{create_codec, Codec};
 use crate::data_type::private::ParquetValueType;
 use crate::data_type::*;
-use crate::encodings::levels::{max_buffer_size, LevelEncoder};
+use crate::encodings::levels::LevelEncoder;
 use crate::errors::{ParquetError, Result};
 use crate::file::metadata::{ColumnIndexBuilder, OffsetIndexBuilder};
 use crate::file::properties::EnabledStatistics;
@@ -782,8 +782,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
         levels: &[i16],
         max_level: i16,
     ) -> Result<Vec<u8>> {
-        let size = max_buffer_size(encoding, max_level, levels.len());
-        let mut encoder = LevelEncoder::v1(encoding, max_level, vec![0; size]);
+        let mut encoder = LevelEncoder::v1(encoding, max_level, levels.len());
         encoder.put(levels)?;
         encoder.consume()
     }
@@ -792,8 +791,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
     /// Encoding is always RLE.
     #[inline]
     fn encode_levels_v2(&self, levels: &[i16], max_level: i16) -> Result<Vec<u8>> {
-        let size = max_buffer_size(Encoding::RLE, max_level, levels.len());
-        let mut encoder = LevelEncoder::v2(max_level, vec![0; size]);
+        let mut encoder = LevelEncoder::v2(max_level, levels.len());
         encoder.put(levels)?;
         encoder.consume()
     }
diff --git a/parquet/src/data_type.rs b/parquet/src/data_type.rs
index 1d0b5b231..43c9a4238 100644
--- a/parquet/src/data_type.rs
+++ b/parquet/src/data_type.rs
@@ -565,7 +565,7 @@ impl AsBytes for str {
 
 pub(crate) mod private {
     use crate::encodings::decoding::PlainDecoderDetails;
-    use crate::util::bit_util::{round_upto_power_of_2, BitReader, BitWriter};
+    use crate::util::bit_util::{BitReader, BitWriter};
     use crate::util::memory::ByteBufferPtr;
 
     use crate::basic::Type;
@@ -658,20 +658,8 @@ pub(crate) mod private {
             _: &mut W,
             bit_writer: &mut BitWriter,
         ) -> Result<()> {
-            if bit_writer.bytes_written() + values.len() / 8 >= bit_writer.capacity() {
-                let bits_available =
-                    (bit_writer.capacity() - bit_writer.bytes_written()) * 8;
-                let bits_needed = values.len() - bits_available;
-                let bytes_needed = (bits_needed + 7) / 8;
-                let bytes_needed = round_upto_power_of_2(bytes_needed, 256);
-                bit_writer.extend(bytes_needed);
-            }
             for value in values {
-                if !bit_writer.put_value(*value as u64, 1) {
-                    return Err(ParquetError::EOF(
-                        "unable to put boolean value".to_string(),
-                    ));
-                }
+                bit_writer.put_value(*value as u64, 1)
             }
             Ok(())
         }
diff --git a/parquet/src/encodings/encoding/dict_encoder.rs b/parquet/src/encodings/encoding/dict_encoder.rs
index 7bf983254..1b386448b 100644
--- a/parquet/src/encodings/encoding/dict_encoder.rs
+++ b/parquet/src/encodings/encoding/dict_encoder.rs
@@ -20,7 +20,7 @@
 
 use crate::basic::{Encoding, Type};
 use crate::data_type::private::ParquetValueType;
-use crate::data_type::{AsBytes, DataType};
+use crate::data_type::DataType;
 use crate::encodings::encoding::{Encoder, PlainEncoder};
 use crate::encodings::rle::RleEncoder;
 use crate::errors::{ParquetError, Result};
@@ -28,7 +28,6 @@ use crate::schema::types::ColumnDescPtr;
 use crate::util::bit_util::num_required_bits;
 use crate::util::interner::{Interner, Storage};
 use crate::util::memory::ByteBufferPtr;
-use std::io::Write;
 
 #[derive(Debug)]
 struct KeyStorage<T: DataType> {
@@ -127,12 +126,11 @@ impl<T: DataType> DictEncoder<T> {
     /// the result.
     pub fn write_indices(&mut self) -> Result<ByteBufferPtr> {
         let buffer_len = self.estimated_data_encoded_size();
-        let mut buffer = vec![0; buffer_len];
-        buffer[0] = self.bit_width() as u8;
+        let mut buffer = Vec::with_capacity(buffer_len);
+        buffer.push(self.bit_width() as u8);
 
         // Write bit width in the first byte
-        buffer.write_all((self.bit_width() as u8).as_bytes())?;
-        let mut encoder = RleEncoder::new_from_buf(self.bit_width(), buffer, 1);
+        let mut encoder = RleEncoder::new_from_buf(self.bit_width(), buffer);
         for index in &self.indices {
             if !encoder.put(*index as u64)? {
                 return Err(general_err!("Encoder doesn't have enough space"));
diff --git a/parquet/src/encodings/encoding/mod.rs b/parquet/src/encodings/encoding/mod.rs
index 5cb94b7c0..383211f12 100644
--- a/parquet/src/encodings/encoding/mod.rs
+++ b/parquet/src/encodings/encoding/mod.rs
@@ -186,10 +186,13 @@ impl<T: DataType> Encoder<T> for RleValueEncoder<T> {
     fn put(&mut self, values: &[T::T]) -> Result<()> {
         ensure_phys_ty!(Type::BOOLEAN, "RleValueEncoder only supports BoolType");
 
-        if self.encoder.is_none() {
-            self.encoder = Some(RleEncoder::new(1, DEFAULT_RLE_BUFFER_LEN));
-        }
-        let rle_encoder = self.encoder.as_mut().unwrap();
+        let rle_encoder = self.encoder.get_or_insert_with(|| {
+            let mut buffer = Vec::with_capacity(DEFAULT_RLE_BUFFER_LEN);
+            // Reserve space for length
+            buffer.extend_from_slice(&[0; 4]);
+            RleEncoder::new_from_buf(1, buffer)
+        });
+
         for value in values {
             let value = value.as_u64()?;
             if !rle_encoder.put(value)? {
@@ -220,25 +223,18 @@ impl<T: DataType> Encoder<T> for RleValueEncoder<T> {
         ensure_phys_ty!(Type::BOOLEAN, "RleValueEncoder only supports BoolType");
         let rle_encoder = self
             .encoder
-            .as_mut()
+            .take()
             .expect("RLE value encoder is not initialized");
 
         // Flush all encoder buffers and raw values
-        let encoded_data = {
-            let buf = rle_encoder.flush_buffer()?;
-
-            // Note that buf does not have any offset, all data is encoded bytes
-            let len = (buf.len() as i32).to_le();
-            let len_bytes = len.as_bytes();
-            let mut encoded_data = vec![];
-            encoded_data.extend_from_slice(len_bytes);
-            encoded_data.extend_from_slice(buf);
-            encoded_data
-        };
-        // Reset rle encoder for the next batch
-        rle_encoder.clear();
+        let mut buf = rle_encoder.consume()?;
+        assert!(buf.len() > 4, "should have had padding inserted");
+
+        // Note that buf does not have any offset, all data is encoded bytes
+        let len = (buf.len() - 4) as i32;
+        buf[..4].copy_from_slice(&len.to_le_bytes());
 
-        Ok(ByteBufferPtr::new(encoded_data))
+        Ok(ByteBufferPtr::new(buf))
     }
 }
 
@@ -293,7 +289,7 @@ impl<T: DataType> DeltaBitPackEncoder<T> {
         let block_size = DEFAULT_BLOCK_SIZE;
         let num_mini_blocks = DEFAULT_NUM_MINI_BLOCKS;
         let mini_block_size = block_size / num_mini_blocks;
-        assert!(mini_block_size % 8 == 0);
+        assert_eq!(mini_block_size % 8, 0);
         Self::assert_supported_type();
 
         DeltaBitPackEncoder {
@@ -346,7 +342,7 @@ impl<T: DataType> DeltaBitPackEncoder<T> {
         self.bit_writer.put_zigzag_vlq_int(min_delta);
 
         // Slice to store bit width for each mini block
-        let offset = self.bit_writer.skip(self.num_mini_blocks)?;
+        let offset = self.bit_writer.skip(self.num_mini_blocks);
 
         for i in 0..self.num_mini_blocks {
             // Find how many values we need to encode - either block size or whatever
@@ -364,7 +360,7 @@ impl<T: DataType> DeltaBitPackEncoder<T> {
             }
 
             // Compute the max delta in current mini block
-            let mut max_delta = i64::min_value();
+            let mut max_delta = i64::MIN;
             for j in 0..n {
                 max_delta =
                     cmp::max(max_delta, self.deltas[i * self.mini_block_size + j]);
@@ -873,7 +869,7 @@ mod tests {
         let mut values = vec![];
         values.extend_from_slice(&[true; 16]);
         values.extend_from_slice(&[false; 16]);
-        run_test::<BoolType>(Encoding::RLE, -1, &values, 0, 2, 0);
+        run_test::<BoolType>(Encoding::RLE, -1, &values, 0, 6, 0);
 
         // DELTA_LENGTH_BYTE_ARRAY
         run_test::<ByteArrayType>(
diff --git a/parquet/src/encodings/levels.rs b/parquet/src/encodings/levels.rs
index 28fb63881..8bdfdd3e9 100644
--- a/parquet/src/encodings/levels.rs
+++ b/parquet/src/encodings/levels.rs
@@ -65,22 +65,21 @@ impl LevelEncoder {
     /// Used to encode levels for Data Page v1.
     ///
     /// Panics, if encoding is not supported.
-    pub fn v1(encoding: Encoding, max_level: i16, byte_buffer: Vec<u8>) -> Self {
+    pub fn v1(encoding: Encoding, max_level: i16, capacity: usize) -> Self {
+        let capacity_bytes = max_buffer_size(encoding, max_level, capacity);
+        let mut buffer = Vec::with_capacity(capacity_bytes);
         let bit_width = num_required_bits(max_level as u64);
         match encoding {
-            Encoding::RLE => LevelEncoder::Rle(RleEncoder::new_from_buf(
-                bit_width,
-                byte_buffer,
-                mem::size_of::<i32>(),
-            )),
+            Encoding::RLE => {
+                // Reserve space for length header
+                buffer.extend_from_slice(&[0; 4]);
+                LevelEncoder::Rle(RleEncoder::new_from_buf(bit_width, buffer))
+            }
             Encoding::BIT_PACKED => {
                 // Here we set full byte buffer without adjusting for num_buffered_values,
                 // because byte buffer will already be allocated with size from
                 // `max_buffer_size()` method.
-                LevelEncoder::BitPacked(
-                    bit_width,
-                    BitWriter::new_from_buf(byte_buffer, 0),
-                )
+                LevelEncoder::BitPacked(bit_width, BitWriter::new_from_buf(buffer))
             }
             _ => panic!("Unsupported encoding type {}", encoding),
         }
@@ -88,9 +87,11 @@ impl LevelEncoder {
 
     /// Creates new level encoder based on RLE encoding. Used to encode Data Page v2
     /// repetition and definition levels.
-    pub fn v2(max_level: i16, byte_buffer: Vec<u8>) -> Self {
+    pub fn v2(max_level: i16, capacity: usize) -> Self {
+        let capacity_bytes = max_buffer_size(Encoding::RLE, max_level, capacity);
+        let buffer = Vec::with_capacity(capacity_bytes);
         let bit_width = num_required_bits(max_level as u64);
-        LevelEncoder::RleV2(RleEncoder::new_from_buf(bit_width, byte_buffer, 0))
+        LevelEncoder::RleV2(RleEncoder::new_from_buf(bit_width, buffer))
     }
 
     /// Put/encode levels vector into this level encoder.
@@ -114,9 +115,7 @@ impl LevelEncoder {
             }
             LevelEncoder::BitPacked(bit_width, ref mut encoder) => {
                 for value in buffer {
-                    if !encoder.put_value(*value as u64, bit_width as usize) {
-                        return Err(general_err!("Not enough bytes left"));
-                    }
+                    encoder.put_value(*value as u64, bit_width as usize);
                     num_encoded += 1;
                 }
                 encoder.flush();
@@ -283,11 +282,10 @@ mod tests {
     use crate::util::test_common::random_numbers_range;
 
     fn test_internal_roundtrip(enc: Encoding, levels: &[i16], max_level: i16, v2: bool) {
-        let size = max_buffer_size(enc, max_level, levels.len());
         let mut encoder = if v2 {
-            LevelEncoder::v2(max_level, vec![0; size])
+            LevelEncoder::v2(max_level, levels.len())
         } else {
-            LevelEncoder::v1(enc, max_level, vec![0; size])
+            LevelEncoder::v1(enc, max_level, levels.len())
         };
         encoder.put(levels).expect("put() should be OK");
         let encoded_levels = encoder.consume().expect("consume() should be OK");
@@ -315,11 +313,10 @@ mod tests {
         max_level: i16,
         v2: bool,
     ) {
-        let size = max_buffer_size(enc, max_level, levels.len());
         let mut encoder = if v2 {
-            LevelEncoder::v2(max_level, vec![0; size])
+            LevelEncoder::v2(max_level, levels.len())
         } else {
-            LevelEncoder::v1(enc, max_level, vec![0; size])
+            LevelEncoder::v1(enc, max_level, levels.len())
         };
         encoder.put(levels).expect("put() should be OK");
         let encoded_levels = encoder.consume().expect("consume() should be OK");
@@ -363,11 +360,10 @@ mod tests {
         max_level: i16,
         v2: bool,
     ) {
-        let size = max_buffer_size(enc, max_level, levels.len());
         let mut encoder = if v2 {
-            LevelEncoder::v2(max_level, vec![0; size])
+            LevelEncoder::v2(max_level, levels.len())
         } else {
-            LevelEncoder::v1(enc, max_level, vec![0; size])
+            LevelEncoder::v1(enc, max_level, levels.len())
         };
         // Encode only one value
         let num_encoded = encoder.put(&levels[0..1]).expect("put() should be OK");
@@ -391,33 +387,6 @@ mod tests {
         assert_eq!(buffer[0..num_decoded], levels[0..num_decoded]);
     }
 
-    // Tests when encoded values are larger than encoder's buffer
-    fn test_internal_roundtrip_overflow(
-        enc: Encoding,
-        levels: &[i16],
-        max_level: i16,
-        v2: bool,
-    ) {
-        let size = max_buffer_size(enc, max_level, levels.len());
-        let mut encoder = if v2 {
-            LevelEncoder::v2(max_level, vec![0; size])
-        } else {
-            LevelEncoder::v1(enc, max_level, vec![0; size])
-        };
-        let mut found_err = false;
-        // Insert a large number of values, so we run out of space
-        for _ in 0..100 {
-            if let Err(err) = encoder.put(levels) {
-                assert!(format!("{}", err).contains("Not enough bytes left"));
-                found_err = true;
-                break;
-            };
-        }
-        if !found_err {
-            panic!("Failed test: no buffer overflow");
-        }
-    }
-
     #[test]
     fn test_roundtrip_one() {
         let levels = vec![0, 1, 1, 1, 1, 0, 0, 0, 0, 1];
@@ -470,6 +439,15 @@ mod tests {
         test_internal_roundtrip(Encoding::RLE, &levels, max_level, true);
     }
 
+    #[test]
+    fn test_rountrip_max() {
+        let levels = vec![0, i16::MAX, i16::MAX, i16::MAX, 0];
+        let max_level = i16::MAX;
+        test_internal_roundtrip(Encoding::RLE, &levels, max_level, false);
+        test_internal_roundtrip(Encoding::BIT_PACKED, &levels, max_level, false);
+        test_internal_roundtrip(Encoding::RLE, &levels, max_level, true);
+    }
+
     #[test]
     fn test_roundtrip_underflow() {
         let levels = vec![1, 1, 2, 3, 2, 1, 1, 2, 3, 1];
@@ -484,15 +462,6 @@ mod tests {
         test_internal_roundtrip_underflow(Encoding::RLE, &levels, max_level, true);
     }
 
-    #[test]
-    fn test_roundtrip_overflow() {
-        let levels = vec![1, 1, 2, 3, 2, 1, 1, 2, 3, 1];
-        let max_level = 3;
-        test_internal_roundtrip_overflow(Encoding::RLE, &levels, max_level, false);
-        test_internal_roundtrip_overflow(Encoding::BIT_PACKED, &levels, max_level, false);
-        test_internal_roundtrip_overflow(Encoding::RLE, &levels, max_level, true);
-    }
-
     #[test]
     fn test_rle_decoder_set_data_range() {
         // Buffer containing both repetition and definition levels
diff --git a/parquet/src/encodings/rle.rs b/parquet/src/encodings/rle.rs
index 8a19dd545..28ebd7d3a 100644
--- a/parquet/src/encodings/rle.rs
+++ b/parquet/src/encodings/rle.rs
@@ -83,21 +83,14 @@ pub struct RleEncoder {
 
 impl RleEncoder {
     pub fn new(bit_width: u8, buffer_len: usize) -> Self {
-        let buffer = vec![0; buffer_len];
-        RleEncoder::new_from_buf(bit_width, buffer, 0)
+        let buffer = Vec::with_capacity(buffer_len);
+        RleEncoder::new_from_buf(bit_width, buffer)
     }
 
-    /// Initialize the encoder from existing `buffer` and the starting offset `start`.
-    pub fn new_from_buf(bit_width: u8, buffer: Vec<u8>, start: usize) -> Self {
-        assert!(bit_width <= 64, "bit_width ({}) out of range.", bit_width);
+    /// Initialize the encoder from existing `buffer`
+    pub fn new_from_buf(bit_width: u8, buffer: Vec<u8>) -> Self {
         let max_run_byte_size = RleEncoder::min_buffer_size(bit_width);
-        assert!(
-            buffer.len() >= max_run_byte_size,
-            "buffer length {} must be greater than {}",
-            buffer.len(),
-            max_run_byte_size
-        );
-        let bit_writer = BitWriter::new_from_buf(buffer, start);
+        let bit_writer = BitWriter::new_from_buf(buffer);
         RleEncoder {
             bit_width,
             bit_writer,
@@ -244,14 +237,11 @@ impl RleEncoder {
     fn flush_rle_run(&mut self) -> Result<()> {
         assert!(self.repeat_count > 0);
         let indicator_value = self.repeat_count << 1;
-        let mut result = self.bit_writer.put_vlq_int(indicator_value as u64);
-        result &= self.bit_writer.put_aligned(
+        self.bit_writer.put_vlq_int(indicator_value as u64);
+        self.bit_writer.put_aligned(
             self.current_value,
             bit_util::ceil(self.bit_width as i64, 8) as usize,
         );
-        if !result {
-            return Err(general_err!("Failed to write RLE run"));
-        }
         self.num_buffered_values = 0;
         self.repeat_count = 0;
         Ok(())
@@ -259,13 +249,12 @@ impl RleEncoder {
 
     fn flush_bit_packed_run(&mut self, update_indicator_byte: bool) -> Result<()> {
         if self.indicator_byte_pos < 0 {
-            self.indicator_byte_pos = self.bit_writer.skip(1)? as i64;
+            self.indicator_byte_pos = self.bit_writer.skip(1) as i64;
         }
 
         // Write all buffered values as bit-packed literals
         for i in 0..self.num_buffered_values {
-            let _ = self
-                .bit_writer
+            self.bit_writer
                 .put_value(self.buffered_values[i], self.bit_width as usize);
         }
         self.num_buffered_values = 0;
@@ -273,13 +262,11 @@ impl RleEncoder {
             // Write the indicator byte to the reserved position in `bit_writer`
             let num_groups = self.bit_packed_count / 8;
             let indicator_byte = ((num_groups << 1) | 1) as u8;
-            if !self.bit_writer.put_aligned_offset(
+            self.bit_writer.put_aligned_offset(
                 indicator_byte,
                 1,
                 self.indicator_byte_pos as usize,
-            ) {
-                return Err(general_err!("Not enough space to write indicator byte"));
-            }
+            );
             self.indicator_byte_pos = -1;
             self.bit_packed_count = 0;
         }
@@ -443,7 +430,8 @@ impl RleDecoder {
         let mut values_skipped = 0;
         while values_skipped < num_values {
             if self.rle_left > 0 {
-                let num_values = cmp::min(num_values - values_skipped, self.rle_left as usize);
+                let num_values =
+                    cmp::min(num_values - values_skipped, self.rle_left as usize);
                 self.rle_left -= num_values as u32;
                 values_skipped += num_values;
             } else if self.bit_packed_left > 0 {
@@ -452,10 +440,7 @@ impl RleDecoder {
                 let bit_reader =
                     self.bit_reader.as_mut().expect("bit_reader should be set");
 
-                num_values = bit_reader.skip(
-                    num_values,
-                    self.bit_width as usize,
-                );
+                num_values = bit_reader.skip(num_values, self.bit_width as usize);
                 if num_values == 0 {
                     // Handle writers which truncate the final block
                     self.bit_packed_left = 0;
@@ -587,7 +572,9 @@ mod tests {
         assert_eq!(skipped, 2);
 
         let mut buffer = vec![0; 6];
-        let remaining = decoder.get_batch::<i32>(&mut buffer).expect("getting remaining");
+        let remaining = decoder
+            .get_batch::<i32>(&mut buffer)
+            .expect("getting remaining");
         assert_eq!(remaining, 6);
         assert_eq!(buffer, expected);
     }
@@ -671,7 +658,9 @@ mod tests {
 
         let skipped = decoder.skip(50).expect("skipping first 50");
         assert_eq!(skipped, 50);
-        let remainder = decoder.get_batch::<bool>(&mut buffer).expect("getting remaining 50");
+        let remainder = decoder
+            .get_batch::<bool>(&mut buffer)
+            .expect("getting remaining 50");
         assert_eq!(remainder, 50);
         assert_eq!(buffer, expected);
 
@@ -687,7 +676,9 @@ mod tests {
         }
         let skipped = decoder.skip(50).expect("skipping first 50");
         assert_eq!(skipped, 50);
-        let remainder = decoder.get_batch::<bool>(&mut buffer).expect("getting remaining 50");
+        let remainder = decoder
+            .get_batch::<bool>(&mut buffer)
+            .expect("getting remaining 50");
         assert_eq!(remainder, 50);
         assert_eq!(buffer, expected);
     }
@@ -739,7 +730,9 @@ mod tests {
         let expected = vec![10, 20, 20, 20, 20, 30, 30, 30, 30, 30];
         let skipped = decoder.skip(2).expect("skipping two values");
         assert_eq!(skipped, 2);
-        let remainder = decoder.get_batch_with_dict::<i32>(&dict, &mut buffer, 10).expect("getting remainder");
+        let remainder = decoder
+            .get_batch_with_dict::<i32>(&dict, &mut buffer, 10)
+            .expect("getting remainder");
         assert_eq!(remainder, 10);
         assert_eq!(buffer, expected);
 
@@ -751,17 +744,12 @@ mod tests {
         let mut decoder: RleDecoder = RleDecoder::new(3);
         decoder.set_data(data);
         let mut buffer = vec![""; 8];
-        let expected = vec![
-            "eee", "fff", "ddd", "eee", "fff", "eee", "fff",
-            "fff",
-        ];
+        let expected = vec!["eee", "fff", "ddd", "eee", "fff", "eee", "fff", "fff"];
         let skipped = decoder.skip(4).expect("skipping four values");
         assert_eq!(skipped, 4);
-        let remainder = decoder.get_batch_with_dict::<&str>(
-            dict.as_slice(),
-            buffer.as_mut_slice(),
-            8,
-        ).expect("getting remainder");
+        let remainder = decoder
+            .get_batch_with_dict::<&str>(dict.as_slice(), buffer.as_mut_slice(), 8)
+            .expect("getting remainder");
         assert_eq!(remainder, 8);
         assert_eq!(buffer, expected);
     }
diff --git a/parquet/src/util/bit_util.rs b/parquet/src/util/bit_util.rs
index 29269c4ad..84c4d10ed 100644
--- a/parquet/src/util/bit_util.rs
+++ b/parquet/src/util/bit_util.rs
@@ -18,7 +18,6 @@
 use std::{cmp, mem::size_of};
 
 use crate::data_type::AsBytes;
-use crate::errors::{ParquetError, Result};
 use crate::util::{bit_packing::unpack32, memory::ByteBufferPtr};
 
 #[inline]
@@ -138,7 +137,7 @@ where
 /// This function should be removed after
 /// [`int_roundings`](https://github.com/rust-lang/rust/issues/88581) is stable.
 #[inline]
-pub fn ceil(value: i64, divisor: i64) -> i64 {
+pub fn ceil<T: num::Integer>(value: T, divisor: T) -> T {
     num::Integer::div_ceil(&value, &divisor)
 }
 
@@ -148,7 +147,7 @@ pub fn trailing_bits(v: u64, num_bits: usize) -> u64 {
     if num_bits >= 64 {
         v
     } else {
-        v & ((1<<num_bits) - 1)
+        v & ((1 << num_bits) - 1)
     }
 }
 
@@ -180,59 +179,32 @@ pub fn get_bit(data: &[u8], i: usize) -> bool {
 /// bit packed or byte aligned fashion.
 pub struct BitWriter {
     buffer: Vec<u8>,
-    max_bytes: usize,
     buffered_values: u64,
-    byte_offset: usize,
-    bit_offset: usize,
-    start: usize,
+    bit_offset: u8,
 }
 
 impl BitWriter {
     pub fn new(max_bytes: usize) -> Self {
         Self {
-            buffer: vec![0; max_bytes],
-            max_bytes,
+            buffer: Vec::with_capacity(max_bytes),
             buffered_values: 0,
-            byte_offset: 0,
             bit_offset: 0,
-            start: 0,
         }
     }
 
-    /// Initializes the writer from the existing buffer `buffer` and starting
-    /// offset `start`.
-    pub fn new_from_buf(buffer: Vec<u8>, start: usize) -> Self {
-        assert!(start < buffer.len());
-        let len = buffer.len();
+    /// Initializes the writer appending to the existing buffer `buffer`
+    pub fn new_from_buf(buffer: Vec<u8>) -> Self {
         Self {
             buffer,
-            max_bytes: len,
             buffered_values: 0,
-            byte_offset: start,
             bit_offset: 0,
-            start,
         }
     }
 
-    /// Extend buffer size by `increment` bytes
-    #[inline]
-    pub fn extend(&mut self, increment: usize) {
-        self.max_bytes += increment;
-        let extra = vec![0; increment];
-        self.buffer.extend(extra);
-    }
-
-    /// Report buffer size, in bytes
-    #[inline]
-    pub fn capacity(&mut self) -> usize {
-        self.max_bytes
-    }
-
     /// Consumes and returns the current buffer.
     #[inline]
     pub fn consume(mut self) -> Vec<u8> {
         self.flush();
-        self.buffer.truncate(self.byte_offset);
         self.buffer
     }
 
@@ -241,53 +213,37 @@ impl BitWriter {
     #[inline]
     pub fn flush_buffer(&mut self) -> &[u8] {
         self.flush();
-        &self.buffer()[0..self.byte_offset]
+        self.buffer()
     }
 
     /// Clears the internal state so the buffer can be reused.
     #[inline]
     pub fn clear(&mut self) {
+        self.buffer.clear();
         self.buffered_values = 0;
-        self.byte_offset = self.start;
         self.bit_offset = 0;
     }
 
     /// Flushes the internal buffered bits and the align the buffer to the next byte.
     #[inline]
     pub fn flush(&mut self) {
-        let num_bytes = ceil(self.bit_offset as i64, 8) as usize;
-        assert!(self.byte_offset + num_bytes <= self.max_bytes);
-        memcpy_value(
-            &self.buffered_values,
-            num_bytes,
-            &mut self.buffer[self.byte_offset..],
-        );
+        let num_bytes = ceil(self.bit_offset, 8);
+        let slice = &self.buffered_values.to_le_bytes()[..num_bytes as usize];
+        self.buffer.extend_from_slice(slice);
         self.buffered_values = 0;
         self.bit_offset = 0;
-        self.byte_offset += num_bytes;
     }
 
     /// Advances the current offset by skipping `num_bytes`, flushing the internal bit
     /// buffer first.
     /// This is useful when you want to jump over `num_bytes` bytes and come back later
     /// to fill these bytes.
-    ///
-    /// Returns error if `num_bytes` is beyond the boundary of the internal buffer.
-    /// Otherwise, returns the old offset.
     #[inline]
-    pub fn skip(&mut self, num_bytes: usize) -> Result<usize> {
+    pub fn skip(&mut self, num_bytes: usize) -> usize {
         self.flush();
-        assert!(self.byte_offset <= self.max_bytes);
-        if self.byte_offset + num_bytes > self.max_bytes {
-            return Err(general_err!(
-                "Not enough bytes left in BitWriter. Need {} but only have {}",
-                self.byte_offset + num_bytes,
-                self.max_bytes
-            ));
-        }
-        let result = self.byte_offset;
-        self.byte_offset += num_bytes;
-        Ok(result)
+        let result = self.buffer.len();
+        self.buffer.extend(std::iter::repeat(0).take(num_bytes));
+        result
     }
 
     /// Returns a slice containing the next `num_bytes` bytes starting from the current
@@ -295,32 +251,24 @@ impl BitWriter {
     /// This is useful when you want to jump over `num_bytes` bytes and come back later
     /// to fill these bytes.
     #[inline]
-    pub fn get_next_byte_ptr(&mut self, num_bytes: usize) -> Result<&mut [u8]> {
-        let offset = self.skip(num_bytes)?;
-        Ok(&mut self.buffer[offset..offset + num_bytes])
+    pub fn get_next_byte_ptr(&mut self, num_bytes: usize) -> &mut [u8] {
+        let offset = self.skip(num_bytes);
+        &mut self.buffer[offset..offset + num_bytes]
     }
 
     #[inline]
     pub fn bytes_written(&self) -> usize {
-        self.byte_offset - self.start + ceil(self.bit_offset as i64, 8) as usize
+        self.buffer.len() + ceil(self.bit_offset, 8) as usize
     }
 
     #[inline]
     pub fn buffer(&self) -> &[u8] {
-        &self.buffer[self.start..]
+        &self.buffer
     }
 
     #[inline]
     pub fn byte_offset(&self) -> usize {
-        self.byte_offset
-    }
-
-    /// Returns the internal buffer length. This is the maximum number of bytes that this
-    /// writer can write. User needs to call `consume` to consume the current buffer
-    /// before more data can be written.
-    #[inline]
-    pub fn buffer_len(&self) -> usize {
-        self.max_bytes
+        self.buffer.len()
     }
 
     /// Writes the entire byte `value` at the byte `offset`
@@ -330,53 +278,36 @@ impl BitWriter {
 
     /// Writes the `num_bits` LSB of value `v` to the internal buffer of this writer.
     /// The `num_bits` must not be greater than 64. This is bit packed.
-    ///
-    /// Returns false if there's not enough room left. True otherwise.
     #[inline]
-    pub fn put_value(&mut self, v: u64, num_bits: usize) -> bool {
+    pub fn put_value(&mut self, v: u64, num_bits: usize) {
         assert!(num_bits <= 64);
+        let num_bits = num_bits as u8;
         assert_eq!(v.checked_shr(num_bits as u32).unwrap_or(0), 0); // covers case v >> 64
 
-        if self.byte_offset * 8 + self.bit_offset + num_bits > self.max_bytes as usize * 8
-        {
-            return false;
-        }
-
+        // Add value to buffered_values
         self.buffered_values |= v << self.bit_offset;
         self.bit_offset += num_bits;
-        if self.bit_offset >= 64 {
-            memcpy_value(
-                &self.buffered_values,
-                8,
-                &mut self.buffer[self.byte_offset..],
-            );
-            self.byte_offset += 8;
-            self.bit_offset -= 64;
-            self.buffered_values = 0;
+        if let Some(remaining) = self.bit_offset.checked_sub(64) {
+            self.buffer
+                .extend_from_slice(&self.buffered_values.to_le_bytes());
+            self.bit_offset = remaining;
+
             // Perform checked right shift: v >> offset, where offset < 64, otherwise we
             // shift all bits
             self.buffered_values = v
                 .checked_shr((num_bits - self.bit_offset) as u32)
                 .unwrap_or(0);
         }
-        assert!(self.bit_offset < 64);
-        true
     }
 
     /// Writes `val` of `num_bytes` bytes to the next aligned byte. If size of `T` is
     /// larger than `num_bytes`, extra higher ordered bytes will be ignored.
-    ///
-    /// Returns false if there's not enough room left. True otherwise.
     #[inline]
-    pub fn put_aligned<T: AsBytes>(&mut self, val: T, num_bytes: usize) -> bool {
-        let result = self.get_next_byte_ptr(num_bytes);
-        if result.is_err() {
-            // TODO: should we return `Result` for this func?
-            return false;
-        }
-        let ptr = result.unwrap();
-        memcpy_value(&val, num_bytes, ptr);
-        true
+    pub fn put_aligned<T: AsBytes>(&mut self, val: T, num_bytes: usize) {
+        self.flush();
+        let slice = val.as_bytes();
+        let len = num_bytes.min(slice.len());
+        self.buffer.extend_from_slice(&slice[..len]);
     }
 
     /// Writes `val` of `num_bytes` bytes at the designated `offset`. The `offset` is the
@@ -384,49 +315,34 @@ impl BitWriter {
     /// maintains. Note that this will overwrite any existing data between `offset` and
     /// `offset + num_bytes`. Also that if size of `T` is larger than `num_bytes`, extra
     /// higher ordered bytes will be ignored.
-    ///
-    /// Returns false if there's not enough room left, or the `pos` is not valid.
-    /// True otherwise.
     #[inline]
     pub fn put_aligned_offset<T: AsBytes>(
         &mut self,
         val: T,
         num_bytes: usize,
         offset: usize,
-    ) -> bool {
-        if num_bytes + offset > self.max_bytes {
-            return false;
-        }
-        memcpy_value(
-            &val,
-            num_bytes,
-            &mut self.buffer[offset..offset + num_bytes],
-        );
-        true
+    ) {
+        let slice = val.as_bytes();
+        let len = num_bytes.min(slice.len());
+        self.buffer[offset..offset + len].copy_from_slice(&slice[..len])
     }
 
     /// Writes a VLQ encoded integer `v` to this buffer. The value is byte aligned.
-    ///
-    /// Returns false if there's not enough room left. True otherwise.
     #[inline]
-    pub fn put_vlq_int(&mut self, mut v: u64) -> bool {
-        let mut result = true;
+    pub fn put_vlq_int(&mut self, mut v: u64) {
         while v & 0xFFFFFFFFFFFFFF80 != 0 {
-            result &= self.put_aligned::<u8>(((v & 0x7F) | 0x80) as u8, 1);
+            self.put_aligned::<u8>(((v & 0x7F) | 0x80) as u8, 1);
             v >>= 7;
         }
-        result &= self.put_aligned::<u8>((v & 0x7F) as u8, 1);
-        result
+        self.put_aligned::<u8>((v & 0x7F) as u8, 1);
     }
 
     /// Writes a zigzag-VLQ encoded (in little endian order) int `v` to this buffer.
     /// Zigzag-VLQ is a variant of VLQ encoding where negative and positive
     /// numbers are encoded in a zigzag fashion.
     /// See: https://developers.google.com/protocol-buffers/docs/encoding
-    ///
-    /// Returns false if there's not enough room left. True otherwise.
     #[inline]
-    pub fn put_zigzag_vlq_int(&mut self, v: i64) -> bool {
+    pub fn put_zigzag_vlq_int(&mut self, v: i64) {
         let u: u64 = ((v << 1) ^ (v >> 63)) as u64;
         self.put_vlq_int(u)
     }
@@ -488,7 +404,7 @@ impl BitReader {
     /// Gets the current byte offset
     #[inline]
     pub fn get_byte_offset(&self) -> usize {
-        self.byte_offset + ceil(self.bit_offset as i64, 8) as usize
+        self.byte_offset + ceil(self.bit_offset, 8)
     }
 
     /// Reads a value of type `T` and of size `num_bits`.
@@ -568,7 +484,7 @@ impl BitReader {
                     .expect("expected to have more data");
                 i += 1;
             }
-            return values_to_read
+            return values_to_read;
         }
 
         // First align bit offset to byte offset
@@ -645,8 +561,7 @@ impl BitReader {
         // First align bit offset to byte offset
         if self.bit_offset != 0 {
             while values_skipped < num_values && self.bit_offset != 0 {
-                self
-                    .skip_value(num_bits);
+                self.skip_value(num_bits);
                 values_skipped += 1;
             }
         }
@@ -656,7 +571,6 @@ impl BitReader {
             values_skipped += 32;
         }
 
-
         assert!(num_values - values_skipped < 32);
 
         self.reload_buffer_values();
@@ -696,7 +610,7 @@ impl BitReader {
     /// Returns `Some` if there's enough bytes left to form a value of `T`.
     /// Otherwise `None`.
     pub fn get_aligned<T: FromBytes>(&mut self, num_bytes: usize) -> Option<T> {
-        let bytes_read = ceil(self.bit_offset as i64, 8) as usize;
+        let bytes_read = ceil(self.bit_offset, 8);
         if self.byte_offset + bytes_read + num_bytes > self.total_bytes {
             return None;
         }
@@ -792,9 +706,9 @@ mod tests {
         assert_eq!(ceil(8, 8), 1);
         assert_eq!(ceil(9, 8), 2);
         assert_eq!(ceil(9, 9), 1);
-        assert_eq!(ceil(10000000000, 10), 1000000000);
-        assert_eq!(ceil(10, 10000000000), 1);
-        assert_eq!(ceil(10000000000, 1000000000), 10);
+        assert_eq!(ceil(10000000000_u64, 10), 1000000000);
+        assert_eq!(ceil(10_u64, 10000000000), 1);
+        assert_eq!(ceil(10000000000_u64, 1000000000), 10);
     }
 
     #[test]
@@ -846,16 +760,16 @@ mod tests {
     fn test_bit_reader_skip() {
         let buffer = vec![255, 0];
         let mut bit_reader = BitReader::from(buffer);
-        let skipped = bit_reader.skip(1,1);
+        let skipped = bit_reader.skip(1, 1);
         assert_eq!(skipped, 1);
         assert_eq!(bit_reader.get_value::<i32>(1), Some(1));
-        let skipped = bit_reader.skip(2,2);
+        let skipped = bit_reader.skip(2, 2);
         assert_eq!(skipped, 2);
         assert_eq!(bit_reader.get_value::<i32>(2), Some(3));
-        let skipped = bit_reader.skip(4,1);
+        let skipped = bit_reader.skip(4, 1);
         assert_eq!(skipped, 4);
         assert_eq!(bit_reader.get_value::<i32>(4), Some(0));
-        let skipped = bit_reader.skip(1,1);
+        let skipped = bit_reader.skip(1, 1);
         assert_eq!(skipped, 0);
     }
 
@@ -973,7 +887,7 @@ mod tests {
     #[test]
     fn test_skip() {
         let mut writer = BitWriter::new(5);
-        let old_offset = writer.skip(1).expect("skip() should return OK");
+        let old_offset = writer.skip(1);
         writer.put_aligned(42, 4);
         writer.put_aligned_offset(0x10, 1, old_offset);
         let result = writer.consume();
@@ -981,16 +895,15 @@ mod tests {
 
         writer = BitWriter::new(4);
         let result = writer.skip(5);
-        assert!(result.is_err());
+        assert_eq!(result, 0);
+        assert_eq!(writer.buffer(), &[0; 5])
     }
 
     #[test]
     fn test_get_next_byte_ptr() {
         let mut writer = BitWriter::new(5);
         {
-            let first_byte = writer
-                .get_next_byte_ptr(1)
-                .expect("get_next_byte_ptr() should return OK");
+            let first_byte = writer.get_next_byte_ptr(1);
             first_byte[0] = 0x10;
         }
         writer.put_aligned(42, 4);
@@ -1017,8 +930,7 @@ mod tests {
         let mut writer = BitWriter::new(len);
 
         for i in 0..8 {
-            let result = writer.put_value(i % 2, 1);
-            assert!(result);
+            writer.put_value(i % 2, 1);
         }
 
         writer.flush();
@@ -1029,11 +941,10 @@ mod tests {
 
         // Write 00110011
         for i in 0..8 {
-            let result = match i {
+            match i {
                 0 | 1 | 4 | 5 => writer.put_value(false as u64, 1),
                 _ => writer.put_value(true as u64, 1),
-            };
-            assert!(result);
+            }
         }
         writer.flush();
         {
@@ -1078,19 +989,13 @@ mod tests {
 
     fn test_put_value_rand_numbers(total: usize, num_bits: usize) {
         assert!(num_bits < 64);
-        let num_bytes = ceil(num_bits as i64, 8);
+        let num_bytes = ceil(num_bits, 8);
         let mut writer = BitWriter::new(num_bytes as usize * total);
         let values: Vec<u64> = random_numbers::<u64>(total)
             .iter()
             .map(|v| v & ((1 << num_bits) - 1))
             .collect();
-        (0..total).for_each(|i| {
-            assert!(
-                writer.put_value(values[i] as u64, num_bits),
-                "[{}]: put_value() failed",
-                i
-            );
-        });
+        (0..total).for_each(|i| writer.put_value(values[i] as u64, num_bits));
 
         let mut reader = BitReader::from(writer.consume());
         (0..total).for_each(|i| {
@@ -1124,7 +1029,7 @@ mod tests {
         T: FromBytes + Default + Clone + Debug + Eq,
     {
         assert!(num_bits <= 32);
-        let num_bytes = ceil(num_bits as i64, 8);
+        let num_bytes = ceil(num_bits, 8);
         let mut writer = BitWriter::new(num_bytes as usize * total);
 
         let values: Vec<u32> = random_numbers::<u32>(total)
@@ -1136,9 +1041,7 @@ mod tests {
         let expected_values: Vec<T> =
             values.iter().map(|v| from_ne_slice(v.as_bytes())).collect();
 
-        (0..total).for_each(|i| {
-            assert!(writer.put_value(values[i] as u64, num_bits));
-        });
+        (0..total).for_each(|i| writer.put_value(values[i] as u64, num_bits));
 
         let buf = writer.consume();
         let mut reader = BitReader::from(buf);
@@ -1175,7 +1078,7 @@ mod tests {
         assert!(total % 2 == 0);
 
         let aligned_value_byte_width = std::mem::size_of::<T>();
-        let value_byte_width = ceil(num_bits as i64, 8) as usize;
+        let value_byte_width = ceil(num_bits, 8);
         let mut writer =
             BitWriter::new((total / 2) * (aligned_value_byte_width + value_byte_width));
         let values: Vec<u32> = random_numbers::<u32>(total / 2)
@@ -1187,17 +1090,9 @@ mod tests {
         for i in 0..total {
             let j = i / 2;
             if i % 2 == 0 {
-                assert!(
-                    writer.put_value(values[j] as u64, num_bits),
-                    "[{}]: put_value() failed",
-                    i
-                );
+                writer.put_value(values[j] as u64, num_bits);
             } else {
-                assert!(
-                    writer.put_aligned::<T>(aligned_values[j], aligned_value_byte_width),
-                    "[{}]: put_aligned() failed",
-                    i
-                );
+                writer.put_aligned::<T>(aligned_values[j], aligned_value_byte_width)
             }
         }
 
@@ -1231,13 +1126,7 @@ mod tests {
         let total = 64;
         let mut writer = BitWriter::new(total * 32);
         let values = random_numbers::<u32>(total);
-        (0..total).for_each(|i| {
-            assert!(
-                writer.put_vlq_int(values[i] as u64),
-                "[{}]; put_vlq_int() failed",
-                i
-            );
-        });
+        (0..total).for_each(|i| writer.put_vlq_int(values[i] as u64));
 
         let mut reader = BitReader::from(writer.consume());
         (0..total).for_each(|i| {
@@ -1257,13 +1146,7 @@ mod tests {
         let total = 64;
         let mut writer = BitWriter::new(total * 32);
         let values = random_numbers::<i32>(total);
-        (0..total).for_each(|i| {
-            assert!(
-                writer.put_zigzag_vlq_int(values[i] as i64),
-                "[{}]; put_zigzag_vlq_int() failed",
-                i
-            );
-        });
+        (0..total).for_each(|i| writer.put_zigzag_vlq_int(values[i] as i64));
 
         let mut reader = BitReader::from(writer.consume());
         (0..total).for_each(|i| {
diff --git a/parquet/src/util/test_common/page_util.rs b/parquet/src/util/test_common/page_util.rs
index f56eaf85e..d7653d4e5 100644
--- a/parquet/src/util/test_common/page_util.rs
+++ b/parquet/src/util/test_common/page_util.rs
@@ -16,11 +16,10 @@
 // under the License.
 
 use crate::basic::Encoding;
-use crate::column::page::{PageMetadata, PageReader};
 use crate::column::page::{Page, PageIterator};
+use crate::column::page::{PageMetadata, PageReader};
 use crate::data_type::DataType;
 use crate::encodings::encoding::{get_encoder, DictEncoder, Encoder};
-use crate::encodings::levels::max_buffer_size;
 use crate::encodings::levels::LevelEncoder;
 use crate::errors::Result;
 use crate::schema::types::{ColumnDescPtr, SchemaDescPtr};
@@ -75,8 +74,7 @@ impl DataPageBuilderImpl {
         if max_level <= 0 {
             return 0;
         }
-        let size = max_buffer_size(Encoding::RLE, max_level, levels.len());
-        let mut level_encoder = LevelEncoder::v1(Encoding::RLE, max_level, vec![0; size]);
+        let mut level_encoder = LevelEncoder::v1(Encoding::RLE, max_level, levels.len());
         level_encoder.put(levels).expect("put() should be OK");
         let encoded_levels = level_encoder.consume().expect("consume() should be OK");
         // Actual encoded bytes (without length offset)