You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/01/01 16:29:39 UTC

[GitHub] [arrow-rs] tustvold commented on a change in pull request #1041: Generify ColumnReaderImpl and RecordReader (#1040)

tustvold commented on a change in pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041#discussion_r777120783



##########
File path: parquet/src/column/reader/decoder.rs
##########
@@ -0,0 +1,215 @@
+use std::collections::HashMap;
+use std::ops::Range;
+
+use crate::basic::Encoding;
+use crate::data_type::DataType;
+use crate::decoding::{get_decoder, Decoder, DictDecoder, PlainDecoder};
+use crate::encodings::rle::RleDecoder;
+use crate::errors::{ParquetError, Result};
+use crate::memory::ByteBufferPtr;
+use crate::schema::types::ColumnDescPtr;
+use crate::util::bit_util::BitReader;
+
+/// A type that can have level data written to it by a [`ColumnLevelDecoder`]
+pub trait LevelsWriter {
+    fn capacity(&self) -> usize;
+
+    fn get(&self, idx: usize) -> i16;
+
+    fn count_nulls(&self, range: Range<usize>, max_level: i16) -> usize;
+}
+
+impl LevelsWriter for [i16] {
+    fn capacity(&self) -> usize {
+        self.len()
+    }
+
+    fn get(&self, idx: usize) -> i16 {
+        self[idx]
+    }
+
+    fn count_nulls(&self, range: Range<usize>, max_level: i16) -> usize {
+        self[range].iter().filter(|i| **i != max_level).count()
+    }
+}
+
+/// A type that can have value data written to it by a [`ColumnValueDecoder`]
+pub trait ValuesWriter {
+    fn capacity(&self) -> usize;
+}
+
+impl<T> ValuesWriter for [T] {
+    fn capacity(&self) -> usize {
+        self.len()
+    }
+}
+
+/// Decodes level data to a [`LevelsWriter`]
+pub trait ColumnLevelDecoder {
+    type Writer: LevelsWriter + ?Sized;
+
+    fn create(max_level: i16, encoding: Encoding, data: ByteBufferPtr) -> Self;
+
+    fn read(&mut self, out: &mut Self::Writer, range: Range<usize>) -> Result<usize>;
+}
+
+/// Decodes value data to a [`ValuesWriter`]
+pub trait ColumnValueDecoder {
+    type Writer: ValuesWriter + ?Sized;
+
+    fn create(col: &ColumnDescPtr) -> Self;
+
+    fn set_dict(
+        &mut self,
+        buf: ByteBufferPtr,
+        num_values: u32,
+        encoding: Encoding,
+        is_sorted: bool,
+    ) -> Result<()>;
+
+    fn set_data(
+        &mut self,
+        encoding: Encoding,
+        data: ByteBufferPtr,
+        num_values: usize,
+    ) -> Result<()>;
+
+    fn read(&mut self, out: &mut Self::Writer, range: Range<usize>) -> Result<usize>;
+}
+
+/// An implementation of [`ColumnValueDecoder`] for `[T::T]`
+pub struct ColumnValueDecoderImpl<T: DataType> {
+    descr: ColumnDescPtr,
+
+    current_encoding: Option<Encoding>,
+
+    // Cache of decoders for existing encodings
+    decoders: HashMap<Encoding, Box<dyn Decoder<T>>>,
+}
+
+impl<T: DataType> ColumnValueDecoder for ColumnValueDecoderImpl<T> {
+    type Writer = [T::T];
+
+    fn create(descr: &ColumnDescPtr) -> Self {
+        Self {
+            descr: descr.clone(),
+            current_encoding: None,
+            decoders: Default::default(),
+        }
+    }
+
+    fn set_dict(
+        &mut self,
+        buf: ByteBufferPtr,
+        num_values: u32,
+        mut encoding: Encoding,
+        _is_sorted: bool,
+    ) -> Result<()> {
+        if encoding == Encoding::PLAIN || encoding == Encoding::PLAIN_DICTIONARY {
+            encoding = Encoding::RLE_DICTIONARY
+        }
+
+        if self.decoders.contains_key(&encoding) {
+            return Err(general_err!("Column cannot have more than one dictionary"));
+        }
+
+        if encoding == Encoding::RLE_DICTIONARY {
+            let mut dictionary = PlainDecoder::<T>::new(self.descr.type_length());
+            dictionary.set_data(buf, num_values as usize)?;
+
+            let mut decoder = DictDecoder::new();
+            decoder.set_dict(Box::new(dictionary))?;
+            self.decoders.insert(encoding, Box::new(decoder));
+            Ok(())
+        } else {
+            Err(nyi_err!(
+                "Invalid/Unsupported encoding type for dictionary: {}",
+                encoding
+            ))
+        }
+    }
+
+    fn set_data(
+        &mut self,
+        mut encoding: Encoding,
+        data: ByteBufferPtr,
+        num_values: usize,
+    ) -> Result<()> {
+        if encoding == Encoding::PLAIN_DICTIONARY {
+            encoding = Encoding::RLE_DICTIONARY;
+        }
+
+        let decoder = if encoding == Encoding::RLE_DICTIONARY {
+            self.decoders
+                .get_mut(&encoding)
+                .expect("Decoder for dict should have been set")
+        } else {
+            // Search cache for data page decoder
+            #[allow(clippy::map_entry)]
+            if !self.decoders.contains_key(&encoding) {
+                // Initialize decoder for this page
+                let data_decoder = get_decoder::<T>(self.descr.clone(), encoding)?;
+                self.decoders.insert(encoding, data_decoder);
+            }
+            self.decoders.get_mut(&encoding).unwrap()
+        };
+
+        decoder.set_data(data, num_values)?;
+        self.current_encoding = Some(encoding);
+        Ok(())
+    }
+
+    fn read(&mut self, out: &mut Self::Writer, range: Range<usize>) -> Result<usize> {
+        let encoding = self
+            .current_encoding
+            .expect("current_encoding should be set");
+
+        let current_decoder = self
+            .decoders
+            .get_mut(&encoding)
+            .unwrap_or_else(|| panic!("decoder for encoding {} should be set", encoding));
+
+        current_decoder.get(&mut out[range])
+    }
+}
+
+/// An implementation of [`ColumnLevelDecoder`] for `[i16]`
+pub struct ColumnLevelDecoderImpl {
+    inner: LevelDecoderInner,

Review comment:
       This would require introducing some type representation of the encoding type. This would be a fair bit of additional code/complexity that I don't think would not lead to a meaningful performance uplift. Assuming `ColumnLevelDecoderImpl::read` is called with a reasonable batch size of ~1024, the overheads of a jump table are likely to be irrelevant.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org