You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/06/01 12:13:37 UTC

[arrow-rs] branch master updated: Handle trailing padding when skipping repetition levels (#3911) (#4319)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 0baf99a22 Handle trailing padding when skipping repetition levels (#3911) (#4319)
0baf99a22 is described below

commit 0baf99a2244d39ff910ec09a0bc3a30b1138a577
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Thu Jun 1 13:13:30 2023 +0100

    Handle trailing padding when skipping repetition levels (#3911) (#4319)
---
 parquet/src/column/reader.rs         | 32 +++++++++------
 parquet/src/column/reader/decoder.rs | 75 +++++++++++++++++++++++++++---------
 2 files changed, 76 insertions(+), 31 deletions(-)

diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs
index 0bb6e0024..3434eba69 100644
--- a/parquet/src/column/reader.rs
+++ b/parquet/src/column/reader.rs
@@ -296,11 +296,11 @@ where
     ///
     /// Returns the number of records skipped
     pub fn skip_records(&mut self, num_records: usize) -> Result<usize> {
-        let mut remaining = num_records;
-        while remaining != 0 {
+        let mut remaining_records = num_records;
+        while remaining_records != 0 {
             if self.num_buffered_values == self.num_decoded_values {
                 let metadata = match self.page_reader.peek_next_page()? {
-                    None => return Ok(num_records - remaining),
+                    None => return Ok(num_records - remaining_records),
                     Some(metadata) => metadata,
                 };
 
@@ -312,29 +312,37 @@ where
 
                 // If page has less rows than the remaining records to
                 // be skipped, skip entire page
-                if metadata.num_rows <= remaining {
+                if metadata.num_rows <= remaining_records {
                     self.page_reader.skip_next_page()?;
-                    remaining -= metadata.num_rows;
+                    remaining_records -= metadata.num_rows;
                     continue;
                 };
                 // because self.num_buffered_values == self.num_decoded_values means
                 // we need reads a new page and set up the decoders for levels
                 if !self.read_new_page()? {
-                    return Ok(num_records - remaining);
+                    return Ok(num_records - remaining_records);
                 }
             }
 
             // start skip values in page level
-            let to_read = remaining
-                .min((self.num_buffered_values - self.num_decoded_values) as usize);
+
+            // The number of levels in the current data page
+            let buffered_levels =
+                (self.num_buffered_values - self.num_decoded_values) as usize;
 
             let (records_read, rep_levels_read) = match self.rep_level_decoder.as_mut() {
-                Some(decoder) => decoder.skip_rep_levels(to_read)?,
-                None => (to_read, to_read),
+                Some(decoder) => {
+                    decoder.skip_rep_levels(remaining_records, buffered_levels)?
+                }
+                None => {
+                    // No repetition levels, so each level corresponds to a row
+                    let levels = buffered_levels.min(remaining_records);
+                    (levels, levels)
+                }
             };
 
             self.num_decoded_values += rep_levels_read as u32;
-            remaining -= records_read;
+            remaining_records -= records_read;
 
             if self.num_buffered_values == self.num_decoded_values {
                 // Exhausted buffered page - no need to advance other decoders
@@ -364,7 +372,7 @@ where
                 ));
             }
         }
-        Ok(num_records - remaining)
+        Ok(num_records - remaining_records)
     }
 
     /// Read the next page as a dictionary page. If the next page is not a dictionary page,
diff --git a/parquet/src/column/reader/decoder.rs b/parquet/src/column/reader/decoder.rs
index f57b3e16d..3a6795c8c 100644
--- a/parquet/src/column/reader/decoder.rs
+++ b/parquet/src/column/reader/decoder.rs
@@ -82,11 +82,15 @@ pub trait ColumnLevelDecoder {
 }
 
 pub trait RepetitionLevelDecoder: ColumnLevelDecoder {
-    /// Skips over repetition level corresponding to `num_records` records, where a record
-    /// is delimited by a repetition level of 0
+    /// Skips over up to `num_levels` repetition levels corresponding to `num_records` records,
+    /// where a record is delimited by a repetition level of 0
     ///
     /// Returns the number of records skipped, and the number of levels skipped
-    fn skip_rep_levels(&mut self, num_records: usize) -> Result<(usize, usize)>;
+    fn skip_rep_levels(
+        &mut self,
+        num_records: usize,
+        num_levels: usize,
+    ) -> Result<(usize, usize)>;
 }
 
 pub trait DefinitionLevelDecoder: ColumnLevelDecoder {
@@ -395,22 +399,30 @@ impl DefinitionLevelDecoder for ColumnLevelDecoderImpl {
 }
 
 impl RepetitionLevelDecoder for ColumnLevelDecoderImpl {
-    fn skip_rep_levels(&mut self, num_records: usize) -> Result<(usize, usize)> {
+    fn skip_rep_levels(
+        &mut self,
+        num_records: usize,
+        num_levels: usize,
+    ) -> Result<(usize, usize)> {
         let mut level_skip = 0;
         let mut record_skip = 0;
 
-        loop {
+        while level_skip < num_levels {
+            let remaining_levels = num_levels - level_skip;
+
             if self.buffer.is_empty() {
-                // Read SKIP_BUFFER_SIZE as we don't know how many to read
-                self.read_to_buffer(SKIP_BUFFER_SIZE)?;
+                // Only read number of needed values
+                self.read_to_buffer(remaining_levels.min(SKIP_BUFFER_SIZE))?;
                 if self.buffer.is_empty() {
                     // Reached end of page
                     break;
                 }
             }
 
+            let max_skip = self.buffer.len().min(remaining_levels);
+
             let mut to_skip = 0;
-            while to_skip < self.buffer.len() && record_skip != num_records {
+            while to_skip < max_skip && record_skip != num_records {
                 if self.buffer[to_skip] == 0 {
                     record_skip += 1;
                 }
@@ -418,12 +430,12 @@ impl RepetitionLevelDecoder for ColumnLevelDecoderImpl {
             }
 
             // Find end of record
-            while to_skip < self.buffer.len() && self.buffer[to_skip] != 0 {
+            while to_skip < max_skip && self.buffer[to_skip] != 0 {
                 to_skip += 1;
             }
 
             level_skip += to_skip;
-            if to_skip >= self.buffer.len() {
+            if to_skip == self.buffer.len() {
                 // Need to to read more values
                 self.buffer.clear();
                 continue;
@@ -473,17 +485,39 @@ mod tests {
     }
 
     #[test]
-    fn test_skip() {
-        let mut rng = thread_rng();
-        let total_len = 10000;
-        let encoded: Vec<i16> = (0..total_len).map(|_| rng.gen_range(0..5)).collect();
-        let mut encoder = RleEncoder::new(3, 1024);
-        for v in &encoded {
-            encoder.put(*v as _)
-        }
+    fn test_skip_padding() {
+        let mut encoder = RleEncoder::new(1, 1024);
+        encoder.put(0);
+        (0..3).for_each(|_| encoder.put(1));
         let data = ByteBufferPtr::new(encoder.consume());
 
+        let mut decoder = ColumnLevelDecoderImpl::new(1);
+        decoder.set_data(Encoding::RLE, data.clone());
+        let (records, levels) = decoder.skip_rep_levels(100, 4).unwrap();
+        assert_eq!(records, 1);
+        assert_eq!(levels, 4);
+
+        // The length of the final bit packed run is ambiguous, so without the correct
+        // levels limit, it will decode zero padding
+        let mut decoder = ColumnLevelDecoderImpl::new(1);
+        decoder.set_data(Encoding::RLE, data);
+        let (records, levels) = decoder.skip_rep_levels(100, 6).unwrap();
+        assert_eq!(records, 3);
+        assert_eq!(levels, 6);
+    }
+
+    #[test]
+    fn test_skip() {
         for _ in 0..10 {
+            let mut rng = thread_rng();
+            let total_len = 10000_usize;
+            let encoded: Vec<i16> = (0..total_len).map(|_| rng.gen_range(0..5)).collect();
+            let mut encoder = RleEncoder::new(3, 1024);
+            for v in &encoded {
+                encoder.put(*v as _)
+            }
+            let data = ByteBufferPtr::new(encoder.consume());
+
             test_skip_levels(&encoded, data.clone(), |decoder, read, to_read| {
                 let (values_skipped, levels_skipped) =
                     decoder.skip_def_levels(to_read, 5).unwrap();
@@ -497,8 +531,11 @@ mod tests {
             });
 
             test_skip_levels(&encoded, data.clone(), |decoder, read, to_read| {
+                let remaining_levels = total_len - *read;
                 let (records_skipped, levels_skipped) =
-                    decoder.skip_rep_levels(to_read).unwrap();
+                    decoder.skip_rep_levels(to_read, remaining_levels).unwrap();
+
+                assert!(levels_skipped <= remaining_levels);
 
                 // If not run out of values
                 if levels_skipped + *read != encoded.len() {