You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/12/13 12:05:17 UTC

[GitHub] [arrow-rs] tustvold opened a new pull request #1041: Generify ColumnReaderImpl and RecordReader (#1040)

tustvold opened a new pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041


   **Highly experimental**
   
   This is highly experimental, I want to get further fleshing out #171 and #1037 before settling on this. In particular I want to get some numbers about performance. However, I wanted to give some visibility into what I'm doing 
   
   Builds on top of #1021 
   
   # Which issue does this PR close?
   
   Closes #1040.
   
   # Rationale for this change
    
   See ticket
   
   # What changes are included in this PR?
   
   See ticket
   
   # Are there any user-facing changes?
   
   No :grin: 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] yordan-pavlov commented on a change in pull request #1041: Generify ColumnReaderImpl and RecordReader (#1040)

Posted by GitBox <gi...@apache.org>.
yordan-pavlov commented on a change in pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041#discussion_r775284993



##########
File path: parquet/src/arrow/record_reader/definition_levels.rs
##########
@@ -0,0 +1,82 @@
+use arrow::array::BooleanBufferBuilder;
+use arrow::bitmap::Bitmap;
+use arrow::buffer::Buffer;
+use std::ops::Range;
+
+use crate::column::reader::decoder::ColumnLevelDecoderImpl;
+use crate::schema::types::ColumnDescPtr;
+
+use super::{
+    buffer::{RecordBuffer, TypedBuffer},
+    MIN_BATCH_SIZE,
+};
+
+pub struct DefinitionLevelBuffer {
+    buffer: TypedBuffer<i16>,
+    builder: BooleanBufferBuilder,
+    max_level: i16,
+}
+
+impl RecordBuffer for DefinitionLevelBuffer {
+    type Output = Buffer;
+    type Writer = [i16];
+
+    fn split(&mut self, len: usize) -> Self::Output {
+        self.buffer.split(len)
+    }
+
+    fn writer(&mut self, batch_size: usize) -> &mut Self::Writer {
+        assert_eq!(self.buffer.len(), self.builder.len());
+        self.buffer.writer(batch_size)
+    }
+
+    fn commit(&mut self, len: usize) {
+        self.buffer.commit(len);
+        let buf = self.buffer.as_slice();
+
+        let range = self.builder.len()..len;
+        self.builder.reserve(range.end - range.start);
+        for i in &buf[range] {
+            self.builder.append(*i == self.max_level)
+        }
+    }
+}
+
+impl DefinitionLevelBuffer {
+    pub fn new(desc: &ColumnDescPtr) -> Self {
+        Self {
+            buffer: TypedBuffer::new(),
+            builder: BooleanBufferBuilder::new(0),
+            max_level: desc.max_def_level(),
+        }
+    }
+
+    /// Split `len` levels out of `self`
+    pub fn split_bitmask(&mut self, len: usize) -> Bitmap {
+        let old_len = self.builder.len();
+        let num_left_values = old_len - len;
+        let new_bitmap_builder =
+            BooleanBufferBuilder::new(MIN_BATCH_SIZE.max(num_left_values));
+
+        let old_bitmap =
+            std::mem::replace(&mut self.builder, new_bitmap_builder).finish();
+        let old_bitmap = Bitmap::from(old_bitmap);
+
+        for i in len..old_len {
+            self.builder.append(old_bitmap.is_set(i));
+        }
+
+        old_bitmap
+    }
+
+    pub fn valid_position_iter(
+        &self,
+        range: Range<usize>,
+    ) -> impl Iterator<Item = usize> + '_ {
+        let max_def_level = self.max_level;
+        let slice = self.buffer.as_slice();
+        range.rev().filter(move |x| slice[*x] == max_def_level)

Review comment:
       it might be more efficient to calculate a boolean array for the null bitmap using `arrow::compute::eq_scalar` as used in `ArrowArrayReader` here https://github.com/apache/arrow-rs/blob/master/parquet/src/arrow/arrow_array_reader.rs#L570 , because it can use SIMD (if enabled)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] tustvold commented on a change in pull request #1041: Generify ColumnReaderImpl and RecordReader (#1040)

Posted by GitBox <gi...@apache.org>.
tustvold commented on a change in pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041#discussion_r778404924



##########
File path: parquet/src/arrow/record_reader/buffer.rs
##########
@@ -0,0 +1,191 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::marker::PhantomData;
+use std::ops::Range;
+
+use arrow::buffer::{Buffer, MutableBuffer};
+
+/// A buffer that supports writing new data to the end, and removing data from the front
+///
+/// Used by [RecordReader](`super::RecordReader`) to buffer up values before returning a
+/// potentially smaller number of values, corresponding to a whole number of semantic records
+pub trait BufferQueue: Sized {
+    type Output: Sized;
+
+    type Slice: ?Sized;
+
+    /// Split out the first `len` items
+    ///
+    /// # Panics
+    ///
+    /// Implementations must panic if `len` is beyond the length of [`BufferQueue`]
+    ///
+    fn split_off(&mut self, len: usize) -> Self::Output;
+
+    /// Returns a [`Self::Slice`] with at least `batch_size` capacity that can be used
+    /// to append data to the end of this [`BufferQueue`]
+    ///
+    /// NB: writes to the returned slice will not update the length of [`BufferQueue`]
+    /// instead a subsequent call should be made to [`BufferQueue::set_len`]
+    fn spare_capacity_mut(&mut self, batch_size: usize) -> &mut Self::Slice;
+
+    /// Sets the length of the [`BufferQueue`].
+    ///
+    /// Intended to be used in combination with [`BufferQueue::spare_capacity_mut`]
+    ///
+    /// # Panics
+    ///
+    /// Implementations must panic if `len` is beyond the initialized length
+    ///
+    /// Implementations may panic if `set_len` is called with less than what has been written
+    ///
+    /// This distinction is to allow for implementations that return a default initialized
+    /// [BufferQueue::Slice`] which doesn't track capacity and length separately
+    ///
+    /// For example, [`TypedBuffer<T>`] returns a default-initialized `&mut [T]`, and does not
+    /// track how much of this slice is actually written to by the caller. This is still
+    /// safe as the slice is default-initialized.
+    ///
+    fn set_len(&mut self, len: usize);
+}
+
+/// A typed buffer similar to [`Vec<T>`] but using [`MutableBuffer`] for storage
+pub struct TypedBuffer<T> {
+    buffer: MutableBuffer,
+
+    /// Length in elements of size T
+    len: usize,
+
+    /// Placeholder to allow `T` as an invariant generic parameter
+    _phantom: PhantomData<*mut T>,
+}
+
+impl<T> Default for TypedBuffer<T> {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl<T> TypedBuffer<T> {
+    pub fn new() -> Self {
+        Self {
+            buffer: MutableBuffer::new(0),
+            len: 0,
+            _phantom: Default::default(),
+        }
+    }
+
+    pub fn len(&self) -> usize {
+        self.len
+    }
+
+    pub fn is_empty(&self) -> bool {
+        self.len == 0
+    }
+
+    #[inline]
+    pub fn as_slice(&self) -> &[T] {
+        let (prefix, buf, suffix) = unsafe { self.buffer.as_slice().align_to::<T>() };
+        assert!(prefix.is_empty() && suffix.is_empty());
+        buf
+    }
+
+    #[inline]
+    pub fn as_slice_mut(&mut self) -> &mut [T] {
+        let (prefix, buf, suffix) =
+            unsafe { self.buffer.as_slice_mut().align_to_mut::<T>() };
+        assert!(prefix.is_empty() && suffix.is_empty());
+        buf
+    }
+}
+
+impl<T> BufferQueue for TypedBuffer<T> {
+    type Output = Buffer;
+
+    type Slice = [T];
+
+    fn split_off(&mut self, len: usize) -> Self::Output {
+        assert!(len <= self.len);
+
+        let num_bytes = len * std::mem::size_of::<T>();
+        let remaining_bytes = self.buffer.len() - num_bytes;
+        // TODO: Optimize to reduce the copy
+        // create an empty buffer, as it will be resized below
+        let mut remaining = MutableBuffer::new(0);
+        remaining.resize(remaining_bytes, 0);
+
+        let new_records = remaining.as_slice_mut();
+
+        new_records[0..remaining_bytes]
+            .copy_from_slice(&self.buffer.as_slice()[num_bytes..]);
+
+        self.buffer.resize(num_bytes, 0);
+        self.len -= len;
+
+        std::mem::replace(&mut self.buffer, remaining).into()
+    }
+
+    fn spare_capacity_mut(&mut self, batch_size: usize) -> &mut Self::Slice {
+        self.buffer
+            .resize((self.len + batch_size) * std::mem::size_of::<T>(), 0);

Review comment:
       Sadly this is not possible with `MutableBuffer` the second parameter is u8. IMO `MutableBuffer` is pretty unfortunate and should really be typed based on what it contains, but changing this would be a major breaking change to a lot of arrow...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] yordan-pavlov commented on a change in pull request #1041: Generify ColumnReaderImpl and RecordReader (#1040)

Posted by GitBox <gi...@apache.org>.
yordan-pavlov commented on a change in pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041#discussion_r774811305



##########
File path: parquet/src/column/reader/decoder.rs
##########
@@ -0,0 +1,215 @@
+use std::collections::HashMap;
+use std::ops::Range;
+
+use crate::basic::Encoding;
+use crate::data_type::DataType;
+use crate::decoding::{get_decoder, Decoder, DictDecoder, PlainDecoder};
+use crate::encodings::rle::RleDecoder;
+use crate::errors::{ParquetError, Result};
+use crate::memory::ByteBufferPtr;
+use crate::schema::types::ColumnDescPtr;
+use crate::util::bit_util::BitReader;
+
+/// A type that can have level data written to it by a [`ColumnLevelDecoder`]
+pub trait LevelsWriter {
+    fn capacity(&self) -> usize;
+
+    fn get(&self, idx: usize) -> i16;

Review comment:
       I would expect a level writer to have a `write` method rather than a `get` method (it's a level writer, not a level getter)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] yordan-pavlov commented on a change in pull request #1041: Generify ColumnReaderImpl and RecordReader (#1040)

Posted by GitBox <gi...@apache.org>.
yordan-pavlov commented on a change in pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041#discussion_r774814520



##########
File path: parquet/src/column/reader/decoder.rs
##########
@@ -0,0 +1,215 @@
+use std::collections::HashMap;
+use std::ops::Range;
+
+use crate::basic::Encoding;
+use crate::data_type::DataType;
+use crate::decoding::{get_decoder, Decoder, DictDecoder, PlainDecoder};
+use crate::encodings::rle::RleDecoder;
+use crate::errors::{ParquetError, Result};
+use crate::memory::ByteBufferPtr;
+use crate::schema::types::ColumnDescPtr;
+use crate::util::bit_util::BitReader;
+
+/// A type that can have level data written to it by a [`ColumnLevelDecoder`]
+pub trait LevelsWriter {
+    fn capacity(&self) -> usize;
+
+    fn get(&self, idx: usize) -> i16;
+
+    fn count_nulls(&self, range: Range<usize>, max_level: i16) -> usize;
+}
+
+impl LevelsWriter for [i16] {
+    fn capacity(&self) -> usize {
+        self.len()
+    }
+
+    fn get(&self, idx: usize) -> i16 {
+        self[idx]
+    }
+
+    fn count_nulls(&self, range: Range<usize>, max_level: i16) -> usize {
+        self[range].iter().filter(|i| **i != max_level).count()
+    }
+}
+
+/// A type that can have value data written to it by a [`ColumnValueDecoder`]
+pub trait ValuesWriter {
+    fn capacity(&self) -> usize;
+}
+
+impl<T> ValuesWriter for [T] {
+    fn capacity(&self) -> usize {
+        self.len()
+    }
+}
+
+/// Decodes level data to a [`LevelsWriter`]
+pub trait ColumnLevelDecoder {
+    type Writer: LevelsWriter + ?Sized;
+
+    fn create(max_level: i16, encoding: Encoding, data: ByteBufferPtr) -> Self;
+
+    fn read(&mut self, out: &mut Self::Writer, range: Range<usize>) -> Result<usize>;
+}
+
+/// Decodes value data to a [`ValuesWriter`]
+pub trait ColumnValueDecoder {
+    type Writer: ValuesWriter + ?Sized;
+
+    fn create(col: &ColumnDescPtr) -> Self;
+
+    fn set_dict(
+        &mut self,
+        buf: ByteBufferPtr,
+        num_values: u32,
+        encoding: Encoding,
+        is_sorted: bool,
+    ) -> Result<()>;
+
+    fn set_data(
+        &mut self,
+        encoding: Encoding,
+        data: ByteBufferPtr,
+        num_values: usize,
+    ) -> Result<()>;
+
+    fn read(&mut self, out: &mut Self::Writer, range: Range<usize>) -> Result<usize>;
+}
+
+/// An implementation of [`ColumnValueDecoder`] for `[T::T]`
+pub struct ColumnValueDecoderImpl<T: DataType> {
+    descr: ColumnDescPtr,
+
+    current_encoding: Option<Encoding>,
+
+    // Cache of decoders for existing encodings
+    decoders: HashMap<Encoding, Box<dyn Decoder<T>>>,
+}
+
+impl<T: DataType> ColumnValueDecoder for ColumnValueDecoderImpl<T> {
+    type Writer = [T::T];
+
+    fn create(descr: &ColumnDescPtr) -> Self {
+        Self {
+            descr: descr.clone(),
+            current_encoding: None,
+            decoders: Default::default(),
+        }
+    }
+
+    fn set_dict(
+        &mut self,
+        buf: ByteBufferPtr,
+        num_values: u32,
+        mut encoding: Encoding,
+        _is_sorted: bool,
+    ) -> Result<()> {
+        if encoding == Encoding::PLAIN || encoding == Encoding::PLAIN_DICTIONARY {
+            encoding = Encoding::RLE_DICTIONARY
+        }
+
+        if self.decoders.contains_key(&encoding) {
+            return Err(general_err!("Column cannot have more than one dictionary"));
+        }
+
+        if encoding == Encoding::RLE_DICTIONARY {
+            let mut dictionary = PlainDecoder::<T>::new(self.descr.type_length());
+            dictionary.set_data(buf, num_values as usize)?;
+
+            let mut decoder = DictDecoder::new();
+            decoder.set_dict(Box::new(dictionary))?;
+            self.decoders.insert(encoding, Box::new(decoder));
+            Ok(())
+        } else {
+            Err(nyi_err!(
+                "Invalid/Unsupported encoding type for dictionary: {}",
+                encoding
+            ))
+        }
+    }
+
+    fn set_data(
+        &mut self,
+        mut encoding: Encoding,
+        data: ByteBufferPtr,
+        num_values: usize,
+    ) -> Result<()> {
+        if encoding == Encoding::PLAIN_DICTIONARY {
+            encoding = Encoding::RLE_DICTIONARY;
+        }
+
+        let decoder = if encoding == Encoding::RLE_DICTIONARY {
+            self.decoders
+                .get_mut(&encoding)
+                .expect("Decoder for dict should have been set")
+        } else {
+            // Search cache for data page decoder
+            #[allow(clippy::map_entry)]
+            if !self.decoders.contains_key(&encoding) {
+                // Initialize decoder for this page
+                let data_decoder = get_decoder::<T>(self.descr.clone(), encoding)?;
+                self.decoders.insert(encoding, data_decoder);
+            }
+            self.decoders.get_mut(&encoding).unwrap()
+        };
+
+        decoder.set_data(data, num_values)?;
+        self.current_encoding = Some(encoding);
+        Ok(())
+    }
+
+    fn read(&mut self, out: &mut Self::Writer, range: Range<usize>) -> Result<usize> {
+        let encoding = self
+            .current_encoding
+            .expect("current_encoding should be set");
+
+        let current_decoder = self

Review comment:
       why not set a `current_decoder` field in the `set_data` method (where the decoder has to be selected anyway to call `set_data` on it), so that it doesn't have to be looked up on every call of `read` here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] tustvold commented on a change in pull request #1041: Generify ColumnReaderImpl and RecordReader (#1040)

Posted by GitBox <gi...@apache.org>.
tustvold commented on a change in pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041#discussion_r767696543



##########
File path: parquet/src/arrow/record_reader.rs
##########
@@ -16,75 +16,248 @@
 // under the License.
 
 use std::cmp::{max, min};
-use std::mem::{replace, size_of};
-
-use crate::column::{page::PageReader, reader::ColumnReaderImpl};
+use std::marker::PhantomData;
+use std::mem::replace;
+use std::ops::Range;
+
+use crate::arrow::record_reader::private::{
+    DefinitionLevels, RecordBuffer, RepetitionLevels,
+};
+use crate::column::{
+    page::PageReader,
+    reader::{
+        private::{
+            ColumnLevelDecoder, ColumnLevelDecoderImpl, ColumnValueDecoder,
+            ColumnValueDecoderImpl,
+        },
+        GenericColumnReader,
+    },
+};
 use crate::data_type::DataType;
-use crate::errors::{ParquetError, Result};
+use crate::errors::Result;
 use crate::schema::types::ColumnDescPtr;
 use arrow::array::BooleanBufferBuilder;
 use arrow::bitmap::Bitmap;
 use arrow::buffer::{Buffer, MutableBuffer};
 
+pub(crate) mod private {

Review comment:
       This is effectively my workaround for #1032 - these traits and accompanying types should not be part of the public API to allow us to iterate on them




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] alamb commented on a change in pull request #1041: Generify ColumnReaderImpl and RecordReader (#1040)

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041#discussion_r778775604



##########
File path: parquet/src/column/reader/decoder.rs
##########
@@ -0,0 +1,253 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::ops::Range;
+
+use crate::basic::Encoding;
+use crate::data_type::DataType;
+use crate::decoding::{get_decoder, Decoder, DictDecoder, PlainDecoder};
+use crate::encodings::rle::RleDecoder;
+use crate::errors::{ParquetError, Result};
+use crate::memory::ByteBufferPtr;
+use crate::schema::types::ColumnDescPtr;
+use crate::util::bit_util::BitReader;
+
+/// A slice of levels buffer data that is written to by a [`ColumnLevelDecoder`]
+pub trait LevelsBufferSlice {

Review comment:
       Ah -- got it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] tustvold commented on a change in pull request #1041: Generify ColumnReaderImpl and RecordReader (#1040)

Posted by GitBox <gi...@apache.org>.
tustvold commented on a change in pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041#discussion_r782230579



##########
File path: parquet/src/data_type.rs
##########
@@ -1033,21 +1032,6 @@ pub(crate) mod private {
             self
         }
     }
-
-    /// A marker trait for [`DataType`] with a [scalar] physical type

Review comment:
       This was added in #1155 but unfortunately didn't work as anticipated because of the lack of `Int16Type` which is needed for decoding levels data




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] codecov-commenter edited a comment on pull request #1041: Generify ColumnReaderImpl and RecordReader (#1040)

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041#issuecomment-992420848


   # [Codecov](https://codecov.io/gh/apache/arrow-rs/pull/1041?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#1041](https://codecov.io/gh/apache/arrow-rs/pull/1041?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (6c50632) into [master](https://codecov.io/gh/apache/arrow-rs/commit/07660c61680220ac54b7bf4c42a64c840872cc43?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (07660c6) will **decrease** coverage by `0.02%`.
   > The diff coverage is `81.16%`.
   
   > :exclamation: Current head 6c50632 differs from pull request most recent head dec899c. Consider uploading reports for the commit dec899c to get more accurate results
   [![Impacted file tree graph](https://codecov.io/gh/apache/arrow-rs/pull/1041/graphs/tree.svg?width=650&height=150&src=pr&token=pq9V9qWZ1N&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/arrow-rs/pull/1041?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #1041      +/-   ##
   ==========================================
   - Coverage   82.30%   82.27%   -0.03%     
   ==========================================
     Files         168      171       +3     
     Lines       49026    49051      +25     
   ==========================================
   + Hits        40351    40358       +7     
   - Misses       8675     8693      +18     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow-rs/pull/1041?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [parquet/src/column/reader.rs](https://codecov.io/gh/apache/arrow-rs/pull/1041/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvY29sdW1uL3JlYWRlci5ycw==) | `69.81% <75.00%> (-2.53%)` | :arrow_down: |
   | [parquet/src/column/reader/decoder.rs](https://codecov.io/gh/apache/arrow-rs/pull/1041/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvY29sdW1uL3JlYWRlci9kZWNvZGVyLnJz) | `76.31% <76.31%> (ø)` | |
   | [parquet/src/arrow/record\_reader/buffer.rs](https://codecov.io/gh/apache/arrow-rs/pull/1041/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvYXJyb3cvcmVjb3JkX3JlYWRlci9idWZmZXIucnM=) | `86.00% <86.00%> (ø)` | |
   | [parquet/src/arrow/record\_reader.rs](https://codecov.io/gh/apache/arrow-rs/pull/1041/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvYXJyb3cvcmVjb3JkX3JlYWRlci5ycw==) | `95.18% <87.87%> (+2.40%)` | :arrow_up: |
   | [...rquet/src/arrow/record\_reader/definition\_levels.rs](https://codecov.io/gh/apache/arrow-rs/pull/1041/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvYXJyb3cvcmVjb3JkX3JlYWRlci9kZWZpbml0aW9uX2xldmVscy5ycw==) | `88.88% <88.88%> (ø)` | |
   | [parquet/src/arrow/array\_reader.rs](https://codecov.io/gh/apache/arrow-rs/pull/1041/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvYXJyb3cvYXJyYXlfcmVhZGVyLnJz) | `76.72% <93.75%> (-0.03%)` | :arrow_down: |
   | [parquet/src/util/memory.rs](https://codecov.io/gh/apache/arrow-rs/pull/1041/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvdXRpbC9tZW1vcnkucnM=) | `91.12% <100.00%> (+0.08%)` | :arrow_up: |
   | [arrow/src/datatypes/datatype.rs](https://codecov.io/gh/apache/arrow-rs/pull/1041/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YXJyb3cvc3JjL2RhdGF0eXBlcy9kYXRhdHlwZS5ycw==) | `65.95% <0.00%> (-0.43%)` | :arrow_down: |
   | [parquet/src/data\_type.rs](https://codecov.io/gh/apache/arrow-rs/pull/1041/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvZGF0YV90eXBlLnJz) | `76.61% <0.00%> (-0.24%)` | :arrow_down: |
   | [parquet\_derive/src/parquet\_field.rs](https://codecov.io/gh/apache/arrow-rs/pull/1041/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldF9kZXJpdmUvc3JjL3BhcnF1ZXRfZmllbGQucnM=) | `66.21% <0.00%> (-0.23%)` | :arrow_down: |
   | ... and [2 more](https://codecov.io/gh/apache/arrow-rs/pull/1041/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow-rs/pull/1041?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow-rs/pull/1041?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [07660c6...dec899c](https://codecov.io/gh/apache/arrow-rs/pull/1041?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] tustvold commented on pull request #1041: Generify ColumnReaderImpl and RecordReader (#1040)

Posted by GitBox <gi...@apache.org>.
tustvold commented on pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041#issuecomment-993816497


   Running benchmarks on my local machine I get somewhat erratic results, from which I conclude this has no major impact on performance
   
   ```
   arrow_array_reader/read Int32Array, plain encoded, mandatory, no NULLs - old                                                                             
                           time:   [3.7939 us 3.8031 us 3.8114 us]
                           change: [-3.6579% -3.4154% -3.1951%] (p = 0.00 < 0.05)
                           Performance has improved.
   arrow_array_reader/read Int32Array, plain encoded, mandatory, no NULLs - new                                                                             
                           time:   [2.3030 us 2.3048 us 2.3073 us]
                           change: [+2.5908% +2.7441% +2.9142%] (p = 0.00 < 0.05)
                           Performance has regressed.
   arrow_array_reader/read Int32Array, plain encoded, optional, no NULLs - old                                                                            
                           time:   [59.193 us 59.275 us 59.363 us]
                           change: [-4.2623% -4.1285% -4.0009%] (p = 0.00 < 0.05)
                           Performance has improved.
   arrow_array_reader/read Int32Array, plain encoded, optional, no NULLs - new                                                                             
                           time:   [23.209 us 23.221 us 23.236 us]
                           change: [+32.531% +32.663% +32.835%] (p = 0.00 < 0.05)
                           Performance has regressed.
   arrow_array_reader/read Int32Array, plain encoded, optional, half NULLs - old                                                                            
                           time:   [142.37 us 142.41 us 142.44 us]
                           change: [+5.5942% +6.6789% +7.7376%] (p = 0.00 < 0.05)
                           Performance has regressed.
   arrow_array_reader/read Int32Array, plain encoded, optional, half NULLs - new                                                                            
                           time:   [139.07 us 139.89 us 140.59 us]
                           change: [+0.4422% +0.9960% +1.6028%] (p = 0.00 < 0.05)
                           Change within noise threshold.
   arrow_array_reader/read Int32Array, dictionary encoded, mandatory, no NULLs - old                                                                             
                           time:   [21.919 us 21.923 us 21.927 us]
                           change: [+1.3392% +1.7681% +2.0113%] (p = 0.00 < 0.05)
                           Performance has regressed.
   arrow_array_reader/read Int32Array, dictionary encoded, mandatory, no NULLs - new                                                                            
                           time:   [99.347 us 101.00 us 102.37 us]
                           change: [+5.5715% +6.7636% +8.2107%] (p = 0.00 < 0.05)
                           Performance has regressed.
   arrow_array_reader/read Int32Array, dictionary encoded, optional, no NULLs - old                                                                            
                           time:   [75.648 us 75.663 us 75.681 us]
                           change: [-1.5816% -1.5384% -1.4963%] (p = 0.00 < 0.05)
                           Performance has improved.
   arrow_array_reader/read Int32Array, dictionary encoded, optional, no NULLs - new                                                                            
                           time:   [112.52 us 113.33 us 114.36 us]
                           change: [+5.2751% +7.2166% +9.0108%] (p = 0.00 < 0.05)
                           Performance has regressed.
   arrow_array_reader/read Int32Array, dictionary encoded, optional, half NULLs - old                                                                            
                           time:   [144.77 us 144.80 us 144.83 us]
                           change: [-11.013% -10.318% -9.6258%] (p = 0.00 < 0.05)
                           Performance has improved.
   arrow_array_reader/read Int32Array, dictionary encoded, optional, half NULLs - new                                                                            
                           time:   [191.06 us 191.12 us 191.18 us]
                           change: [+3.4773% +3.5370% +3.5957%] (p = 0.00 < 0.05)
                           Performance has regressed.
   arrow_array_reader/read StringArray, plain encoded, mandatory, no NULLs - old                                                                            
                           time:   [800.06 us 800.19 us 800.32 us]
                           change: [-1.6826% -1.6388% -1.5967%] (p = 0.00 < 0.05)
                           Performance has improved.
   arrow_array_reader/read StringArray, plain encoded, mandatory, no NULLs - new                                                                            
                           time:   [124.84 us 124.86 us 124.88 us]
                           change: [+4.1077% +4.1575% +4.2088%] (p = 0.00 < 0.05)
                           Performance has regressed.
   arrow_array_reader/read StringArray, plain encoded, optional, no NULLs - old                                                                            
                           time:   [846.35 us 846.59 us 846.87 us]
                           change: [+0.8637% +0.9228% +0.9834%] (p = 0.00 < 0.05)
                           Change within noise threshold.
   arrow_array_reader/read StringArray, plain encoded, optional, no NULLs - new                                                                            
                           time:   [143.25 us 143.30 us 143.35 us]
                           change: [+2.6977% +2.7794% +2.8847%] (p = 0.00 < 0.05)
                           Performance has regressed.
   arrow_array_reader/read StringArray, plain encoded, optional, half NULLs - old                                                                            
                           time:   [773.74 us 776.61 us 779.87 us]
                           change: [+3.2218% +3.4681% +3.7063%] (p = 0.00 < 0.05)
                           Performance has regressed.
   arrow_array_reader/read StringArray, plain encoded, optional, half NULLs - new                                                                            
                           time:   [264.22 us 264.80 us 265.57 us]
                           change: [-1.3401% -1.1712% -0.9903%] (p = 0.00 < 0.05)
                           Change within noise threshold.
   arrow_array_reader/read StringArray, dictionary encoded, mandatory, no NULLs - old                                                                            
                           time:   [726.17 us 726.74 us 727.44 us]
                           change: [+1.2812% +1.3725% +1.4618%] (p = 0.00 < 0.05)
                           Performance has regressed.
   arrow_array_reader/read StringArray, dictionary encoded, mandatory, no NULLs - new                                                                            
                           time:   [116.83 us 116.91 us 116.99 us]
                           change: [-3.2217% -3.0893% -2.9282%] (p = 0.00 < 0.05)
                           Performance has improved.
   arrow_array_reader/read StringArray, dictionary encoded, optional, no NULLs - old                                                                            
                           time:   [802.16 us 803.89 us 805.57 us]
                           change: [-0.4055% -0.2549% -0.1073%] (p = 0.00 < 0.05)
                           Change within noise threshold.
   arrow_array_reader/read StringArray, dictionary encoded, optional, no NULLs - new                                                                            
                           time:   [134.39 us 134.43 us 134.48 us]
                           change: [+0.0304% +0.2086% +0.3678%] (p = 0.02 < 0.05)
                           Change within noise threshold.
   arrow_array_reader/read StringArray, dictionary encoded, optional, half NULLs - old                                                                            
                           time:   [742.00 us 742.57 us 743.00 us]
                           change: [+3.4464% +3.6453% +3.8440%] (p = 0.00 < 0.05)
                           Performance has regressed.
   arrow_array_reader/read StringArray, dictionary encoded, optional, half NULLs - new                                                                            
                           time:   [236.67 us 237.14 us 238.07 us]
                           change: [+1.7094% +1.9629% +2.5264%] (p = 0.00 < 0.05)
                           Performance has regressed.
   ```
   
   What is strange to me is that this seems to have a consistent ~5% impact on the new `StringArrayReader` despite this change touching none of the code used by it. I suspect we're in the weeds of the wims of LLVM, which I'm not really sure it makes sense to optimise for at this stage - there's a lot of lower hanging fruit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] yordan-pavlov commented on a change in pull request #1041: Generify ColumnReaderImpl and RecordReader (#1040)

Posted by GitBox <gi...@apache.org>.
yordan-pavlov commented on a change in pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041#discussion_r775164526



##########
File path: parquet/src/arrow/record_reader/definition_levels.rs
##########
@@ -0,0 +1,82 @@
+use arrow::array::BooleanBufferBuilder;
+use arrow::bitmap::Bitmap;
+use arrow::buffer::Buffer;
+use std::ops::Range;
+
+use crate::column::reader::decoder::ColumnLevelDecoderImpl;
+use crate::schema::types::ColumnDescPtr;
+
+use super::{
+    buffer::{RecordBuffer, TypedBuffer},
+    MIN_BATCH_SIZE,
+};
+
+pub struct DefinitionLevelBuffer {
+    buffer: TypedBuffer<i16>,
+    builder: BooleanBufferBuilder,
+    max_level: i16,
+}
+
+impl RecordBuffer for DefinitionLevelBuffer {
+    type Output = Buffer;
+    type Writer = [i16];
+
+    fn split(&mut self, len: usize) -> Self::Output {
+        self.buffer.split(len)
+    }
+
+    fn writer(&mut self, batch_size: usize) -> &mut Self::Writer {

Review comment:
       I wonder if `BufferSlice` would be a more fitting name to what this method does, e.g.:
   ```
   fn writable_buffer_slice(&mut self, batch_size: usize) -> &mut Self::BufferSlice {
   // return writable buffer slice
   }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] tustvold commented on a change in pull request #1041: Generify ColumnReaderImpl and RecordReader (#1040)

Posted by GitBox <gi...@apache.org>.
tustvold commented on a change in pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041#discussion_r777130081



##########
File path: parquet/src/arrow/record_reader/definition_levels.rs
##########
@@ -0,0 +1,82 @@
+use arrow::array::BooleanBufferBuilder;
+use arrow::bitmap::Bitmap;
+use arrow::buffer::Buffer;
+use std::ops::Range;
+
+use crate::column::reader::decoder::ColumnLevelDecoderImpl;
+use crate::schema::types::ColumnDescPtr;
+
+use super::{
+    buffer::{RecordBuffer, TypedBuffer},
+    MIN_BATCH_SIZE,
+};
+
+pub struct DefinitionLevelBuffer {
+    buffer: TypedBuffer<i16>,
+    builder: BooleanBufferBuilder,
+    max_level: i16,
+}
+
+impl RecordBuffer for DefinitionLevelBuffer {
+    type Output = Buffer;
+    type Writer = [i16];
+
+    fn split(&mut self, len: usize) -> Self::Output {
+        self.buffer.split(len)
+    }
+
+    fn writer(&mut self, batch_size: usize) -> &mut Self::Writer {
+        assert_eq!(self.buffer.len(), self.builder.len());
+        self.buffer.writer(batch_size)
+    }
+
+    fn commit(&mut self, len: usize) {
+        self.buffer.commit(len);
+        let buf = self.buffer.as_slice();
+
+        let range = self.builder.len()..len;
+        self.builder.reserve(range.end - range.start);
+        for i in &buf[range] {
+            self.builder.append(*i == self.max_level)
+        }
+    }
+}
+
+impl DefinitionLevelBuffer {
+    pub fn new(desc: &ColumnDescPtr) -> Self {
+        Self {
+            buffer: TypedBuffer::new(),
+            builder: BooleanBufferBuilder::new(0),
+            max_level: desc.max_def_level(),
+        }
+    }
+
+    /// Split `len` levels out of `self`
+    pub fn split_bitmask(&mut self, len: usize) -> Bitmap {
+        let old_len = self.builder.len();
+        let num_left_values = old_len - len;
+        let new_bitmap_builder =
+            BooleanBufferBuilder::new(MIN_BATCH_SIZE.max(num_left_values));
+
+        let old_bitmap =
+            std::mem::replace(&mut self.builder, new_bitmap_builder).finish();
+        let old_bitmap = Bitmap::from(old_bitmap);
+
+        for i in len..old_len {
+            self.builder.append(old_bitmap.is_set(i));
+        }
+
+        old_bitmap
+    }
+
+    pub fn valid_position_iter(
+        &self,
+        range: Range<usize>,
+    ) -> impl Iterator<Item = usize> + '_ {
+        let max_def_level = self.max_level;
+        let slice = self.buffer.as_slice();
+        range.rev().filter(move |x| slice[*x] == max_def_level)

Review comment:
       Currently BooleanBufferBuilder doesn't have a story for appending other BooleanBuffers - #1039 adds this but I'd rather not make this PR depend on it.
   
   Additionally the cost of the memory allocation and copy may outweigh the gains from SIMD.
   
   Given this I'm going to leave this as is, especially as #1054 will remove this code from the decode path for files without nested nullability.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] tustvold commented on a change in pull request #1041: Generify ColumnReaderImpl and RecordReader (#1040)

Posted by GitBox <gi...@apache.org>.
tustvold commented on a change in pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041#discussion_r777130938



##########
File path: parquet/src/column/reader/decoder.rs
##########
@@ -0,0 +1,215 @@
+use std::collections::HashMap;
+use std::ops::Range;
+
+use crate::basic::Encoding;
+use crate::data_type::DataType;
+use crate::decoding::{get_decoder, Decoder, DictDecoder, PlainDecoder};
+use crate::encodings::rle::RleDecoder;
+use crate::errors::{ParquetError, Result};
+use crate::memory::ByteBufferPtr;
+use crate::schema::types::ColumnDescPtr;
+use crate::util::bit_util::BitReader;
+
+/// A type that can have level data written to it by a [`ColumnLevelDecoder`]
+pub trait LevelsWriter {
+    fn capacity(&self) -> usize;
+
+    fn get(&self, idx: usize) -> i16;

Review comment:
       This method is no longer needed - removed :+1:

##########
File path: parquet/src/column/reader/decoder.rs
##########
@@ -0,0 +1,215 @@
+use std::collections::HashMap;
+use std::ops::Range;
+
+use crate::basic::Encoding;
+use crate::data_type::DataType;
+use crate::decoding::{get_decoder, Decoder, DictDecoder, PlainDecoder};
+use crate::encodings::rle::RleDecoder;
+use crate::errors::{ParquetError, Result};
+use crate::memory::ByteBufferPtr;
+use crate::schema::types::ColumnDescPtr;
+use crate::util::bit_util::BitReader;
+
+/// A type that can have level data written to it by a [`ColumnLevelDecoder`]
+pub trait LevelsWriter {
+    fn capacity(&self) -> usize;
+
+    fn get(&self, idx: usize) -> i16;
+
+    fn count_nulls(&self, range: Range<usize>, max_level: i16) -> usize;
+}
+
+impl LevelsWriter for [i16] {
+    fn capacity(&self) -> usize {
+        self.len()
+    }
+
+    fn get(&self, idx: usize) -> i16 {
+        self[idx]
+    }
+
+    fn count_nulls(&self, range: Range<usize>, max_level: i16) -> usize {
+        self[range].iter().filter(|i| **i != max_level).count()
+    }
+}
+
+/// A type that can have value data written to it by a [`ColumnValueDecoder`]
+pub trait ValuesWriter {
+    fn capacity(&self) -> usize;
+}
+
+impl<T> ValuesWriter for [T] {
+    fn capacity(&self) -> usize {
+        self.len()
+    }
+}
+
+/// Decodes level data to a [`LevelsWriter`]
+pub trait ColumnLevelDecoder {
+    type Writer: LevelsWriter + ?Sized;
+
+    fn create(max_level: i16, encoding: Encoding, data: ByteBufferPtr) -> Self;

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] tustvold edited a comment on pull request #1041: Generify ColumnReaderImpl and RecordReader (#1040)

Posted by GitBox <gi...@apache.org>.
tustvold edited a comment on pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041#issuecomment-1003609340






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] alamb commented on a change in pull request #1041: Generify ColumnReaderImpl and RecordReader (#1040)

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041#discussion_r778776658



##########
File path: parquet/src/arrow/record_reader/buffer.rs
##########
@@ -0,0 +1,196 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::marker::PhantomData;
+use std::ops::Range;
+
+use arrow::buffer::{Buffer, MutableBuffer};
+
+/// A buffer that supports writing new data to the end, and removing data from the front
+///
+/// Used by [RecordReader](`super::RecordReader`) to buffer up values before returning a
+/// potentially smaller number of values, corresponding to a whole number of semantic records
+pub trait BufferQueue: Sized {
+    type Output: Sized;
+
+    type Slice: ?Sized;
+
+    /// Split out the first `len` items
+    ///
+    /// # Panics
+    ///
+    /// Implementations must panic if `len` is beyond the length of [`BufferQueue`]
+    ///
+    fn split_off(&mut self, len: usize) -> Self::Output;
+
+    /// Returns a [`Self::Slice`] with at least `batch_size` capacity that can be used
+    /// to append data to the end of this [`BufferQueue`]
+    ///
+    /// NB: writes to the returned slice will not update the length of [`BufferQueue`]
+    /// instead a subsequent call should be made to [`BufferQueue::set_len`]
+    fn spare_capacity_mut(&mut self, batch_size: usize) -> &mut Self::Slice;
+
+    /// Sets the length of the [`BufferQueue`].
+    ///
+    /// Intended to be used in combination with [`BufferQueue::spare_capacity_mut`]
+    ///
+    /// # Panics
+    ///
+    /// Implementations must panic if `len` is beyond the initialized length
+    ///
+    /// Implementations may panic if `set_len` is called with less than what has been written
+    ///
+    /// This distinction is to allow for implementations that return a default initialized
+    /// [BufferQueue::Slice`] which doesn't track capacity and length separately
+    ///
+    /// For example, [`TypedBuffer<T>`] returns a default-initialized `&mut [T]`, and does not
+    /// track how much of this slice is actually written to by the caller. This is still
+    /// safe as the slice is default-initialized.
+    ///
+    fn set_len(&mut self, len: usize);
+}
+
+/// A typed buffer similar to [`Vec<T>`] but using [`MutableBuffer`] for storage
+pub struct TypedBuffer<T> {
+    buffer: MutableBuffer,
+
+    /// Length in elements of size T
+    len: usize,
+
+    /// Placeholder to allow `T` as an invariant generic parameter
+    _phantom: PhantomData<*mut T>,
+}
+
+impl<T> Default for TypedBuffer<T> {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl<T> TypedBuffer<T> {
+    pub fn new() -> Self {
+        Self {
+            buffer: MutableBuffer::new(0),
+            len: 0,
+            _phantom: Default::default(),
+        }
+    }
+
+    pub fn len(&self) -> usize {
+        self.len
+    }
+
+    pub fn is_empty(&self) -> bool {
+        self.len == 0
+    }
+
+    #[inline]
+    pub fn as_slice(&self) -> &[T] {
+        let (prefix, buf, suffix) = unsafe { self.buffer.as_slice().align_to::<T>() };

Review comment:
       Maybe we could at least document it (or mark it as `unsafe` to force the callsites to acknowledge they aren't using `bool`)?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] codecov-commenter edited a comment on pull request #1041: Generify ColumnReaderImpl and RecordReader (#1040)

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041#issuecomment-992420848


   # [Codecov](https://codecov.io/gh/apache/arrow-rs/pull/1041?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#1041](https://codecov.io/gh/apache/arrow-rs/pull/1041?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (28228b2) into [master](https://codecov.io/gh/apache/arrow-rs/commit/07660c61680220ac54b7bf4c42a64c840872cc43?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (07660c6) will **increase** coverage by `0.01%`.
   > The diff coverage is `81.32%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/arrow-rs/pull/1041/graphs/tree.svg?width=650&height=150&src=pr&token=pq9V9qWZ1N&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/arrow-rs/pull/1041?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #1041      +/-   ##
   ==========================================
   + Coverage   82.30%   82.31%   +0.01%     
   ==========================================
     Files         168      172       +4     
     Lines       49026    50082    +1056     
   ==========================================
   + Hits        40351    41227     +876     
   - Misses       8675     8855     +180     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow-rs/pull/1041?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [parquet/src/arrow/array\_reader.rs](https://codecov.io/gh/apache/arrow-rs/pull/1041/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvYXJyb3cvYXJyYXlfcmVhZGVyLnJz) | `76.72% <ø> (-0.03%)` | :arrow_down: |
   | [parquet/src/column/reader.rs](https://codecov.io/gh/apache/arrow-rs/pull/1041/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvY29sdW1uL3JlYWRlci5ycw==) | `69.88% <75.94%> (-2.45%)` | :arrow_down: |
   | [parquet/src/column/reader/decoder.rs](https://codecov.io/gh/apache/arrow-rs/pull/1041/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvY29sdW1uL3JlYWRlci9kZWNvZGVyLnJz) | `76.27% <76.27%> (ø)` | |
   | [parquet/src/arrow/record\_reader/buffer.rs](https://codecov.io/gh/apache/arrow-rs/pull/1041/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvYXJyb3cvcmVjb3JkX3JlYWRlci9idWZmZXIucnM=) | `85.10% <85.10%> (ø)` | |
   | [parquet/src/arrow/record\_reader.rs](https://codecov.io/gh/apache/arrow-rs/pull/1041/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvYXJyb3cvcmVjb3JkX3JlYWRlci5ycw==) | `94.00% <87.17%> (+1.23%)` | :arrow_up: |
   | [...rquet/src/arrow/record\_reader/definition\_levels.rs](https://codecov.io/gh/apache/arrow-rs/pull/1041/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvYXJyb3cvcmVjb3JkX3JlYWRlci9kZWZpbml0aW9uX2xldmVscy5ycw==) | `90.32% <90.32%> (ø)` | |
   | [parquet/src/util/memory.rs](https://codecov.io/gh/apache/arrow-rs/pull/1041/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvdXRpbC9tZW1vcnkucnM=) | `91.12% <100.00%> (+0.08%)` | :arrow_up: |
   | [arrow/src/datatypes/native.rs](https://codecov.io/gh/apache/arrow-rs/pull/1041/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YXJyb3cvc3JjL2RhdGF0eXBlcy9uYXRpdmUucnM=) | `66.66% <0.00%> (-6.25%)` | :arrow_down: |
   | [arrow/src/compute/kernels/comparison.rs](https://codecov.io/gh/apache/arrow-rs/pull/1041/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YXJyb3cvc3JjL2NvbXB1dGUva2VybmVscy9jb21wYXJpc29uLnJz) | `89.75% <0.00%> (-3.48%)` | :arrow_down: |
   | [arrow/src/csv/reader.rs](https://codecov.io/gh/apache/arrow-rs/pull/1041/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YXJyb3cvc3JjL2Nzdi9yZWFkZXIucnM=) | `88.10% <0.00%> (-2.48%)` | :arrow_down: |
   | ... and [37 more](https://codecov.io/gh/apache/arrow-rs/pull/1041/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow-rs/pull/1041?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow-rs/pull/1041?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [07660c6...28228b2](https://codecov.io/gh/apache/arrow-rs/pull/1041?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] tustvold commented on a change in pull request #1041: Generify ColumnReaderImpl and RecordReader (#1040)

Posted by GitBox <gi...@apache.org>.
tustvold commented on a change in pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041#discussion_r777120783



##########
File path: parquet/src/column/reader/decoder.rs
##########
@@ -0,0 +1,215 @@
+use std::collections::HashMap;
+use std::ops::Range;
+
+use crate::basic::Encoding;
+use crate::data_type::DataType;
+use crate::decoding::{get_decoder, Decoder, DictDecoder, PlainDecoder};
+use crate::encodings::rle::RleDecoder;
+use crate::errors::{ParquetError, Result};
+use crate::memory::ByteBufferPtr;
+use crate::schema::types::ColumnDescPtr;
+use crate::util::bit_util::BitReader;
+
+/// A type that can have level data written to it by a [`ColumnLevelDecoder`]
+pub trait LevelsWriter {
+    fn capacity(&self) -> usize;
+
+    fn get(&self, idx: usize) -> i16;
+
+    fn count_nulls(&self, range: Range<usize>, max_level: i16) -> usize;
+}
+
+impl LevelsWriter for [i16] {
+    fn capacity(&self) -> usize {
+        self.len()
+    }
+
+    fn get(&self, idx: usize) -> i16 {
+        self[idx]
+    }
+
+    fn count_nulls(&self, range: Range<usize>, max_level: i16) -> usize {
+        self[range].iter().filter(|i| **i != max_level).count()
+    }
+}
+
+/// A type that can have value data written to it by a [`ColumnValueDecoder`]
+pub trait ValuesWriter {
+    fn capacity(&self) -> usize;
+}
+
+impl<T> ValuesWriter for [T] {
+    fn capacity(&self) -> usize {
+        self.len()
+    }
+}
+
+/// Decodes level data to a [`LevelsWriter`]
+pub trait ColumnLevelDecoder {
+    type Writer: LevelsWriter + ?Sized;
+
+    fn create(max_level: i16, encoding: Encoding, data: ByteBufferPtr) -> Self;
+
+    fn read(&mut self, out: &mut Self::Writer, range: Range<usize>) -> Result<usize>;
+}
+
+/// Decodes value data to a [`ValuesWriter`]
+pub trait ColumnValueDecoder {
+    type Writer: ValuesWriter + ?Sized;
+
+    fn create(col: &ColumnDescPtr) -> Self;
+
+    fn set_dict(
+        &mut self,
+        buf: ByteBufferPtr,
+        num_values: u32,
+        encoding: Encoding,
+        is_sorted: bool,
+    ) -> Result<()>;
+
+    fn set_data(
+        &mut self,
+        encoding: Encoding,
+        data: ByteBufferPtr,
+        num_values: usize,
+    ) -> Result<()>;
+
+    fn read(&mut self, out: &mut Self::Writer, range: Range<usize>) -> Result<usize>;
+}
+
+/// An implementation of [`ColumnValueDecoder`] for `[T::T]`
+pub struct ColumnValueDecoderImpl<T: DataType> {
+    descr: ColumnDescPtr,
+
+    current_encoding: Option<Encoding>,
+
+    // Cache of decoders for existing encodings
+    decoders: HashMap<Encoding, Box<dyn Decoder<T>>>,
+}
+
+impl<T: DataType> ColumnValueDecoder for ColumnValueDecoderImpl<T> {
+    type Writer = [T::T];
+
+    fn create(descr: &ColumnDescPtr) -> Self {
+        Self {
+            descr: descr.clone(),
+            current_encoding: None,
+            decoders: Default::default(),
+        }
+    }
+
+    fn set_dict(
+        &mut self,
+        buf: ByteBufferPtr,
+        num_values: u32,
+        mut encoding: Encoding,
+        _is_sorted: bool,
+    ) -> Result<()> {
+        if encoding == Encoding::PLAIN || encoding == Encoding::PLAIN_DICTIONARY {
+            encoding = Encoding::RLE_DICTIONARY
+        }
+
+        if self.decoders.contains_key(&encoding) {
+            return Err(general_err!("Column cannot have more than one dictionary"));
+        }
+
+        if encoding == Encoding::RLE_DICTIONARY {
+            let mut dictionary = PlainDecoder::<T>::new(self.descr.type_length());
+            dictionary.set_data(buf, num_values as usize)?;
+
+            let mut decoder = DictDecoder::new();
+            decoder.set_dict(Box::new(dictionary))?;
+            self.decoders.insert(encoding, Box::new(decoder));
+            Ok(())
+        } else {
+            Err(nyi_err!(
+                "Invalid/Unsupported encoding type for dictionary: {}",
+                encoding
+            ))
+        }
+    }
+
+    fn set_data(
+        &mut self,
+        mut encoding: Encoding,
+        data: ByteBufferPtr,
+        num_values: usize,
+    ) -> Result<()> {
+        if encoding == Encoding::PLAIN_DICTIONARY {
+            encoding = Encoding::RLE_DICTIONARY;
+        }
+
+        let decoder = if encoding == Encoding::RLE_DICTIONARY {
+            self.decoders
+                .get_mut(&encoding)
+                .expect("Decoder for dict should have been set")
+        } else {
+            // Search cache for data page decoder
+            #[allow(clippy::map_entry)]
+            if !self.decoders.contains_key(&encoding) {
+                // Initialize decoder for this page
+                let data_decoder = get_decoder::<T>(self.descr.clone(), encoding)?;
+                self.decoders.insert(encoding, data_decoder);
+            }
+            self.decoders.get_mut(&encoding).unwrap()
+        };
+
+        decoder.set_data(data, num_values)?;
+        self.current_encoding = Some(encoding);
+        Ok(())
+    }
+
+    fn read(&mut self, out: &mut Self::Writer, range: Range<usize>) -> Result<usize> {
+        let encoding = self
+            .current_encoding
+            .expect("current_encoding should be set");
+
+        let current_decoder = self
+            .decoders
+            .get_mut(&encoding)
+            .unwrap_or_else(|| panic!("decoder for encoding {} should be set", encoding));
+
+        current_decoder.get(&mut out[range])
+    }
+}
+
+/// An implementation of [`ColumnLevelDecoder`] for `[i16]`
+pub struct ColumnLevelDecoderImpl {
+    inner: LevelDecoderInner,

Review comment:
       This would require introducing some type representation of the encoding type. This would be a fair bit of additional code/complexity that I don't think would not lead to a meaningful performance uplift. Assuming `ColumnLevelDecoderImpl::read` is called with a reasonable batch size of ~1024, the overheads of a jump table are likely to be irrelevant.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] tustvold edited a comment on pull request #1041: Generify ColumnReaderImpl and RecordReader (#1040)

Posted by GitBox <gi...@apache.org>.
tustvold edited a comment on pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041#issuecomment-1003609340


   I've renamed a number of the methods and traits based on the great feedback, and also added a load of doc comments. In particular I took inspiration from std::Vec, in particular [Vec::spare_capacity_mut](https://doc.rust-lang.org/std/vec/struct.Vec.html#method.spare_capacity_mut) and [Vec::set_len](https://doc.rust-lang.org/std/vec/struct.Vec.html#method.set_len) which is effectively an unsafe version of what is going on here.
   
   I'm happy that this interface is sufficiently flexible for the optimisations I have in mind, many of which I've already got draft PR with initial cuts of, and so I'm marking this ready for review.
   
   I am aware this is a relatively complex change, to an already complex part of the codebase so if anything isn't clear please let me know. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] tustvold commented on a change in pull request #1041: Generify ColumnReaderImpl and RecordReader (#1040)

Posted by GitBox <gi...@apache.org>.
tustvold commented on a change in pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041#discussion_r777134116



##########
File path: parquet/src/column/reader.rs
##########
@@ -392,38 +419,6 @@ impl<T: DataType> ColumnReaderImpl<T> {
         Ok(true)
     }
 
-    /// Resolves and updates encoding and set decoder for the current page

Review comment:
       This logic is also moved into ColumnValueDecoder




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] tustvold commented on a change in pull request #1041: Generify ColumnReaderImpl and RecordReader (#1040)

Posted by GitBox <gi...@apache.org>.
tustvold commented on a change in pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041#discussion_r776332615



##########
File path: parquet/src/arrow/record_reader/definition_levels.rs
##########
@@ -0,0 +1,82 @@
+use arrow::array::BooleanBufferBuilder;
+use arrow::bitmap::Bitmap;
+use arrow::buffer::Buffer;
+use std::ops::Range;
+
+use crate::column::reader::decoder::ColumnLevelDecoderImpl;
+use crate::schema::types::ColumnDescPtr;
+
+use super::{
+    buffer::{RecordBuffer, TypedBuffer},
+    MIN_BATCH_SIZE,
+};
+
+pub struct DefinitionLevelBuffer {
+    buffer: TypedBuffer<i16>,
+    builder: BooleanBufferBuilder,
+    max_level: i16,
+}
+
+impl RecordBuffer for DefinitionLevelBuffer {
+    type Output = Buffer;
+    type Writer = [i16];
+
+    fn split(&mut self, len: usize) -> Self::Output {
+        self.buffer.split(len)
+    }
+
+    fn writer(&mut self, batch_size: usize) -> &mut Self::Writer {

Review comment:
       Yeah, I'm not a fan of this method's name. It gets confusing because the optimized implementations in #1054 and #1082 don't use this method to yield a slice at all, I need to think more on this




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] alamb commented on pull request #1041: Generify ColumnReaderImpl and RecordReader (#1040)

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041#issuecomment-1005180113


   cc @nevi-me  @sunchao and @jorgecarleitao 
   
   Please let us know if anyone else is interested in reviewing this PR. If not I'll plan to merge it in soon


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] alamb commented on a change in pull request #1041: Generify ColumnReaderImpl and RecordReader (#1040)

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041#discussion_r782244587



##########
File path: parquet/src/data_type.rs
##########
@@ -1033,21 +1032,6 @@ pub(crate) mod private {
             self
         }
     }
-
-    /// A marker trait for [`DataType`] with a [scalar] physical type

Review comment:
       Can we `impl ScalarDataType for i16`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] alamb merged pull request #1041: Generify ColumnReaderImpl and RecordReader (#1040)

Posted by GitBox <gi...@apache.org>.
alamb merged pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] yordan-pavlov commented on a change in pull request #1041: Generify ColumnReaderImpl and RecordReader (#1040)

Posted by GitBox <gi...@apache.org>.
yordan-pavlov commented on a change in pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041#discussion_r775285402



##########
File path: parquet/src/arrow/record_reader/buffer.rs
##########
@@ -0,0 +1,137 @@
+use std::marker::PhantomData;
+use std::ops::Range;
+
+use arrow::buffer::{Buffer, MutableBuffer};
+
+pub trait RecordBuffer: Sized {
+    type Output: Sized;
+
+    type Writer: ?Sized;
+
+    /// Split out `len` items
+    fn split(&mut self, len: usize) -> Self::Output;
+
+    /// Get a writer with `batch_size` capacity
+    fn writer(&mut self, batch_size: usize) -> &mut Self::Writer;
+
+    /// Record a write of `len` items
+    fn commit(&mut self, len: usize);
+}
+
+pub struct TypedBuffer<T> {
+    buffer: MutableBuffer,
+
+    /// Length in elements of size T
+    len: usize,
+
+    /// Placeholder to allow `T` as an invariant generic parameter
+    _phantom: PhantomData<*mut T>,
+}
+
+impl<T> Default for TypedBuffer<T> {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl<T> TypedBuffer<T> {
+    pub fn new() -> Self {
+        Self {
+            buffer: MutableBuffer::new(0),
+            len: 0,
+            _phantom: Default::default(),
+        }
+    }
+
+    pub fn len(&self) -> usize {
+        self.len
+    }
+
+    pub fn is_empty(&self) -> bool {
+        self.len == 0
+    }
+
+    #[inline]
+    pub fn as_slice(&self) -> &[T] {
+        let (prefix, buf, suffix) = unsafe { self.buffer.as_slice().align_to::<T>() };
+        assert!(prefix.is_empty() && suffix.is_empty());
+        buf
+    }
+
+    #[inline]
+    pub fn as_slice_mut(&mut self) -> &mut [T] {
+        let (prefix, buf, suffix) =
+            unsafe { self.buffer.as_slice_mut().align_to_mut::<T>() };
+        assert!(prefix.is_empty() && suffix.is_empty());
+        buf
+    }
+}
+
+impl<T> RecordBuffer for TypedBuffer<T> {
+    type Output = Buffer;
+
+    type Writer = [T];
+
+    fn split(&mut self, len: usize) -> Self::Output {
+        assert!(len <= self.len);
+
+        let num_bytes = len * std::mem::size_of::<T>();
+        let remaining_bytes = self.buffer.len() - num_bytes;
+        // TODO: Optimize to reduce the copy
+        // create an empty buffer, as it will be resized below
+        let mut remaining = MutableBuffer::new(0);
+        remaining.resize(remaining_bytes, 0);
+
+        let new_records = remaining.as_slice_mut();
+
+        new_records[0..remaining_bytes]
+            .copy_from_slice(&self.buffer.as_slice()[num_bytes..]);
+
+        self.buffer.resize(num_bytes, 0);
+        self.len -= len;
+
+        std::mem::replace(&mut self.buffer, remaining).into()
+    }
+
+    fn writer(&mut self, batch_size: usize) -> &mut Self::Writer {
+        self.buffer
+            .resize((self.len + batch_size) * std::mem::size_of::<T>(), 0);
+
+        let range = self.len..self.len + batch_size;
+        &mut self.as_slice_mut()[range]
+    }
+
+    fn commit(&mut self, len: usize) {
+        self.len = len;
+
+        let new_bytes = self.len * std::mem::size_of::<T>();
+        assert!(new_bytes <= self.buffer.len());
+        self.buffer.resize(new_bytes, 0);
+    }
+}
+
+pub trait ValueBuffer {
+    fn pad_nulls(
+        &mut self,
+        range: Range<usize>,
+        rev_position_iter: impl Iterator<Item = usize>,
+    );
+}
+
+impl<T> ValueBuffer for TypedBuffer<T> {
+    fn pad_nulls(
+        &mut self,
+        range: Range<usize>,
+        rev_position_iter: impl Iterator<Item = usize>,
+    ) {
+        let slice = self.as_slice_mut();
+
+        for (value_pos, level_pos) in range.rev().zip(rev_position_iter) {

Review comment:
       it might be more efficient to insert null values using `arrow::compute::SlicesIterator` as used in `ArrowArrayReader` here https://github.com/apache/arrow-rs/blob/master/parquet/src/arrow/arrow_array_reader.rs#L606 , since it works with sequences rather than single values




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] tustvold commented on a change in pull request #1041: Generify ColumnReaderImpl and RecordReader (#1040)

Posted by GitBox <gi...@apache.org>.
tustvold commented on a change in pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041#discussion_r778412799



##########
File path: parquet/src/column/reader.rs
##########
@@ -440,63 +435,29 @@ impl<T: DataType> ColumnReaderImpl<T> {
             Ok(true)
         }
     }
+}
 
-    #[inline]
-    fn read_rep_levels(&mut self, buffer: &mut [i16]) -> Result<usize> {
-        let level_decoder = self
-            .rep_level_decoder
-            .as_mut()
-            .expect("rep_level_decoder be set");
-        level_decoder.get(buffer)
-    }
-
-    #[inline]
-    fn read_def_levels(&mut self, buffer: &mut [i16]) -> Result<usize> {
-        let level_decoder = self
-            .def_level_decoder
-            .as_mut()
-            .expect("def_level_decoder be set");
-        level_decoder.get(buffer)
-    }
-
-    #[inline]
-    fn read_values(&mut self, buffer: &mut [T::T]) -> Result<usize> {
-        let encoding = self
-            .current_encoding
-            .expect("current_encoding should be set");
-        let current_decoder = self
-            .decoders
-            .get_mut(&encoding)
-            .unwrap_or_else(|| panic!("decoder for encoding {} should be set", encoding));
-        current_decoder.get(buffer)
-    }
-
-    #[inline]
-    fn configure_dictionary(&mut self, page: Page) -> Result<bool> {
-        let mut encoding = page.encoding();
-        if encoding == Encoding::PLAIN || encoding == Encoding::PLAIN_DICTIONARY {
-            encoding = Encoding::RLE_DICTIONARY
-        }
-
-        if self.decoders.contains_key(&encoding) {
-            return Err(general_err!("Column cannot have more than one dictionary"));
+fn parse_v1_level(
+    max_level: i16,
+    num_buffered_values: u32,
+    encoding: Encoding,
+    buf: ByteBufferPtr,
+) -> Result<ByteBufferPtr> {

Review comment:
       The short answer is because I found the interface for LevelDecoder incredibly confusing, and this isn't actually interested in the decoder, just working out how many bytes of level data there are...
   
   I can change if you feel strongly




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] jorgecarleitao commented on a change in pull request #1041: Generify ColumnReaderImpl and RecordReader (#1040)

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041#discussion_r778438011



##########
File path: parquet/src/arrow/record_reader/buffer.rs
##########
@@ -0,0 +1,196 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::marker::PhantomData;
+use std::ops::Range;
+
+use arrow::buffer::{Buffer, MutableBuffer};
+
+/// A buffer that supports writing new data to the end, and removing data from the front
+///
+/// Used by [RecordReader](`super::RecordReader`) to buffer up values before returning a
+/// potentially smaller number of values, corresponding to a whole number of semantic records
+pub trait BufferQueue: Sized {
+    type Output: Sized;
+
+    type Slice: ?Sized;
+
+    /// Split out the first `len` items
+    ///
+    /// # Panics
+    ///
+    /// Implementations must panic if `len` is beyond the length of [`BufferQueue`]
+    ///
+    fn split_off(&mut self, len: usize) -> Self::Output;
+
+    /// Returns a [`Self::Slice`] with at least `batch_size` capacity that can be used
+    /// to append data to the end of this [`BufferQueue`]
+    ///
+    /// NB: writes to the returned slice will not update the length of [`BufferQueue`]
+    /// instead a subsequent call should be made to [`BufferQueue::set_len`]
+    fn spare_capacity_mut(&mut self, batch_size: usize) -> &mut Self::Slice;
+
+    /// Sets the length of the [`BufferQueue`].
+    ///
+    /// Intended to be used in combination with [`BufferQueue::spare_capacity_mut`]
+    ///
+    /// # Panics
+    ///
+    /// Implementations must panic if `len` is beyond the initialized length
+    ///
+    /// Implementations may panic if `set_len` is called with less than what has been written
+    ///
+    /// This distinction is to allow for implementations that return a default initialized
+    /// [BufferQueue::Slice`] which doesn't track capacity and length separately
+    ///
+    /// For example, [`TypedBuffer<T>`] returns a default-initialized `&mut [T]`, and does not
+    /// track how much of this slice is actually written to by the caller. This is still
+    /// safe as the slice is default-initialized.
+    ///
+    fn set_len(&mut self, len: usize);
+}
+
+/// A typed buffer similar to [`Vec<T>`] but using [`MutableBuffer`] for storage
+pub struct TypedBuffer<T> {
+    buffer: MutableBuffer,
+
+    /// Length in elements of size T
+    len: usize,
+
+    /// Placeholder to allow `T` as an invariant generic parameter
+    _phantom: PhantomData<*mut T>,
+}
+
+impl<T> Default for TypedBuffer<T> {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl<T> TypedBuffer<T> {
+    pub fn new() -> Self {
+        Self {
+            buffer: MutableBuffer::new(0),
+            len: 0,
+            _phantom: Default::default(),
+        }
+    }
+
+    pub fn len(&self) -> usize {
+        self.len
+    }
+
+    pub fn is_empty(&self) -> bool {
+        self.len == 0
+    }
+
+    #[inline]
+    pub fn as_slice(&self) -> &[T] {
+        let (prefix, buf, suffix) = unsafe { self.buffer.as_slice().align_to::<T>() };

Review comment:
       Thanks @alamb for the ping. I haven't look into this PR semantics in detail because I am not familiar with this code base.
   
   I think that this line is sound iff `T: plain old data` (in the sense that they fulfill the invariants of [Pod](https://docs.rs/bytemuck/latest/bytemuck/trait.Pod.html)).
   
   However, `bool`, which is not Pod, [implements `ParquetValueType`](https://github.com/apache/arrow-rs/blob/master/parquet/src/data_type.rs#L652), and we pass `T: DataType::T` to TypedBuffer [here](https://github.com/apache/arrow-rs/pull/1041/files#diff-4eaa09af06edfa304fdfc1042342e7b203414904a98b48b0a11461c21643ffbeR45).
   
   Note that like `bool`, `Int96` [contains `Option<[u32; 3]>`](https://github.com/apache/arrow-rs/blob/master/parquet/src/data_type.rs#L41) which is also not plain old data, and also [implements `ParquetValueType`](https://github.com/apache/arrow-rs/blob/master/parquet/src/data_type.rs#L796).
   
   Maybe restrict `T` to `TypedBuffer<T: PrimitiveType>` or something, so that we do not allow non-plain old data types to be passed here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] tustvold commented on a change in pull request #1041: Generify ColumnReaderImpl and RecordReader (#1040)

Posted by GitBox <gi...@apache.org>.
tustvold commented on a change in pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041#discussion_r777134097



##########
File path: parquet/src/column/reader.rs
##########
@@ -440,63 +435,29 @@ impl<T: DataType> ColumnReaderImpl<T> {
             Ok(true)
         }
     }
+}
 
-    #[inline]
-    fn read_rep_levels(&mut self, buffer: &mut [i16]) -> Result<usize> {
-        let level_decoder = self
-            .rep_level_decoder
-            .as_mut()
-            .expect("rep_level_decoder be set");
-        level_decoder.get(buffer)
-    }
-
-    #[inline]
-    fn read_def_levels(&mut self, buffer: &mut [i16]) -> Result<usize> {
-        let level_decoder = self
-            .def_level_decoder
-            .as_mut()
-            .expect("def_level_decoder be set");
-        level_decoder.get(buffer)
-    }
-
-    #[inline]
-    fn read_values(&mut self, buffer: &mut [T::T]) -> Result<usize> {
-        let encoding = self
-            .current_encoding
-            .expect("current_encoding should be set");
-        let current_decoder = self
-            .decoders
-            .get_mut(&encoding)
-            .unwrap_or_else(|| panic!("decoder for encoding {} should be set", encoding));
-        current_decoder.get(buffer)
-    }
-
-    #[inline]
-    fn configure_dictionary(&mut self, page: Page) -> Result<bool> {

Review comment:
       This logic is moved into ColumnValueDecoder




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] tustvold commented on a change in pull request #1041: Generify ColumnReaderImpl and RecordReader (#1040)

Posted by GitBox <gi...@apache.org>.
tustvold commented on a change in pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041#discussion_r777131154



##########
File path: parquet/src/arrow/array_reader.rs
##########
@@ -200,7 +200,6 @@ pub struct PrimitiveArrayReader<T: DataType> {
     rep_levels_buffer: Option<Buffer>,
     column_desc: ColumnDescPtr,
     record_reader: RecordReader<T>,
-    _type_marker: PhantomData<T>,

Review comment:
       This seemed to be an orphan so I just removed it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] tustvold commented on a change in pull request #1041: Generify ColumnReaderImpl and RecordReader (#1040)

Posted by GitBox <gi...@apache.org>.
tustvold commented on a change in pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041#discussion_r778445152



##########
File path: parquet/src/arrow/record_reader/buffer.rs
##########
@@ -0,0 +1,196 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::marker::PhantomData;
+use std::ops::Range;
+
+use arrow::buffer::{Buffer, MutableBuffer};
+
+/// A buffer that supports writing new data to the end, and removing data from the front
+///
+/// Used by [RecordReader](`super::RecordReader`) to buffer up values before returning a
+/// potentially smaller number of values, corresponding to a whole number of semantic records
+pub trait BufferQueue: Sized {
+    type Output: Sized;
+
+    type Slice: ?Sized;
+
+    /// Split out the first `len` items
+    ///
+    /// # Panics
+    ///
+    /// Implementations must panic if `len` is beyond the length of [`BufferQueue`]
+    ///
+    fn split_off(&mut self, len: usize) -> Self::Output;
+
+    /// Returns a [`Self::Slice`] with at least `batch_size` capacity that can be used
+    /// to append data to the end of this [`BufferQueue`]
+    ///
+    /// NB: writes to the returned slice will not update the length of [`BufferQueue`]
+    /// instead a subsequent call should be made to [`BufferQueue::set_len`]
+    fn spare_capacity_mut(&mut self, batch_size: usize) -> &mut Self::Slice;
+
+    /// Sets the length of the [`BufferQueue`].
+    ///
+    /// Intended to be used in combination with [`BufferQueue::spare_capacity_mut`]
+    ///
+    /// # Panics
+    ///
+    /// Implementations must panic if `len` is beyond the initialized length
+    ///
+    /// Implementations may panic if `set_len` is called with less than what has been written
+    ///
+    /// This distinction is to allow for implementations that return a default initialized
+    /// [BufferQueue::Slice`] which doesn't track capacity and length separately
+    ///
+    /// For example, [`TypedBuffer<T>`] returns a default-initialized `&mut [T]`, and does not
+    /// track how much of this slice is actually written to by the caller. This is still
+    /// safe as the slice is default-initialized.
+    ///
+    fn set_len(&mut self, len: usize);
+}
+
+/// A typed buffer similar to [`Vec<T>`] but using [`MutableBuffer`] for storage
+pub struct TypedBuffer<T> {
+    buffer: MutableBuffer,
+
+    /// Length in elements of size T
+    len: usize,
+
+    /// Placeholder to allow `T` as an invariant generic parameter
+    _phantom: PhantomData<*mut T>,
+}
+
+impl<T> Default for TypedBuffer<T> {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl<T> TypedBuffer<T> {
+    pub fn new() -> Self {
+        Self {
+            buffer: MutableBuffer::new(0),
+            len: 0,
+            _phantom: Default::default(),
+        }
+    }
+
+    pub fn len(&self) -> usize {
+        self.len
+    }
+
+    pub fn is_empty(&self) -> bool {
+        self.len == 0
+    }
+
+    #[inline]
+    pub fn as_slice(&self) -> &[T] {
+        let (prefix, buf, suffix) = unsafe { self.buffer.as_slice().align_to::<T>() };

Review comment:
       Yeah the typing here is a bit unfortunate, there is a cludge in PrimitiveArrayReader to handle bools, and prevent Int96 but I'm not going to argue it isn't pretty gross :sweat_smile: 
   
   It's no worse than before, but it certainly isn't ideal... I'll have a think about how to improve this without breaking the APIs :thinking: 

##########
File path: parquet/src/arrow/record_reader/buffer.rs
##########
@@ -0,0 +1,196 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::marker::PhantomData;
+use std::ops::Range;
+
+use arrow::buffer::{Buffer, MutableBuffer};
+
+/// A buffer that supports writing new data to the end, and removing data from the front
+///
+/// Used by [RecordReader](`super::RecordReader`) to buffer up values before returning a
+/// potentially smaller number of values, corresponding to a whole number of semantic records
+pub trait BufferQueue: Sized {
+    type Output: Sized;
+
+    type Slice: ?Sized;
+
+    /// Split out the first `len` items
+    ///
+    /// # Panics
+    ///
+    /// Implementations must panic if `len` is beyond the length of [`BufferQueue`]
+    ///
+    fn split_off(&mut self, len: usize) -> Self::Output;
+
+    /// Returns a [`Self::Slice`] with at least `batch_size` capacity that can be used
+    /// to append data to the end of this [`BufferQueue`]
+    ///
+    /// NB: writes to the returned slice will not update the length of [`BufferQueue`]
+    /// instead a subsequent call should be made to [`BufferQueue::set_len`]
+    fn spare_capacity_mut(&mut self, batch_size: usize) -> &mut Self::Slice;
+
+    /// Sets the length of the [`BufferQueue`].
+    ///
+    /// Intended to be used in combination with [`BufferQueue::spare_capacity_mut`]
+    ///
+    /// # Panics
+    ///
+    /// Implementations must panic if `len` is beyond the initialized length
+    ///
+    /// Implementations may panic if `set_len` is called with less than what has been written
+    ///
+    /// This distinction is to allow for implementations that return a default initialized
+    /// [BufferQueue::Slice`] which doesn't track capacity and length separately
+    ///
+    /// For example, [`TypedBuffer<T>`] returns a default-initialized `&mut [T]`, and does not
+    /// track how much of this slice is actually written to by the caller. This is still
+    /// safe as the slice is default-initialized.
+    ///
+    fn set_len(&mut self, len: usize);
+}
+
+/// A typed buffer similar to [`Vec<T>`] but using [`MutableBuffer`] for storage
+pub struct TypedBuffer<T> {
+    buffer: MutableBuffer,
+
+    /// Length in elements of size T
+    len: usize,
+
+    /// Placeholder to allow `T` as an invariant generic parameter
+    _phantom: PhantomData<*mut T>,
+}
+
+impl<T> Default for TypedBuffer<T> {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl<T> TypedBuffer<T> {
+    pub fn new() -> Self {
+        Self {
+            buffer: MutableBuffer::new(0),
+            len: 0,
+            _phantom: Default::default(),
+        }
+    }
+
+    pub fn len(&self) -> usize {
+        self.len
+    }
+
+    pub fn is_empty(&self) -> bool {
+        self.len == 0
+    }
+
+    #[inline]
+    pub fn as_slice(&self) -> &[T] {
+        let (prefix, buf, suffix) = unsafe { self.buffer.as_slice().align_to::<T>() };

Review comment:
       Yeah the typing here is a bit unfortunate, there is a cludge in `PrimitiveArrayReader` to handle bools, and prevent Int96 but I'm not going to argue it isn't pretty gross :sweat_smile: 
   
   It's no worse than before, but it certainly isn't ideal... I'll have a think about how to improve this without breaking the APIs :thinking: 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] tustvold commented on a change in pull request #1041: Generify ColumnReaderImpl and RecordReader (#1040)

Posted by GitBox <gi...@apache.org>.
tustvold commented on a change in pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041#discussion_r782251158



##########
File path: parquet/src/data_type.rs
##########
@@ -1033,21 +1032,6 @@ pub(crate) mod private {
             self
         }
     }
-
-    /// A marker trait for [`DataType`] with a [scalar] physical type

Review comment:
       > impl ScalarDataType for i16
   
   In short, no... `DataType` is tightly coupled with what it means to be a physical parquet type, which i16 is not
   
   > If you you need to remove this code, then we should probably reopen the original ticket
   
   It is an alternative way of fixing that ticket. Rather than constraining `T: DataType` we constrain `T::T`. The two approaches are equivalent, but the latter allows implementing the marker trait for types that don't have a corresponding `DataType`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] tustvold commented on a change in pull request #1041: Generify ColumnReaderImpl and RecordReader (#1040)

Posted by GitBox <gi...@apache.org>.
tustvold commented on a change in pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041#discussion_r781621658



##########
File path: parquet/src/arrow/record_reader/buffer.rs
##########
@@ -0,0 +1,196 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::marker::PhantomData;
+use std::ops::Range;
+
+use arrow::buffer::{Buffer, MutableBuffer};
+
+/// A buffer that supports writing new data to the end, and removing data from the front
+///
+/// Used by [RecordReader](`super::RecordReader`) to buffer up values before returning a
+/// potentially smaller number of values, corresponding to a whole number of semantic records
+pub trait BufferQueue: Sized {
+    type Output: Sized;
+
+    type Slice: ?Sized;
+
+    /// Split out the first `len` items
+    ///
+    /// # Panics
+    ///
+    /// Implementations must panic if `len` is beyond the length of [`BufferQueue`]
+    ///
+    fn split_off(&mut self, len: usize) -> Self::Output;
+
+    /// Returns a [`Self::Slice`] with at least `batch_size` capacity that can be used
+    /// to append data to the end of this [`BufferQueue`]
+    ///
+    /// NB: writes to the returned slice will not update the length of [`BufferQueue`]
+    /// instead a subsequent call should be made to [`BufferQueue::set_len`]
+    fn spare_capacity_mut(&mut self, batch_size: usize) -> &mut Self::Slice;
+
+    /// Sets the length of the [`BufferQueue`].
+    ///
+    /// Intended to be used in combination with [`BufferQueue::spare_capacity_mut`]
+    ///
+    /// # Panics
+    ///
+    /// Implementations must panic if `len` is beyond the initialized length
+    ///
+    /// Implementations may panic if `set_len` is called with less than what has been written
+    ///
+    /// This distinction is to allow for implementations that return a default initialized
+    /// [BufferQueue::Slice`] which doesn't track capacity and length separately
+    ///
+    /// For example, [`TypedBuffer<T>`] returns a default-initialized `&mut [T]`, and does not
+    /// track how much of this slice is actually written to by the caller. This is still
+    /// safe as the slice is default-initialized.
+    ///
+    fn set_len(&mut self, len: usize);
+}
+
+/// A typed buffer similar to [`Vec<T>`] but using [`MutableBuffer`] for storage
+pub struct TypedBuffer<T> {
+    buffer: MutableBuffer,
+
+    /// Length in elements of size T
+    len: usize,
+
+    /// Placeholder to allow `T` as an invariant generic parameter
+    _phantom: PhantomData<*mut T>,
+}
+
+impl<T> Default for TypedBuffer<T> {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl<T> TypedBuffer<T> {
+    pub fn new() -> Self {
+        Self {
+            buffer: MutableBuffer::new(0),
+            len: 0,
+            _phantom: Default::default(),
+        }
+    }
+
+    pub fn len(&self) -> usize {
+        self.len
+    }
+
+    pub fn is_empty(&self) -> bool {
+        self.len == 0
+    }
+
+    #[inline]
+    pub fn as_slice(&self) -> &[T] {
+        let (prefix, buf, suffix) = unsafe { self.buffer.as_slice().align_to::<T>() };

Review comment:
       Going to mark this as a draft whilst I fix https://github.com/apache/arrow-rs/issues/1132 which should in turn fix this




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] yordan-pavlov commented on a change in pull request #1041: Generify ColumnReaderImpl and RecordReader (#1040)

Posted by GitBox <gi...@apache.org>.
yordan-pavlov commented on a change in pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041#discussion_r775285402



##########
File path: parquet/src/arrow/record_reader/buffer.rs
##########
@@ -0,0 +1,137 @@
+use std::marker::PhantomData;
+use std::ops::Range;
+
+use arrow::buffer::{Buffer, MutableBuffer};
+
+pub trait RecordBuffer: Sized {
+    type Output: Sized;
+
+    type Writer: ?Sized;
+
+    /// Split out `len` items
+    fn split(&mut self, len: usize) -> Self::Output;
+
+    /// Get a writer with `batch_size` capacity
+    fn writer(&mut self, batch_size: usize) -> &mut Self::Writer;
+
+    /// Record a write of `len` items
+    fn commit(&mut self, len: usize);
+}
+
+pub struct TypedBuffer<T> {
+    buffer: MutableBuffer,
+
+    /// Length in elements of size T
+    len: usize,
+
+    /// Placeholder to allow `T` as an invariant generic parameter
+    _phantom: PhantomData<*mut T>,
+}
+
+impl<T> Default for TypedBuffer<T> {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl<T> TypedBuffer<T> {
+    pub fn new() -> Self {
+        Self {
+            buffer: MutableBuffer::new(0),
+            len: 0,
+            _phantom: Default::default(),
+        }
+    }
+
+    pub fn len(&self) -> usize {
+        self.len
+    }
+
+    pub fn is_empty(&self) -> bool {
+        self.len == 0
+    }
+
+    #[inline]
+    pub fn as_slice(&self) -> &[T] {
+        let (prefix, buf, suffix) = unsafe { self.buffer.as_slice().align_to::<T>() };
+        assert!(prefix.is_empty() && suffix.is_empty());
+        buf
+    }
+
+    #[inline]
+    pub fn as_slice_mut(&mut self) -> &mut [T] {
+        let (prefix, buf, suffix) =
+            unsafe { self.buffer.as_slice_mut().align_to_mut::<T>() };
+        assert!(prefix.is_empty() && suffix.is_empty());
+        buf
+    }
+}
+
+impl<T> RecordBuffer for TypedBuffer<T> {
+    type Output = Buffer;
+
+    type Writer = [T];
+
+    fn split(&mut self, len: usize) -> Self::Output {
+        assert!(len <= self.len);
+
+        let num_bytes = len * std::mem::size_of::<T>();
+        let remaining_bytes = self.buffer.len() - num_bytes;
+        // TODO: Optimize to reduce the copy
+        // create an empty buffer, as it will be resized below
+        let mut remaining = MutableBuffer::new(0);
+        remaining.resize(remaining_bytes, 0);
+
+        let new_records = remaining.as_slice_mut();
+
+        new_records[0..remaining_bytes]
+            .copy_from_slice(&self.buffer.as_slice()[num_bytes..]);
+
+        self.buffer.resize(num_bytes, 0);
+        self.len -= len;
+
+        std::mem::replace(&mut self.buffer, remaining).into()
+    }
+
+    fn writer(&mut self, batch_size: usize) -> &mut Self::Writer {
+        self.buffer
+            .resize((self.len + batch_size) * std::mem::size_of::<T>(), 0);
+
+        let range = self.len..self.len + batch_size;
+        &mut self.as_slice_mut()[range]
+    }
+
+    fn commit(&mut self, len: usize) {
+        self.len = len;
+
+        let new_bytes = self.len * std::mem::size_of::<T>();
+        assert!(new_bytes <= self.buffer.len());
+        self.buffer.resize(new_bytes, 0);
+    }
+}
+
+pub trait ValueBuffer {
+    fn pad_nulls(
+        &mut self,
+        range: Range<usize>,
+        rev_position_iter: impl Iterator<Item = usize>,
+    );
+}
+
+impl<T> ValueBuffer for TypedBuffer<T> {
+    fn pad_nulls(
+        &mut self,
+        range: Range<usize>,
+        rev_position_iter: impl Iterator<Item = usize>,
+    ) {
+        let slice = self.as_slice_mut();
+
+        for (value_pos, level_pos) in range.rev().zip(rev_position_iter) {

Review comment:
       it might be more efficient to insert null values using `arrow::compute::SlicesIterator` as used in `ArrowArrayReader` here https://github.com/apache/arrow-rs/blob/master/parquet/src/arrow/arrow_array_reader.rs#L570 , since it works with sequences rather than single values




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] codecov-commenter edited a comment on pull request #1041: Generify ColumnReaderImpl and RecordReader (#1040)

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041#issuecomment-992420848


   # [Codecov](https://codecov.io/gh/apache/arrow-rs/pull/1041?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#1041](https://codecov.io/gh/apache/arrow-rs/pull/1041?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (2a6b576) into [master](https://codecov.io/gh/apache/arrow-rs/commit/07660c61680220ac54b7bf4c42a64c840872cc43?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (07660c6) will **decrease** coverage by `0.02%`.
   > The diff coverage is `80.89%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/arrow-rs/pull/1041/graphs/tree.svg?width=650&height=150&src=pr&token=pq9V9qWZ1N&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/arrow-rs/pull/1041?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #1041      +/-   ##
   ==========================================
   - Coverage   82.30%   82.28%   -0.03%     
   ==========================================
     Files         168      168              
     Lines       49026    49048      +22     
   ==========================================
   + Hits        40351    40359       +8     
   - Misses       8675     8689      +14     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow-rs/pull/1041?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [parquet/src/arrow/array\_reader.rs](https://codecov.io/gh/apache/arrow-rs/pull/1041/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvYXJyb3cvYXJyYXlfcmVhZGVyLnJz) | `76.82% <ø> (+0.06%)` | :arrow_up: |
   | [parquet/src/column/reader.rs](https://codecov.io/gh/apache/arrow-rs/pull/1041/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvY29sdW1uL3JlYWRlci5ycw==) | `71.23% <76.54%> (-1.10%)` | :arrow_down: |
   | [parquet/src/arrow/record\_reader.rs](https://codecov.io/gh/apache/arrow-rs/pull/1041/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvYXJyb3cvcmVjb3JkX3JlYWRlci5ycw==) | `93.98% <89.02%> (+1.21%)` | :arrow_up: |
   | [parquet/src/util/memory.rs](https://codecov.io/gh/apache/arrow-rs/pull/1041/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvdXRpbC9tZW1vcnkucnM=) | `91.12% <100.00%> (+0.08%)` | :arrow_up: |
   | [arrow/src/datatypes/datatype.rs](https://codecov.io/gh/apache/arrow-rs/pull/1041/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YXJyb3cvc3JjL2RhdGF0eXBlcy9kYXRhdHlwZS5ycw==) | `65.95% <0.00%> (-0.43%)` | :arrow_down: |
   | [parquet/src/data\_type.rs](https://codecov.io/gh/apache/arrow-rs/pull/1041/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvZGF0YV90eXBlLnJz) | `76.61% <0.00%> (-0.24%)` | :arrow_down: |
   | [parquet\_derive/src/parquet\_field.rs](https://codecov.io/gh/apache/arrow-rs/pull/1041/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldF9kZXJpdmUvc3JjL3BhcnF1ZXRfZmllbGQucnM=) | `66.21% <0.00%> (-0.23%)` | :arrow_down: |
   | [arrow/src/array/transform/mod.rs](https://codecov.io/gh/apache/arrow-rs/pull/1041/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YXJyb3cvc3JjL2FycmF5L3RyYW5zZm9ybS9tb2QucnM=) | `85.10% <0.00%> (+0.13%)` | :arrow_up: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow-rs/pull/1041?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow-rs/pull/1041?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [07660c6...2a6b576](https://codecov.io/gh/apache/arrow-rs/pull/1041?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] tustvold commented on a change in pull request #1041: Generify ColumnReaderImpl and RecordReader (#1040)

Posted by GitBox <gi...@apache.org>.
tustvold commented on a change in pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041#discussion_r777130877



##########
File path: parquet/src/column/reader/decoder.rs
##########
@@ -0,0 +1,215 @@
+use std::collections::HashMap;
+use std::ops::Range;
+
+use crate::basic::Encoding;
+use crate::data_type::DataType;
+use crate::decoding::{get_decoder, Decoder, DictDecoder, PlainDecoder};
+use crate::encodings::rle::RleDecoder;
+use crate::errors::{ParquetError, Result};
+use crate::memory::ByteBufferPtr;
+use crate::schema::types::ColumnDescPtr;
+use crate::util::bit_util::BitReader;
+
+/// A type that can have level data written to it by a [`ColumnLevelDecoder`]
+pub trait LevelsWriter {
+    fn capacity(&self) -> usize;
+
+    fn get(&self, idx: usize) -> i16;
+
+    fn count_nulls(&self, range: Range<usize>, max_level: i16) -> usize;
+}
+
+impl LevelsWriter for [i16] {
+    fn capacity(&self) -> usize {
+        self.len()
+    }
+
+    fn get(&self, idx: usize) -> i16 {
+        self[idx]
+    }
+
+    fn count_nulls(&self, range: Range<usize>, max_level: i16) -> usize {
+        self[range].iter().filter(|i| **i != max_level).count()
+    }
+}
+
+/// A type that can have value data written to it by a [`ColumnValueDecoder`]
+pub trait ValuesWriter {
+    fn capacity(&self) -> usize;
+}
+
+impl<T> ValuesWriter for [T] {
+    fn capacity(&self) -> usize {
+        self.len()
+    }
+}
+
+/// Decodes level data to a [`LevelsWriter`]
+pub trait ColumnLevelDecoder {
+    type Writer: LevelsWriter + ?Sized;
+
+    fn create(max_level: i16, encoding: Encoding, data: ByteBufferPtr) -> Self;
+
+    fn read(&mut self, out: &mut Self::Writer, range: Range<usize>) -> Result<usize>;
+}
+
+/// Decodes value data to a [`ValuesWriter`]
+pub trait ColumnValueDecoder {
+    type Writer: ValuesWriter + ?Sized;
+
+    fn create(col: &ColumnDescPtr) -> Self;
+
+    fn set_dict(
+        &mut self,
+        buf: ByteBufferPtr,
+        num_values: u32,
+        encoding: Encoding,
+        is_sorted: bool,
+    ) -> Result<()>;
+
+    fn set_data(
+        &mut self,
+        encoding: Encoding,
+        data: ByteBufferPtr,
+        num_values: usize,
+    ) -> Result<()>;
+
+    fn read(&mut self, out: &mut Self::Writer, range: Range<usize>) -> Result<usize>;
+}
+
+/// An implementation of [`ColumnValueDecoder`] for `[T::T]`
+pub struct ColumnValueDecoderImpl<T: DataType> {
+    descr: ColumnDescPtr,
+
+    current_encoding: Option<Encoding>,
+
+    // Cache of decoders for existing encodings
+    decoders: HashMap<Encoding, Box<dyn Decoder<T>>>,
+}
+
+impl<T: DataType> ColumnValueDecoder for ColumnValueDecoderImpl<T> {
+    type Writer = [T::T];
+
+    fn create(descr: &ColumnDescPtr) -> Self {
+        Self {
+            descr: descr.clone(),
+            current_encoding: None,
+            decoders: Default::default(),
+        }
+    }
+
+    fn set_dict(
+        &mut self,
+        buf: ByteBufferPtr,
+        num_values: u32,
+        mut encoding: Encoding,
+        _is_sorted: bool,
+    ) -> Result<()> {
+        if encoding == Encoding::PLAIN || encoding == Encoding::PLAIN_DICTIONARY {
+            encoding = Encoding::RLE_DICTIONARY
+        }
+
+        if self.decoders.contains_key(&encoding) {
+            return Err(general_err!("Column cannot have more than one dictionary"));
+        }
+
+        if encoding == Encoding::RLE_DICTIONARY {
+            let mut dictionary = PlainDecoder::<T>::new(self.descr.type_length());
+            dictionary.set_data(buf, num_values as usize)?;
+
+            let mut decoder = DictDecoder::new();
+            decoder.set_dict(Box::new(dictionary))?;
+            self.decoders.insert(encoding, Box::new(decoder));
+            Ok(())
+        } else {
+            Err(nyi_err!(
+                "Invalid/Unsupported encoding type for dictionary: {}",
+                encoding
+            ))
+        }
+    }
+
+    fn set_data(
+        &mut self,
+        mut encoding: Encoding,
+        data: ByteBufferPtr,
+        num_values: usize,
+    ) -> Result<()> {
+        if encoding == Encoding::PLAIN_DICTIONARY {
+            encoding = Encoding::RLE_DICTIONARY;
+        }
+
+        let decoder = if encoding == Encoding::RLE_DICTIONARY {
+            self.decoders
+                .get_mut(&encoding)
+                .expect("Decoder for dict should have been set")
+        } else {
+            // Search cache for data page decoder
+            #[allow(clippy::map_entry)]
+            if !self.decoders.contains_key(&encoding) {
+                // Initialize decoder for this page
+                let data_decoder = get_decoder::<T>(self.descr.clone(), encoding)?;
+                self.decoders.insert(encoding, data_decoder);
+            }
+            self.decoders.get_mut(&encoding).unwrap()
+        };
+
+        decoder.set_data(data, num_values)?;
+        self.current_encoding = Some(encoding);
+        Ok(())
+    }
+
+    fn read(&mut self, out: &mut Self::Writer, range: Range<usize>) -> Result<usize> {
+        let encoding = self
+            .current_encoding
+            .expect("current_encoding should be set");
+
+        let current_decoder = self

Review comment:
       I didn't write this logic, just moved it, but my guess is this is a way to placate the borrow checker. `Decoder::get` requires a mutable reference, and we wish for decoders, in particular the dictionary decoder, to be usable across multiple `set_data` calls. 
   
   In order to have a current_decoder construct we would either need to perform a convoluted move dance moving data in and out of the decoder map, or use `Rc<RefCell>`. This is simpler, if possibly a little less performant. FWIW I'd wager that the overheads of a hashmap keyed on a low cardinality enumeration are pretty low.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] tustvold commented on a change in pull request #1041: Generify ColumnReaderImpl and RecordReader (#1040)

Posted by GitBox <gi...@apache.org>.
tustvold commented on a change in pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041#discussion_r778402824



##########
File path: parquet/src/arrow/record_reader/buffer.rs
##########
@@ -0,0 +1,191 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::marker::PhantomData;
+use std::ops::Range;
+
+use arrow::buffer::{Buffer, MutableBuffer};
+
+/// A buffer that supports writing new data to the end, and removing data from the front
+///
+/// Used by [RecordReader](`super::RecordReader`) to buffer up values before returning a
+/// potentially smaller number of values, corresponding to a whole number of semantic records
+pub trait BufferQueue: Sized {
+    type Output: Sized;
+
+    type Slice: ?Sized;
+
+    /// Split out the first `len` items
+    ///
+    /// # Panics
+    ///
+    /// Implementations must panic if `len` is beyond the length of [`BufferQueue`]
+    ///
+    fn split_off(&mut self, len: usize) -> Self::Output;
+
+    /// Returns a [`Self::Slice`] with at least `batch_size` capacity that can be used
+    /// to append data to the end of this [`BufferQueue`]
+    ///
+    /// NB: writes to the returned slice will not update the length of [`BufferQueue`]
+    /// instead a subsequent call should be made to [`BufferQueue::set_len`]
+    fn spare_capacity_mut(&mut self, batch_size: usize) -> &mut Self::Slice;
+
+    /// Sets the length of the [`BufferQueue`].
+    ///
+    /// Intended to be used in combination with [`BufferQueue::spare_capacity_mut`]
+    ///
+    /// # Panics
+    ///
+    /// Implementations must panic if `len` is beyond the initialized length

Review comment:
       I was trying to distinguish this from `Vec::set_len` which is unsafe because it doesn't know how much is initialized. In the case of RecordBuffer<T> the entire capacity is initialized, just possibly not set to anything useful. The result may not be desirable, but isn't UB and therefore unsafe




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] alamb commented on a change in pull request #1041: Generify ColumnReaderImpl and RecordReader (#1040)

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041#discussion_r778775711



##########
File path: parquet/src/column/reader.rs
##########
@@ -440,63 +435,29 @@ impl<T: DataType> ColumnReaderImpl<T> {
             Ok(true)
         }
     }
+}
 
-    #[inline]
-    fn read_rep_levels(&mut self, buffer: &mut [i16]) -> Result<usize> {
-        let level_decoder = self
-            .rep_level_decoder
-            .as_mut()
-            .expect("rep_level_decoder be set");
-        level_decoder.get(buffer)
-    }
-
-    #[inline]
-    fn read_def_levels(&mut self, buffer: &mut [i16]) -> Result<usize> {
-        let level_decoder = self
-            .def_level_decoder
-            .as_mut()
-            .expect("def_level_decoder be set");
-        level_decoder.get(buffer)
-    }
-
-    #[inline]
-    fn read_values(&mut self, buffer: &mut [T::T]) -> Result<usize> {
-        let encoding = self
-            .current_encoding
-            .expect("current_encoding should be set");
-        let current_decoder = self
-            .decoders
-            .get_mut(&encoding)
-            .unwrap_or_else(|| panic!("decoder for encoding {} should be set", encoding));
-        current_decoder.get(buffer)
-    }
-
-    #[inline]
-    fn configure_dictionary(&mut self, page: Page) -> Result<bool> {
-        let mut encoding = page.encoding();
-        if encoding == Encoding::PLAIN || encoding == Encoding::PLAIN_DICTIONARY {
-            encoding = Encoding::RLE_DICTIONARY
-        }
-
-        if self.decoders.contains_key(&encoding) {
-            return Err(general_err!("Column cannot have more than one dictionary"));
+fn parse_v1_level(
+    max_level: i16,
+    num_buffered_values: u32,
+    encoding: Encoding,
+    buf: ByteBufferPtr,
+) -> Result<ByteBufferPtr> {

Review comment:
       No, I was just curios




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] tustvold commented on a change in pull request #1041: Generify ColumnReaderImpl and RecordReader (#1040)

Posted by GitBox <gi...@apache.org>.
tustvold commented on a change in pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041#discussion_r778411604



##########
File path: parquet/src/arrow/record_reader/buffer.rs
##########
@@ -0,0 +1,191 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::marker::PhantomData;
+use std::ops::Range;
+
+use arrow::buffer::{Buffer, MutableBuffer};
+
+/// A buffer that supports writing new data to the end, and removing data from the front
+///
+/// Used by [RecordReader](`super::RecordReader`) to buffer up values before returning a
+/// potentially smaller number of values, corresponding to a whole number of semantic records
+pub trait BufferQueue: Sized {
+    type Output: Sized;
+
+    type Slice: ?Sized;
+
+    /// Split out the first `len` items
+    ///
+    /// # Panics
+    ///
+    /// Implementations must panic if `len` is beyond the length of [`BufferQueue`]
+    ///
+    fn split_off(&mut self, len: usize) -> Self::Output;
+
+    /// Returns a [`Self::Slice`] with at least `batch_size` capacity that can be used
+    /// to append data to the end of this [`BufferQueue`]
+    ///
+    /// NB: writes to the returned slice will not update the length of [`BufferQueue`]
+    /// instead a subsequent call should be made to [`BufferQueue::set_len`]
+    fn spare_capacity_mut(&mut self, batch_size: usize) -> &mut Self::Slice;
+
+    /// Sets the length of the [`BufferQueue`].
+    ///
+    /// Intended to be used in combination with [`BufferQueue::spare_capacity_mut`]
+    ///
+    /// # Panics
+    ///
+    /// Implementations must panic if `len` is beyond the initialized length
+    ///
+    /// Implementations may panic if `set_len` is called with less than what has been written
+    ///
+    /// This distinction is to allow for implementations that return a default initialized
+    /// [BufferQueue::Slice`] which doesn't track capacity and length separately
+    ///
+    /// For example, [`TypedBuffer<T>`] returns a default-initialized `&mut [T]`, and does not
+    /// track how much of this slice is actually written to by the caller. This is still
+    /// safe as the slice is default-initialized.
+    ///
+    fn set_len(&mut self, len: usize);
+}
+
+/// A typed buffer similar to [`Vec<T>`] but using [`MutableBuffer`] for storage
+pub struct TypedBuffer<T> {
+    buffer: MutableBuffer,
+
+    /// Length in elements of size T
+    len: usize,
+
+    /// Placeholder to allow `T` as an invariant generic parameter
+    _phantom: PhantomData<*mut T>,
+}
+
+impl<T> Default for TypedBuffer<T> {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl<T> TypedBuffer<T> {
+    pub fn new() -> Self {
+        Self {
+            buffer: MutableBuffer::new(0),
+            len: 0,
+            _phantom: Default::default(),
+        }
+    }
+
+    pub fn len(&self) -> usize {
+        self.len
+    }
+
+    pub fn is_empty(&self) -> bool {
+        self.len == 0
+    }
+
+    #[inline]
+    pub fn as_slice(&self) -> &[T] {
+        let (prefix, buf, suffix) = unsafe { self.buffer.as_slice().align_to::<T>() };
+        assert!(prefix.is_empty() && suffix.is_empty());
+        buf
+    }
+
+    #[inline]
+    pub fn as_slice_mut(&mut self) -> &mut [T] {
+        let (prefix, buf, suffix) =
+            unsafe { self.buffer.as_slice_mut().align_to_mut::<T>() };
+        assert!(prefix.is_empty() && suffix.is_empty());
+        buf
+    }
+}
+
+impl<T> BufferQueue for TypedBuffer<T> {
+    type Output = Buffer;
+
+    type Slice = [T];
+
+    fn split_off(&mut self, len: usize) -> Self::Output {
+        assert!(len <= self.len);
+
+        let num_bytes = len * std::mem::size_of::<T>();
+        let remaining_bytes = self.buffer.len() - num_bytes;
+        // TODO: Optimize to reduce the copy
+        // create an empty buffer, as it will be resized below
+        let mut remaining = MutableBuffer::new(0);
+        remaining.resize(remaining_bytes, 0);
+
+        let new_records = remaining.as_slice_mut();
+
+        new_records[0..remaining_bytes]
+            .copy_from_slice(&self.buffer.as_slice()[num_bytes..]);
+
+        self.buffer.resize(num_bytes, 0);
+        self.len -= len;
+
+        std::mem::replace(&mut self.buffer, remaining).into()
+    }
+
+    fn spare_capacity_mut(&mut self, batch_size: usize) -> &mut Self::Slice {
+        self.buffer
+            .resize((self.len + batch_size) * std::mem::size_of::<T>(), 0);

Review comment:
       Indeed `arrow2` could definitely serve as inspiration for such a change. I have some ideas on how to make such a change without major churn, but nothing fully formed just yet :grin: 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] tustvold commented on a change in pull request #1041: Generify ColumnReaderImpl and RecordReader (#1040)

Posted by GitBox <gi...@apache.org>.
tustvold commented on a change in pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041#discussion_r778405419



##########
File path: parquet/src/column/reader/decoder.rs
##########
@@ -0,0 +1,253 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::ops::Range;
+
+use crate::basic::Encoding;
+use crate::data_type::DataType;
+use crate::decoding::{get_decoder, Decoder, DictDecoder, PlainDecoder};
+use crate::encodings::rle::RleDecoder;
+use crate::errors::{ParquetError, Result};
+use crate::memory::ByteBufferPtr;
+use crate::schema::types::ColumnDescPtr;
+use crate::util::bit_util::BitReader;
+
+/// A slice of levels buffer data that is written to by a [`ColumnLevelDecoder`]
+pub trait LevelsBufferSlice {

Review comment:
       Yes - #1054 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] tustvold commented on a change in pull request #1041: Generify ColumnReaderImpl and RecordReader (#1040)

Posted by GitBox <gi...@apache.org>.
tustvold commented on a change in pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041#discussion_r778442831



##########
File path: parquet/src/arrow/record_reader/buffer.rs
##########
@@ -0,0 +1,191 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::marker::PhantomData;
+use std::ops::Range;
+
+use arrow::buffer::{Buffer, MutableBuffer};
+
+/// A buffer that supports writing new data to the end, and removing data from the front
+///
+/// Used by [RecordReader](`super::RecordReader`) to buffer up values before returning a
+/// potentially smaller number of values, corresponding to a whole number of semantic records
+pub trait BufferQueue: Sized {
+    type Output: Sized;
+
+    type Slice: ?Sized;
+
+    /// Split out the first `len` items
+    ///
+    /// # Panics
+    ///
+    /// Implementations must panic if `len` is beyond the length of [`BufferQueue`]
+    ///
+    fn split_off(&mut self, len: usize) -> Self::Output;
+
+    /// Returns a [`Self::Slice`] with at least `batch_size` capacity that can be used
+    /// to append data to the end of this [`BufferQueue`]
+    ///
+    /// NB: writes to the returned slice will not update the length of [`BufferQueue`]
+    /// instead a subsequent call should be made to [`BufferQueue::set_len`]
+    fn spare_capacity_mut(&mut self, batch_size: usize) -> &mut Self::Slice;
+
+    /// Sets the length of the [`BufferQueue`].
+    ///
+    /// Intended to be used in combination with [`BufferQueue::spare_capacity_mut`]
+    ///
+    /// # Panics
+    ///
+    /// Implementations must panic if `len` is beyond the initialized length
+    ///
+    /// Implementations may panic if `set_len` is called with less than what has been written
+    ///
+    /// This distinction is to allow for implementations that return a default initialized
+    /// [BufferQueue::Slice`] which doesn't track capacity and length separately
+    ///
+    /// For example, [`TypedBuffer<T>`] returns a default-initialized `&mut [T]`, and does not
+    /// track how much of this slice is actually written to by the caller. This is still
+    /// safe as the slice is default-initialized.
+    ///
+    fn set_len(&mut self, len: usize);
+}
+
+/// A typed buffer similar to [`Vec<T>`] but using [`MutableBuffer`] for storage
+pub struct TypedBuffer<T> {
+    buffer: MutableBuffer,
+
+    /// Length in elements of size T
+    len: usize,
+
+    /// Placeholder to allow `T` as an invariant generic parameter
+    _phantom: PhantomData<*mut T>,
+}
+
+impl<T> Default for TypedBuffer<T> {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl<T> TypedBuffer<T> {
+    pub fn new() -> Self {
+        Self {
+            buffer: MutableBuffer::new(0),
+            len: 0,
+            _phantom: Default::default(),
+        }
+    }
+
+    pub fn len(&self) -> usize {
+        self.len
+    }
+
+    pub fn is_empty(&self) -> bool {
+        self.len == 0
+    }
+
+    #[inline]
+    pub fn as_slice(&self) -> &[T] {
+        let (prefix, buf, suffix) = unsafe { self.buffer.as_slice().align_to::<T>() };
+        assert!(prefix.is_empty() && suffix.is_empty());
+        buf
+    }
+
+    #[inline]
+    pub fn as_slice_mut(&mut self) -> &mut [T] {
+        let (prefix, buf, suffix) =
+            unsafe { self.buffer.as_slice_mut().align_to_mut::<T>() };
+        assert!(prefix.is_empty() && suffix.is_empty());
+        buf
+    }
+}
+
+impl<T> BufferQueue for TypedBuffer<T> {
+    type Output = Buffer;
+
+    type Slice = [T];
+
+    fn split_off(&mut self, len: usize) -> Self::Output {
+        assert!(len <= self.len);
+
+        let num_bytes = len * std::mem::size_of::<T>();
+        let remaining_bytes = self.buffer.len() - num_bytes;
+        // TODO: Optimize to reduce the copy
+        // create an empty buffer, as it will be resized below
+        let mut remaining = MutableBuffer::new(0);
+        remaining.resize(remaining_bytes, 0);
+
+        let new_records = remaining.as_slice_mut();
+
+        new_records[0..remaining_bytes]
+            .copy_from_slice(&self.buffer.as_slice()[num_bytes..]);
+
+        self.buffer.resize(num_bytes, 0);
+        self.len -= len;
+
+        std::mem::replace(&mut self.buffer, remaining).into()
+    }
+
+    fn spare_capacity_mut(&mut self, batch_size: usize) -> &mut Self::Slice {
+        self.buffer
+            .resize((self.len + batch_size) * std::mem::size_of::<T>(), 0);

Review comment:
       > it recently migrated to std::Vec<T: NativeType>
   
   Is there some way to force `Vec` to use stricter alignment than needed by `T`? i.e. for SIMD stuffs?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] yordan-pavlov commented on a change in pull request #1041: Generify ColumnReaderImpl and RecordReader (#1040)

Posted by GitBox <gi...@apache.org>.
yordan-pavlov commented on a change in pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041#discussion_r774815933



##########
File path: parquet/src/column/reader/decoder.rs
##########
@@ -0,0 +1,215 @@
+use std::collections::HashMap;
+use std::ops::Range;
+
+use crate::basic::Encoding;
+use crate::data_type::DataType;
+use crate::decoding::{get_decoder, Decoder, DictDecoder, PlainDecoder};
+use crate::encodings::rle::RleDecoder;
+use crate::errors::{ParquetError, Result};
+use crate::memory::ByteBufferPtr;
+use crate::schema::types::ColumnDescPtr;
+use crate::util::bit_util::BitReader;
+
+/// A type that can have level data written to it by a [`ColumnLevelDecoder`]
+pub trait LevelsWriter {
+    fn capacity(&self) -> usize;
+
+    fn get(&self, idx: usize) -> i16;
+
+    fn count_nulls(&self, range: Range<usize>, max_level: i16) -> usize;
+}
+
+impl LevelsWriter for [i16] {
+    fn capacity(&self) -> usize {
+        self.len()
+    }
+
+    fn get(&self, idx: usize) -> i16 {
+        self[idx]
+    }
+
+    fn count_nulls(&self, range: Range<usize>, max_level: i16) -> usize {
+        self[range].iter().filter(|i| **i != max_level).count()
+    }
+}
+
+/// A type that can have value data written to it by a [`ColumnValueDecoder`]
+pub trait ValuesWriter {
+    fn capacity(&self) -> usize;
+}
+
+impl<T> ValuesWriter for [T] {
+    fn capacity(&self) -> usize {
+        self.len()
+    }
+}
+
+/// Decodes level data to a [`LevelsWriter`]
+pub trait ColumnLevelDecoder {
+    type Writer: LevelsWriter + ?Sized;
+
+    fn create(max_level: i16, encoding: Encoding, data: ByteBufferPtr) -> Self;
+
+    fn read(&mut self, out: &mut Self::Writer, range: Range<usize>) -> Result<usize>;
+}
+
+/// Decodes value data to a [`ValuesWriter`]
+pub trait ColumnValueDecoder {
+    type Writer: ValuesWriter + ?Sized;
+
+    fn create(col: &ColumnDescPtr) -> Self;
+
+    fn set_dict(
+        &mut self,
+        buf: ByteBufferPtr,
+        num_values: u32,
+        encoding: Encoding,
+        is_sorted: bool,
+    ) -> Result<()>;
+
+    fn set_data(
+        &mut self,
+        encoding: Encoding,
+        data: ByteBufferPtr,
+        num_values: usize,
+    ) -> Result<()>;
+
+    fn read(&mut self, out: &mut Self::Writer, range: Range<usize>) -> Result<usize>;
+}
+
+/// An implementation of [`ColumnValueDecoder`] for `[T::T]`
+pub struct ColumnValueDecoderImpl<T: DataType> {
+    descr: ColumnDescPtr,
+
+    current_encoding: Option<Encoding>,
+
+    // Cache of decoders for existing encodings
+    decoders: HashMap<Encoding, Box<dyn Decoder<T>>>,
+}
+
+impl<T: DataType> ColumnValueDecoder for ColumnValueDecoderImpl<T> {
+    type Writer = [T::T];
+
+    fn create(descr: &ColumnDescPtr) -> Self {
+        Self {
+            descr: descr.clone(),
+            current_encoding: None,
+            decoders: Default::default(),
+        }
+    }
+
+    fn set_dict(
+        &mut self,
+        buf: ByteBufferPtr,
+        num_values: u32,
+        mut encoding: Encoding,
+        _is_sorted: bool,
+    ) -> Result<()> {
+        if encoding == Encoding::PLAIN || encoding == Encoding::PLAIN_DICTIONARY {
+            encoding = Encoding::RLE_DICTIONARY
+        }
+
+        if self.decoders.contains_key(&encoding) {
+            return Err(general_err!("Column cannot have more than one dictionary"));
+        }
+
+        if encoding == Encoding::RLE_DICTIONARY {
+            let mut dictionary = PlainDecoder::<T>::new(self.descr.type_length());
+            dictionary.set_data(buf, num_values as usize)?;
+
+            let mut decoder = DictDecoder::new();
+            decoder.set_dict(Box::new(dictionary))?;
+            self.decoders.insert(encoding, Box::new(decoder));
+            Ok(())
+        } else {
+            Err(nyi_err!(
+                "Invalid/Unsupported encoding type for dictionary: {}",
+                encoding
+            ))
+        }
+    }
+
+    fn set_data(
+        &mut self,
+        mut encoding: Encoding,
+        data: ByteBufferPtr,
+        num_values: usize,
+    ) -> Result<()> {
+        if encoding == Encoding::PLAIN_DICTIONARY {
+            encoding = Encoding::RLE_DICTIONARY;
+        }
+
+        let decoder = if encoding == Encoding::RLE_DICTIONARY {
+            self.decoders
+                .get_mut(&encoding)
+                .expect("Decoder for dict should have been set")
+        } else {
+            // Search cache for data page decoder
+            #[allow(clippy::map_entry)]
+            if !self.decoders.contains_key(&encoding) {
+                // Initialize decoder for this page
+                let data_decoder = get_decoder::<T>(self.descr.clone(), encoding)?;
+                self.decoders.insert(encoding, data_decoder);
+            }
+            self.decoders.get_mut(&encoding).unwrap()
+        };
+
+        decoder.set_data(data, num_values)?;
+        self.current_encoding = Some(encoding);
+        Ok(())
+    }
+
+    fn read(&mut self, out: &mut Self::Writer, range: Range<usize>) -> Result<usize> {
+        let encoding = self
+            .current_encoding
+            .expect("current_encoding should be set");
+
+        let current_decoder = self
+            .decoders
+            .get_mut(&encoding)
+            .unwrap_or_else(|| panic!("decoder for encoding {} should be set", encoding));
+
+        current_decoder.get(&mut out[range])
+    }
+}
+
+/// An implementation of [`ColumnLevelDecoder`] for `[i16]`
+pub struct ColumnLevelDecoderImpl {
+    inner: LevelDecoderInner,

Review comment:
       I wonder if the inner level decoder can be a generic parameter instead - wouldn't that remove the need to `match &mut self.inner` in the `read` method?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] alamb commented on a change in pull request #1041: Generify ColumnReaderImpl and RecordReader (#1040)

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041#discussion_r778374418



##########
File path: parquet/src/arrow/record_reader/buffer.rs
##########
@@ -0,0 +1,191 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::marker::PhantomData;
+use std::ops::Range;
+
+use arrow::buffer::{Buffer, MutableBuffer};
+
+/// A buffer that supports writing new data to the end, and removing data from the front
+///
+/// Used by [RecordReader](`super::RecordReader`) to buffer up values before returning a
+/// potentially smaller number of values, corresponding to a whole number of semantic records
+pub trait BufferQueue: Sized {
+    type Output: Sized;
+
+    type Slice: ?Sized;
+
+    /// Split out the first `len` items
+    ///
+    /// # Panics
+    ///
+    /// Implementations must panic if `len` is beyond the length of [`BufferQueue`]
+    ///
+    fn split_off(&mut self, len: usize) -> Self::Output;
+
+    /// Returns a [`Self::Slice`] with at least `batch_size` capacity that can be used
+    /// to append data to the end of this [`BufferQueue`]
+    ///
+    /// NB: writes to the returned slice will not update the length of [`BufferQueue`]
+    /// instead a subsequent call should be made to [`BufferQueue::set_len`]
+    fn spare_capacity_mut(&mut self, batch_size: usize) -> &mut Self::Slice;
+
+    /// Sets the length of the [`BufferQueue`].
+    ///
+    /// Intended to be used in combination with [`BufferQueue::spare_capacity_mut`]
+    ///
+    /// # Panics
+    ///
+    /// Implementations must panic if `len` is beyond the initialized length

Review comment:
       I don't understand the `must panic` bit here -- how would implementations know what the initialized length (data written to the location returned by `space_capacity_mut`) is? Or is this referring to the capacity ?

##########
File path: parquet/src/arrow/record_reader/buffer.rs
##########
@@ -0,0 +1,191 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::marker::PhantomData;
+use std::ops::Range;
+
+use arrow::buffer::{Buffer, MutableBuffer};
+
+/// A buffer that supports writing new data to the end, and removing data from the front
+///
+/// Used by [RecordReader](`super::RecordReader`) to buffer up values before returning a
+/// potentially smaller number of values, corresponding to a whole number of semantic records
+pub trait BufferQueue: Sized {
+    type Output: Sized;
+
+    type Slice: ?Sized;
+
+    /// Split out the first `len` items
+    ///
+    /// # Panics
+    ///
+    /// Implementations must panic if `len` is beyond the length of [`BufferQueue`]
+    ///
+    fn split_off(&mut self, len: usize) -> Self::Output;
+
+    /// Returns a [`Self::Slice`] with at least `batch_size` capacity that can be used
+    /// to append data to the end of this [`BufferQueue`]
+    ///
+    /// NB: writes to the returned slice will not update the length of [`BufferQueue`]
+    /// instead a subsequent call should be made to [`BufferQueue::set_len`]
+    fn spare_capacity_mut(&mut self, batch_size: usize) -> &mut Self::Slice;
+
+    /// Sets the length of the [`BufferQueue`].
+    ///
+    /// Intended to be used in combination with [`BufferQueue::spare_capacity_mut`]
+    ///
+    /// # Panics
+    ///
+    /// Implementations must panic if `len` is beyond the initialized length
+    ///
+    /// Implementations may panic if `set_len` is called with less than what has been written
+    ///
+    /// This distinction is to allow for implementations that return a default initialized
+    /// [BufferQueue::Slice`] which doesn't track capacity and length separately
+    ///
+    /// For example, [`TypedBuffer<T>`] returns a default-initialized `&mut [T]`, and does not
+    /// track how much of this slice is actually written to by the caller. This is still
+    /// safe as the slice is default-initialized.
+    ///
+    fn set_len(&mut self, len: usize);
+}
+
+/// A typed buffer similar to [`Vec<T>`] but using [`MutableBuffer`] for storage
+pub struct TypedBuffer<T> {
+    buffer: MutableBuffer,
+
+    /// Length in elements of size T
+    len: usize,
+
+    /// Placeholder to allow `T` as an invariant generic parameter
+    _phantom: PhantomData<*mut T>,
+}
+
+impl<T> Default for TypedBuffer<T> {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl<T> TypedBuffer<T> {
+    pub fn new() -> Self {
+        Self {
+            buffer: MutableBuffer::new(0),
+            len: 0,
+            _phantom: Default::default(),
+        }
+    }
+
+    pub fn len(&self) -> usize {
+        self.len
+    }
+
+    pub fn is_empty(&self) -> bool {
+        self.len == 0
+    }
+
+    #[inline]
+    pub fn as_slice(&self) -> &[T] {
+        let (prefix, buf, suffix) = unsafe { self.buffer.as_slice().align_to::<T>() };
+        assert!(prefix.is_empty() && suffix.is_empty());
+        buf
+    }
+
+    #[inline]
+    pub fn as_slice_mut(&mut self) -> &mut [T] {
+        let (prefix, buf, suffix) =
+            unsafe { self.buffer.as_slice_mut().align_to_mut::<T>() };
+        assert!(prefix.is_empty() && suffix.is_empty());
+        buf
+    }
+}
+
+impl<T> BufferQueue for TypedBuffer<T> {
+    type Output = Buffer;
+
+    type Slice = [T];
+
+    fn split_off(&mut self, len: usize) -> Self::Output {
+        assert!(len <= self.len);
+
+        let num_bytes = len * std::mem::size_of::<T>();
+        let remaining_bytes = self.buffer.len() - num_bytes;
+        // TODO: Optimize to reduce the copy
+        // create an empty buffer, as it will be resized below
+        let mut remaining = MutableBuffer::new(0);
+        remaining.resize(remaining_bytes, 0);
+
+        let new_records = remaining.as_slice_mut();
+
+        new_records[0..remaining_bytes]
+            .copy_from_slice(&self.buffer.as_slice()[num_bytes..]);
+
+        self.buffer.resize(num_bytes, 0);
+        self.len -= len;
+
+        std::mem::replace(&mut self.buffer, remaining).into()

Review comment:
       TIL: `std::mem::replace`

##########
File path: parquet/src/column/reader.rs
##########
@@ -440,63 +435,29 @@ impl<T: DataType> ColumnReaderImpl<T> {
             Ok(true)
         }
     }
+}
 
-    #[inline]
-    fn read_rep_levels(&mut self, buffer: &mut [i16]) -> Result<usize> {
-        let level_decoder = self
-            .rep_level_decoder
-            .as_mut()
-            .expect("rep_level_decoder be set");
-        level_decoder.get(buffer)
-    }
-
-    #[inline]
-    fn read_def_levels(&mut self, buffer: &mut [i16]) -> Result<usize> {
-        let level_decoder = self
-            .def_level_decoder
-            .as_mut()
-            .expect("def_level_decoder be set");
-        level_decoder.get(buffer)
-    }
-
-    #[inline]
-    fn read_values(&mut self, buffer: &mut [T::T]) -> Result<usize> {
-        let encoding = self
-            .current_encoding
-            .expect("current_encoding should be set");
-        let current_decoder = self
-            .decoders
-            .get_mut(&encoding)
-            .unwrap_or_else(|| panic!("decoder for encoding {} should be set", encoding));
-        current_decoder.get(buffer)
-    }
-
-    #[inline]
-    fn configure_dictionary(&mut self, page: Page) -> Result<bool> {
-        let mut encoding = page.encoding();
-        if encoding == Encoding::PLAIN || encoding == Encoding::PLAIN_DICTIONARY {
-            encoding = Encoding::RLE_DICTIONARY
-        }
-
-        if self.decoders.contains_key(&encoding) {
-            return Err(general_err!("Column cannot have more than one dictionary"));
+fn parse_v1_level(
+    max_level: i16,
+    num_buffered_values: u32,
+    encoding: Encoding,
+    buf: ByteBufferPtr,
+) -> Result<ByteBufferPtr> {

Review comment:
       Is there a reason to replicate the logic in `LevelDecoder::v1(enc, max_level);` here ? Could that level decoder simply be reused? Especially since it already has tests, etc

##########
File path: parquet/src/arrow/record_reader.rs
##########
@@ -16,75 +16,248 @@
 // under the License.
 
 use std::cmp::{max, min};
-use std::mem::{replace, size_of};
-
-use crate::column::{page::PageReader, reader::ColumnReaderImpl};
+use std::marker::PhantomData;
+use std::mem::replace;
+use std::ops::Range;
+
+use crate::arrow::record_reader::private::{
+    DefinitionLevels, RecordBuffer, RepetitionLevels,
+};
+use crate::column::{
+    page::PageReader,
+    reader::{
+        private::{
+            ColumnLevelDecoder, ColumnLevelDecoderImpl, ColumnValueDecoder,
+            ColumnValueDecoderImpl,
+        },
+        GenericColumnReader,
+    },
+};
 use crate::data_type::DataType;
-use crate::errors::{ParquetError, Result};
+use crate::errors::Result;
 use crate::schema::types::ColumnDescPtr;
 use arrow::array::BooleanBufferBuilder;
 use arrow::bitmap::Bitmap;
 use arrow::buffer::{Buffer, MutableBuffer};
 
+pub(crate) mod private {
+    use super::*;
+
+    pub trait RecordBuffer: Sized + Default {
+        type Output: Sized;
+
+        type Writer: ?Sized;
+
+        /// Split out `len` items
+        fn split(&mut self, len: usize) -> Self::Output;
+
+        /// Get a writer with `batch_size` capacity
+        fn writer(&mut self, batch_size: usize) -> &mut Self::Writer;
+
+        /// Record a write of `len` items
+        fn commit(&mut self, len: usize);
+    }
+
+    pub trait RepetitionLevels: RecordBuffer {
+        /// Inspects the buffered repetition levels in `range` and returns the number of
+        /// "complete" records along with the corresponding number of values
+        ///
+        /// A "complete" record is one where the buffer contains a subsequent repetition level of 0
+        fn count_records(
+            &self,
+            range: Range<usize>,
+            max_records: usize,
+        ) -> (usize, usize);
+    }
+
+    pub trait DefinitionLevels: RecordBuffer {
+        /// Update the provided validity mask based on contained levels
+        fn update_valid_mask(
+            &self,
+            valid: &mut BooleanBufferBuilder,
+            range: Range<usize>,
+            max_level: i16,
+        );
+    }
+
+    pub struct TypedBuffer<T> {
+        buffer: MutableBuffer,
+
+        /// Length in elements of size T
+        len: usize,
+
+        /// Placeholder to allow `T` as an invariant generic parameter
+        _phantom: PhantomData<*mut T>,
+    }
+
+    impl<T> Default for TypedBuffer<T> {
+        fn default() -> Self {
+            Self {
+                buffer: MutableBuffer::new(0),
+                len: 0,
+                _phantom: Default::default(),
+            }
+        }
+    }
+
+    impl<T> RecordBuffer for TypedBuffer<T> {
+        type Output = Buffer;
+
+        type Writer = [T];
+
+        fn split(&mut self, len: usize) -> Self::Output {
+            let num_bytes = len * std::mem::size_of::<T>();
+            let remaining_bytes = self.buffer.len() - num_bytes;
+            // TODO: Optimize to reduce the copy
+            // create an empty buffer, as it will be resized below
+            let mut remaining = MutableBuffer::new(0);
+            remaining.resize(remaining_bytes, 0);
+
+            let new_records = remaining.as_slice_mut();
+
+            new_records[0..remaining_bytes]
+                .copy_from_slice(&self.buffer.as_slice()[num_bytes..]);
+
+            self.buffer.resize(num_bytes, 0);
+
+            replace(&mut self.buffer, remaining).into()
+        }
+
+        fn writer(&mut self, batch_size: usize) -> &mut Self::Writer {
+            self.buffer
+                .resize((self.len + batch_size) * std::mem::size_of::<T>(), 0);
+
+            let (prefix, values, suffix) =
+                unsafe { self.buffer.as_slice_mut().align_to_mut::<T>() };
+            assert!(prefix.is_empty() && suffix.is_empty());
+
+            &mut values[self.len..self.len + batch_size]
+        }
+
+        fn commit(&mut self, len: usize) {
+            self.len = len;
+
+            let new_bytes = self.len * std::mem::size_of::<T>();
+            assert!(new_bytes <= self.buffer.len());
+            self.buffer.resize(new_bytes, 0);
+        }
+    }
+
+    impl RepetitionLevels for TypedBuffer<i16> {
+        fn count_records(
+            &self,
+            range: Range<usize>,
+            max_records: usize,
+        ) -> (usize, usize) {
+            let (prefix, buf, suffix) =
+                unsafe { self.buffer.as_slice().align_to::<i16>() };
+            assert!(prefix.is_empty() && suffix.is_empty());
+
+            let start = range.start;
+            let mut records_read = 0;
+            let mut end_of_last_record = start;
+
+            for current in range {
+                if buf[current] == 0 && current != end_of_last_record {
+                    records_read += 1;
+                    end_of_last_record = current;
+
+                    if records_read == max_records {
+                        break;
+                    }
+                }
+            }
+
+            (records_read, end_of_last_record - start)
+        }
+    }
+
+    impl DefinitionLevels for TypedBuffer<i16> {
+        fn update_valid_mask(
+            &self,
+            null_mask: &mut BooleanBufferBuilder,
+            range: Range<usize>,
+            max_level: i16,
+        ) {
+            let (prefix, buf, suffix) =
+                unsafe { self.buffer.as_slice().align_to::<i16>() };
+            assert!(prefix.is_empty() && suffix.is_empty());
+
+            for i in &buf[range] {
+                null_mask.append(*i == max_level)
+            }
+        }
+    }
+}
+
 const MIN_BATCH_SIZE: usize = 1024;
 
 /// A `RecordReader` is a stateful column reader that delimits semantic records.
-pub struct RecordReader<T: DataType> {
+pub type RecordReader<T> = GenericRecordReader<
+    private::TypedBuffer<i16>,
+    private::TypedBuffer<i16>,
+    private::TypedBuffer<<T as DataType>::T>,
+    ColumnLevelDecoderImpl,
+    ColumnLevelDecoderImpl,
+    ColumnValueDecoderImpl<T>,
+>;
+
+#[doc(hidden)]

Review comment:
       ```suggestion
   #[doc(hidden)]
   /// This type is hidden from the docs, and the private module makes it impossible for users to 
   /// directly construct it. Thus, this type signature may be changed without breaking downstream
   /// users
   ```

##########
File path: parquet/src/arrow/record_reader/buffer.rs
##########
@@ -0,0 +1,191 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::marker::PhantomData;
+use std::ops::Range;
+
+use arrow::buffer::{Buffer, MutableBuffer};
+
+/// A buffer that supports writing new data to the end, and removing data from the front
+///
+/// Used by [RecordReader](`super::RecordReader`) to buffer up values before returning a
+/// potentially smaller number of values, corresponding to a whole number of semantic records
+pub trait BufferQueue: Sized {
+    type Output: Sized;
+
+    type Slice: ?Sized;
+
+    /// Split out the first `len` items
+    ///
+    /// # Panics
+    ///
+    /// Implementations must panic if `len` is beyond the length of [`BufferQueue`]
+    ///
+    fn split_off(&mut self, len: usize) -> Self::Output;
+
+    /// Returns a [`Self::Slice`] with at least `batch_size` capacity that can be used
+    /// to append data to the end of this [`BufferQueue`]
+    ///
+    /// NB: writes to the returned slice will not update the length of [`BufferQueue`]
+    /// instead a subsequent call should be made to [`BufferQueue::set_len`]
+    fn spare_capacity_mut(&mut self, batch_size: usize) -> &mut Self::Slice;
+
+    /// Sets the length of the [`BufferQueue`].
+    ///
+    /// Intended to be used in combination with [`BufferQueue::spare_capacity_mut`]
+    ///
+    /// # Panics
+    ///
+    /// Implementations must panic if `len` is beyond the initialized length
+    ///
+    /// Implementations may panic if `set_len` is called with less than what has been written
+    ///
+    /// This distinction is to allow for implementations that return a default initialized
+    /// [BufferQueue::Slice`] which doesn't track capacity and length separately
+    ///
+    /// For example, [`TypedBuffer<T>`] returns a default-initialized `&mut [T]`, and does not
+    /// track how much of this slice is actually written to by the caller. This is still
+    /// safe as the slice is default-initialized.
+    ///
+    fn set_len(&mut self, len: usize);
+}
+
+/// A typed buffer similar to [`Vec<T>`] but using [`MutableBuffer`] for storage
+pub struct TypedBuffer<T> {
+    buffer: MutableBuffer,
+
+    /// Length in elements of size T
+    len: usize,
+
+    /// Placeholder to allow `T` as an invariant generic parameter
+    _phantom: PhantomData<*mut T>,
+}
+
+impl<T> Default for TypedBuffer<T> {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl<T> TypedBuffer<T> {
+    pub fn new() -> Self {
+        Self {
+            buffer: MutableBuffer::new(0),
+            len: 0,
+            _phantom: Default::default(),
+        }
+    }
+
+    pub fn len(&self) -> usize {
+        self.len
+    }
+
+    pub fn is_empty(&self) -> bool {
+        self.len == 0
+    }
+
+    #[inline]
+    pub fn as_slice(&self) -> &[T] {
+        let (prefix, buf, suffix) = unsafe { self.buffer.as_slice().align_to::<T>() };
+        assert!(prefix.is_empty() && suffix.is_empty());
+        buf
+    }
+
+    #[inline]
+    pub fn as_slice_mut(&mut self) -> &mut [T] {
+        let (prefix, buf, suffix) =
+            unsafe { self.buffer.as_slice_mut().align_to_mut::<T>() };
+        assert!(prefix.is_empty() && suffix.is_empty());
+        buf
+    }
+}
+
+impl<T> BufferQueue for TypedBuffer<T> {
+    type Output = Buffer;
+
+    type Slice = [T];
+
+    fn split_off(&mut self, len: usize) -> Self::Output {
+        assert!(len <= self.len);
+
+        let num_bytes = len * std::mem::size_of::<T>();
+        let remaining_bytes = self.buffer.len() - num_bytes;
+        // TODO: Optimize to reduce the copy
+        // create an empty buffer, as it will be resized below
+        let mut remaining = MutableBuffer::new(0);
+        remaining.resize(remaining_bytes, 0);
+
+        let new_records = remaining.as_slice_mut();
+
+        new_records[0..remaining_bytes]
+            .copy_from_slice(&self.buffer.as_slice()[num_bytes..]);
+
+        self.buffer.resize(num_bytes, 0);
+        self.len -= len;
+
+        std::mem::replace(&mut self.buffer, remaining).into()
+    }
+
+    fn spare_capacity_mut(&mut self, batch_size: usize) -> &mut Self::Slice {
+        self.buffer
+            .resize((self.len + batch_size) * std::mem::size_of::<T>(), 0);
+
+        let range = self.len..self.len + batch_size;
+        &mut self.as_slice_mut()[range]
+    }
+
+    fn set_len(&mut self, len: usize) {
+        self.len = len;
+
+        let new_bytes = self.len * std::mem::size_of::<T>();
+        assert!(new_bytes <= self.buffer.len());
+        self.buffer.resize(new_bytes, 0);
+    }
+}
+
+/// A [`BufferQueue`] capable of storing column values
+pub trait ValuesBuffer: BufferQueue {
+    /// Iterate through the indexes in `range` in reverse order, moving the value at each
+    /// index to the next index returned by `rev_valid_position_iter`

Review comment:
       the code also seems to assume that `rev_valid_position_iter` is sorted

##########
File path: parquet/src/column/reader/decoder.rs
##########
@@ -0,0 +1,253 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::ops::Range;
+
+use crate::basic::Encoding;
+use crate::data_type::DataType;
+use crate::decoding::{get_decoder, Decoder, DictDecoder, PlainDecoder};
+use crate::encodings::rle::RleDecoder;
+use crate::errors::{ParquetError, Result};
+use crate::memory::ByteBufferPtr;
+use crate::schema::types::ColumnDescPtr;
+use crate::util::bit_util::BitReader;
+
+/// A slice of levels buffer data that is written to by a [`ColumnLevelDecoder`]
+pub trait LevelsBufferSlice {

Review comment:
       I think I missed it somewhere along the line -- what is the point of Generisizing (sp?) levels, rather than just using `[i16]`? Can definition or repetition levels ever be something other than `i16`?
   
   

##########
File path: parquet/src/column/reader.rs
##########
@@ -102,36 +101,65 @@ pub fn get_typed_column_reader<T: DataType>(
 }
 
 /// Typed value reader for a particular primitive column.
-pub struct ColumnReaderImpl<T: DataType> {
+pub type ColumnReaderImpl<T> = GenericColumnReader<
+    decoder::ColumnLevelDecoderImpl,
+    decoder::ColumnLevelDecoderImpl,
+    decoder::ColumnValueDecoderImpl<T>,
+>;
+
+#[doc(hidden)]
+/// Reads data for a given column chunk, using the provided decoders:
+///
+/// - R: [`ColumnLevelDecoder`] used to decode repetition levels
+/// - D: [`ColumnLevelDecoder`] used to decode definition levels
+/// - V: [`ColumnValueDecoder`] used to decode value data
+pub struct GenericColumnReader<R, D, V> {
     descr: ColumnDescPtr,
-    def_level_decoder: Option<LevelDecoder>,
-    rep_level_decoder: Option<LevelDecoder>,
+
     page_reader: Box<dyn PageReader>,
-    current_encoding: Option<Encoding>,
 
-    // The total number of values stored in the data page.
+    /// The total number of values stored in the data page.
     num_buffered_values: u32,
 
-    // The number of values from the current data page that has been decoded into memory
-    // so far.
+    /// The number of values from the current data page that has been decoded into memory
+    /// so far.
     num_decoded_values: u32,
 
-    // Cache of decoders for existing encodings
-    decoders: HashMap<Encoding, Box<dyn Decoder<T>>>,

Review comment:
       For anyone else following along, the cache is moved into `ColumnValueDecoderImpl` below

##########
File path: parquet/src/arrow/record_reader/buffer.rs
##########
@@ -0,0 +1,191 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::marker::PhantomData;
+use std::ops::Range;
+
+use arrow::buffer::{Buffer, MutableBuffer};
+
+/// A buffer that supports writing new data to the end, and removing data from the front
+///
+/// Used by [RecordReader](`super::RecordReader`) to buffer up values before returning a
+/// potentially smaller number of values, corresponding to a whole number of semantic records
+pub trait BufferQueue: Sized {
+    type Output: Sized;
+
+    type Slice: ?Sized;
+
+    /// Split out the first `len` items
+    ///
+    /// # Panics
+    ///
+    /// Implementations must panic if `len` is beyond the length of [`BufferQueue`]
+    ///
+    fn split_off(&mut self, len: usize) -> Self::Output;
+
+    /// Returns a [`Self::Slice`] with at least `batch_size` capacity that can be used
+    /// to append data to the end of this [`BufferQueue`]
+    ///
+    /// NB: writes to the returned slice will not update the length of [`BufferQueue`]
+    /// instead a subsequent call should be made to [`BufferQueue::set_len`]
+    fn spare_capacity_mut(&mut self, batch_size: usize) -> &mut Self::Slice;
+
+    /// Sets the length of the [`BufferQueue`].
+    ///
+    /// Intended to be used in combination with [`BufferQueue::spare_capacity_mut`]
+    ///
+    /// # Panics
+    ///
+    /// Implementations must panic if `len` is beyond the initialized length
+    ///
+    /// Implementations may panic if `set_len` is called with less than what has been written
+    ///
+    /// This distinction is to allow for implementations that return a default initialized
+    /// [BufferQueue::Slice`] which doesn't track capacity and length separately
+    ///
+    /// For example, [`TypedBuffer<T>`] returns a default-initialized `&mut [T]`, and does not
+    /// track how much of this slice is actually written to by the caller. This is still
+    /// safe as the slice is default-initialized.
+    ///
+    fn set_len(&mut self, len: usize);
+}
+
+/// A typed buffer similar to [`Vec<T>`] but using [`MutableBuffer`] for storage
+pub struct TypedBuffer<T> {
+    buffer: MutableBuffer,
+
+    /// Length in elements of size T
+    len: usize,
+
+    /// Placeholder to allow `T` as an invariant generic parameter
+    _phantom: PhantomData<*mut T>,
+}
+
+impl<T> Default for TypedBuffer<T> {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl<T> TypedBuffer<T> {
+    pub fn new() -> Self {
+        Self {
+            buffer: MutableBuffer::new(0),
+            len: 0,
+            _phantom: Default::default(),
+        }
+    }
+
+    pub fn len(&self) -> usize {
+        self.len
+    }
+
+    pub fn is_empty(&self) -> bool {
+        self.len == 0
+    }
+
+    #[inline]
+    pub fn as_slice(&self) -> &[T] {
+        let (prefix, buf, suffix) = unsafe { self.buffer.as_slice().align_to::<T>() };
+        assert!(prefix.is_empty() && suffix.is_empty());
+        buf
+    }
+
+    #[inline]
+    pub fn as_slice_mut(&mut self) -> &mut [T] {
+        let (prefix, buf, suffix) =
+            unsafe { self.buffer.as_slice_mut().align_to_mut::<T>() };
+        assert!(prefix.is_empty() && suffix.is_empty());
+        buf
+    }
+}
+
+impl<T> BufferQueue for TypedBuffer<T> {
+    type Output = Buffer;
+
+    type Slice = [T];
+
+    fn split_off(&mut self, len: usize) -> Self::Output {
+        assert!(len <= self.len);
+
+        let num_bytes = len * std::mem::size_of::<T>();
+        let remaining_bytes = self.buffer.len() - num_bytes;
+        // TODO: Optimize to reduce the copy
+        // create an empty buffer, as it will be resized below
+        let mut remaining = MutableBuffer::new(0);
+        remaining.resize(remaining_bytes, 0);
+
+        let new_records = remaining.as_slice_mut();
+
+        new_records[0..remaining_bytes]
+            .copy_from_slice(&self.buffer.as_slice()[num_bytes..]);
+
+        self.buffer.resize(num_bytes, 0);
+        self.len -= len;
+
+        std::mem::replace(&mut self.buffer, remaining).into()
+    }
+
+    fn spare_capacity_mut(&mut self, batch_size: usize) -> &mut Self::Slice {
+        self.buffer
+            .resize((self.len + batch_size) * std::mem::size_of::<T>(), 0);

Review comment:
       Is it ok to initialize everything to `0`? I am wondering if `0` isn't a valid representation for some type `T`? Perhaps this should be `T::default()` instead?

##########
File path: parquet/src/arrow/record_reader/buffer.rs
##########
@@ -0,0 +1,191 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::marker::PhantomData;
+use std::ops::Range;
+
+use arrow::buffer::{Buffer, MutableBuffer};
+
+/// A buffer that supports writing new data to the end, and removing data from the front
+///
+/// Used by [RecordReader](`super::RecordReader`) to buffer up values before returning a
+/// potentially smaller number of values, corresponding to a whole number of semantic records
+pub trait BufferQueue: Sized {
+    type Output: Sized;
+
+    type Slice: ?Sized;
+
+    /// Split out the first `len` items
+    ///
+    /// # Panics
+    ///
+    /// Implementations must panic if `len` is beyond the length of [`BufferQueue`]
+    ///
+    fn split_off(&mut self, len: usize) -> Self::Output;
+
+    /// Returns a [`Self::Slice`] with at least `batch_size` capacity that can be used
+    /// to append data to the end of this [`BufferQueue`]
+    ///
+    /// NB: writes to the returned slice will not update the length of [`BufferQueue`]
+    /// instead a subsequent call should be made to [`BufferQueue::set_len`]
+    fn spare_capacity_mut(&mut self, batch_size: usize) -> &mut Self::Slice;
+
+    /// Sets the length of the [`BufferQueue`].
+    ///
+    /// Intended to be used in combination with [`BufferQueue::spare_capacity_mut`]
+    ///
+    /// # Panics
+    ///
+    /// Implementations must panic if `len` is beyond the initialized length
+    ///
+    /// Implementations may panic if `set_len` is called with less than what has been written
+    ///
+    /// This distinction is to allow for implementations that return a default initialized
+    /// [BufferQueue::Slice`] which doesn't track capacity and length separately
+    ///
+    /// For example, [`TypedBuffer<T>`] returns a default-initialized `&mut [T]`, and does not
+    /// track how much of this slice is actually written to by the caller. This is still
+    /// safe as the slice is default-initialized.
+    ///
+    fn set_len(&mut self, len: usize);
+}
+
+/// A typed buffer similar to [`Vec<T>`] but using [`MutableBuffer`] for storage
+pub struct TypedBuffer<T> {
+    buffer: MutableBuffer,
+
+    /// Length in elements of size T
+    len: usize,
+
+    /// Placeholder to allow `T` as an invariant generic parameter
+    _phantom: PhantomData<*mut T>,
+}
+
+impl<T> Default for TypedBuffer<T> {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl<T> TypedBuffer<T> {
+    pub fn new() -> Self {
+        Self {
+            buffer: MutableBuffer::new(0),
+            len: 0,
+            _phantom: Default::default(),
+        }
+    }
+
+    pub fn len(&self) -> usize {
+        self.len
+    }
+
+    pub fn is_empty(&self) -> bool {
+        self.len == 0
+    }
+
+    #[inline]
+    pub fn as_slice(&self) -> &[T] {
+        let (prefix, buf, suffix) = unsafe { self.buffer.as_slice().align_to::<T>() };
+        assert!(prefix.is_empty() && suffix.is_empty());
+        buf
+    }
+
+    #[inline]
+    pub fn as_slice_mut(&mut self) -> &mut [T] {
+        let (prefix, buf, suffix) =
+            unsafe { self.buffer.as_slice_mut().align_to_mut::<T>() };
+        assert!(prefix.is_empty() && suffix.is_empty());
+        buf
+    }
+}
+
+impl<T> BufferQueue for TypedBuffer<T> {
+    type Output = Buffer;
+
+    type Slice = [T];
+
+    fn split_off(&mut self, len: usize) -> Self::Output {
+        assert!(len <= self.len);
+
+        let num_bytes = len * std::mem::size_of::<T>();
+        let remaining_bytes = self.buffer.len() - num_bytes;
+        // TODO: Optimize to reduce the copy
+        // create an empty buffer, as it will be resized below
+        let mut remaining = MutableBuffer::new(0);
+        remaining.resize(remaining_bytes, 0);
+
+        let new_records = remaining.as_slice_mut();
+
+        new_records[0..remaining_bytes]
+            .copy_from_slice(&self.buffer.as_slice()[num_bytes..]);
+
+        self.buffer.resize(num_bytes, 0);
+        self.len -= len;
+
+        std::mem::replace(&mut self.buffer, remaining).into()
+    }
+
+    fn spare_capacity_mut(&mut self, batch_size: usize) -> &mut Self::Slice {
+        self.buffer
+            .resize((self.len + batch_size) * std::mem::size_of::<T>(), 0);
+
+        let range = self.len..self.len + batch_size;
+        &mut self.as_slice_mut()[range]
+    }
+
+    fn set_len(&mut self, len: usize) {
+        self.len = len;
+
+        let new_bytes = self.len * std::mem::size_of::<T>();
+        assert!(new_bytes <= self.buffer.len());
+        self.buffer.resize(new_bytes, 0);
+    }
+}
+
+/// A [`BufferQueue`] capable of storing column values
+pub trait ValuesBuffer: BufferQueue {
+    /// Iterate through the indexes in `range` in reverse order, moving the value at each
+    /// index to the next index returned by `rev_valid_position_iter`
+    ///
+    /// It is guaranteed that the `i`th index returned by `rev_valid_position_iter` is greater

Review comment:
       ```suggestion
       /// panics if  the `i`th index returned by `rev_valid_position_iter` is not greater
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] jorgecarleitao commented on a change in pull request #1041: Generify ColumnReaderImpl and RecordReader (#1040)

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041#discussion_r778449830



##########
File path: parquet/src/arrow/record_reader/buffer.rs
##########
@@ -0,0 +1,191 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::marker::PhantomData;
+use std::ops::Range;
+
+use arrow::buffer::{Buffer, MutableBuffer};
+
+/// A buffer that supports writing new data to the end, and removing data from the front
+///
+/// Used by [RecordReader](`super::RecordReader`) to buffer up values before returning a
+/// potentially smaller number of values, corresponding to a whole number of semantic records
+pub trait BufferQueue: Sized {
+    type Output: Sized;
+
+    type Slice: ?Sized;
+
+    /// Split out the first `len` items
+    ///
+    /// # Panics
+    ///
+    /// Implementations must panic if `len` is beyond the length of [`BufferQueue`]
+    ///
+    fn split_off(&mut self, len: usize) -> Self::Output;
+
+    /// Returns a [`Self::Slice`] with at least `batch_size` capacity that can be used
+    /// to append data to the end of this [`BufferQueue`]
+    ///
+    /// NB: writes to the returned slice will not update the length of [`BufferQueue`]
+    /// instead a subsequent call should be made to [`BufferQueue::set_len`]
+    fn spare_capacity_mut(&mut self, batch_size: usize) -> &mut Self::Slice;
+
+    /// Sets the length of the [`BufferQueue`].
+    ///
+    /// Intended to be used in combination with [`BufferQueue::spare_capacity_mut`]
+    ///
+    /// # Panics
+    ///
+    /// Implementations must panic if `len` is beyond the initialized length
+    ///
+    /// Implementations may panic if `set_len` is called with less than what has been written
+    ///
+    /// This distinction is to allow for implementations that return a default initialized
+    /// [BufferQueue::Slice`] which doesn't track capacity and length separately
+    ///
+    /// For example, [`TypedBuffer<T>`] returns a default-initialized `&mut [T]`, and does not
+    /// track how much of this slice is actually written to by the caller. This is still
+    /// safe as the slice is default-initialized.
+    ///
+    fn set_len(&mut self, len: usize);
+}
+
+/// A typed buffer similar to [`Vec<T>`] but using [`MutableBuffer`] for storage
+pub struct TypedBuffer<T> {
+    buffer: MutableBuffer,
+
+    /// Length in elements of size T
+    len: usize,
+
+    /// Placeholder to allow `T` as an invariant generic parameter
+    _phantom: PhantomData<*mut T>,
+}
+
+impl<T> Default for TypedBuffer<T> {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl<T> TypedBuffer<T> {
+    pub fn new() -> Self {
+        Self {
+            buffer: MutableBuffer::new(0),
+            len: 0,
+            _phantom: Default::default(),
+        }
+    }
+
+    pub fn len(&self) -> usize {
+        self.len
+    }
+
+    pub fn is_empty(&self) -> bool {
+        self.len == 0
+    }
+
+    #[inline]
+    pub fn as_slice(&self) -> &[T] {
+        let (prefix, buf, suffix) = unsafe { self.buffer.as_slice().align_to::<T>() };
+        assert!(prefix.is_empty() && suffix.is_empty());
+        buf
+    }
+
+    #[inline]
+    pub fn as_slice_mut(&mut self) -> &mut [T] {
+        let (prefix, buf, suffix) =
+            unsafe { self.buffer.as_slice_mut().align_to_mut::<T>() };
+        assert!(prefix.is_empty() && suffix.is_empty());
+        buf
+    }
+}
+
+impl<T> BufferQueue for TypedBuffer<T> {
+    type Output = Buffer;
+
+    type Slice = [T];
+
+    fn split_off(&mut self, len: usize) -> Self::Output {
+        assert!(len <= self.len);
+
+        let num_bytes = len * std::mem::size_of::<T>();
+        let remaining_bytes = self.buffer.len() - num_bytes;
+        // TODO: Optimize to reduce the copy
+        // create an empty buffer, as it will be resized below
+        let mut remaining = MutableBuffer::new(0);
+        remaining.resize(remaining_bytes, 0);
+
+        let new_records = remaining.as_slice_mut();
+
+        new_records[0..remaining_bytes]
+            .copy_from_slice(&self.buffer.as_slice()[num_bytes..]);
+
+        self.buffer.resize(num_bytes, 0);
+        self.len -= len;
+
+        std::mem::replace(&mut self.buffer, remaining).into()
+    }
+
+    fn spare_capacity_mut(&mut self, batch_size: usize) -> &mut Self::Slice {
+        self.buffer
+            .resize((self.len + batch_size) * std::mem::size_of::<T>(), 0);

Review comment:
       you mean e.g. use 128 bytes instead of the minimum layout required by T? I do not think it is possible on the stable channel, no.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] tustvold commented on a change in pull request #1041: Generify ColumnReaderImpl and RecordReader (#1040)

Posted by GitBox <gi...@apache.org>.
tustvold commented on a change in pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041#discussion_r767699564



##########
File path: parquet/src/arrow/record_reader.rs
##########
@@ -16,75 +16,248 @@
 // under the License.
 
 use std::cmp::{max, min};
-use std::mem::{replace, size_of};
-
-use crate::column::{page::PageReader, reader::ColumnReaderImpl};
+use std::marker::PhantomData;
+use std::mem::replace;
+use std::ops::Range;
+
+use crate::arrow::record_reader::private::{
+    DefinitionLevels, RecordBuffer, RepetitionLevels,
+};
+use crate::column::{
+    page::PageReader,
+    reader::{
+        private::{
+            ColumnLevelDecoder, ColumnLevelDecoderImpl, ColumnValueDecoder,
+            ColumnValueDecoderImpl,
+        },
+        GenericColumnReader,
+    },
+};
 use crate::data_type::DataType;
-use crate::errors::{ParquetError, Result};
+use crate::errors::Result;
 use crate::schema::types::ColumnDescPtr;
 use arrow::array::BooleanBufferBuilder;
 use arrow::bitmap::Bitmap;
 use arrow::buffer::{Buffer, MutableBuffer};
 
+pub(crate) mod private {
+    use super::*;
+
+    pub trait RecordBuffer: Sized + Default {
+        type Output: Sized;
+
+        type Writer: ?Sized;
+
+        /// Split out `len` items
+        fn split(&mut self, len: usize) -> Self::Output;
+
+        /// Get a writer with `batch_size` capacity
+        fn writer(&mut self, batch_size: usize) -> &mut Self::Writer;
+
+        /// Record a write of `len` items
+        fn commit(&mut self, len: usize);
+    }
+
+    pub trait RepetitionLevels: RecordBuffer {
+        /// Inspects the buffered repetition levels in `range` and returns the number of
+        /// "complete" records along with the corresponding number of values
+        ///
+        /// A "complete" record is one where the buffer contains a subsequent repetition level of 0
+        fn count_records(
+            &self,
+            range: Range<usize>,
+            max_records: usize,
+        ) -> (usize, usize);
+    }
+
+    pub trait DefinitionLevels: RecordBuffer {
+        /// Update the provided validity mask based on contained levels
+        fn update_valid_mask(
+            &self,
+            valid: &mut BooleanBufferBuilder,
+            range: Range<usize>,
+            max_level: i16,
+        );
+    }
+
+    pub struct TypedBuffer<T> {
+        buffer: MutableBuffer,
+
+        /// Length in elements of size T
+        len: usize,
+
+        /// Placeholder to allow `T` as an invariant generic parameter
+        _phantom: PhantomData<*mut T>,
+    }
+
+    impl<T> Default for TypedBuffer<T> {
+        fn default() -> Self {
+            Self {
+                buffer: MutableBuffer::new(0),
+                len: 0,
+                _phantom: Default::default(),
+            }
+        }
+    }
+
+    impl<T> RecordBuffer for TypedBuffer<T> {
+        type Output = Buffer;
+
+        type Writer = [T];
+
+        fn split(&mut self, len: usize) -> Self::Output {
+            let num_bytes = len * std::mem::size_of::<T>();
+            let remaining_bytes = self.buffer.len() - num_bytes;
+            // TODO: Optimize to reduce the copy
+            // create an empty buffer, as it will be resized below
+            let mut remaining = MutableBuffer::new(0);
+            remaining.resize(remaining_bytes, 0);
+
+            let new_records = remaining.as_slice_mut();
+
+            new_records[0..remaining_bytes]
+                .copy_from_slice(&self.buffer.as_slice()[num_bytes..]);
+
+            self.buffer.resize(num_bytes, 0);
+
+            replace(&mut self.buffer, remaining).into()
+        }
+
+        fn writer(&mut self, batch_size: usize) -> &mut Self::Writer {
+            self.buffer
+                .resize((self.len + batch_size) * std::mem::size_of::<T>(), 0);
+
+            let (prefix, values, suffix) =
+                unsafe { self.buffer.as_slice_mut().align_to_mut::<T>() };
+            assert!(prefix.is_empty() && suffix.is_empty());
+
+            &mut values[self.len..self.len + batch_size]
+        }
+
+        fn commit(&mut self, len: usize) {
+            self.len = len;
+
+            let new_bytes = self.len * std::mem::size_of::<T>();
+            assert!(new_bytes <= self.buffer.len());
+            self.buffer.resize(new_bytes, 0);
+        }
+    }
+
+    impl RepetitionLevels for TypedBuffer<i16> {
+        fn count_records(
+            &self,
+            range: Range<usize>,
+            max_records: usize,
+        ) -> (usize, usize) {
+            let (prefix, buf, suffix) =
+                unsafe { self.buffer.as_slice().align_to::<i16>() };
+            assert!(prefix.is_empty() && suffix.is_empty());
+
+            let start = range.start;
+            let mut records_read = 0;
+            let mut end_of_last_record = start;
+
+            for current in range {
+                if buf[current] == 0 && current != end_of_last_record {
+                    records_read += 1;
+                    end_of_last_record = current;
+
+                    if records_read == max_records {
+                        break;
+                    }
+                }
+            }
+
+            (records_read, end_of_last_record - start)
+        }
+    }
+
+    impl DefinitionLevels for TypedBuffer<i16> {
+        fn update_valid_mask(
+            &self,
+            null_mask: &mut BooleanBufferBuilder,
+            range: Range<usize>,
+            max_level: i16,
+        ) {
+            let (prefix, buf, suffix) =
+                unsafe { self.buffer.as_slice().align_to::<i16>() };
+            assert!(prefix.is_empty() && suffix.is_empty());
+
+            for i in &buf[range] {
+                null_mask.append(*i == max_level)
+            }
+        }
+    }
+}
+
 const MIN_BATCH_SIZE: usize = 1024;
 
 /// A `RecordReader` is a stateful column reader that delimits semantic records.
-pub struct RecordReader<T: DataType> {
+pub type RecordReader<T> = GenericRecordReader<
+    private::TypedBuffer<i16>,
+    private::TypedBuffer<i16>,
+    private::TypedBuffer<<T as DataType>::T>,
+    ColumnLevelDecoderImpl,
+    ColumnLevelDecoderImpl,
+    ColumnValueDecoderImpl<T>,
+>;
+
+#[doc(hidden)]

Review comment:
       This type is hidden from the docs, and the private module makes it impossible for users to directly construct this. The direct implication is we can finagle this type signature however we want and it not leak into downstreams




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] tustvold commented on a change in pull request #1041: Generify ColumnReaderImpl and RecordReader (#1040)

Posted by GitBox <gi...@apache.org>.
tustvold commented on a change in pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041#discussion_r767696543



##########
File path: parquet/src/arrow/record_reader.rs
##########
@@ -16,75 +16,248 @@
 // under the License.
 
 use std::cmp::{max, min};
-use std::mem::{replace, size_of};
-
-use crate::column::{page::PageReader, reader::ColumnReaderImpl};
+use std::marker::PhantomData;
+use std::mem::replace;
+use std::ops::Range;
+
+use crate::arrow::record_reader::private::{
+    DefinitionLevels, RecordBuffer, RepetitionLevels,
+};
+use crate::column::{
+    page::PageReader,
+    reader::{
+        private::{
+            ColumnLevelDecoder, ColumnLevelDecoderImpl, ColumnValueDecoder,
+            ColumnValueDecoderImpl,
+        },
+        GenericColumnReader,
+    },
+};
 use crate::data_type::DataType;
-use crate::errors::{ParquetError, Result};
+use crate::errors::Result;
 use crate::schema::types::ColumnDescPtr;
 use arrow::array::BooleanBufferBuilder;
 use arrow::bitmap::Bitmap;
 use arrow::buffer::{Buffer, MutableBuffer};
 
+pub(crate) mod private {

Review comment:
       This is effectively my workaround for #1032 - these traits and accompanying types should not be part of the public API




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] tustvold edited a comment on pull request #1041: Generify ColumnReaderImpl and RecordReader (#1040)

Posted by GitBox <gi...@apache.org>.
tustvold edited a comment on pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041#issuecomment-993816497


   Running benchmarks on my local machine I get somewhat erratic results, from which I conclude this has no major impact on performance
   
   ```
   arrow_array_reader/read Int32Array, plain encoded, mandatory, no NULLs - old                                                                             
                           time:   [3.7939 us 3.8031 us 3.8114 us]
                           change: [-3.6579% -3.4154% -3.1951%] (p = 0.00 < 0.05)
                           Performance has improved.
   arrow_array_reader/read Int32Array, plain encoded, mandatory, no NULLs - new                                                                             
                           time:   [2.3030 us 2.3048 us 2.3073 us]
                           change: [+2.5908% +2.7441% +2.9142%] (p = 0.00 < 0.05)
                           Performance has regressed.
   arrow_array_reader/read Int32Array, plain encoded, optional, no NULLs - old                                                                            
                           time:   [59.193 us 59.275 us 59.363 us]
                           change: [-4.2623% -4.1285% -4.0009%] (p = 0.00 < 0.05)
                           Performance has improved.
   arrow_array_reader/read Int32Array, plain encoded, optional, no NULLs - new                                                                             
                           time:   [23.209 us 23.221 us 23.236 us]
                           change: [+32.531% +32.663% +32.835%] (p = 0.00 < 0.05)
                           Performance has regressed.
   arrow_array_reader/read Int32Array, plain encoded, optional, half NULLs - old                                                                            
                           time:   [142.37 us 142.41 us 142.44 us]
                           change: [+5.5942% +6.6789% +7.7376%] (p = 0.00 < 0.05)
                           Performance has regressed.
   arrow_array_reader/read Int32Array, plain encoded, optional, half NULLs - new                                                                            
                           time:   [139.07 us 139.89 us 140.59 us]
                           change: [+0.4422% +0.9960% +1.6028%] (p = 0.00 < 0.05)
                           Change within noise threshold.
   arrow_array_reader/read Int32Array, dictionary encoded, mandatory, no NULLs - old                                                                             
                           time:   [21.919 us 21.923 us 21.927 us]
                           change: [+1.3392% +1.7681% +2.0113%] (p = 0.00 < 0.05)
                           Performance has regressed.
   arrow_array_reader/read Int32Array, dictionary encoded, mandatory, no NULLs - new                                                                            
                           time:   [99.347 us 101.00 us 102.37 us]
                           change: [+5.5715% +6.7636% +8.2107%] (p = 0.00 < 0.05)
                           Performance has regressed.
   arrow_array_reader/read Int32Array, dictionary encoded, optional, no NULLs - old                                                                            
                           time:   [75.648 us 75.663 us 75.681 us]
                           change: [-1.5816% -1.5384% -1.4963%] (p = 0.00 < 0.05)
                           Performance has improved.
   arrow_array_reader/read Int32Array, dictionary encoded, optional, no NULLs - new                                                                            
                           time:   [112.52 us 113.33 us 114.36 us]
                           change: [+5.2751% +7.2166% +9.0108%] (p = 0.00 < 0.05)
                           Performance has regressed.
   arrow_array_reader/read Int32Array, dictionary encoded, optional, half NULLs - old                                                                            
                           time:   [144.77 us 144.80 us 144.83 us]
                           change: [-11.013% -10.318% -9.6258%] (p = 0.00 < 0.05)
                           Performance has improved.
   arrow_array_reader/read Int32Array, dictionary encoded, optional, half NULLs - new                                                                            
                           time:   [191.06 us 191.12 us 191.18 us]
                           change: [+3.4773% +3.5370% +3.5957%] (p = 0.00 < 0.05)
                           Performance has regressed.
   arrow_array_reader/read StringArray, plain encoded, mandatory, no NULLs - old                                                                            
                           time:   [800.06 us 800.19 us 800.32 us]
                           change: [-1.6826% -1.6388% -1.5967%] (p = 0.00 < 0.05)
                           Performance has improved.
   arrow_array_reader/read StringArray, plain encoded, mandatory, no NULLs - new                                                                            
                           time:   [124.84 us 124.86 us 124.88 us]
                           change: [+4.1077% +4.1575% +4.2088%] (p = 0.00 < 0.05)
                           Performance has regressed.
   arrow_array_reader/read StringArray, plain encoded, optional, no NULLs - old                                                                            
                           time:   [846.35 us 846.59 us 846.87 us]
                           change: [+0.8637% +0.9228% +0.9834%] (p = 0.00 < 0.05)
                           Change within noise threshold.
   arrow_array_reader/read StringArray, plain encoded, optional, no NULLs - new                                                                            
                           time:   [143.25 us 143.30 us 143.35 us]
                           change: [+2.6977% +2.7794% +2.8847%] (p = 0.00 < 0.05)
                           Performance has regressed.
   arrow_array_reader/read StringArray, plain encoded, optional, half NULLs - old                                                                            
                           time:   [773.74 us 776.61 us 779.87 us]
                           change: [+3.2218% +3.4681% +3.7063%] (p = 0.00 < 0.05)
                           Performance has regressed.
   arrow_array_reader/read StringArray, plain encoded, optional, half NULLs - new                                                                            
                           time:   [264.22 us 264.80 us 265.57 us]
                           change: [-1.3401% -1.1712% -0.9903%] (p = 0.00 < 0.05)
                           Change within noise threshold.
   arrow_array_reader/read StringArray, dictionary encoded, mandatory, no NULLs - old                                                                            
                           time:   [726.17 us 726.74 us 727.44 us]
                           change: [+1.2812% +1.3725% +1.4618%] (p = 0.00 < 0.05)
                           Performance has regressed.
   arrow_array_reader/read StringArray, dictionary encoded, mandatory, no NULLs - new                                                                            
                           time:   [116.83 us 116.91 us 116.99 us]
                           change: [-3.2217% -3.0893% -2.9282%] (p = 0.00 < 0.05)
                           Performance has improved.
   arrow_array_reader/read StringArray, dictionary encoded, optional, no NULLs - old                                                                            
                           time:   [802.16 us 803.89 us 805.57 us]
                           change: [-0.4055% -0.2549% -0.1073%] (p = 0.00 < 0.05)
                           Change within noise threshold.
   arrow_array_reader/read StringArray, dictionary encoded, optional, no NULLs - new                                                                            
                           time:   [134.39 us 134.43 us 134.48 us]
                           change: [+0.0304% +0.2086% +0.3678%] (p = 0.02 < 0.05)
                           Change within noise threshold.
   arrow_array_reader/read StringArray, dictionary encoded, optional, half NULLs - old                                                                            
                           time:   [742.00 us 742.57 us 743.00 us]
                           change: [+3.4464% +3.6453% +3.8440%] (p = 0.00 < 0.05)
                           Performance has regressed.
   arrow_array_reader/read StringArray, dictionary encoded, optional, half NULLs - new                                                                            
                           time:   [236.67 us 237.14 us 238.07 us]
                           change: [+1.7094% +1.9629% +2.5264%] (p = 0.00 < 0.05)
                           Performance has regressed.
   ```
   
   What is strange to me is that this seems to have a consistent ~5% impact on the "new" `ArrowArrayReader` despite this change touching none of the code used by it. I suspect we're in the weeds of the wims of LLVM, which I'm not really sure it makes sense to optimise for at this stage - there's a lot of lower hanging fruit. It's also worth noting that `ArrowArrayReader` is not used for anything bar strings at this stage, and I intend to introduce an optimised StringArrayReader that should be significantly faster.
   
   My takeaway - no major cause for concern at this stage


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] codecov-commenter commented on pull request #1041: Generify ColumnReaderImpl and RecordReader (#1040)

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041#issuecomment-992420848


   # [Codecov](https://codecov.io/gh/apache/arrow-rs/pull/1041?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#1041](https://codecov.io/gh/apache/arrow-rs/pull/1041?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (be0fc1b) into [master](https://codecov.io/gh/apache/arrow-rs/commit/239cba141cb27519b7c32d58a3ea6447fda31d11?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (239cba1) will **decrease** coverage by `0.02%`.
   > The diff coverage is `80.76%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/arrow-rs/pull/1041/graphs/tree.svg?width=650&height=150&src=pr&token=pq9V9qWZ1N&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/arrow-rs/pull/1041?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #1041      +/-   ##
   ==========================================
   - Coverage   82.31%   82.28%   -0.03%     
   ==========================================
     Files         168      168              
     Lines       49031    49047      +16     
   ==========================================
   - Hits        40360    40359       -1     
   - Misses       8671     8688      +17     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow-rs/pull/1041?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [parquet/src/arrow/array\_reader.rs](https://codecov.io/gh/apache/arrow-rs/pull/1041/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvYXJyb3cvYXJyYXlfcmVhZGVyLnJz) | `76.82% <ø> (+0.06%)` | :arrow_up: |
   | [parquet/src/column/reader.rs](https://codecov.io/gh/apache/arrow-rs/pull/1041/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvY29sdW1uL3JlYWRlci5ycw==) | `71.23% <76.54%> (-1.10%)` | :arrow_down: |
   | [parquet/src/arrow/record\_reader.rs](https://codecov.io/gh/apache/arrow-rs/pull/1041/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvYXJyb3cvcmVjb3JkX3JlYWRlci5ycw==) | `93.96% <87.50%> (+0.23%)` | :arrow_up: |
   | [parquet/src/util/memory.rs](https://codecov.io/gh/apache/arrow-rs/pull/1041/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvdXRpbC9tZW1vcnkucnM=) | `91.12% <100.00%> (+0.08%)` | :arrow_up: |
   | [arrow/src/datatypes/datatype.rs](https://codecov.io/gh/apache/arrow-rs/pull/1041/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YXJyb3cvc3JjL2RhdGF0eXBlcy9kYXRhdHlwZS5ycw==) | `65.95% <0.00%> (-0.43%)` | :arrow_down: |
   | [parquet/src/data\_type.rs](https://codecov.io/gh/apache/arrow-rs/pull/1041/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvZGF0YV90eXBlLnJz) | `76.61% <0.00%> (-0.24%)` | :arrow_down: |
   | [parquet\_derive/src/parquet\_field.rs](https://codecov.io/gh/apache/arrow-rs/pull/1041/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldF9kZXJpdmUvc3JjL3BhcnF1ZXRfZmllbGQucnM=) | `65.98% <0.00%> (-0.23%)` | :arrow_down: |
   | [arrow/src/array/transform/mod.rs](https://codecov.io/gh/apache/arrow-rs/pull/1041/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YXJyb3cvc3JjL2FycmF5L3RyYW5zZm9ybS9tb2QucnM=) | `85.24% <0.00%> (+0.13%)` | :arrow_up: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow-rs/pull/1041?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow-rs/pull/1041?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [239cba1...be0fc1b](https://codecov.io/gh/apache/arrow-rs/pull/1041?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] yordan-pavlov commented on a change in pull request #1041: Generify ColumnReaderImpl and RecordReader (#1040)

Posted by GitBox <gi...@apache.org>.
yordan-pavlov commented on a change in pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041#discussion_r774814520



##########
File path: parquet/src/column/reader/decoder.rs
##########
@@ -0,0 +1,215 @@
+use std::collections::HashMap;
+use std::ops::Range;
+
+use crate::basic::Encoding;
+use crate::data_type::DataType;
+use crate::decoding::{get_decoder, Decoder, DictDecoder, PlainDecoder};
+use crate::encodings::rle::RleDecoder;
+use crate::errors::{ParquetError, Result};
+use crate::memory::ByteBufferPtr;
+use crate::schema::types::ColumnDescPtr;
+use crate::util::bit_util::BitReader;
+
+/// A type that can have level data written to it by a [`ColumnLevelDecoder`]
+pub trait LevelsWriter {
+    fn capacity(&self) -> usize;
+
+    fn get(&self, idx: usize) -> i16;
+
+    fn count_nulls(&self, range: Range<usize>, max_level: i16) -> usize;
+}
+
+impl LevelsWriter for [i16] {
+    fn capacity(&self) -> usize {
+        self.len()
+    }
+
+    fn get(&self, idx: usize) -> i16 {
+        self[idx]
+    }
+
+    fn count_nulls(&self, range: Range<usize>, max_level: i16) -> usize {
+        self[range].iter().filter(|i| **i != max_level).count()
+    }
+}
+
+/// A type that can have value data written to it by a [`ColumnValueDecoder`]
+pub trait ValuesWriter {
+    fn capacity(&self) -> usize;
+}
+
+impl<T> ValuesWriter for [T] {
+    fn capacity(&self) -> usize {
+        self.len()
+    }
+}
+
+/// Decodes level data to a [`LevelsWriter`]
+pub trait ColumnLevelDecoder {
+    type Writer: LevelsWriter + ?Sized;
+
+    fn create(max_level: i16, encoding: Encoding, data: ByteBufferPtr) -> Self;
+
+    fn read(&mut self, out: &mut Self::Writer, range: Range<usize>) -> Result<usize>;
+}
+
+/// Decodes value data to a [`ValuesWriter`]
+pub trait ColumnValueDecoder {
+    type Writer: ValuesWriter + ?Sized;
+
+    fn create(col: &ColumnDescPtr) -> Self;
+
+    fn set_dict(
+        &mut self,
+        buf: ByteBufferPtr,
+        num_values: u32,
+        encoding: Encoding,
+        is_sorted: bool,
+    ) -> Result<()>;
+
+    fn set_data(
+        &mut self,
+        encoding: Encoding,
+        data: ByteBufferPtr,
+        num_values: usize,
+    ) -> Result<()>;
+
+    fn read(&mut self, out: &mut Self::Writer, range: Range<usize>) -> Result<usize>;
+}
+
+/// An implementation of [`ColumnValueDecoder`] for `[T::T]`
+pub struct ColumnValueDecoderImpl<T: DataType> {
+    descr: ColumnDescPtr,
+
+    current_encoding: Option<Encoding>,
+
+    // Cache of decoders for existing encodings
+    decoders: HashMap<Encoding, Box<dyn Decoder<T>>>,
+}
+
+impl<T: DataType> ColumnValueDecoder for ColumnValueDecoderImpl<T> {
+    type Writer = [T::T];
+
+    fn create(descr: &ColumnDescPtr) -> Self {
+        Self {
+            descr: descr.clone(),
+            current_encoding: None,
+            decoders: Default::default(),
+        }
+    }
+
+    fn set_dict(
+        &mut self,
+        buf: ByteBufferPtr,
+        num_values: u32,
+        mut encoding: Encoding,
+        _is_sorted: bool,
+    ) -> Result<()> {
+        if encoding == Encoding::PLAIN || encoding == Encoding::PLAIN_DICTIONARY {
+            encoding = Encoding::RLE_DICTIONARY
+        }
+
+        if self.decoders.contains_key(&encoding) {
+            return Err(general_err!("Column cannot have more than one dictionary"));
+        }
+
+        if encoding == Encoding::RLE_DICTIONARY {
+            let mut dictionary = PlainDecoder::<T>::new(self.descr.type_length());
+            dictionary.set_data(buf, num_values as usize)?;
+
+            let mut decoder = DictDecoder::new();
+            decoder.set_dict(Box::new(dictionary))?;
+            self.decoders.insert(encoding, Box::new(decoder));
+            Ok(())
+        } else {
+            Err(nyi_err!(
+                "Invalid/Unsupported encoding type for dictionary: {}",
+                encoding
+            ))
+        }
+    }
+
+    fn set_data(
+        &mut self,
+        mut encoding: Encoding,
+        data: ByteBufferPtr,
+        num_values: usize,
+    ) -> Result<()> {
+        if encoding == Encoding::PLAIN_DICTIONARY {
+            encoding = Encoding::RLE_DICTIONARY;
+        }
+
+        let decoder = if encoding == Encoding::RLE_DICTIONARY {
+            self.decoders
+                .get_mut(&encoding)
+                .expect("Decoder for dict should have been set")
+        } else {
+            // Search cache for data page decoder
+            #[allow(clippy::map_entry)]
+            if !self.decoders.contains_key(&encoding) {
+                // Initialize decoder for this page
+                let data_decoder = get_decoder::<T>(self.descr.clone(), encoding)?;
+                self.decoders.insert(encoding, data_decoder);
+            }
+            self.decoders.get_mut(&encoding).unwrap()
+        };
+
+        decoder.set_data(data, num_values)?;
+        self.current_encoding = Some(encoding);
+        Ok(())
+    }
+
+    fn read(&mut self, out: &mut Self::Writer, range: Range<usize>) -> Result<usize> {
+        let encoding = self
+            .current_encoding
+            .expect("current_encoding should be set");
+
+        let current_decoder = self

Review comment:
       why not set a `current_decoder` field in the `set_data` method (where the decoder has to be selected anyway to call `set_data` on it), so that it doesn't have to be looked up on every call of `read` here? It should perform better (no lookup) and simplify this `read` method as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] yordan-pavlov commented on a change in pull request #1041: Generify ColumnReaderImpl and RecordReader (#1040)

Posted by GitBox <gi...@apache.org>.
yordan-pavlov commented on a change in pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041#discussion_r774813759



##########
File path: parquet/src/column/reader/decoder.rs
##########
@@ -0,0 +1,215 @@
+use std::collections::HashMap;
+use std::ops::Range;
+
+use crate::basic::Encoding;
+use crate::data_type::DataType;
+use crate::decoding::{get_decoder, Decoder, DictDecoder, PlainDecoder};
+use crate::encodings::rle::RleDecoder;
+use crate::errors::{ParquetError, Result};
+use crate::memory::ByteBufferPtr;
+use crate::schema::types::ColumnDescPtr;
+use crate::util::bit_util::BitReader;
+
+/// A type that can have level data written to it by a [`ColumnLevelDecoder`]
+pub trait LevelsWriter {
+    fn capacity(&self) -> usize;
+
+    fn get(&self, idx: usize) -> i16;
+
+    fn count_nulls(&self, range: Range<usize>, max_level: i16) -> usize;
+}
+
+impl LevelsWriter for [i16] {
+    fn capacity(&self) -> usize {
+        self.len()
+    }
+
+    fn get(&self, idx: usize) -> i16 {
+        self[idx]
+    }
+
+    fn count_nulls(&self, range: Range<usize>, max_level: i16) -> usize {
+        self[range].iter().filter(|i| **i != max_level).count()
+    }
+}
+
+/// A type that can have value data written to it by a [`ColumnValueDecoder`]
+pub trait ValuesWriter {
+    fn capacity(&self) -> usize;
+}
+
+impl<T> ValuesWriter for [T] {
+    fn capacity(&self) -> usize {
+        self.len()
+    }
+}
+
+/// Decodes level data to a [`LevelsWriter`]
+pub trait ColumnLevelDecoder {
+    type Writer: LevelsWriter + ?Sized;
+
+    fn create(max_level: i16, encoding: Encoding, data: ByteBufferPtr) -> Self;
+
+    fn read(&mut self, out: &mut Self::Writer, range: Range<usize>) -> Result<usize>;
+}
+
+/// Decodes value data to a [`ValuesWriter`]
+pub trait ColumnValueDecoder {
+    type Writer: ValuesWriter + ?Sized;
+
+    fn create(col: &ColumnDescPtr) -> Self;
+
+    fn set_dict(
+        &mut self,
+        buf: ByteBufferPtr,
+        num_values: u32,
+        encoding: Encoding,
+        is_sorted: bool,
+    ) -> Result<()>;
+
+    fn set_data(
+        &mut self,
+        encoding: Encoding,
+        data: ByteBufferPtr,
+        num_values: usize,
+    ) -> Result<()>;
+
+    fn read(&mut self, out: &mut Self::Writer, range: Range<usize>) -> Result<usize>;
+}
+
+/// An implementation of [`ColumnValueDecoder`] for `[T::T]`
+pub struct ColumnValueDecoderImpl<T: DataType> {
+    descr: ColumnDescPtr,
+
+    current_encoding: Option<Encoding>,
+
+    // Cache of decoders for existing encodings
+    decoders: HashMap<Encoding, Box<dyn Decoder<T>>>,
+}
+
+impl<T: DataType> ColumnValueDecoder for ColumnValueDecoderImpl<T> {
+    type Writer = [T::T];
+
+    fn create(descr: &ColumnDescPtr) -> Self {
+        Self {
+            descr: descr.clone(),
+            current_encoding: None,
+            decoders: Default::default(),
+        }
+    }
+
+    fn set_dict(
+        &mut self,
+        buf: ByteBufferPtr,
+        num_values: u32,
+        mut encoding: Encoding,
+        _is_sorted: bool,
+    ) -> Result<()> {
+        if encoding == Encoding::PLAIN || encoding == Encoding::PLAIN_DICTIONARY {
+            encoding = Encoding::RLE_DICTIONARY
+        }
+
+        if self.decoders.contains_key(&encoding) {
+            return Err(general_err!("Column cannot have more than one dictionary"));
+        }
+
+        if encoding == Encoding::RLE_DICTIONARY {
+            let mut dictionary = PlainDecoder::<T>::new(self.descr.type_length());
+            dictionary.set_data(buf, num_values as usize)?;
+
+            let mut decoder = DictDecoder::new();
+            decoder.set_dict(Box::new(dictionary))?;
+            self.decoders.insert(encoding, Box::new(decoder));
+            Ok(())
+        } else {
+            Err(nyi_err!(
+                "Invalid/Unsupported encoding type for dictionary: {}",
+                encoding
+            ))
+        }
+    }
+
+    fn set_data(
+        &mut self,
+        mut encoding: Encoding,
+        data: ByteBufferPtr,
+        num_values: usize,
+    ) -> Result<()> {
+        if encoding == Encoding::PLAIN_DICTIONARY {
+            encoding = Encoding::RLE_DICTIONARY;
+        }
+
+        let decoder = if encoding == Encoding::RLE_DICTIONARY {
+            self.decoders
+                .get_mut(&encoding)
+                .expect("Decoder for dict should have been set")
+        } else {
+            // Search cache for data page decoder
+            #[allow(clippy::map_entry)]
+            if !self.decoders.contains_key(&encoding) {
+                // Initialize decoder for this page
+                let data_decoder = get_decoder::<T>(self.descr.clone(), encoding)?;
+                self.decoders.insert(encoding, data_decoder);

Review comment:
       since we already have the `data_decoder` why not just return it directly after inserting, instead of looking it up a bit later?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] yordan-pavlov commented on a change in pull request #1041: Generify ColumnReaderImpl and RecordReader (#1040)

Posted by GitBox <gi...@apache.org>.
yordan-pavlov commented on a change in pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041#discussion_r774811594



##########
File path: parquet/src/column/reader/decoder.rs
##########
@@ -0,0 +1,215 @@
+use std::collections::HashMap;
+use std::ops::Range;
+
+use crate::basic::Encoding;
+use crate::data_type::DataType;
+use crate::decoding::{get_decoder, Decoder, DictDecoder, PlainDecoder};
+use crate::encodings::rle::RleDecoder;
+use crate::errors::{ParquetError, Result};
+use crate::memory::ByteBufferPtr;
+use crate::schema::types::ColumnDescPtr;
+use crate::util::bit_util::BitReader;
+
+/// A type that can have level data written to it by a [`ColumnLevelDecoder`]
+pub trait LevelsWriter {
+    fn capacity(&self) -> usize;
+
+    fn get(&self, idx: usize) -> i16;
+
+    fn count_nulls(&self, range: Range<usize>, max_level: i16) -> usize;
+}
+
+impl LevelsWriter for [i16] {
+    fn capacity(&self) -> usize {
+        self.len()
+    }
+
+    fn get(&self, idx: usize) -> i16 {
+        self[idx]
+    }
+
+    fn count_nulls(&self, range: Range<usize>, max_level: i16) -> usize {
+        self[range].iter().filter(|i| **i != max_level).count()
+    }
+}
+
+/// A type that can have value data written to it by a [`ColumnValueDecoder`]
+pub trait ValuesWriter {
+    fn capacity(&self) -> usize;
+}
+
+impl<T> ValuesWriter for [T] {
+    fn capacity(&self) -> usize {
+        self.len()
+    }
+}
+
+/// Decodes level data to a [`LevelsWriter`]
+pub trait ColumnLevelDecoder {
+    type Writer: LevelsWriter + ?Sized;
+
+    fn create(max_level: i16, encoding: Encoding, data: ByteBufferPtr) -> Self;

Review comment:
       wouldn't a `new` method be more idiomatic (instead of `create` )?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] tustvold commented on a change in pull request #1041: Generify ColumnReaderImpl and RecordReader (#1040)

Posted by GitBox <gi...@apache.org>.
tustvold commented on a change in pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041#discussion_r777130285



##########
File path: parquet/src/arrow/record_reader/buffer.rs
##########
@@ -0,0 +1,137 @@
+use std::marker::PhantomData;
+use std::ops::Range;
+
+use arrow::buffer::{Buffer, MutableBuffer};
+
+pub trait RecordBuffer: Sized {
+    type Output: Sized;
+
+    type Writer: ?Sized;
+
+    /// Split out `len` items
+    fn split(&mut self, len: usize) -> Self::Output;
+
+    /// Get a writer with `batch_size` capacity
+    fn writer(&mut self, batch_size: usize) -> &mut Self::Writer;
+
+    /// Record a write of `len` items
+    fn commit(&mut self, len: usize);
+}
+
+pub struct TypedBuffer<T> {
+    buffer: MutableBuffer,
+
+    /// Length in elements of size T
+    len: usize,
+
+    /// Placeholder to allow `T` as an invariant generic parameter
+    _phantom: PhantomData<*mut T>,
+}
+
+impl<T> Default for TypedBuffer<T> {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl<T> TypedBuffer<T> {
+    pub fn new() -> Self {
+        Self {
+            buffer: MutableBuffer::new(0),
+            len: 0,
+            _phantom: Default::default(),
+        }
+    }
+
+    pub fn len(&self) -> usize {
+        self.len
+    }
+
+    pub fn is_empty(&self) -> bool {
+        self.len == 0
+    }
+
+    #[inline]
+    pub fn as_slice(&self) -> &[T] {
+        let (prefix, buf, suffix) = unsafe { self.buffer.as_slice().align_to::<T>() };
+        assert!(prefix.is_empty() && suffix.is_empty());
+        buf
+    }
+
+    #[inline]
+    pub fn as_slice_mut(&mut self) -> &mut [T] {
+        let (prefix, buf, suffix) =
+            unsafe { self.buffer.as_slice_mut().align_to_mut::<T>() };
+        assert!(prefix.is_empty() && suffix.is_empty());
+        buf
+    }
+}
+
+impl<T> RecordBuffer for TypedBuffer<T> {
+    type Output = Buffer;
+
+    type Writer = [T];
+
+    fn split(&mut self, len: usize) -> Self::Output {
+        assert!(len <= self.len);
+
+        let num_bytes = len * std::mem::size_of::<T>();
+        let remaining_bytes = self.buffer.len() - num_bytes;
+        // TODO: Optimize to reduce the copy
+        // create an empty buffer, as it will be resized below
+        let mut remaining = MutableBuffer::new(0);
+        remaining.resize(remaining_bytes, 0);
+
+        let new_records = remaining.as_slice_mut();
+
+        new_records[0..remaining_bytes]
+            .copy_from_slice(&self.buffer.as_slice()[num_bytes..]);
+
+        self.buffer.resize(num_bytes, 0);
+        self.len -= len;
+
+        std::mem::replace(&mut self.buffer, remaining).into()
+    }
+
+    fn writer(&mut self, batch_size: usize) -> &mut Self::Writer {
+        self.buffer
+            .resize((self.len + batch_size) * std::mem::size_of::<T>(), 0);
+
+        let range = self.len..self.len + batch_size;
+        &mut self.as_slice_mut()[range]
+    }
+
+    fn commit(&mut self, len: usize) {
+        self.len = len;
+
+        let new_bytes = self.len * std::mem::size_of::<T>();
+        assert!(new_bytes <= self.buffer.len());
+        self.buffer.resize(new_bytes, 0);
+    }
+}
+
+pub trait ValueBuffer {
+    fn pad_nulls(
+        &mut self,
+        range: Range<usize>,
+        rev_position_iter: impl Iterator<Item = usize>,
+    );
+}
+
+impl<T> ValueBuffer for TypedBuffer<T> {
+    fn pad_nulls(
+        &mut self,
+        range: Range<usize>,
+        rev_position_iter: impl Iterator<Item = usize>,
+    ) {
+        let slice = self.as_slice_mut();
+
+        for (value_pos, level_pos) in range.rev().zip(rev_position_iter) {

Review comment:
       This is a cool suggestion, I was not aware of this component. Unfortunately it does not appear to support reverse iteration, which is required here, so I will leave this as a potential future optimization. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] jorgecarleitao commented on a change in pull request #1041: Generify ColumnReaderImpl and RecordReader (#1040)

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041#discussion_r778440916



##########
File path: parquet/src/arrow/record_reader/buffer.rs
##########
@@ -0,0 +1,191 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::marker::PhantomData;
+use std::ops::Range;
+
+use arrow::buffer::{Buffer, MutableBuffer};
+
+/// A buffer that supports writing new data to the end, and removing data from the front
+///
+/// Used by [RecordReader](`super::RecordReader`) to buffer up values before returning a
+/// potentially smaller number of values, corresponding to a whole number of semantic records
+pub trait BufferQueue: Sized {
+    type Output: Sized;
+
+    type Slice: ?Sized;
+
+    /// Split out the first `len` items
+    ///
+    /// # Panics
+    ///
+    /// Implementations must panic if `len` is beyond the length of [`BufferQueue`]
+    ///
+    fn split_off(&mut self, len: usize) -> Self::Output;
+
+    /// Returns a [`Self::Slice`] with at least `batch_size` capacity that can be used
+    /// to append data to the end of this [`BufferQueue`]
+    ///
+    /// NB: writes to the returned slice will not update the length of [`BufferQueue`]
+    /// instead a subsequent call should be made to [`BufferQueue::set_len`]
+    fn spare_capacity_mut(&mut self, batch_size: usize) -> &mut Self::Slice;
+
+    /// Sets the length of the [`BufferQueue`].
+    ///
+    /// Intended to be used in combination with [`BufferQueue::spare_capacity_mut`]
+    ///
+    /// # Panics
+    ///
+    /// Implementations must panic if `len` is beyond the initialized length
+    ///
+    /// Implementations may panic if `set_len` is called with less than what has been written
+    ///
+    /// This distinction is to allow for implementations that return a default initialized
+    /// [BufferQueue::Slice`] which doesn't track capacity and length separately
+    ///
+    /// For example, [`TypedBuffer<T>`] returns a default-initialized `&mut [T]`, and does not
+    /// track how much of this slice is actually written to by the caller. This is still
+    /// safe as the slice is default-initialized.
+    ///
+    fn set_len(&mut self, len: usize);
+}
+
+/// A typed buffer similar to [`Vec<T>`] but using [`MutableBuffer`] for storage
+pub struct TypedBuffer<T> {
+    buffer: MutableBuffer,
+
+    /// Length in elements of size T
+    len: usize,
+
+    /// Placeholder to allow `T` as an invariant generic parameter
+    _phantom: PhantomData<*mut T>,
+}
+
+impl<T> Default for TypedBuffer<T> {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl<T> TypedBuffer<T> {
+    pub fn new() -> Self {
+        Self {
+            buffer: MutableBuffer::new(0),
+            len: 0,
+            _phantom: Default::default(),
+        }
+    }
+
+    pub fn len(&self) -> usize {
+        self.len
+    }
+
+    pub fn is_empty(&self) -> bool {
+        self.len == 0
+    }
+
+    #[inline]
+    pub fn as_slice(&self) -> &[T] {
+        let (prefix, buf, suffix) = unsafe { self.buffer.as_slice().align_to::<T>() };
+        assert!(prefix.is_empty() && suffix.is_empty());
+        buf
+    }
+
+    #[inline]
+    pub fn as_slice_mut(&mut self) -> &mut [T] {
+        let (prefix, buf, suffix) =
+            unsafe { self.buffer.as_slice_mut().align_to_mut::<T>() };
+        assert!(prefix.is_empty() && suffix.is_empty());
+        buf
+    }
+}
+
+impl<T> BufferQueue for TypedBuffer<T> {
+    type Output = Buffer;
+
+    type Slice = [T];
+
+    fn split_off(&mut self, len: usize) -> Self::Output {
+        assert!(len <= self.len);
+
+        let num_bytes = len * std::mem::size_of::<T>();
+        let remaining_bytes = self.buffer.len() - num_bytes;
+        // TODO: Optimize to reduce the copy
+        // create an empty buffer, as it will be resized below
+        let mut remaining = MutableBuffer::new(0);
+        remaining.resize(remaining_bytes, 0);
+
+        let new_records = remaining.as_slice_mut();
+
+        new_records[0..remaining_bytes]
+            .copy_from_slice(&self.buffer.as_slice()[num_bytes..]);
+
+        self.buffer.resize(num_bytes, 0);
+        self.len -= len;
+
+        std::mem::replace(&mut self.buffer, remaining).into()
+    }
+
+    fn spare_capacity_mut(&mut self, batch_size: usize) -> &mut Self::Slice {
+        self.buffer
+            .resize((self.len + batch_size) * std::mem::size_of::<T>(), 0);

Review comment:
       `arrow2` no longer uses `MutableBuffer<T: NativeType>`: it recently migrated to `std::Vec<T: NativeType>`, for ease of use (and maintain).
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] tustvold commented on a change in pull request #1041: Generify ColumnReaderImpl and RecordReader (#1040)

Posted by GitBox <gi...@apache.org>.
tustvold commented on a change in pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041#discussion_r782119288



##########
File path: parquet/src/arrow/record_reader/buffer.rs
##########
@@ -0,0 +1,196 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::marker::PhantomData;
+use std::ops::Range;
+
+use arrow::buffer::{Buffer, MutableBuffer};
+
+/// A buffer that supports writing new data to the end, and removing data from the front
+///
+/// Used by [RecordReader](`super::RecordReader`) to buffer up values before returning a
+/// potentially smaller number of values, corresponding to a whole number of semantic records
+pub trait BufferQueue: Sized {
+    type Output: Sized;
+
+    type Slice: ?Sized;
+
+    /// Split out the first `len` items
+    ///
+    /// # Panics
+    ///
+    /// Implementations must panic if `len` is beyond the length of [`BufferQueue`]
+    ///
+    fn split_off(&mut self, len: usize) -> Self::Output;
+
+    /// Returns a [`Self::Slice`] with at least `batch_size` capacity that can be used
+    /// to append data to the end of this [`BufferQueue`]
+    ///
+    /// NB: writes to the returned slice will not update the length of [`BufferQueue`]
+    /// instead a subsequent call should be made to [`BufferQueue::set_len`]
+    fn spare_capacity_mut(&mut self, batch_size: usize) -> &mut Self::Slice;
+
+    /// Sets the length of the [`BufferQueue`].
+    ///
+    /// Intended to be used in combination with [`BufferQueue::spare_capacity_mut`]
+    ///
+    /// # Panics
+    ///
+    /// Implementations must panic if `len` is beyond the initialized length
+    ///
+    /// Implementations may panic if `set_len` is called with less than what has been written
+    ///
+    /// This distinction is to allow for implementations that return a default initialized
+    /// [BufferQueue::Slice`] which doesn't track capacity and length separately
+    ///
+    /// For example, [`TypedBuffer<T>`] returns a default-initialized `&mut [T]`, and does not
+    /// track how much of this slice is actually written to by the caller. This is still
+    /// safe as the slice is default-initialized.
+    ///
+    fn set_len(&mut self, len: usize);
+}
+
+/// A typed buffer similar to [`Vec<T>`] but using [`MutableBuffer`] for storage
+pub struct TypedBuffer<T> {
+    buffer: MutableBuffer,
+
+    /// Length in elements of size T
+    len: usize,
+
+    /// Placeholder to allow `T` as an invariant generic parameter
+    _phantom: PhantomData<*mut T>,
+}
+
+impl<T> Default for TypedBuffer<T> {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl<T> TypedBuffer<T> {
+    pub fn new() -> Self {
+        Self {
+            buffer: MutableBuffer::new(0),
+            len: 0,
+            _phantom: Default::default(),
+        }
+    }
+
+    pub fn len(&self) -> usize {
+        self.len
+    }
+
+    pub fn is_empty(&self) -> bool {
+        self.len == 0
+    }
+
+    #[inline]
+    pub fn as_slice(&self) -> &[T] {
+        let (prefix, buf, suffix) = unsafe { self.buffer.as_slice().align_to::<T>() };

Review comment:
       #1155 contains the fix




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] alamb commented on a change in pull request #1041: Generify ColumnReaderImpl and RecordReader (#1040)

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041#discussion_r782246044



##########
File path: parquet/src/data_type.rs
##########
@@ -1033,21 +1032,6 @@ pub(crate) mod private {
             self
         }
     }
-
-    /// A marker trait for [`DataType`] with a [scalar] physical type

Review comment:
       If you you need to remove this code, then we should probably reopen the original ticket https://github.com/apache/arrow-rs/issues/1132




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] yordan-pavlov commented on a change in pull request #1041: Generify ColumnReaderImpl and RecordReader (#1040)

Posted by GitBox <gi...@apache.org>.
yordan-pavlov commented on a change in pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041#discussion_r774814520



##########
File path: parquet/src/column/reader/decoder.rs
##########
@@ -0,0 +1,215 @@
+use std::collections::HashMap;
+use std::ops::Range;
+
+use crate::basic::Encoding;
+use crate::data_type::DataType;
+use crate::decoding::{get_decoder, Decoder, DictDecoder, PlainDecoder};
+use crate::encodings::rle::RleDecoder;
+use crate::errors::{ParquetError, Result};
+use crate::memory::ByteBufferPtr;
+use crate::schema::types::ColumnDescPtr;
+use crate::util::bit_util::BitReader;
+
+/// A type that can have level data written to it by a [`ColumnLevelDecoder`]
+pub trait LevelsWriter {
+    fn capacity(&self) -> usize;
+
+    fn get(&self, idx: usize) -> i16;
+
+    fn count_nulls(&self, range: Range<usize>, max_level: i16) -> usize;
+}
+
+impl LevelsWriter for [i16] {
+    fn capacity(&self) -> usize {
+        self.len()
+    }
+
+    fn get(&self, idx: usize) -> i16 {
+        self[idx]
+    }
+
+    fn count_nulls(&self, range: Range<usize>, max_level: i16) -> usize {
+        self[range].iter().filter(|i| **i != max_level).count()
+    }
+}
+
+/// A type that can have value data written to it by a [`ColumnValueDecoder`]
+pub trait ValuesWriter {
+    fn capacity(&self) -> usize;
+}
+
+impl<T> ValuesWriter for [T] {
+    fn capacity(&self) -> usize {
+        self.len()
+    }
+}
+
+/// Decodes level data to a [`LevelsWriter`]
+pub trait ColumnLevelDecoder {
+    type Writer: LevelsWriter + ?Sized;
+
+    fn create(max_level: i16, encoding: Encoding, data: ByteBufferPtr) -> Self;
+
+    fn read(&mut self, out: &mut Self::Writer, range: Range<usize>) -> Result<usize>;
+}
+
+/// Decodes value data to a [`ValuesWriter`]
+pub trait ColumnValueDecoder {
+    type Writer: ValuesWriter + ?Sized;
+
+    fn create(col: &ColumnDescPtr) -> Self;
+
+    fn set_dict(
+        &mut self,
+        buf: ByteBufferPtr,
+        num_values: u32,
+        encoding: Encoding,
+        is_sorted: bool,
+    ) -> Result<()>;
+
+    fn set_data(
+        &mut self,
+        encoding: Encoding,
+        data: ByteBufferPtr,
+        num_values: usize,
+    ) -> Result<()>;
+
+    fn read(&mut self, out: &mut Self::Writer, range: Range<usize>) -> Result<usize>;
+}
+
+/// An implementation of [`ColumnValueDecoder`] for `[T::T]`
+pub struct ColumnValueDecoderImpl<T: DataType> {
+    descr: ColumnDescPtr,
+
+    current_encoding: Option<Encoding>,
+
+    // Cache of decoders for existing encodings
+    decoders: HashMap<Encoding, Box<dyn Decoder<T>>>,
+}
+
+impl<T: DataType> ColumnValueDecoder for ColumnValueDecoderImpl<T> {
+    type Writer = [T::T];
+
+    fn create(descr: &ColumnDescPtr) -> Self {
+        Self {
+            descr: descr.clone(),
+            current_encoding: None,
+            decoders: Default::default(),
+        }
+    }
+
+    fn set_dict(
+        &mut self,
+        buf: ByteBufferPtr,
+        num_values: u32,
+        mut encoding: Encoding,
+        _is_sorted: bool,
+    ) -> Result<()> {
+        if encoding == Encoding::PLAIN || encoding == Encoding::PLAIN_DICTIONARY {
+            encoding = Encoding::RLE_DICTIONARY
+        }
+
+        if self.decoders.contains_key(&encoding) {
+            return Err(general_err!("Column cannot have more than one dictionary"));
+        }
+
+        if encoding == Encoding::RLE_DICTIONARY {
+            let mut dictionary = PlainDecoder::<T>::new(self.descr.type_length());
+            dictionary.set_data(buf, num_values as usize)?;
+
+            let mut decoder = DictDecoder::new();
+            decoder.set_dict(Box::new(dictionary))?;
+            self.decoders.insert(encoding, Box::new(decoder));
+            Ok(())
+        } else {
+            Err(nyi_err!(
+                "Invalid/Unsupported encoding type for dictionary: {}",
+                encoding
+            ))
+        }
+    }
+
+    fn set_data(
+        &mut self,
+        mut encoding: Encoding,
+        data: ByteBufferPtr,
+        num_values: usize,
+    ) -> Result<()> {
+        if encoding == Encoding::PLAIN_DICTIONARY {
+            encoding = Encoding::RLE_DICTIONARY;
+        }
+
+        let decoder = if encoding == Encoding::RLE_DICTIONARY {
+            self.decoders
+                .get_mut(&encoding)
+                .expect("Decoder for dict should have been set")
+        } else {
+            // Search cache for data page decoder
+            #[allow(clippy::map_entry)]
+            if !self.decoders.contains_key(&encoding) {
+                // Initialize decoder for this page
+                let data_decoder = get_decoder::<T>(self.descr.clone(), encoding)?;
+                self.decoders.insert(encoding, data_decoder);
+            }
+            self.decoders.get_mut(&encoding).unwrap()
+        };
+
+        decoder.set_data(data, num_values)?;
+        self.current_encoding = Some(encoding);
+        Ok(())
+    }
+
+    fn read(&mut self, out: &mut Self::Writer, range: Range<usize>) -> Result<usize> {
+        let encoding = self
+            .current_encoding
+            .expect("current_encoding should be set");
+
+        let current_decoder = self

Review comment:
       why not set a `current_decoder` field in the `set_data` method (where it has to be selected anyway to call `set_data` on it), so that it doesn't have to be looked up on every call of `read` here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] codecov-commenter edited a comment on pull request #1041: Generify ColumnReaderImpl and RecordReader (#1040)

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041#issuecomment-992420848


   # [Codecov](https://codecov.io/gh/apache/arrow-rs/pull/1041?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#1041](https://codecov.io/gh/apache/arrow-rs/pull/1041?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (0fe966a) into [master](https://codecov.io/gh/apache/arrow-rs/commit/07660c61680220ac54b7bf4c42a64c840872cc43?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (07660c6) will **decrease** coverage by `0.00%`.
   > The diff coverage is `81.32%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/arrow-rs/pull/1041/graphs/tree.svg?width=650&height=150&src=pr&token=pq9V9qWZ1N&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/arrow-rs/pull/1041?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #1041      +/-   ##
   ==========================================
   - Coverage   82.30%   82.29%   -0.01%     
   ==========================================
     Files         168      171       +3     
     Lines       49026    49604     +578     
   ==========================================
   + Hits        40351    40822     +471     
   - Misses       8675     8782     +107     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow-rs/pull/1041?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [parquet/src/arrow/array\_reader.rs](https://codecov.io/gh/apache/arrow-rs/pull/1041/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvYXJyb3cvYXJyYXlfcmVhZGVyLnJz) | `76.72% <ø> (-0.03%)` | :arrow_down: |
   | [parquet/src/column/reader.rs](https://codecov.io/gh/apache/arrow-rs/pull/1041/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvY29sdW1uL3JlYWRlci5ycw==) | `69.88% <75.94%> (-2.45%)` | :arrow_down: |
   | [parquet/src/column/reader/decoder.rs](https://codecov.io/gh/apache/arrow-rs/pull/1041/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvY29sdW1uL3JlYWRlci9kZWNvZGVyLnJz) | `76.27% <76.27%> (ø)` | |
   | [parquet/src/arrow/record\_reader/buffer.rs](https://codecov.io/gh/apache/arrow-rs/pull/1041/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvYXJyb3cvcmVjb3JkX3JlYWRlci9idWZmZXIucnM=) | `85.10% <85.10%> (ø)` | |
   | [parquet/src/arrow/record\_reader.rs](https://codecov.io/gh/apache/arrow-rs/pull/1041/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvYXJyb3cvcmVjb3JkX3JlYWRlci5ycw==) | `94.00% <87.17%> (+1.23%)` | :arrow_up: |
   | [...rquet/src/arrow/record\_reader/definition\_levels.rs](https://codecov.io/gh/apache/arrow-rs/pull/1041/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvYXJyb3cvcmVjb3JkX3JlYWRlci9kZWZpbml0aW9uX2xldmVscy5ycw==) | `90.32% <90.32%> (ø)` | |
   | [parquet/src/util/memory.rs](https://codecov.io/gh/apache/arrow-rs/pull/1041/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvdXRpbC9tZW1vcnkucnM=) | `91.12% <100.00%> (+0.08%)` | :arrow_up: |
   | [arrow/src/datatypes/native.rs](https://codecov.io/gh/apache/arrow-rs/pull/1041/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YXJyb3cvc3JjL2RhdGF0eXBlcy9uYXRpdmUucnM=) | `66.66% <0.00%> (-6.25%)` | :arrow_down: |
   | [arrow/src/compute/kernels/comparison.rs](https://codecov.io/gh/apache/arrow-rs/pull/1041/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YXJyb3cvc3JjL2NvbXB1dGUva2VybmVscy9jb21wYXJpc29uLnJz) | `90.87% <0.00%> (-2.35%)` | :arrow_down: |
   | [arrow/src/record\_batch.rs](https://codecov.io/gh/apache/arrow-rs/pull/1041/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YXJyb3cvc3JjL3JlY29yZF9iYXRjaC5ycw==) | `91.97% <0.00%> (-0.68%)` | :arrow_down: |
   | ... and [32 more](https://codecov.io/gh/apache/arrow-rs/pull/1041/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow-rs/pull/1041?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow-rs/pull/1041?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [07660c6...0fe966a](https://codecov.io/gh/apache/arrow-rs/pull/1041?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] codecov-commenter edited a comment on pull request #1041: Generify ColumnReaderImpl and RecordReader (#1040)

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041#issuecomment-992420848


   # [Codecov](https://codecov.io/gh/apache/arrow-rs/pull/1041?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#1041](https://codecov.io/gh/apache/arrow-rs/pull/1041?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (6a21ad2) into [master](https://codecov.io/gh/apache/arrow-rs/commit/07660c61680220ac54b7bf4c42a64c840872cc43?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (07660c6) will **decrease** coverage by `0.00%`.
   > The diff coverage is `81.32%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/arrow-rs/pull/1041/graphs/tree.svg?width=650&height=150&src=pr&token=pq9V9qWZ1N&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/arrow-rs/pull/1041?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #1041      +/-   ##
   ==========================================
   - Coverage   82.30%   82.29%   -0.01%     
   ==========================================
     Files         168      171       +3     
     Lines       49026    49604     +578     
   ==========================================
   + Hits        40351    40823     +472     
   - Misses       8675     8781     +106     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow-rs/pull/1041?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [parquet/src/arrow/array\_reader.rs](https://codecov.io/gh/apache/arrow-rs/pull/1041/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvYXJyb3cvYXJyYXlfcmVhZGVyLnJz) | `76.72% <ø> (-0.03%)` | :arrow_down: |
   | [parquet/src/column/reader.rs](https://codecov.io/gh/apache/arrow-rs/pull/1041/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvY29sdW1uL3JlYWRlci5ycw==) | `69.88% <75.94%> (-2.45%)` | :arrow_down: |
   | [parquet/src/column/reader/decoder.rs](https://codecov.io/gh/apache/arrow-rs/pull/1041/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvY29sdW1uL3JlYWRlci9kZWNvZGVyLnJz) | `76.27% <76.27%> (ø)` | |
   | [parquet/src/arrow/record\_reader/buffer.rs](https://codecov.io/gh/apache/arrow-rs/pull/1041/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvYXJyb3cvcmVjb3JkX3JlYWRlci9idWZmZXIucnM=) | `85.10% <85.10%> (ø)` | |
   | [parquet/src/arrow/record\_reader.rs](https://codecov.io/gh/apache/arrow-rs/pull/1041/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvYXJyb3cvcmVjb3JkX3JlYWRlci5ycw==) | `94.00% <87.17%> (+1.23%)` | :arrow_up: |
   | [...rquet/src/arrow/record\_reader/definition\_levels.rs](https://codecov.io/gh/apache/arrow-rs/pull/1041/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvYXJyb3cvcmVjb3JkX3JlYWRlci9kZWZpbml0aW9uX2xldmVscy5ycw==) | `90.32% <90.32%> (ø)` | |
   | [parquet/src/util/memory.rs](https://codecov.io/gh/apache/arrow-rs/pull/1041/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvdXRpbC9tZW1vcnkucnM=) | `91.12% <100.00%> (+0.08%)` | :arrow_up: |
   | [arrow/src/datatypes/native.rs](https://codecov.io/gh/apache/arrow-rs/pull/1041/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YXJyb3cvc3JjL2RhdGF0eXBlcy9uYXRpdmUucnM=) | `66.66% <0.00%> (-6.25%)` | :arrow_down: |
   | [arrow/src/compute/kernels/comparison.rs](https://codecov.io/gh/apache/arrow-rs/pull/1041/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YXJyb3cvc3JjL2NvbXB1dGUva2VybmVscy9jb21wYXJpc29uLnJz) | `90.87% <0.00%> (-2.35%)` | :arrow_down: |
   | [arrow/src/record\_batch.rs](https://codecov.io/gh/apache/arrow-rs/pull/1041/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YXJyb3cvc3JjL3JlY29yZF9iYXRjaC5ycw==) | `91.97% <0.00%> (-0.68%)` | :arrow_down: |
   | ... and [32 more](https://codecov.io/gh/apache/arrow-rs/pull/1041/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow-rs/pull/1041?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow-rs/pull/1041?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [07660c6...6a21ad2](https://codecov.io/gh/apache/arrow-rs/pull/1041?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] tustvold commented on pull request #1041: Generify ColumnReaderImpl and RecordReader (#1040)

Posted by GitBox <gi...@apache.org>.
tustvold commented on pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041#issuecomment-1003609340


   I've renamed a number of the methods and traits based on the great feedback, and also added a load of doc comments. In particular I took inspiration from std::Vec, in particular [Vec::spare_capacity_mut](https://doc.rust-lang.org/std/vec/struct.Vec.html#method.spare_capacity_mut) and [Vec::set_len](https://doc.rust-lang.org/std/vec/struct.Vec.html#method.set_len) which is effectively what is going on here.
   
   I'm happy that this interface is sufficiently flexible for the optimisations I have in mind, many of which I've already got an initial cut of, and so I'm marking this ready for review.
   
   I am aware this is a relatively complex change, to an already complex part of the codebase so if anything isn't clear please let me know. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] tustvold commented on a change in pull request #1041: Generify ColumnReaderImpl and RecordReader (#1040)

Posted by GitBox <gi...@apache.org>.
tustvold commented on a change in pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041#discussion_r777130395



##########
File path: parquet/src/column/reader/decoder.rs
##########
@@ -0,0 +1,215 @@
+use std::collections::HashMap;
+use std::ops::Range;
+
+use crate::basic::Encoding;
+use crate::data_type::DataType;
+use crate::decoding::{get_decoder, Decoder, DictDecoder, PlainDecoder};
+use crate::encodings::rle::RleDecoder;
+use crate::errors::{ParquetError, Result};
+use crate::memory::ByteBufferPtr;
+use crate::schema::types::ColumnDescPtr;
+use crate::util::bit_util::BitReader;
+
+/// A type that can have level data written to it by a [`ColumnLevelDecoder`]
+pub trait LevelsWriter {
+    fn capacity(&self) -> usize;
+
+    fn get(&self, idx: usize) -> i16;
+
+    fn count_nulls(&self, range: Range<usize>, max_level: i16) -> usize;
+}
+
+impl LevelsWriter for [i16] {
+    fn capacity(&self) -> usize {
+        self.len()
+    }
+
+    fn get(&self, idx: usize) -> i16 {
+        self[idx]
+    }
+
+    fn count_nulls(&self, range: Range<usize>, max_level: i16) -> usize {
+        self[range].iter().filter(|i| **i != max_level).count()
+    }
+}
+
+/// A type that can have value data written to it by a [`ColumnValueDecoder`]
+pub trait ValuesWriter {
+    fn capacity(&self) -> usize;
+}
+
+impl<T> ValuesWriter for [T] {
+    fn capacity(&self) -> usize {
+        self.len()
+    }
+}
+
+/// Decodes level data to a [`LevelsWriter`]
+pub trait ColumnLevelDecoder {
+    type Writer: LevelsWriter + ?Sized;
+
+    fn create(max_level: i16, encoding: Encoding, data: ByteBufferPtr) -> Self;
+
+    fn read(&mut self, out: &mut Self::Writer, range: Range<usize>) -> Result<usize>;
+}
+
+/// Decodes value data to a [`ValuesWriter`]
+pub trait ColumnValueDecoder {
+    type Writer: ValuesWriter + ?Sized;
+
+    fn create(col: &ColumnDescPtr) -> Self;
+
+    fn set_dict(
+        &mut self,
+        buf: ByteBufferPtr,
+        num_values: u32,
+        encoding: Encoding,
+        is_sorted: bool,
+    ) -> Result<()>;
+
+    fn set_data(
+        &mut self,
+        encoding: Encoding,
+        data: ByteBufferPtr,
+        num_values: usize,
+    ) -> Result<()>;
+
+    fn read(&mut self, out: &mut Self::Writer, range: Range<usize>) -> Result<usize>;
+}
+
+/// An implementation of [`ColumnValueDecoder`] for `[T::T]`
+pub struct ColumnValueDecoderImpl<T: DataType> {
+    descr: ColumnDescPtr,
+
+    current_encoding: Option<Encoding>,
+
+    // Cache of decoders for existing encodings
+    decoders: HashMap<Encoding, Box<dyn Decoder<T>>>,
+}
+
+impl<T: DataType> ColumnValueDecoder for ColumnValueDecoderImpl<T> {
+    type Writer = [T::T];
+
+    fn create(descr: &ColumnDescPtr) -> Self {
+        Self {
+            descr: descr.clone(),
+            current_encoding: None,
+            decoders: Default::default(),
+        }
+    }
+
+    fn set_dict(
+        &mut self,
+        buf: ByteBufferPtr,
+        num_values: u32,
+        mut encoding: Encoding,
+        _is_sorted: bool,
+    ) -> Result<()> {
+        if encoding == Encoding::PLAIN || encoding == Encoding::PLAIN_DICTIONARY {
+            encoding = Encoding::RLE_DICTIONARY
+        }
+
+        if self.decoders.contains_key(&encoding) {
+            return Err(general_err!("Column cannot have more than one dictionary"));
+        }
+
+        if encoding == Encoding::RLE_DICTIONARY {
+            let mut dictionary = PlainDecoder::<T>::new(self.descr.type_length());
+            dictionary.set_data(buf, num_values as usize)?;
+
+            let mut decoder = DictDecoder::new();
+            decoder.set_dict(Box::new(dictionary))?;
+            self.decoders.insert(encoding, Box::new(decoder));
+            Ok(())
+        } else {
+            Err(nyi_err!(
+                "Invalid/Unsupported encoding type for dictionary: {}",
+                encoding
+            ))
+        }
+    }
+
+    fn set_data(
+        &mut self,
+        mut encoding: Encoding,
+        data: ByteBufferPtr,
+        num_values: usize,
+    ) -> Result<()> {
+        if encoding == Encoding::PLAIN_DICTIONARY {
+            encoding = Encoding::RLE_DICTIONARY;
+        }
+
+        let decoder = if encoding == Encoding::RLE_DICTIONARY {
+            self.decoders
+                .get_mut(&encoding)
+                .expect("Decoder for dict should have been set")
+        } else {
+            // Search cache for data page decoder
+            #[allow(clippy::map_entry)]
+            if !self.decoders.contains_key(&encoding) {
+                // Initialize decoder for this page
+                let data_decoder = get_decoder::<T>(self.descr.clone(), encoding)?;
+                self.decoders.insert(encoding, data_decoder);

Review comment:
       Copy paste from old code - will fix




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] tustvold commented on a change in pull request #1041: Generify ColumnReaderImpl and RecordReader (#1040)

Posted by GitBox <gi...@apache.org>.
tustvold commented on a change in pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041#discussion_r777131001



##########
File path: parquet/src/arrow/record_reader/definition_levels.rs
##########
@@ -0,0 +1,82 @@
+use arrow::array::BooleanBufferBuilder;
+use arrow::bitmap::Bitmap;
+use arrow::buffer::Buffer;
+use std::ops::Range;
+
+use crate::column::reader::decoder::ColumnLevelDecoderImpl;
+use crate::schema::types::ColumnDescPtr;
+
+use super::{
+    buffer::{RecordBuffer, TypedBuffer},
+    MIN_BATCH_SIZE,
+};
+
+pub struct DefinitionLevelBuffer {
+    buffer: TypedBuffer<i16>,
+    builder: BooleanBufferBuilder,
+    max_level: i16,
+}
+
+impl RecordBuffer for DefinitionLevelBuffer {
+    type Output = Buffer;
+    type Writer = [i16];
+
+    fn split(&mut self, len: usize) -> Self::Output {
+        self.buffer.split(len)
+    }
+
+    fn writer(&mut self, batch_size: usize) -> &mut Self::Writer {

Review comment:
       I've renamed it to `spare_capacity_mut` to reflect its similarities to the same method on `Vec`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] alamb commented on a change in pull request #1041: Generify ColumnReaderImpl and RecordReader (#1040)

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041#discussion_r778409738



##########
File path: parquet/src/arrow/record_reader/buffer.rs
##########
@@ -0,0 +1,191 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::marker::PhantomData;
+use std::ops::Range;
+
+use arrow::buffer::{Buffer, MutableBuffer};
+
+/// A buffer that supports writing new data to the end, and removing data from the front
+///
+/// Used by [RecordReader](`super::RecordReader`) to buffer up values before returning a
+/// potentially smaller number of values, corresponding to a whole number of semantic records
+pub trait BufferQueue: Sized {
+    type Output: Sized;
+
+    type Slice: ?Sized;
+
+    /// Split out the first `len` items
+    ///
+    /// # Panics
+    ///
+    /// Implementations must panic if `len` is beyond the length of [`BufferQueue`]
+    ///
+    fn split_off(&mut self, len: usize) -> Self::Output;
+
+    /// Returns a [`Self::Slice`] with at least `batch_size` capacity that can be used
+    /// to append data to the end of this [`BufferQueue`]
+    ///
+    /// NB: writes to the returned slice will not update the length of [`BufferQueue`]
+    /// instead a subsequent call should be made to [`BufferQueue::set_len`]
+    fn spare_capacity_mut(&mut self, batch_size: usize) -> &mut Self::Slice;
+
+    /// Sets the length of the [`BufferQueue`].
+    ///
+    /// Intended to be used in combination with [`BufferQueue::spare_capacity_mut`]
+    ///
+    /// # Panics
+    ///
+    /// Implementations must panic if `len` is beyond the initialized length
+    ///
+    /// Implementations may panic if `set_len` is called with less than what has been written
+    ///
+    /// This distinction is to allow for implementations that return a default initialized
+    /// [BufferQueue::Slice`] which doesn't track capacity and length separately
+    ///
+    /// For example, [`TypedBuffer<T>`] returns a default-initialized `&mut [T]`, and does not
+    /// track how much of this slice is actually written to by the caller. This is still
+    /// safe as the slice is default-initialized.
+    ///
+    fn set_len(&mut self, len: usize);
+}
+
+/// A typed buffer similar to [`Vec<T>`] but using [`MutableBuffer`] for storage
+pub struct TypedBuffer<T> {
+    buffer: MutableBuffer,
+
+    /// Length in elements of size T
+    len: usize,
+
+    /// Placeholder to allow `T` as an invariant generic parameter
+    _phantom: PhantomData<*mut T>,
+}
+
+impl<T> Default for TypedBuffer<T> {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl<T> TypedBuffer<T> {
+    pub fn new() -> Self {
+        Self {
+            buffer: MutableBuffer::new(0),
+            len: 0,
+            _phantom: Default::default(),
+        }
+    }
+
+    pub fn len(&self) -> usize {
+        self.len
+    }
+
+    pub fn is_empty(&self) -> bool {
+        self.len == 0
+    }
+
+    #[inline]
+    pub fn as_slice(&self) -> &[T] {
+        let (prefix, buf, suffix) = unsafe { self.buffer.as_slice().align_to::<T>() };
+        assert!(prefix.is_empty() && suffix.is_empty());
+        buf
+    }
+
+    #[inline]
+    pub fn as_slice_mut(&mut self) -> &mut [T] {
+        let (prefix, buf, suffix) =
+            unsafe { self.buffer.as_slice_mut().align_to_mut::<T>() };
+        assert!(prefix.is_empty() && suffix.is_empty());
+        buf
+    }
+}
+
+impl<T> BufferQueue for TypedBuffer<T> {
+    type Output = Buffer;
+
+    type Slice = [T];
+
+    fn split_off(&mut self, len: usize) -> Self::Output {
+        assert!(len <= self.len);
+
+        let num_bytes = len * std::mem::size_of::<T>();
+        let remaining_bytes = self.buffer.len() - num_bytes;
+        // TODO: Optimize to reduce the copy
+        // create an empty buffer, as it will be resized below
+        let mut remaining = MutableBuffer::new(0);
+        remaining.resize(remaining_bytes, 0);
+
+        let new_records = remaining.as_slice_mut();
+
+        new_records[0..remaining_bytes]
+            .copy_from_slice(&self.buffer.as_slice()[num_bytes..]);
+
+        self.buffer.resize(num_bytes, 0);
+        self.len -= len;
+
+        std::mem::replace(&mut self.buffer, remaining).into()
+    }
+
+    fn spare_capacity_mut(&mut self, batch_size: usize) -> &mut Self::Slice {
+        self.buffer
+            .resize((self.len + batch_size) * std::mem::size_of::<T>(), 0);

Review comment:
       I believe it is called `arrow2` :trollface: 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] tustvold commented on pull request #1041: Generify ColumnReaderImpl and RecordReader (#1040)

Posted by GitBox <gi...@apache.org>.
tustvold commented on pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041#issuecomment-1010044251


   Unfortunately the code I added in #1155 didn't quite carry across as I had hoped for, as parquet doesn't have an `Int16Type` but definition levels and repetition levels are parsed as `i16`. This required some more finagling, but the general concept of restricting the valid types remains unchanged


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org