You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/11/05 05:56:10 UTC

[GitHub] [arrow-rs] Ted-Jiang commented on a diff in pull request #2999: Support Predicate Pushdown for Parquet Lists (#2108)

Ted-Jiang commented on code in PR #2999:
URL: https://github.com/apache/arrow-rs/pull/2999#discussion_r1014578761


##########
parquet/src/column/reader/decoder.rs:
##########
@@ -323,41 +368,153 @@ impl DefinitionLevelDecoder for ColumnLevelDecoderImpl {
     ) -> Result<(usize, usize)> {
         let mut level_skip = 0;
         let mut value_skip = 0;
-        match self.decoder.as_mut().unwrap() {
-            LevelDecoderInner::Packed(reader, bit_width) => {
-                for _ in 0..num_levels {
-                    // Values are delimited by max_def_level
-                    if max_def_level
-                        == reader
-                            .get_value::<i16>(*bit_width as usize)
-                            .expect("Not enough values in Packed ColumnLevelDecoderImpl.")
-                    {
-                        value_skip += 1;
-                    }
-                    level_skip += 1;
-                }
-            }
-            LevelDecoderInner::Rle(reader) => {
-                for _ in 0..num_levels {
-                    if let Some(level) = reader
-                        .get::<i16>()
-                        .expect("Not enough values in Rle ColumnLevelDecoderImpl.")
-                    {
-                        // Values are delimited by max_def_level
-                        if level == max_def_level {
-                            value_skip += 1;
-                        }
-                    }
-                    level_skip += 1;
+        while level_skip < num_levels {
+            let remaining_levels = num_levels - level_skip;
+
+            if self.buffer.is_empty() {
+                // Only read number of needed values
+                self.read_to_buffer(remaining_levels.min(SKIP_BUFFER_SIZE))?;
+                if self.buffer.is_empty() {
+                    // Reached end of page
+                    break;
                 }
             }
+            let to_read = self.buffer.len().min(remaining_levels);
+
+            level_skip += to_read;
+            value_skip += self.buffer[..to_read]
+                .iter()
+                .filter(|x| **x == max_def_level)
+                .count();
+
+            self.split_off_buffer(to_read)
         }
+
         Ok((value_skip, level_skip))
     }
 }
 
 impl RepetitionLevelDecoder for ColumnLevelDecoderImpl {
-    fn skip_rep_levels(&mut self, _num_records: usize) -> Result<(usize, usize)> {
-        Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
+    fn skip_rep_levels(&mut self, num_records: usize) -> Result<(usize, usize)> {
+        let mut level_skip = 0;
+        let mut record_skip = 0;
+
+        loop {
+            if self.buffer.is_empty() {
+                // Read SKIP_BUFFER_SIZE as we don't know how many to read

Review Comment:
   👍



##########
parquet/src/column/reader/decoder.rs:
##########
@@ -323,41 +368,153 @@ impl DefinitionLevelDecoder for ColumnLevelDecoderImpl {
     ) -> Result<(usize, usize)> {
         let mut level_skip = 0;
         let mut value_skip = 0;
-        match self.decoder.as_mut().unwrap() {
-            LevelDecoderInner::Packed(reader, bit_width) => {
-                for _ in 0..num_levels {
-                    // Values are delimited by max_def_level
-                    if max_def_level
-                        == reader
-                            .get_value::<i16>(*bit_width as usize)
-                            .expect("Not enough values in Packed ColumnLevelDecoderImpl.")
-                    {
-                        value_skip += 1;
-                    }
-                    level_skip += 1;
-                }
-            }
-            LevelDecoderInner::Rle(reader) => {
-                for _ in 0..num_levels {
-                    if let Some(level) = reader
-                        .get::<i16>()
-                        .expect("Not enough values in Rle ColumnLevelDecoderImpl.")
-                    {
-                        // Values are delimited by max_def_level
-                        if level == max_def_level {
-                            value_skip += 1;
-                        }
-                    }
-                    level_skip += 1;
+        while level_skip < num_levels {
+            let remaining_levels = num_levels - level_skip;
+
+            if self.buffer.is_empty() {
+                // Only read number of needed values
+                self.read_to_buffer(remaining_levels.min(SKIP_BUFFER_SIZE))?;
+                if self.buffer.is_empty() {
+                    // Reached end of page
+                    break;
                 }
             }
+            let to_read = self.buffer.len().min(remaining_levels);
+
+            level_skip += to_read;
+            value_skip += self.buffer[..to_read]
+                .iter()
+                .filter(|x| **x == max_def_level)
+                .count();
+
+            self.split_off_buffer(to_read)
         }
+
         Ok((value_skip, level_skip))
     }
 }
 
 impl RepetitionLevelDecoder for ColumnLevelDecoderImpl {
-    fn skip_rep_levels(&mut self, _num_records: usize) -> Result<(usize, usize)> {
-        Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
+    fn skip_rep_levels(&mut self, num_records: usize) -> Result<(usize, usize)> {
+        let mut level_skip = 0;
+        let mut record_skip = 0;
+
+        loop {
+            if self.buffer.is_empty() {
+                // Read SKIP_BUFFER_SIZE as we don't know how many to read
+                self.read_to_buffer(SKIP_BUFFER_SIZE)?;
+                if self.buffer.is_empty() {
+                    // Reached end of page
+                    break;
+                }
+            }
+
+            let mut to_skip = 0;
+            while to_skip < self.buffer.len() && record_skip != num_records {
+                if self.buffer[to_skip] == 0 {
+                    record_skip += 1;
+                }
+                to_skip += 1;
+            }
+
+            // Find end of record
+            while to_skip < self.buffer.len() && self.buffer[to_skip] != 0 {
+                to_skip += 1;
+            }
+
+            level_skip += to_skip;
+            if to_skip >= self.buffer.len() {
+                // Need to to read more values
+                self.buffer.clear();
+                continue;
+            }
+
+            self.split_off_buffer(to_skip);
+            break;
+        }
+
+        Ok((record_skip, level_skip))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::encodings::rle::RleEncoder;
+    use rand::prelude::*;
+
+    fn test_skip_levels<F>(encoded: &[i16], data: ByteBufferPtr, skip: F)
+    where
+        F: Fn(&mut ColumnLevelDecoderImpl, &mut usize, usize),
+    {
+        let mut rng = thread_rng();
+        let mut decoder = ColumnLevelDecoderImpl::new(5);
+        decoder.set_data(Encoding::RLE, data);
+
+        let mut read = 0;
+        let mut decoded = vec![];
+        let mut expected = vec![];
+        while read < encoded.len() {

Review Comment:
   Nice fuzz tests !  👍



##########
parquet/src/column/reader/decoder.rs:
##########
@@ -305,12 +337,25 @@ impl ColumnLevelDecoder for ColumnLevelDecoderImpl {
         }
     }
 
-    fn read(&mut self, out: &mut Self::Slice, range: Range<usize>) -> Result<usize> {
+    fn read(&mut self, out: &mut Self::Slice, mut range: Range<usize>) -> Result<usize> {
+        let read_from_buffer = match self.buffer.is_empty() {

Review Comment:
   👍



##########
parquet/src/column/reader/decoder.rs:
##########
@@ -323,41 +368,153 @@ impl DefinitionLevelDecoder for ColumnLevelDecoderImpl {
     ) -> Result<(usize, usize)> {
         let mut level_skip = 0;
         let mut value_skip = 0;
-        match self.decoder.as_mut().unwrap() {
-            LevelDecoderInner::Packed(reader, bit_width) => {
-                for _ in 0..num_levels {
-                    // Values are delimited by max_def_level
-                    if max_def_level
-                        == reader
-                            .get_value::<i16>(*bit_width as usize)
-                            .expect("Not enough values in Packed ColumnLevelDecoderImpl.")
-                    {
-                        value_skip += 1;
-                    }
-                    level_skip += 1;
-                }
-            }
-            LevelDecoderInner::Rle(reader) => {
-                for _ in 0..num_levels {
-                    if let Some(level) = reader
-                        .get::<i16>()
-                        .expect("Not enough values in Rle ColumnLevelDecoderImpl.")
-                    {
-                        // Values are delimited by max_def_level
-                        if level == max_def_level {
-                            value_skip += 1;
-                        }
-                    }
-                    level_skip += 1;
+        while level_skip < num_levels {
+            let remaining_levels = num_levels - level_skip;
+
+            if self.buffer.is_empty() {
+                // Only read number of needed values
+                self.read_to_buffer(remaining_levels.min(SKIP_BUFFER_SIZE))?;
+                if self.buffer.is_empty() {
+                    // Reached end of page
+                    break;
                 }
             }
+            let to_read = self.buffer.len().min(remaining_levels);
+
+            level_skip += to_read;
+            value_skip += self.buffer[..to_read]
+                .iter()
+                .filter(|x| **x == max_def_level)
+                .count();
+
+            self.split_off_buffer(to_read)
         }
+
         Ok((value_skip, level_skip))
     }
 }
 
 impl RepetitionLevelDecoder for ColumnLevelDecoderImpl {
-    fn skip_rep_levels(&mut self, _num_records: usize) -> Result<(usize, usize)> {
-        Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
+    fn skip_rep_levels(&mut self, num_records: usize) -> Result<(usize, usize)> {
+        let mut level_skip = 0;
+        let mut record_skip = 0;
+
+        loop {
+            if self.buffer.is_empty() {
+                // Read SKIP_BUFFER_SIZE as we don't know how many to read
+                self.read_to_buffer(SKIP_BUFFER_SIZE)?;
+                if self.buffer.is_empty() {
+                    // Reached end of page
+                    break;
+                }
+            }
+
+            let mut to_skip = 0;
+            while to_skip < self.buffer.len() && record_skip != num_records {
+                if self.buffer[to_skip] == 0 {
+                    record_skip += 1;
+                }
+                to_skip += 1;
+            }
+
+            // Find end of record
+            while to_skip < self.buffer.len() && self.buffer[to_skip] != 0 {
+                to_skip += 1;

Review Comment:
   Nice check !



##########
parquet/src/column/reader/decoder.rs:
##########
@@ -323,41 +368,153 @@ impl DefinitionLevelDecoder for ColumnLevelDecoderImpl {
     ) -> Result<(usize, usize)> {
         let mut level_skip = 0;
         let mut value_skip = 0;
-        match self.decoder.as_mut().unwrap() {
-            LevelDecoderInner::Packed(reader, bit_width) => {
-                for _ in 0..num_levels {
-                    // Values are delimited by max_def_level
-                    if max_def_level
-                        == reader
-                            .get_value::<i16>(*bit_width as usize)
-                            .expect("Not enough values in Packed ColumnLevelDecoderImpl.")
-                    {
-                        value_skip += 1;
-                    }
-                    level_skip += 1;
-                }
-            }
-            LevelDecoderInner::Rle(reader) => {
-                for _ in 0..num_levels {
-                    if let Some(level) = reader
-                        .get::<i16>()
-                        .expect("Not enough values in Rle ColumnLevelDecoderImpl.")
-                    {
-                        // Values are delimited by max_def_level
-                        if level == max_def_level {
-                            value_skip += 1;
-                        }
-                    }
-                    level_skip += 1;
+        while level_skip < num_levels {
+            let remaining_levels = num_levels - level_skip;
+
+            if self.buffer.is_empty() {
+                // Only read number of needed values
+                self.read_to_buffer(remaining_levels.min(SKIP_BUFFER_SIZE))?;
+                if self.buffer.is_empty() {
+                    // Reached end of page
+                    break;
                 }
             }
+            let to_read = self.buffer.len().min(remaining_levels);
+
+            level_skip += to_read;
+            value_skip += self.buffer[..to_read]
+                .iter()
+                .filter(|x| **x == max_def_level)
+                .count();
+
+            self.split_off_buffer(to_read)
         }
+
         Ok((value_skip, level_skip))
     }
 }
 
 impl RepetitionLevelDecoder for ColumnLevelDecoderImpl {
-    fn skip_rep_levels(&mut self, _num_records: usize) -> Result<(usize, usize)> {
-        Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
+    fn skip_rep_levels(&mut self, num_records: usize) -> Result<(usize, usize)> {
+        let mut level_skip = 0;
+        let mut record_skip = 0;
+
+        loop {
+            if self.buffer.is_empty() {
+                // Read SKIP_BUFFER_SIZE as we don't know how many to read
+                self.read_to_buffer(SKIP_BUFFER_SIZE)?;
+                if self.buffer.is_empty() {
+                    // Reached end of page
+                    break;
+                }
+            }
+
+            let mut to_skip = 0;
+            while to_skip < self.buffer.len() && record_skip != num_records {
+                if self.buffer[to_skip] == 0 {
+                    record_skip += 1;
+                }
+                to_skip += 1;
+            }
+
+            // Find end of record
+            while to_skip < self.buffer.len() && self.buffer[to_skip] != 0 {
+                to_skip += 1;
+            }
+
+            level_skip += to_skip;
+            if to_skip >= self.buffer.len() {

Review Comment:
   I think here only need `==`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org