You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/07/31 20:45:10 UTC
[arrow-rs] branch master updated: Automatically grow parquet BitWriter (#2226) (~10% faster) (#2231)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 99ad91545 Automatically grow parquet BitWriter (#2226) (~10% faster) (#2231)
99ad91545 is described below
commit 99ad915452528d9add1427591ba23808542195c9
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Sun Jul 31 21:45:05 2022 +0100
Automatically grow parquet BitWriter (#2226) (~10% faster) (#2231)
* Automatically grow parquet BitWriter (#2226)
* Review feedback
---
parquet/src/column/writer/mod.rs | 8 +-
parquet/src/data_type.rs | 16 +-
parquet/src/encodings/encoding/dict_encoder.rs | 10 +-
parquet/src/encodings/encoding/mod.rs | 42 ++--
parquet/src/encodings/levels.rs | 89 +++------
parquet/src/encodings/rle.rs | 72 +++----
parquet/src/util/bit_util.rs | 255 +++++++------------------
parquet/src/util/test_common/page_util.rs | 6 +-
8 files changed, 158 insertions(+), 340 deletions(-)
diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs
index 9a371bc27..2d5556111 100644
--- a/parquet/src/column/writer/mod.rs
+++ b/parquet/src/column/writer/mod.rs
@@ -27,7 +27,7 @@ use crate::column::writer::encoder::{
use crate::compression::{create_codec, Codec};
use crate::data_type::private::ParquetValueType;
use crate::data_type::*;
-use crate::encodings::levels::{max_buffer_size, LevelEncoder};
+use crate::encodings::levels::LevelEncoder;
use crate::errors::{ParquetError, Result};
use crate::file::metadata::{ColumnIndexBuilder, OffsetIndexBuilder};
use crate::file::properties::EnabledStatistics;
@@ -782,8 +782,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
levels: &[i16],
max_level: i16,
) -> Result<Vec<u8>> {
- let size = max_buffer_size(encoding, max_level, levels.len());
- let mut encoder = LevelEncoder::v1(encoding, max_level, vec![0; size]);
+ let mut encoder = LevelEncoder::v1(encoding, max_level, levels.len());
encoder.put(levels)?;
encoder.consume()
}
@@ -792,8 +791,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
/// Encoding is always RLE.
#[inline]
fn encode_levels_v2(&self, levels: &[i16], max_level: i16) -> Result<Vec<u8>> {
- let size = max_buffer_size(Encoding::RLE, max_level, levels.len());
- let mut encoder = LevelEncoder::v2(max_level, vec![0; size]);
+ let mut encoder = LevelEncoder::v2(max_level, levels.len());
encoder.put(levels)?;
encoder.consume()
}
diff --git a/parquet/src/data_type.rs b/parquet/src/data_type.rs
index 1d0b5b231..43c9a4238 100644
--- a/parquet/src/data_type.rs
+++ b/parquet/src/data_type.rs
@@ -565,7 +565,7 @@ impl AsBytes for str {
pub(crate) mod private {
use crate::encodings::decoding::PlainDecoderDetails;
- use crate::util::bit_util::{round_upto_power_of_2, BitReader, BitWriter};
+ use crate::util::bit_util::{BitReader, BitWriter};
use crate::util::memory::ByteBufferPtr;
use crate::basic::Type;
@@ -658,20 +658,8 @@ pub(crate) mod private {
_: &mut W,
bit_writer: &mut BitWriter,
) -> Result<()> {
- if bit_writer.bytes_written() + values.len() / 8 >= bit_writer.capacity() {
- let bits_available =
- (bit_writer.capacity() - bit_writer.bytes_written()) * 8;
- let bits_needed = values.len() - bits_available;
- let bytes_needed = (bits_needed + 7) / 8;
- let bytes_needed = round_upto_power_of_2(bytes_needed, 256);
- bit_writer.extend(bytes_needed);
- }
for value in values {
- if !bit_writer.put_value(*value as u64, 1) {
- return Err(ParquetError::EOF(
- "unable to put boolean value".to_string(),
- ));
- }
+ bit_writer.put_value(*value as u64, 1)
}
Ok(())
}
diff --git a/parquet/src/encodings/encoding/dict_encoder.rs b/parquet/src/encodings/encoding/dict_encoder.rs
index 7bf983254..1b386448b 100644
--- a/parquet/src/encodings/encoding/dict_encoder.rs
+++ b/parquet/src/encodings/encoding/dict_encoder.rs
@@ -20,7 +20,7 @@
use crate::basic::{Encoding, Type};
use crate::data_type::private::ParquetValueType;
-use crate::data_type::{AsBytes, DataType};
+use crate::data_type::DataType;
use crate::encodings::encoding::{Encoder, PlainEncoder};
use crate::encodings::rle::RleEncoder;
use crate::errors::{ParquetError, Result};
@@ -28,7 +28,6 @@ use crate::schema::types::ColumnDescPtr;
use crate::util::bit_util::num_required_bits;
use crate::util::interner::{Interner, Storage};
use crate::util::memory::ByteBufferPtr;
-use std::io::Write;
#[derive(Debug)]
struct KeyStorage<T: DataType> {
@@ -127,12 +126,11 @@ impl<T: DataType> DictEncoder<T> {
/// the result.
pub fn write_indices(&mut self) -> Result<ByteBufferPtr> {
let buffer_len = self.estimated_data_encoded_size();
- let mut buffer = vec![0; buffer_len];
- buffer[0] = self.bit_width() as u8;
+ let mut buffer = Vec::with_capacity(buffer_len);
+ buffer.push(self.bit_width() as u8);
// Write bit width in the first byte
- buffer.write_all((self.bit_width() as u8).as_bytes())?;
- let mut encoder = RleEncoder::new_from_buf(self.bit_width(), buffer, 1);
+ let mut encoder = RleEncoder::new_from_buf(self.bit_width(), buffer);
for index in &self.indices {
if !encoder.put(*index as u64)? {
return Err(general_err!("Encoder doesn't have enough space"));
diff --git a/parquet/src/encodings/encoding/mod.rs b/parquet/src/encodings/encoding/mod.rs
index 5cb94b7c0..383211f12 100644
--- a/parquet/src/encodings/encoding/mod.rs
+++ b/parquet/src/encodings/encoding/mod.rs
@@ -186,10 +186,13 @@ impl<T: DataType> Encoder<T> for RleValueEncoder<T> {
fn put(&mut self, values: &[T::T]) -> Result<()> {
ensure_phys_ty!(Type::BOOLEAN, "RleValueEncoder only supports BoolType");
- if self.encoder.is_none() {
- self.encoder = Some(RleEncoder::new(1, DEFAULT_RLE_BUFFER_LEN));
- }
- let rle_encoder = self.encoder.as_mut().unwrap();
+ let rle_encoder = self.encoder.get_or_insert_with(|| {
+ let mut buffer = Vec::with_capacity(DEFAULT_RLE_BUFFER_LEN);
+ // Reserve space for length
+ buffer.extend_from_slice(&[0; 4]);
+ RleEncoder::new_from_buf(1, buffer)
+ });
+
for value in values {
let value = value.as_u64()?;
if !rle_encoder.put(value)? {
@@ -220,25 +223,18 @@ impl<T: DataType> Encoder<T> for RleValueEncoder<T> {
ensure_phys_ty!(Type::BOOLEAN, "RleValueEncoder only supports BoolType");
let rle_encoder = self
.encoder
- .as_mut()
+ .take()
.expect("RLE value encoder is not initialized");
// Flush all encoder buffers and raw values
- let encoded_data = {
- let buf = rle_encoder.flush_buffer()?;
-
- // Note that buf does not have any offset, all data is encoded bytes
- let len = (buf.len() as i32).to_le();
- let len_bytes = len.as_bytes();
- let mut encoded_data = vec![];
- encoded_data.extend_from_slice(len_bytes);
- encoded_data.extend_from_slice(buf);
- encoded_data
- };
- // Reset rle encoder for the next batch
- rle_encoder.clear();
+ let mut buf = rle_encoder.consume()?;
+ assert!(buf.len() > 4, "should have had padding inserted");
+
+ // Note that buf does not have any offset, all data is encoded bytes
+ let len = (buf.len() - 4) as i32;
+ buf[..4].copy_from_slice(&len.to_le_bytes());
- Ok(ByteBufferPtr::new(encoded_data))
+ Ok(ByteBufferPtr::new(buf))
}
}
@@ -293,7 +289,7 @@ impl<T: DataType> DeltaBitPackEncoder<T> {
let block_size = DEFAULT_BLOCK_SIZE;
let num_mini_blocks = DEFAULT_NUM_MINI_BLOCKS;
let mini_block_size = block_size / num_mini_blocks;
- assert!(mini_block_size % 8 == 0);
+ assert_eq!(mini_block_size % 8, 0);
Self::assert_supported_type();
DeltaBitPackEncoder {
@@ -346,7 +342,7 @@ impl<T: DataType> DeltaBitPackEncoder<T> {
self.bit_writer.put_zigzag_vlq_int(min_delta);
// Slice to store bit width for each mini block
- let offset = self.bit_writer.skip(self.num_mini_blocks)?;
+ let offset = self.bit_writer.skip(self.num_mini_blocks);
for i in 0..self.num_mini_blocks {
// Find how many values we need to encode - either block size or whatever
@@ -364,7 +360,7 @@ impl<T: DataType> DeltaBitPackEncoder<T> {
}
// Compute the max delta in current mini block
- let mut max_delta = i64::min_value();
+ let mut max_delta = i64::MIN;
for j in 0..n {
max_delta =
cmp::max(max_delta, self.deltas[i * self.mini_block_size + j]);
@@ -873,7 +869,7 @@ mod tests {
let mut values = vec![];
values.extend_from_slice(&[true; 16]);
values.extend_from_slice(&[false; 16]);
- run_test::<BoolType>(Encoding::RLE, -1, &values, 0, 2, 0);
+ run_test::<BoolType>(Encoding::RLE, -1, &values, 0, 6, 0);
// DELTA_LENGTH_BYTE_ARRAY
run_test::<ByteArrayType>(
diff --git a/parquet/src/encodings/levels.rs b/parquet/src/encodings/levels.rs
index 28fb63881..8bdfdd3e9 100644
--- a/parquet/src/encodings/levels.rs
+++ b/parquet/src/encodings/levels.rs
@@ -65,22 +65,21 @@ impl LevelEncoder {
/// Used to encode levels for Data Page v1.
///
/// Panics, if encoding is not supported.
- pub fn v1(encoding: Encoding, max_level: i16, byte_buffer: Vec<u8>) -> Self {
+ pub fn v1(encoding: Encoding, max_level: i16, capacity: usize) -> Self {
+ let capacity_bytes = max_buffer_size(encoding, max_level, capacity);
+ let mut buffer = Vec::with_capacity(capacity_bytes);
let bit_width = num_required_bits(max_level as u64);
match encoding {
- Encoding::RLE => LevelEncoder::Rle(RleEncoder::new_from_buf(
- bit_width,
- byte_buffer,
- mem::size_of::<i32>(),
- )),
+ Encoding::RLE => {
+ // Reserve space for length header
+ buffer.extend_from_slice(&[0; 4]);
+ LevelEncoder::Rle(RleEncoder::new_from_buf(bit_width, buffer))
+ }
Encoding::BIT_PACKED => {
// Here we set full byte buffer without adjusting for num_buffered_values,
// because byte buffer will already be allocated with size from
// `max_buffer_size()` method.
- LevelEncoder::BitPacked(
- bit_width,
- BitWriter::new_from_buf(byte_buffer, 0),
- )
+ LevelEncoder::BitPacked(bit_width, BitWriter::new_from_buf(buffer))
}
_ => panic!("Unsupported encoding type {}", encoding),
}
@@ -88,9 +87,11 @@ impl LevelEncoder {
/// Creates new level encoder based on RLE encoding. Used to encode Data Page v2
/// repetition and definition levels.
- pub fn v2(max_level: i16, byte_buffer: Vec<u8>) -> Self {
+ pub fn v2(max_level: i16, capacity: usize) -> Self {
+ let capacity_bytes = max_buffer_size(Encoding::RLE, max_level, capacity);
+ let buffer = Vec::with_capacity(capacity_bytes);
let bit_width = num_required_bits(max_level as u64);
- LevelEncoder::RleV2(RleEncoder::new_from_buf(bit_width, byte_buffer, 0))
+ LevelEncoder::RleV2(RleEncoder::new_from_buf(bit_width, buffer))
}
/// Put/encode levels vector into this level encoder.
@@ -114,9 +115,7 @@ impl LevelEncoder {
}
LevelEncoder::BitPacked(bit_width, ref mut encoder) => {
for value in buffer {
- if !encoder.put_value(*value as u64, bit_width as usize) {
- return Err(general_err!("Not enough bytes left"));
- }
+ encoder.put_value(*value as u64, bit_width as usize);
num_encoded += 1;
}
encoder.flush();
@@ -283,11 +282,10 @@ mod tests {
use crate::util::test_common::random_numbers_range;
fn test_internal_roundtrip(enc: Encoding, levels: &[i16], max_level: i16, v2: bool) {
- let size = max_buffer_size(enc, max_level, levels.len());
let mut encoder = if v2 {
- LevelEncoder::v2(max_level, vec![0; size])
+ LevelEncoder::v2(max_level, levels.len())
} else {
- LevelEncoder::v1(enc, max_level, vec![0; size])
+ LevelEncoder::v1(enc, max_level, levels.len())
};
encoder.put(levels).expect("put() should be OK");
let encoded_levels = encoder.consume().expect("consume() should be OK");
@@ -315,11 +313,10 @@ mod tests {
max_level: i16,
v2: bool,
) {
- let size = max_buffer_size(enc, max_level, levels.len());
let mut encoder = if v2 {
- LevelEncoder::v2(max_level, vec![0; size])
+ LevelEncoder::v2(max_level, levels.len())
} else {
- LevelEncoder::v1(enc, max_level, vec![0; size])
+ LevelEncoder::v1(enc, max_level, levels.len())
};
encoder.put(levels).expect("put() should be OK");
let encoded_levels = encoder.consume().expect("consume() should be OK");
@@ -363,11 +360,10 @@ mod tests {
max_level: i16,
v2: bool,
) {
- let size = max_buffer_size(enc, max_level, levels.len());
let mut encoder = if v2 {
- LevelEncoder::v2(max_level, vec![0; size])
+ LevelEncoder::v2(max_level, levels.len())
} else {
- LevelEncoder::v1(enc, max_level, vec![0; size])
+ LevelEncoder::v1(enc, max_level, levels.len())
};
// Encode only one value
let num_encoded = encoder.put(&levels[0..1]).expect("put() should be OK");
@@ -391,33 +387,6 @@ mod tests {
assert_eq!(buffer[0..num_decoded], levels[0..num_decoded]);
}
- // Tests when encoded values are larger than encoder's buffer
- fn test_internal_roundtrip_overflow(
- enc: Encoding,
- levels: &[i16],
- max_level: i16,
- v2: bool,
- ) {
- let size = max_buffer_size(enc, max_level, levels.len());
- let mut encoder = if v2 {
- LevelEncoder::v2(max_level, vec![0; size])
- } else {
- LevelEncoder::v1(enc, max_level, vec![0; size])
- };
- let mut found_err = false;
- // Insert a large number of values, so we run out of space
- for _ in 0..100 {
- if let Err(err) = encoder.put(levels) {
- assert!(format!("{}", err).contains("Not enough bytes left"));
- found_err = true;
- break;
- };
- }
- if !found_err {
- panic!("Failed test: no buffer overflow");
- }
- }
-
#[test]
fn test_roundtrip_one() {
let levels = vec![0, 1, 1, 1, 1, 0, 0, 0, 0, 1];
@@ -470,6 +439,15 @@ mod tests {
test_internal_roundtrip(Encoding::RLE, &levels, max_level, true);
}
+ #[test]
+ fn test_rountrip_max() {
+ let levels = vec![0, i16::MAX, i16::MAX, i16::MAX, 0];
+ let max_level = i16::MAX;
+ test_internal_roundtrip(Encoding::RLE, &levels, max_level, false);
+ test_internal_roundtrip(Encoding::BIT_PACKED, &levels, max_level, false);
+ test_internal_roundtrip(Encoding::RLE, &levels, max_level, true);
+ }
+
#[test]
fn test_roundtrip_underflow() {
let levels = vec![1, 1, 2, 3, 2, 1, 1, 2, 3, 1];
@@ -484,15 +462,6 @@ mod tests {
test_internal_roundtrip_underflow(Encoding::RLE, &levels, max_level, true);
}
- #[test]
- fn test_roundtrip_overflow() {
- let levels = vec![1, 1, 2, 3, 2, 1, 1, 2, 3, 1];
- let max_level = 3;
- test_internal_roundtrip_overflow(Encoding::RLE, &levels, max_level, false);
- test_internal_roundtrip_overflow(Encoding::BIT_PACKED, &levels, max_level, false);
- test_internal_roundtrip_overflow(Encoding::RLE, &levels, max_level, true);
- }
-
#[test]
fn test_rle_decoder_set_data_range() {
// Buffer containing both repetition and definition levels
diff --git a/parquet/src/encodings/rle.rs b/parquet/src/encodings/rle.rs
index 8a19dd545..28ebd7d3a 100644
--- a/parquet/src/encodings/rle.rs
+++ b/parquet/src/encodings/rle.rs
@@ -83,21 +83,14 @@ pub struct RleEncoder {
impl RleEncoder {
pub fn new(bit_width: u8, buffer_len: usize) -> Self {
- let buffer = vec![0; buffer_len];
- RleEncoder::new_from_buf(bit_width, buffer, 0)
+ let buffer = Vec::with_capacity(buffer_len);
+ RleEncoder::new_from_buf(bit_width, buffer)
}
- /// Initialize the encoder from existing `buffer` and the starting offset `start`.
- pub fn new_from_buf(bit_width: u8, buffer: Vec<u8>, start: usize) -> Self {
- assert!(bit_width <= 64, "bit_width ({}) out of range.", bit_width);
+ /// Initialize the encoder from existing `buffer`
+ pub fn new_from_buf(bit_width: u8, buffer: Vec<u8>) -> Self {
let max_run_byte_size = RleEncoder::min_buffer_size(bit_width);
- assert!(
- buffer.len() >= max_run_byte_size,
- "buffer length {} must be greater than {}",
- buffer.len(),
- max_run_byte_size
- );
- let bit_writer = BitWriter::new_from_buf(buffer, start);
+ let bit_writer = BitWriter::new_from_buf(buffer);
RleEncoder {
bit_width,
bit_writer,
@@ -244,14 +237,11 @@ impl RleEncoder {
fn flush_rle_run(&mut self) -> Result<()> {
assert!(self.repeat_count > 0);
let indicator_value = self.repeat_count << 1;
- let mut result = self.bit_writer.put_vlq_int(indicator_value as u64);
- result &= self.bit_writer.put_aligned(
+ self.bit_writer.put_vlq_int(indicator_value as u64);
+ self.bit_writer.put_aligned(
self.current_value,
bit_util::ceil(self.bit_width as i64, 8) as usize,
);
- if !result {
- return Err(general_err!("Failed to write RLE run"));
- }
self.num_buffered_values = 0;
self.repeat_count = 0;
Ok(())
@@ -259,13 +249,12 @@ impl RleEncoder {
fn flush_bit_packed_run(&mut self, update_indicator_byte: bool) -> Result<()> {
if self.indicator_byte_pos < 0 {
- self.indicator_byte_pos = self.bit_writer.skip(1)? as i64;
+ self.indicator_byte_pos = self.bit_writer.skip(1) as i64;
}
// Write all buffered values as bit-packed literals
for i in 0..self.num_buffered_values {
- let _ = self
- .bit_writer
+ self.bit_writer
.put_value(self.buffered_values[i], self.bit_width as usize);
}
self.num_buffered_values = 0;
@@ -273,13 +262,11 @@ impl RleEncoder {
// Write the indicator byte to the reserved position in `bit_writer`
let num_groups = self.bit_packed_count / 8;
let indicator_byte = ((num_groups << 1) | 1) as u8;
- if !self.bit_writer.put_aligned_offset(
+ self.bit_writer.put_aligned_offset(
indicator_byte,
1,
self.indicator_byte_pos as usize,
- ) {
- return Err(general_err!("Not enough space to write indicator byte"));
- }
+ );
self.indicator_byte_pos = -1;
self.bit_packed_count = 0;
}
@@ -443,7 +430,8 @@ impl RleDecoder {
let mut values_skipped = 0;
while values_skipped < num_values {
if self.rle_left > 0 {
- let num_values = cmp::min(num_values - values_skipped, self.rle_left as usize);
+ let num_values =
+ cmp::min(num_values - values_skipped, self.rle_left as usize);
self.rle_left -= num_values as u32;
values_skipped += num_values;
} else if self.bit_packed_left > 0 {
@@ -452,10 +440,7 @@ impl RleDecoder {
let bit_reader =
self.bit_reader.as_mut().expect("bit_reader should be set");
- num_values = bit_reader.skip(
- num_values,
- self.bit_width as usize,
- );
+ num_values = bit_reader.skip(num_values, self.bit_width as usize);
if num_values == 0 {
// Handle writers which truncate the final block
self.bit_packed_left = 0;
@@ -587,7 +572,9 @@ mod tests {
assert_eq!(skipped, 2);
let mut buffer = vec![0; 6];
- let remaining = decoder.get_batch::<i32>(&mut buffer).expect("getting remaining");
+ let remaining = decoder
+ .get_batch::<i32>(&mut buffer)
+ .expect("getting remaining");
assert_eq!(remaining, 6);
assert_eq!(buffer, expected);
}
@@ -671,7 +658,9 @@ mod tests {
let skipped = decoder.skip(50).expect("skipping first 50");
assert_eq!(skipped, 50);
- let remainder = decoder.get_batch::<bool>(&mut buffer).expect("getting remaining 50");
+ let remainder = decoder
+ .get_batch::<bool>(&mut buffer)
+ .expect("getting remaining 50");
assert_eq!(remainder, 50);
assert_eq!(buffer, expected);
@@ -687,7 +676,9 @@ mod tests {
}
let skipped = decoder.skip(50).expect("skipping first 50");
assert_eq!(skipped, 50);
- let remainder = decoder.get_batch::<bool>(&mut buffer).expect("getting remaining 50");
+ let remainder = decoder
+ .get_batch::<bool>(&mut buffer)
+ .expect("getting remaining 50");
assert_eq!(remainder, 50);
assert_eq!(buffer, expected);
}
@@ -739,7 +730,9 @@ mod tests {
let expected = vec![10, 20, 20, 20, 20, 30, 30, 30, 30, 30];
let skipped = decoder.skip(2).expect("skipping two values");
assert_eq!(skipped, 2);
- let remainder = decoder.get_batch_with_dict::<i32>(&dict, &mut buffer, 10).expect("getting remainder");
+ let remainder = decoder
+ .get_batch_with_dict::<i32>(&dict, &mut buffer, 10)
+ .expect("getting remainder");
assert_eq!(remainder, 10);
assert_eq!(buffer, expected);
@@ -751,17 +744,12 @@ mod tests {
let mut decoder: RleDecoder = RleDecoder::new(3);
decoder.set_data(data);
let mut buffer = vec![""; 8];
- let expected = vec![
- "eee", "fff", "ddd", "eee", "fff", "eee", "fff",
- "fff",
- ];
+ let expected = vec!["eee", "fff", "ddd", "eee", "fff", "eee", "fff", "fff"];
let skipped = decoder.skip(4).expect("skipping four values");
assert_eq!(skipped, 4);
- let remainder = decoder.get_batch_with_dict::<&str>(
- dict.as_slice(),
- buffer.as_mut_slice(),
- 8,
- ).expect("getting remainder");
+ let remainder = decoder
+ .get_batch_with_dict::<&str>(dict.as_slice(), buffer.as_mut_slice(), 8)
+ .expect("getting remainder");
assert_eq!(remainder, 8);
assert_eq!(buffer, expected);
}
diff --git a/parquet/src/util/bit_util.rs b/parquet/src/util/bit_util.rs
index 29269c4ad..84c4d10ed 100644
--- a/parquet/src/util/bit_util.rs
+++ b/parquet/src/util/bit_util.rs
@@ -18,7 +18,6 @@
use std::{cmp, mem::size_of};
use crate::data_type::AsBytes;
-use crate::errors::{ParquetError, Result};
use crate::util::{bit_packing::unpack32, memory::ByteBufferPtr};
#[inline]
@@ -138,7 +137,7 @@ where
/// This function should be removed after
/// [`int_roundings`](https://github.com/rust-lang/rust/issues/88581) is stable.
#[inline]
-pub fn ceil(value: i64, divisor: i64) -> i64 {
+pub fn ceil<T: num::Integer>(value: T, divisor: T) -> T {
num::Integer::div_ceil(&value, &divisor)
}
@@ -148,7 +147,7 @@ pub fn trailing_bits(v: u64, num_bits: usize) -> u64 {
if num_bits >= 64 {
v
} else {
- v & ((1<<num_bits) - 1)
+ v & ((1 << num_bits) - 1)
}
}
@@ -180,59 +179,32 @@ pub fn get_bit(data: &[u8], i: usize) -> bool {
/// bit packed or byte aligned fashion.
pub struct BitWriter {
buffer: Vec<u8>,
- max_bytes: usize,
buffered_values: u64,
- byte_offset: usize,
- bit_offset: usize,
- start: usize,
+ bit_offset: u8,
}
impl BitWriter {
pub fn new(max_bytes: usize) -> Self {
Self {
- buffer: vec![0; max_bytes],
- max_bytes,
+ buffer: Vec::with_capacity(max_bytes),
buffered_values: 0,
- byte_offset: 0,
bit_offset: 0,
- start: 0,
}
}
- /// Initializes the writer from the existing buffer `buffer` and starting
- /// offset `start`.
- pub fn new_from_buf(buffer: Vec<u8>, start: usize) -> Self {
- assert!(start < buffer.len());
- let len = buffer.len();
+ /// Initializes the writer appending to the existing buffer `buffer`
+ pub fn new_from_buf(buffer: Vec<u8>) -> Self {
Self {
buffer,
- max_bytes: len,
buffered_values: 0,
- byte_offset: start,
bit_offset: 0,
- start,
}
}
- /// Extend buffer size by `increment` bytes
- #[inline]
- pub fn extend(&mut self, increment: usize) {
- self.max_bytes += increment;
- let extra = vec![0; increment];
- self.buffer.extend(extra);
- }
-
- /// Report buffer size, in bytes
- #[inline]
- pub fn capacity(&mut self) -> usize {
- self.max_bytes
- }
-
/// Consumes and returns the current buffer.
#[inline]
pub fn consume(mut self) -> Vec<u8> {
self.flush();
- self.buffer.truncate(self.byte_offset);
self.buffer
}
@@ -241,53 +213,37 @@ impl BitWriter {
#[inline]
pub fn flush_buffer(&mut self) -> &[u8] {
self.flush();
- &self.buffer()[0..self.byte_offset]
+ self.buffer()
}
/// Clears the internal state so the buffer can be reused.
#[inline]
pub fn clear(&mut self) {
+ self.buffer.clear();
self.buffered_values = 0;
- self.byte_offset = self.start;
self.bit_offset = 0;
}
/// Flushes the internal buffered bits and the align the buffer to the next byte.
#[inline]
pub fn flush(&mut self) {
- let num_bytes = ceil(self.bit_offset as i64, 8) as usize;
- assert!(self.byte_offset + num_bytes <= self.max_bytes);
- memcpy_value(
- &self.buffered_values,
- num_bytes,
- &mut self.buffer[self.byte_offset..],
- );
+ let num_bytes = ceil(self.bit_offset, 8);
+ let slice = &self.buffered_values.to_le_bytes()[..num_bytes as usize];
+ self.buffer.extend_from_slice(slice);
self.buffered_values = 0;
self.bit_offset = 0;
- self.byte_offset += num_bytes;
}
/// Advances the current offset by skipping `num_bytes`, flushing the internal bit
/// buffer first.
/// This is useful when you want to jump over `num_bytes` bytes and come back later
/// to fill these bytes.
- ///
- /// Returns error if `num_bytes` is beyond the boundary of the internal buffer.
- /// Otherwise, returns the old offset.
#[inline]
- pub fn skip(&mut self, num_bytes: usize) -> Result<usize> {
+ pub fn skip(&mut self, num_bytes: usize) -> usize {
self.flush();
- assert!(self.byte_offset <= self.max_bytes);
- if self.byte_offset + num_bytes > self.max_bytes {
- return Err(general_err!(
- "Not enough bytes left in BitWriter. Need {} but only have {}",
- self.byte_offset + num_bytes,
- self.max_bytes
- ));
- }
- let result = self.byte_offset;
- self.byte_offset += num_bytes;
- Ok(result)
+ let result = self.buffer.len();
+ self.buffer.extend(std::iter::repeat(0).take(num_bytes));
+ result
}
/// Returns a slice containing the next `num_bytes` bytes starting from the current
@@ -295,32 +251,24 @@ impl BitWriter {
/// This is useful when you want to jump over `num_bytes` bytes and come back later
/// to fill these bytes.
#[inline]
- pub fn get_next_byte_ptr(&mut self, num_bytes: usize) -> Result<&mut [u8]> {
- let offset = self.skip(num_bytes)?;
- Ok(&mut self.buffer[offset..offset + num_bytes])
+ pub fn get_next_byte_ptr(&mut self, num_bytes: usize) -> &mut [u8] {
+ let offset = self.skip(num_bytes);
+ &mut self.buffer[offset..offset + num_bytes]
}
#[inline]
pub fn bytes_written(&self) -> usize {
- self.byte_offset - self.start + ceil(self.bit_offset as i64, 8) as usize
+ self.buffer.len() + ceil(self.bit_offset, 8) as usize
}
#[inline]
pub fn buffer(&self) -> &[u8] {
- &self.buffer[self.start..]
+ &self.buffer
}
#[inline]
pub fn byte_offset(&self) -> usize {
- self.byte_offset
- }
-
- /// Returns the internal buffer length. This is the maximum number of bytes that this
- /// writer can write. User needs to call `consume` to consume the current buffer
- /// before more data can be written.
- #[inline]
- pub fn buffer_len(&self) -> usize {
- self.max_bytes
+ self.buffer.len()
}
/// Writes the entire byte `value` at the byte `offset`
@@ -330,53 +278,36 @@ impl BitWriter {
/// Writes the `num_bits` LSB of value `v` to the internal buffer of this writer.
/// The `num_bits` must not be greater than 64. This is bit packed.
- ///
- /// Returns false if there's not enough room left. True otherwise.
#[inline]
- pub fn put_value(&mut self, v: u64, num_bits: usize) -> bool {
+ pub fn put_value(&mut self, v: u64, num_bits: usize) {
assert!(num_bits <= 64);
+ let num_bits = num_bits as u8;
assert_eq!(v.checked_shr(num_bits as u32).unwrap_or(0), 0); // covers case v >> 64
- if self.byte_offset * 8 + self.bit_offset + num_bits > self.max_bytes as usize * 8
- {
- return false;
- }
-
+ // Add value to buffered_values
self.buffered_values |= v << self.bit_offset;
self.bit_offset += num_bits;
- if self.bit_offset >= 64 {
- memcpy_value(
- &self.buffered_values,
- 8,
- &mut self.buffer[self.byte_offset..],
- );
- self.byte_offset += 8;
- self.bit_offset -= 64;
- self.buffered_values = 0;
+ if let Some(remaining) = self.bit_offset.checked_sub(64) {
+ self.buffer
+ .extend_from_slice(&self.buffered_values.to_le_bytes());
+ self.bit_offset = remaining;
+
// Perform checked right shift: v >> offset, where offset < 64, otherwise we
// shift all bits
self.buffered_values = v
.checked_shr((num_bits - self.bit_offset) as u32)
.unwrap_or(0);
}
- assert!(self.bit_offset < 64);
- true
}
/// Writes `val` of `num_bytes` bytes to the next aligned byte. If size of `T` is
/// larger than `num_bytes`, extra higher ordered bytes will be ignored.
- ///
- /// Returns false if there's not enough room left. True otherwise.
#[inline]
- pub fn put_aligned<T: AsBytes>(&mut self, val: T, num_bytes: usize) -> bool {
- let result = self.get_next_byte_ptr(num_bytes);
- if result.is_err() {
- // TODO: should we return `Result` for this func?
- return false;
- }
- let ptr = result.unwrap();
- memcpy_value(&val, num_bytes, ptr);
- true
+ pub fn put_aligned<T: AsBytes>(&mut self, val: T, num_bytes: usize) {
+ self.flush();
+ let slice = val.as_bytes();
+ let len = num_bytes.min(slice.len());
+ self.buffer.extend_from_slice(&slice[..len]);
}
/// Writes `val` of `num_bytes` bytes at the designated `offset`. The `offset` is the
@@ -384,49 +315,34 @@ impl BitWriter {
/// maintains. Note that this will overwrite any existing data between `offset` and
/// `offset + num_bytes`. Also that if size of `T` is larger than `num_bytes`, extra
/// higher ordered bytes will be ignored.
- ///
- /// Returns false if there's not enough room left, or the `pos` is not valid.
- /// True otherwise.
#[inline]
pub fn put_aligned_offset<T: AsBytes>(
&mut self,
val: T,
num_bytes: usize,
offset: usize,
- ) -> bool {
- if num_bytes + offset > self.max_bytes {
- return false;
- }
- memcpy_value(
- &val,
- num_bytes,
- &mut self.buffer[offset..offset + num_bytes],
- );
- true
+ ) {
+ let slice = val.as_bytes();
+ let len = num_bytes.min(slice.len());
+ self.buffer[offset..offset + len].copy_from_slice(&slice[..len])
}
/// Writes a VLQ encoded integer `v` to this buffer. The value is byte aligned.
- ///
- /// Returns false if there's not enough room left. True otherwise.
#[inline]
- pub fn put_vlq_int(&mut self, mut v: u64) -> bool {
- let mut result = true;
+ pub fn put_vlq_int(&mut self, mut v: u64) {
while v & 0xFFFFFFFFFFFFFF80 != 0 {
- result &= self.put_aligned::<u8>(((v & 0x7F) | 0x80) as u8, 1);
+ self.put_aligned::<u8>(((v & 0x7F) | 0x80) as u8, 1);
v >>= 7;
}
- result &= self.put_aligned::<u8>((v & 0x7F) as u8, 1);
- result
+ self.put_aligned::<u8>((v & 0x7F) as u8, 1);
}
/// Writes a zigzag-VLQ encoded (in little endian order) int `v` to this buffer.
/// Zigzag-VLQ is a variant of VLQ encoding where negative and positive
/// numbers are encoded in a zigzag fashion.
/// See: https://developers.google.com/protocol-buffers/docs/encoding
- ///
- /// Returns false if there's not enough room left. True otherwise.
#[inline]
- pub fn put_zigzag_vlq_int(&mut self, v: i64) -> bool {
+ pub fn put_zigzag_vlq_int(&mut self, v: i64) {
let u: u64 = ((v << 1) ^ (v >> 63)) as u64;
self.put_vlq_int(u)
}
@@ -488,7 +404,7 @@ impl BitReader {
/// Gets the current byte offset
#[inline]
pub fn get_byte_offset(&self) -> usize {
- self.byte_offset + ceil(self.bit_offset as i64, 8) as usize
+ self.byte_offset + ceil(self.bit_offset, 8)
}
/// Reads a value of type `T` and of size `num_bits`.
@@ -568,7 +484,7 @@ impl BitReader {
.expect("expected to have more data");
i += 1;
}
- return values_to_read
+ return values_to_read;
}
// First align bit offset to byte offset
@@ -645,8 +561,7 @@ impl BitReader {
// First align bit offset to byte offset
if self.bit_offset != 0 {
while values_skipped < num_values && self.bit_offset != 0 {
- self
- .skip_value(num_bits);
+ self.skip_value(num_bits);
values_skipped += 1;
}
}
@@ -656,7 +571,6 @@ impl BitReader {
values_skipped += 32;
}
-
assert!(num_values - values_skipped < 32);
self.reload_buffer_values();
@@ -696,7 +610,7 @@ impl BitReader {
/// Returns `Some` if there's enough bytes left to form a value of `T`.
/// Otherwise `None`.
pub fn get_aligned<T: FromBytes>(&mut self, num_bytes: usize) -> Option<T> {
- let bytes_read = ceil(self.bit_offset as i64, 8) as usize;
+ let bytes_read = ceil(self.bit_offset, 8);
if self.byte_offset + bytes_read + num_bytes > self.total_bytes {
return None;
}
@@ -792,9 +706,9 @@ mod tests {
assert_eq!(ceil(8, 8), 1);
assert_eq!(ceil(9, 8), 2);
assert_eq!(ceil(9, 9), 1);
- assert_eq!(ceil(10000000000, 10), 1000000000);
- assert_eq!(ceil(10, 10000000000), 1);
- assert_eq!(ceil(10000000000, 1000000000), 10);
+ assert_eq!(ceil(10000000000_u64, 10), 1000000000);
+ assert_eq!(ceil(10_u64, 10000000000), 1);
+ assert_eq!(ceil(10000000000_u64, 1000000000), 10);
}
#[test]
@@ -846,16 +760,16 @@ mod tests {
fn test_bit_reader_skip() {
let buffer = vec![255, 0];
let mut bit_reader = BitReader::from(buffer);
- let skipped = bit_reader.skip(1,1);
+ let skipped = bit_reader.skip(1, 1);
assert_eq!(skipped, 1);
assert_eq!(bit_reader.get_value::<i32>(1), Some(1));
- let skipped = bit_reader.skip(2,2);
+ let skipped = bit_reader.skip(2, 2);
assert_eq!(skipped, 2);
assert_eq!(bit_reader.get_value::<i32>(2), Some(3));
- let skipped = bit_reader.skip(4,1);
+ let skipped = bit_reader.skip(4, 1);
assert_eq!(skipped, 4);
assert_eq!(bit_reader.get_value::<i32>(4), Some(0));
- let skipped = bit_reader.skip(1,1);
+ let skipped = bit_reader.skip(1, 1);
assert_eq!(skipped, 0);
}
@@ -973,7 +887,7 @@ mod tests {
#[test]
fn test_skip() {
let mut writer = BitWriter::new(5);
- let old_offset = writer.skip(1).expect("skip() should return OK");
+ let old_offset = writer.skip(1);
writer.put_aligned(42, 4);
writer.put_aligned_offset(0x10, 1, old_offset);
let result = writer.consume();
@@ -981,16 +895,15 @@ mod tests {
writer = BitWriter::new(4);
let result = writer.skip(5);
- assert!(result.is_err());
+ assert_eq!(result, 0);
+ assert_eq!(writer.buffer(), &[0; 5])
}
#[test]
fn test_get_next_byte_ptr() {
let mut writer = BitWriter::new(5);
{
- let first_byte = writer
- .get_next_byte_ptr(1)
- .expect("get_next_byte_ptr() should return OK");
+ let first_byte = writer.get_next_byte_ptr(1);
first_byte[0] = 0x10;
}
writer.put_aligned(42, 4);
@@ -1017,8 +930,7 @@ mod tests {
let mut writer = BitWriter::new(len);
for i in 0..8 {
- let result = writer.put_value(i % 2, 1);
- assert!(result);
+ writer.put_value(i % 2, 1);
}
writer.flush();
@@ -1029,11 +941,10 @@ mod tests {
// Write 00110011
for i in 0..8 {
- let result = match i {
+ match i {
0 | 1 | 4 | 5 => writer.put_value(false as u64, 1),
_ => writer.put_value(true as u64, 1),
- };
- assert!(result);
+ }
}
writer.flush();
{
@@ -1078,19 +989,13 @@ mod tests {
fn test_put_value_rand_numbers(total: usize, num_bits: usize) {
assert!(num_bits < 64);
- let num_bytes = ceil(num_bits as i64, 8);
+ let num_bytes = ceil(num_bits, 8);
let mut writer = BitWriter::new(num_bytes as usize * total);
let values: Vec<u64> = random_numbers::<u64>(total)
.iter()
.map(|v| v & ((1 << num_bits) - 1))
.collect();
- (0..total).for_each(|i| {
- assert!(
- writer.put_value(values[i] as u64, num_bits),
- "[{}]: put_value() failed",
- i
- );
- });
+ (0..total).for_each(|i| writer.put_value(values[i] as u64, num_bits));
let mut reader = BitReader::from(writer.consume());
(0..total).for_each(|i| {
@@ -1124,7 +1029,7 @@ mod tests {
T: FromBytes + Default + Clone + Debug + Eq,
{
assert!(num_bits <= 32);
- let num_bytes = ceil(num_bits as i64, 8);
+ let num_bytes = ceil(num_bits, 8);
let mut writer = BitWriter::new(num_bytes as usize * total);
let values: Vec<u32> = random_numbers::<u32>(total)
@@ -1136,9 +1041,7 @@ mod tests {
let expected_values: Vec<T> =
values.iter().map(|v| from_ne_slice(v.as_bytes())).collect();
- (0..total).for_each(|i| {
- assert!(writer.put_value(values[i] as u64, num_bits));
- });
+ (0..total).for_each(|i| writer.put_value(values[i] as u64, num_bits));
let buf = writer.consume();
let mut reader = BitReader::from(buf);
@@ -1175,7 +1078,7 @@ mod tests {
assert!(total % 2 == 0);
let aligned_value_byte_width = std::mem::size_of::<T>();
- let value_byte_width = ceil(num_bits as i64, 8) as usize;
+ let value_byte_width = ceil(num_bits, 8);
let mut writer =
BitWriter::new((total / 2) * (aligned_value_byte_width + value_byte_width));
let values: Vec<u32> = random_numbers::<u32>(total / 2)
@@ -1187,17 +1090,9 @@ mod tests {
for i in 0..total {
let j = i / 2;
if i % 2 == 0 {
- assert!(
- writer.put_value(values[j] as u64, num_bits),
- "[{}]: put_value() failed",
- i
- );
+ writer.put_value(values[j] as u64, num_bits);
} else {
- assert!(
- writer.put_aligned::<T>(aligned_values[j], aligned_value_byte_width),
- "[{}]: put_aligned() failed",
- i
- );
+ writer.put_aligned::<T>(aligned_values[j], aligned_value_byte_width)
}
}
@@ -1231,13 +1126,7 @@ mod tests {
let total = 64;
let mut writer = BitWriter::new(total * 32);
let values = random_numbers::<u32>(total);
- (0..total).for_each(|i| {
- assert!(
- writer.put_vlq_int(values[i] as u64),
- "[{}]; put_vlq_int() failed",
- i
- );
- });
+ (0..total).for_each(|i| writer.put_vlq_int(values[i] as u64));
let mut reader = BitReader::from(writer.consume());
(0..total).for_each(|i| {
@@ -1257,13 +1146,7 @@ mod tests {
let total = 64;
let mut writer = BitWriter::new(total * 32);
let values = random_numbers::<i32>(total);
- (0..total).for_each(|i| {
- assert!(
- writer.put_zigzag_vlq_int(values[i] as i64),
- "[{}]; put_zigzag_vlq_int() failed",
- i
- );
- });
+ (0..total).for_each(|i| writer.put_zigzag_vlq_int(values[i] as i64));
let mut reader = BitReader::from(writer.consume());
(0..total).for_each(|i| {
diff --git a/parquet/src/util/test_common/page_util.rs b/parquet/src/util/test_common/page_util.rs
index f56eaf85e..d7653d4e5 100644
--- a/parquet/src/util/test_common/page_util.rs
+++ b/parquet/src/util/test_common/page_util.rs
@@ -16,11 +16,10 @@
// under the License.
use crate::basic::Encoding;
-use crate::column::page::{PageMetadata, PageReader};
use crate::column::page::{Page, PageIterator};
+use crate::column::page::{PageMetadata, PageReader};
use crate::data_type::DataType;
use crate::encodings::encoding::{get_encoder, DictEncoder, Encoder};
-use crate::encodings::levels::max_buffer_size;
use crate::encodings::levels::LevelEncoder;
use crate::errors::Result;
use crate::schema::types::{ColumnDescPtr, SchemaDescPtr};
@@ -75,8 +74,7 @@ impl DataPageBuilderImpl {
if max_level <= 0 {
return 0;
}
- let size = max_buffer_size(Encoding::RLE, max_level, levels.len());
- let mut level_encoder = LevelEncoder::v1(Encoding::RLE, max_level, vec![0; size]);
+ let mut level_encoder = LevelEncoder::v1(Encoding::RLE, max_level, levels.len());
level_encoder.put(levels).expect("put() should be OK");
let encoded_levels = level_encoder.consume().expect("consume() should be OK");
// Actual encoded bytes (without length offset)