You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/01/12 17:18:51 UTC

[GitHub] [arrow-rs] tustvold commented on a change in pull request #1054: Improve parquet reading performance for columns with nulls by preserving bitmask when possible (#1037)

tustvold commented on a change in pull request #1054:
URL: https://github.com/apache/arrow-rs/pull/1054#discussion_r783285205



##########
File path: parquet/src/arrow/record_reader/definition_levels.rs
##########
@@ -91,10 +127,287 @@ impl DefinitionLevelBuffer {
         &self,
         range: Range<usize>,
     ) -> impl Iterator<Item = usize> + '_ {
-        let max_def_level = self.max_level;
-        let slice = self.buffer.as_slice();
-        range.rev().filter(move |x| slice[*x] == max_def_level)
+        assert_eq!(range.start, self.len);
+        iter_set_bits_rev(self.nulls().as_slice())
+    }
+
+    fn nulls(&self) -> &BooleanBufferBuilder {
+        match &self.inner {
+            BufferInner::Full { nulls, .. } => nulls,
+            BufferInner::Mask { nulls } => nulls,
+        }
     }
 }
 
-pub type DefinitionLevelDecoder = ColumnLevelDecoderImpl;
+impl LevelsBufferSlice for DefinitionLevelBuffer {
+    fn capacity(&self) -> usize {
+        usize::MAX
+    }
+
+    fn count_nulls(&self, range: Range<usize>, _max_level: i16) -> usize {
+        let total_count = range.end - range.start;
+        let range = range.start + self.len..range.end + self.len;
+        total_count - count_set_bits(self.nulls().as_slice(), range)
+    }
+}
+
+pub struct DefinitionLevelDecoder {
+    max_level: i16,
+    encoding: Encoding,
+    data: Option<ByteBufferPtr>,
+    column_decoder: Option<ColumnLevelDecoderImpl>,
+    packed_decoder: Option<PackedDecoder>,
+}
+
+impl ColumnLevelDecoder for DefinitionLevelDecoder {
+    type Slice = DefinitionLevelBuffer;
+
+    fn new(max_level: i16, encoding: Encoding, data: ByteBufferPtr) -> Self {
+        Self {
+            max_level,
+            encoding,
+            data: Some(data),
+            column_decoder: None,
+            packed_decoder: None,
+        }
+    }
+
+    fn read(
+        &mut self,
+        writer: &mut Self::Slice,
+        range: Range<usize>,
+    ) -> crate::errors::Result<usize> {
+        match &mut writer.inner {
+            BufferInner::Full {
+                levels,
+                nulls,
+                max_level,
+            } => {
+                assert_eq!(self.max_level, *max_level);
+                assert_eq!(range.start + writer.len, nulls.len());
+
+                let decoder = match self.data.take() {
+                    Some(data) => self.column_decoder.insert(
+                        ColumnLevelDecoderImpl::new(self.max_level, self.encoding, data),
+                    ),
+                    None => self
+                        .column_decoder
+                        .as_mut()
+                        .expect("consistent null_mask_only"),
+                };
+
+                levels.resize(range.end + writer.len);
+
+                let slice = &mut levels.as_slice_mut()[writer.len..];
+                let levels_read = decoder.read(slice, range.clone())?;
+
+                nulls.reserve(levels_read);
+                for i in &slice[range.start..range.start + levels_read] {
+                    nulls.append(i == max_level)
+                }
+
+                Ok(levels_read)
+            }
+            BufferInner::Mask { nulls } => {
+                assert_eq!(self.max_level, 1);
+                assert_eq!(range.start + writer.len, nulls.len());
+
+                let decoder = match self.data.take() {
+                    Some(data) => self
+                        .packed_decoder
+                        .insert(PackedDecoder::new(self.encoding, data)),
+                    None => self
+                        .packed_decoder
+                        .as_mut()
+                        .expect("consistent null_mask_only"),
+                };
+
+                decoder.read(nulls, range.end - range.start)
+            }
+        }
+    }
+}
+
+struct PackedDecoder {

Review comment:
       The short answer is not using that implementation is the major reason this PR represents a non-trivial speed bump, it can decode more optimally as it can decode directly using append_packed_range / append_n. Will add some comments clarifying




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org