You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/07/24 15:46:07 UTC

[arrow-rs] branch master updated: Support skip_def_levels (only max_def_levels=1) for ColumnLevelDecoder (#2111)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new fce662600 Support skip_def_levels (only max_def_levels=1) for ColumnLevelDecoder (#2111)
fce662600 is described below

commit fce66260015642e691f246d29666d8f9c3197f8a
Author: Yang Jiang <ya...@ebay.com>
AuthorDate: Sun Jul 24 23:46:03 2022 +0800

    Support skip_def_levels (only max_def_levels=1) for ColumnLevelDecoder (#2111)
    
    * Support skip_def_levels (only max_def_levels=1) for ColumnLevelDecoder
    
    * Support skip_def_levels (only max_def_levels=1) for ColumnLevelDecoderImpl
    
    * fix ut
    
    * fix and refine support skip definition_level
    
    * fix clippy
    
    * fix comment
---
 .../src/arrow/record_reader/definition_levels.rs   | 108 ++++++++++++++++++++-
 parquet/src/column/reader/decoder.rs               |  37 ++++++-
 2 files changed, 139 insertions(+), 6 deletions(-)

diff --git a/parquet/src/arrow/record_reader/definition_levels.rs b/parquet/src/arrow/record_reader/definition_levels.rs
index a12772af0..53eeab9a5 100644
--- a/parquet/src/arrow/record_reader/definition_levels.rs
+++ b/parquet/src/arrow/record_reader/definition_levels.rs
@@ -20,6 +20,7 @@ use std::ops::Range;
 use arrow::array::BooleanBufferBuilder;
 use arrow::bitmap::Bitmap;
 use arrow::buffer::Buffer;
+use arrow::util::bit_chunk_iterator::UnalignedBitChunk;
 
 use crate::arrow::buffer::bit_util::count_set_bits;
 use crate::arrow::record_reader::buffer::BufferQueue;
@@ -216,10 +217,15 @@ impl ColumnLevelDecoder for DefinitionLevelBufferDecoder {
 impl DefinitionLevelDecoder for DefinitionLevelBufferDecoder {
     fn skip_def_levels(
         &mut self,
-        _num_levels: usize,
-        _max_def_level: i16,
+        num_levels: usize,
+        max_def_level: i16,
     ) -> Result<(usize, usize)> {
-        Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
+        match &mut self.decoder {
+            MaybePacked::Fallback(decoder) => {
+                decoder.skip_def_levels(num_levels, max_def_level)
+            }
+            MaybePacked::Packed(decoder) => decoder.skip(num_levels),
+        }
     }
 }
 
@@ -346,6 +352,41 @@ impl PackedDecoder {
         }
         Ok(read)
     }
+
+    /// Skips `level_num` definition levels
+    ///
+    /// Returns the number of values skipped and the number of levels skipped
+    fn skip(&mut self, level_num: usize) -> Result<(usize, usize)> {
+        let mut skipped_value = 0;
+        let mut skipped_level = 0;
+        while skipped_level != level_num {
+            if self.rle_left != 0 {
+                let to_skip = self.rle_left.min(level_num - skipped_level);
+                self.rle_left -= to_skip;
+                skipped_level += to_skip;
+                if self.rle_value {
+                    skipped_value += to_skip;
+                }
+            } else if self.packed_count != self.packed_offset {
+                let to_skip = (self.packed_count - self.packed_offset)
+                    .min(level_num - skipped_level);
+                let offset = self.data_offset * 8 + self.packed_offset;
+                let bit_chunk =
+                    UnalignedBitChunk::new(self.data.as_ref(), offset, to_skip);
+                skipped_value += bit_chunk.count_ones();
+                self.packed_offset += to_skip;
+                skipped_level += to_skip;
+                if self.packed_offset == self.packed_count {
+                    self.data_offset += self.packed_count / 8;
+                }
+            } else if self.data_offset == self.data.len() {
+                break;
+            } else {
+                self.next_rle_block()?
+            }
+        }
+        Ok((skipped_value, skipped_level))
+    }
 }
 
 #[cfg(test)]
@@ -392,6 +433,67 @@ mod tests {
         assert_eq!(decoded.as_slice(), expected.as_slice());
     }
 
+    #[test]
+    fn test_packed_decoder_skip() {
+        let mut rng = thread_rng();
+        let len: usize = rng.gen_range(512..1024);
+
+        let mut expected = BooleanBufferBuilder::new(len);
+        let mut encoder = RleEncoder::new(1, 1024);
+
+        let mut total_value = 0;
+        for _ in 0..len {
+            let bool = rng.gen_bool(0.8);
+            assert!(encoder.put(bool as u64).unwrap());
+            expected.append(bool);
+            if bool {
+                total_value += 1;
+            }
+        }
+        assert_eq!(expected.len(), len);
+
+        let encoded = encoder.consume().unwrap();
+        let mut decoder = PackedDecoder::new();
+        decoder.set_data(Encoding::RLE, ByteBufferPtr::new(encoded));
+
+        let mut skip_value = 0;
+        let mut read_value = 0;
+        let mut skip_level = 0;
+        let mut read_level = 0;
+
+        loop {
+            let offset = skip_level + read_level;
+            let remaining_levels = len - offset;
+            if remaining_levels == 0 {
+                break;
+            }
+            let to_read_or_skip_level = rng.gen_range(1..=remaining_levels);
+            if rng.gen_bool(0.5) {
+                let (skip_val_num, skip_level_num) =
+                    decoder.skip(to_read_or_skip_level).unwrap();
+                skip_value += skip_val_num;
+                skip_level += skip_level_num
+            } else {
+                let mut decoded = BooleanBufferBuilder::new(to_read_or_skip_level);
+                let read_level_num =
+                    decoder.read(&mut decoded, to_read_or_skip_level).unwrap();
+                read_level += read_level_num;
+                for i in 0..read_level_num {
+                    assert!(!decoded.is_empty());
+                    //check each read bit
+                    let read_bit = decoded.get_bit(i);
+                    if read_bit {
+                        read_value += 1;
+                    }
+                    let expect_bit = expected.get_bit(i + offset);
+                    assert_eq!(read_bit, expect_bit);
+                }
+            }
+        }
+        assert_eq!(read_level + skip_level, len);
+        assert_eq!(read_value + skip_value, total_value);
+    }
+
     #[test]
     fn test_split_off() {
         let t = Type::primitive_type_builder("col", PhysicalType::INT32)
diff --git a/parquet/src/column/reader/decoder.rs b/parquet/src/column/reader/decoder.rs
index 5879c6180..b95b24a21 100644
--- a/parquet/src/column/reader/decoder.rs
+++ b/parquet/src/column/reader/decoder.rs
@@ -318,10 +318,41 @@ impl ColumnLevelDecoder for ColumnLevelDecoderImpl {
 impl DefinitionLevelDecoder for ColumnLevelDecoderImpl {
     fn skip_def_levels(
         &mut self,
-        _num_levels: usize,
-        _max_def_level: i16,
+        num_levels: usize,
+        max_def_level: i16,
     ) -> Result<(usize, usize)> {
-        Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
+        let mut level_skip = 0;
+        let mut value_skip = 0;
+        match self.decoder.as_mut().unwrap() {
+            LevelDecoderInner::Packed(reader, bit_width) => {
+                for _ in 0..num_levels {
+                    // Values are delimited by max_def_level
+                    if max_def_level
+                        == reader
+                            .get_value::<i16>(*bit_width as usize)
+                            .expect("Not enough values in Packed ColumnLevelDecoderImpl.")
+                    {
+                        value_skip += 1;
+                    }
+                    level_skip += 1;
+                }
+            }
+            LevelDecoderInner::Rle(reader) => {
+                for _ in 0..num_levels {
+                    if let Some(level) = reader
+                        .get::<i16>()
+                        .expect("Not enough values in Rle ColumnLevelDecoderImpl.")
+                    {
+                        // Values are delimited by max_def_level
+                        if level == max_def_level {
+                            value_skip += 1;
+                        }
+                    }
+                    level_skip += 1;
+                }
+            }
+        }
+        Ok((value_skip, level_skip))
     }
 }