You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/07/24 15:46:07 UTC
[arrow-rs] branch master updated: Support skip_def_levels (only max_def_levels=1) for ColumnLevelDecoder (#2111)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new fce662600 Support skip_def_levels (only max_def_levels=1) for ColumnLevelDecoder (#2111)
fce662600 is described below
commit fce66260015642e691f246d29666d8f9c3197f8a
Author: Yang Jiang <ya...@ebay.com>
AuthorDate: Sun Jul 24 23:46:03 2022 +0800
Support skip_def_levels (only max_def_levels=1) for ColumnLevelDecoder (#2111)
* Support skip_def_levels (only max_def_levels=1) for ColumnLevelDecoder
* Support skip_def_levels (only max_def_levels=1) for ColumnLevelDecoderImpl
* fix ut
* fix and refine support skip definition_level
* fix clippy
* fix comment
---
.../src/arrow/record_reader/definition_levels.rs | 108 ++++++++++++++++++++-
parquet/src/column/reader/decoder.rs | 37 ++++++-
2 files changed, 139 insertions(+), 6 deletions(-)
diff --git a/parquet/src/arrow/record_reader/definition_levels.rs b/parquet/src/arrow/record_reader/definition_levels.rs
index a12772af0..53eeab9a5 100644
--- a/parquet/src/arrow/record_reader/definition_levels.rs
+++ b/parquet/src/arrow/record_reader/definition_levels.rs
@@ -20,6 +20,7 @@ use std::ops::Range;
use arrow::array::BooleanBufferBuilder;
use arrow::bitmap::Bitmap;
use arrow::buffer::Buffer;
+use arrow::util::bit_chunk_iterator::UnalignedBitChunk;
use crate::arrow::buffer::bit_util::count_set_bits;
use crate::arrow::record_reader::buffer::BufferQueue;
@@ -216,10 +217,15 @@ impl ColumnLevelDecoder for DefinitionLevelBufferDecoder {
impl DefinitionLevelDecoder for DefinitionLevelBufferDecoder {
fn skip_def_levels(
&mut self,
- _num_levels: usize,
- _max_def_level: i16,
+ num_levels: usize,
+ max_def_level: i16,
) -> Result<(usize, usize)> {
- Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
+ match &mut self.decoder {
+ MaybePacked::Fallback(decoder) => {
+ decoder.skip_def_levels(num_levels, max_def_level)
+ }
+ MaybePacked::Packed(decoder) => decoder.skip(num_levels),
+ }
}
}
@@ -346,6 +352,41 @@ impl PackedDecoder {
}
Ok(read)
}
+
+ /// Skips `level_num` definition levels
+ ///
+ /// Returns the number of values skipped and the number of levels skipped
+ fn skip(&mut self, level_num: usize) -> Result<(usize, usize)> {
+ let mut skipped_value = 0;
+ let mut skipped_level = 0;
+ while skipped_level != level_num {
+ if self.rle_left != 0 {
+ let to_skip = self.rle_left.min(level_num - skipped_level);
+ self.rle_left -= to_skip;
+ skipped_level += to_skip;
+ if self.rle_value {
+ skipped_value += to_skip;
+ }
+ } else if self.packed_count != self.packed_offset {
+ let to_skip = (self.packed_count - self.packed_offset)
+ .min(level_num - skipped_level);
+ let offset = self.data_offset * 8 + self.packed_offset;
+ let bit_chunk =
+ UnalignedBitChunk::new(self.data.as_ref(), offset, to_skip);
+ skipped_value += bit_chunk.count_ones();
+ self.packed_offset += to_skip;
+ skipped_level += to_skip;
+ if self.packed_offset == self.packed_count {
+ self.data_offset += self.packed_count / 8;
+ }
+ } else if self.data_offset == self.data.len() {
+ break;
+ } else {
+ self.next_rle_block()?
+ }
+ }
+ Ok((skipped_value, skipped_level))
+ }
}
#[cfg(test)]
@@ -392,6 +433,67 @@ mod tests {
assert_eq!(decoded.as_slice(), expected.as_slice());
}
+ #[test]
+ fn test_packed_decoder_skip() {
+ let mut rng = thread_rng();
+ let len: usize = rng.gen_range(512..1024);
+
+ let mut expected = BooleanBufferBuilder::new(len);
+ let mut encoder = RleEncoder::new(1, 1024);
+
+ let mut total_value = 0;
+ for _ in 0..len {
+ let bool = rng.gen_bool(0.8);
+ assert!(encoder.put(bool as u64).unwrap());
+ expected.append(bool);
+ if bool {
+ total_value += 1;
+ }
+ }
+ assert_eq!(expected.len(), len);
+
+ let encoded = encoder.consume().unwrap();
+ let mut decoder = PackedDecoder::new();
+ decoder.set_data(Encoding::RLE, ByteBufferPtr::new(encoded));
+
+ let mut skip_value = 0;
+ let mut read_value = 0;
+ let mut skip_level = 0;
+ let mut read_level = 0;
+
+ loop {
+ let offset = skip_level + read_level;
+ let remaining_levels = len - offset;
+ if remaining_levels == 0 {
+ break;
+ }
+ let to_read_or_skip_level = rng.gen_range(1..=remaining_levels);
+ if rng.gen_bool(0.5) {
+ let (skip_val_num, skip_level_num) =
+ decoder.skip(to_read_or_skip_level).unwrap();
+ skip_value += skip_val_num;
+ skip_level += skip_level_num
+ } else {
+ let mut decoded = BooleanBufferBuilder::new(to_read_or_skip_level);
+ let read_level_num =
+ decoder.read(&mut decoded, to_read_or_skip_level).unwrap();
+ read_level += read_level_num;
+ for i in 0..read_level_num {
+ assert!(!decoded.is_empty());
+ //check each read bit
+ let read_bit = decoded.get_bit(i);
+ if read_bit {
+ read_value += 1;
+ }
+ let expect_bit = expected.get_bit(i + offset);
+ assert_eq!(read_bit, expect_bit);
+ }
+ }
+ }
+ assert_eq!(read_level + skip_level, len);
+ assert_eq!(read_value + skip_value, total_value);
+ }
+
#[test]
fn test_split_off() {
let t = Type::primitive_type_builder("col", PhysicalType::INT32)
diff --git a/parquet/src/column/reader/decoder.rs b/parquet/src/column/reader/decoder.rs
index 5879c6180..b95b24a21 100644
--- a/parquet/src/column/reader/decoder.rs
+++ b/parquet/src/column/reader/decoder.rs
@@ -318,10 +318,41 @@ impl ColumnLevelDecoder for ColumnLevelDecoderImpl {
impl DefinitionLevelDecoder for ColumnLevelDecoderImpl {
fn skip_def_levels(
&mut self,
- _num_levels: usize,
- _max_def_level: i16,
+ num_levels: usize,
+ max_def_level: i16,
) -> Result<(usize, usize)> {
- Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
+ let mut level_skip = 0;
+ let mut value_skip = 0;
+ match self.decoder.as_mut().unwrap() {
+ LevelDecoderInner::Packed(reader, bit_width) => {
+ for _ in 0..num_levels {
+ // Values are delimited by max_def_level
+ if max_def_level
+ == reader
+ .get_value::<i16>(*bit_width as usize)
+ .expect("Not enough values in Packed ColumnLevelDecoderImpl.")
+ {
+ value_skip += 1;
+ }
+ level_skip += 1;
+ }
+ }
+ LevelDecoderInner::Rle(reader) => {
+ for _ in 0..num_levels {
+ if let Some(level) = reader
+ .get::<i16>()
+ .expect("Not enough values in Rle ColumnLevelDecoderImpl.")
+ {
+ // Values are delimited by max_def_level
+ if level == max_def_level {
+ value_skip += 1;
+ }
+ }
+ level_skip += 1;
+ }
+ }
+ }
+ Ok((value_skip, level_skip))
}
}