You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/06/01 12:13:37 UTC
[arrow-rs] branch master updated: Handle trailing padding when skipping repetition levels (#3911) (#4319)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 0baf99a22 Handle trailing padding when skipping repetition levels (#3911) (#4319)
0baf99a22 is described below
commit 0baf99a2244d39ff910ec09a0bc3a30b1138a577
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Thu Jun 1 13:13:30 2023 +0100
Handle trailing padding when skipping repetition levels (#3911) (#4319)
---
parquet/src/column/reader.rs | 32 +++++++++------
parquet/src/column/reader/decoder.rs | 75 +++++++++++++++++++++++++++---------
2 files changed, 76 insertions(+), 31 deletions(-)
diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs
index 0bb6e0024..3434eba69 100644
--- a/parquet/src/column/reader.rs
+++ b/parquet/src/column/reader.rs
@@ -296,11 +296,11 @@ where
///
/// Returns the number of records skipped
pub fn skip_records(&mut self, num_records: usize) -> Result<usize> {
- let mut remaining = num_records;
- while remaining != 0 {
+ let mut remaining_records = num_records;
+ while remaining_records != 0 {
if self.num_buffered_values == self.num_decoded_values {
let metadata = match self.page_reader.peek_next_page()? {
- None => return Ok(num_records - remaining),
+ None => return Ok(num_records - remaining_records),
Some(metadata) => metadata,
};
@@ -312,29 +312,37 @@ where
// If page has less rows than the remaining records to
// be skipped, skip entire page
- if metadata.num_rows <= remaining {
+ if metadata.num_rows <= remaining_records {
self.page_reader.skip_next_page()?;
- remaining -= metadata.num_rows;
+ remaining_records -= metadata.num_rows;
continue;
};
// because self.num_buffered_values == self.num_decoded_values means
// we need reads a new page and set up the decoders for levels
if !self.read_new_page()? {
- return Ok(num_records - remaining);
+ return Ok(num_records - remaining_records);
}
}
// start skip values in page level
- let to_read = remaining
- .min((self.num_buffered_values - self.num_decoded_values) as usize);
+
+ // The number of levels in the current data page
+ let buffered_levels =
+ (self.num_buffered_values - self.num_decoded_values) as usize;
let (records_read, rep_levels_read) = match self.rep_level_decoder.as_mut() {
- Some(decoder) => decoder.skip_rep_levels(to_read)?,
- None => (to_read, to_read),
+ Some(decoder) => {
+ decoder.skip_rep_levels(remaining_records, buffered_levels)?
+ }
+ None => {
+ // No repetition levels, so each level corresponds to a row
+ let levels = buffered_levels.min(remaining_records);
+ (levels, levels)
+ }
};
self.num_decoded_values += rep_levels_read as u32;
- remaining -= records_read;
+ remaining_records -= records_read;
if self.num_buffered_values == self.num_decoded_values {
// Exhausted buffered page - no need to advance other decoders
@@ -364,7 +372,7 @@ where
));
}
}
- Ok(num_records - remaining)
+ Ok(num_records - remaining_records)
}
/// Read the next page as a dictionary page. If the next page is not a dictionary page,
diff --git a/parquet/src/column/reader/decoder.rs b/parquet/src/column/reader/decoder.rs
index f57b3e16d..3a6795c8c 100644
--- a/parquet/src/column/reader/decoder.rs
+++ b/parquet/src/column/reader/decoder.rs
@@ -82,11 +82,15 @@ pub trait ColumnLevelDecoder {
}
pub trait RepetitionLevelDecoder: ColumnLevelDecoder {
- /// Skips over repetition level corresponding to `num_records` records, where a record
- /// is delimited by a repetition level of 0
+ /// Skips over up to `num_levels` repetition levels corresponding to `num_records` records,
+ /// where a record is delimited by a repetition level of 0
///
/// Returns the number of records skipped, and the number of levels skipped
- fn skip_rep_levels(&mut self, num_records: usize) -> Result<(usize, usize)>;
+ fn skip_rep_levels(
+ &mut self,
+ num_records: usize,
+ num_levels: usize,
+ ) -> Result<(usize, usize)>;
}
pub trait DefinitionLevelDecoder: ColumnLevelDecoder {
@@ -395,22 +399,30 @@ impl DefinitionLevelDecoder for ColumnLevelDecoderImpl {
}
impl RepetitionLevelDecoder for ColumnLevelDecoderImpl {
- fn skip_rep_levels(&mut self, num_records: usize) -> Result<(usize, usize)> {
+ fn skip_rep_levels(
+ &mut self,
+ num_records: usize,
+ num_levels: usize,
+ ) -> Result<(usize, usize)> {
let mut level_skip = 0;
let mut record_skip = 0;
- loop {
+ while level_skip < num_levels {
+ let remaining_levels = num_levels - level_skip;
+
if self.buffer.is_empty() {
- // Read SKIP_BUFFER_SIZE as we don't know how many to read
- self.read_to_buffer(SKIP_BUFFER_SIZE)?;
+ // Only read number of needed values
+ self.read_to_buffer(remaining_levels.min(SKIP_BUFFER_SIZE))?;
if self.buffer.is_empty() {
// Reached end of page
break;
}
}
+ let max_skip = self.buffer.len().min(remaining_levels);
+
let mut to_skip = 0;
- while to_skip < self.buffer.len() && record_skip != num_records {
+ while to_skip < max_skip && record_skip != num_records {
if self.buffer[to_skip] == 0 {
record_skip += 1;
}
@@ -418,12 +430,12 @@ impl RepetitionLevelDecoder for ColumnLevelDecoderImpl {
}
// Find end of record
- while to_skip < self.buffer.len() && self.buffer[to_skip] != 0 {
+ while to_skip < max_skip && self.buffer[to_skip] != 0 {
to_skip += 1;
}
level_skip += to_skip;
- if to_skip >= self.buffer.len() {
+ if to_skip == self.buffer.len() {
// Need to to read more values
self.buffer.clear();
continue;
@@ -473,17 +485,39 @@ mod tests {
}
#[test]
- fn test_skip() {
- let mut rng = thread_rng();
- let total_len = 10000;
- let encoded: Vec<i16> = (0..total_len).map(|_| rng.gen_range(0..5)).collect();
- let mut encoder = RleEncoder::new(3, 1024);
- for v in &encoded {
- encoder.put(*v as _)
- }
+ fn test_skip_padding() {
+ let mut encoder = RleEncoder::new(1, 1024);
+ encoder.put(0);
+ (0..3).for_each(|_| encoder.put(1));
let data = ByteBufferPtr::new(encoder.consume());
+ let mut decoder = ColumnLevelDecoderImpl::new(1);
+ decoder.set_data(Encoding::RLE, data.clone());
+ let (records, levels) = decoder.skip_rep_levels(100, 4).unwrap();
+ assert_eq!(records, 1);
+ assert_eq!(levels, 4);
+
+ // The length of the final bit packed run is ambiguous, so without the correct
+ // levels limit, it will decode zero padding
+ let mut decoder = ColumnLevelDecoderImpl::new(1);
+ decoder.set_data(Encoding::RLE, data);
+ let (records, levels) = decoder.skip_rep_levels(100, 6).unwrap();
+ assert_eq!(records, 3);
+ assert_eq!(levels, 6);
+ }
+
+ #[test]
+ fn test_skip() {
for _ in 0..10 {
+ let mut rng = thread_rng();
+ let total_len = 10000_usize;
+ let encoded: Vec<i16> = (0..total_len).map(|_| rng.gen_range(0..5)).collect();
+ let mut encoder = RleEncoder::new(3, 1024);
+ for v in &encoded {
+ encoder.put(*v as _)
+ }
+ let data = ByteBufferPtr::new(encoder.consume());
+
test_skip_levels(&encoded, data.clone(), |decoder, read, to_read| {
let (values_skipped, levels_skipped) =
decoder.skip_def_levels(to_read, 5).unwrap();
@@ -497,8 +531,11 @@ mod tests {
});
test_skip_levels(&encoded, data.clone(), |decoder, read, to_read| {
+ let remaining_levels = total_len - *read;
let (records_skipped, levels_skipped) =
- decoder.skip_rep_levels(to_read).unwrap();
+ decoder.skip_rep_levels(to_read, remaining_levels).unwrap();
+
+ assert!(levels_skipped <= remaining_levels);
// If not run out of values
if levels_skipped + *read != encoded.len() {