You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "tustvold (via GitHub)" <gi...@apache.org> on 2023/06/06 22:28:41 UTC

[GitHub] [arrow-rs] tustvold opened a new pull request, #4376: Move record delimiting into ColumnReader (#4365)

tustvold opened a new pull request, #4376:
URL: https://github.com/apache/arrow-rs/pull/4376

   _Draft as needs a bit more polish_
   
   # Which issue does this PR close?
   
   <!--
   We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123.
   -->
   
   Closes #4365
   
   # Rationale for this change
    
   <!--
   Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed.
   Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes.
   -->
   
   Previously when reading repeated fields, RecordReader would read in batches of at least 1024 levels. From this it would then split off the levels comprising the desired number of rows. Aside from being wasteful, and complex, this method causes issues for selection pushdown as it would need to skip into data that has already potentially been decoded.
   
   # What changes are included in this PR?
   
   <!--
   There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR.
   -->
   
   This changes ColumnReader to delimit records, whilst this does potentially impose overheads on users using this API directly, I struggle to contrive a use-case where reading partial records would be desirable / correct. I therefore hypothesize such users are either not using repeated data (likely) or are already performing some sort of record buffering similar to what is now performed as a first-party API. 
   
   # Are there any user-facing changes?
   
   Technically no, however, marking this as breaking as it does alter the behaviour of ColumnReader, especially with regards to ill-formed level data as one might encounter in a test.
   
   <!--
   If there are user-facing changes then we may require documentation to be updated before approving the PR.
   -->
   
   <!---
   If there are any breaking changes to public APIs, please add the `breaking change` label.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] tustvold commented on pull request #4376: Move record delimiting into ColumnReader (#4365)

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on PR #4376:
URL: https://github.com/apache/arrow-rs/pull/4376#issuecomment-1640245592

   It will only read whole records.
   
   As for the levels, if the max definition level is 0, it will be the number of values, otherwise it will be the number of levels read. A column cannot have a non-zero max repetition level and a zero max definition level, and the number of levels must be equal


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] tustvold commented on a diff in pull request #4376: Move record delimiting into ColumnReader (#4365)

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #4376:
URL: https://github.com/apache/arrow-rs/pull/4376#discussion_r1223338467


##########
parquet/src/column/reader/decoder.rs:
##########
@@ -94,6 +105,22 @@ pub trait RepetitionLevelDecoder: ColumnLevelDecoder {
 }
 
 pub trait DefinitionLevelDecoder: ColumnLevelDecoder {
+    /// Read definition level data into `out[range]` returning the number of levels read
+    ///
+    /// `range` is provided by the caller to allow for types such as default-initialized `[T]`
+    /// that only track capacity and not length
+    ///
+    /// # Panics
+    ///
+    /// Implementations may panic if `range` overlaps with already written data
+    ///
+    // TODO: Should this return the number of nulls

Review Comment:
   Yes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] tustvold commented on pull request #4376: Move record delimiting into ColumnReader (#4365)

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on PR #4376:
URL: https://github.com/apache/arrow-rs/pull/4376#issuecomment-1580839944

   The benchmarks in #4378 show this to have a minor performance benefit, likely due to not needing to buffer and split off definition levels and values
   
   ```
   arrow_array_reader/ListArray/plain encoded optional strings no NULLs
                           time:   [1.5840 ms 1.5868 ms 1.5903 ms]
                           change: [-8.9378% -8.6442% -8.3995%] (p = 0.00 < 0.05)
                           Performance has improved.
   Found 13 outliers among 100 measurements (13.00%)
     2 (2.00%) low mild
     4 (4.00%) high mild
     7 (7.00%) high severe
   Benchmarking arrow_array_reader/ListArray/plain encoded optional strings half NULLs: Warming up for 3.0000 s
   Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 6.1s, enable flat sampling, or reduce sample count to 60.
   arrow_array_reader/ListArray/plain encoded optional strings half NULLs
                           time:   [1.2136 ms 1.2143 ms 1.2150 ms]
                           change: [-2.9329% -2.8874% -2.8359%] (p = 0.00 < 0.05)
                           Performance has improved.
   Found 1 outliers among 100 measurements (1.00%)
     1 (1.00%) high severe
   ```
   
   Looking at the flamegraph of this PR, we can see that reading the repetition levels is a relatively small portion of the runtime, at least compared to the overheads associated with stripping empty lists and padding nulls, making this even more impressive
   
   ![image](https://github.com/apache/arrow-rs/assets/1781103/8618b72d-9055-43fa-94bb-fd3eec62bced)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] tustvold commented on a diff in pull request #4376: Move record delimiting into ColumnReader (#4365)

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #4376:
URL: https://github.com/apache/arrow-rs/pull/4376#discussion_r1220472362


##########
parquet/src/column/reader/decoder.rs:
##########
@@ -270,182 +297,227 @@ impl<T: DataType> ColumnValueDecoder for ColumnValueDecoderImpl<T> {
 
 const SKIP_BUFFER_SIZE: usize = 1024;
 
-/// An implementation of [`ColumnLevelDecoder`] for `[i16]`
-pub struct ColumnLevelDecoderImpl {
-    decoder: Option<LevelDecoderInner>,
-    /// Temporary buffer populated when skipping values
-    buffer: Vec<i16>,
-    bit_width: u8,
+enum LevelDecoderInner {
+    Packed(BitReader, u8),
+    Rle(RleDecoder),
 }
 
-impl ColumnLevelDecoderImpl {
-    pub fn new(max_level: i16) -> Self {
-        let bit_width = num_required_bits(max_level as u64);
-        Self {
-            decoder: None,
-            buffer: vec![],
-            bit_width,
+impl LevelDecoderInner {
+    fn new(encoding: Encoding, data: ByteBufferPtr, bit_width: u8) -> Self {
+        match encoding {
+            Encoding::RLE => {
+                let mut decoder = RleDecoder::new(bit_width);
+                decoder.set_data(data);
+                Self::Rle(decoder)
+            }
+            Encoding::BIT_PACKED => Self::Packed(BitReader::new(data), bit_width),
+            _ => unreachable!("invalid level encoding: {}", encoding),
         }
     }
 
-    /// Drops the first `len` values from the internal buffer
-    fn split_off_buffer(&mut self, len: usize) {
-        match self.buffer.len() == len {
-            true => self.buffer.clear(),
-            false => {
-                // Move to_read elements to end of slice
-                self.buffer.rotate_left(len);
-                // Truncate buffer
-                self.buffer.truncate(self.buffer.len() - len);
+    fn read(&mut self, out: &mut [i16]) -> Result<usize> {
+        match self {
+            Self::Packed(reader, bit_width) => {
+                Ok(reader.get_batch::<i16>(out, *bit_width as usize))
             }
+            Self::Rle(reader) => Ok(reader.get_batch(out)?),
         }
     }
+}
 
-    /// Reads up to `to_read` values to the internal buffer
-    fn read_to_buffer(&mut self, to_read: usize) -> Result<()> {
-        let mut buf = std::mem::take(&mut self.buffer);
-
-        // Repopulate buffer
-        buf.resize(to_read, 0);
-        let actual = self.read(&mut buf, 0..to_read)?;
-        buf.truncate(actual);
-
-        self.buffer = buf;
-        Ok(())
-    }
+/// An implementation of [`DefinitionLevelDecoder`] for `[i16]`
+pub struct DefinitionLevelDecoderImpl {
+    decoder: Option<LevelDecoderInner>,
+    bit_width: u8,
 }
 
-enum LevelDecoderInner {
-    Packed(BitReader, u8),
-    Rle(RleDecoder),
+impl DefinitionLevelDecoderImpl {
+    pub fn new(max_level: i16) -> Self {
+        let bit_width = num_required_bits(max_level as u64);
+        Self {
+            decoder: None,
+            bit_width,
+        }
+    }
 }
 
-impl ColumnLevelDecoder for ColumnLevelDecoderImpl {
+impl ColumnLevelDecoder for DefinitionLevelDecoderImpl {
     type Slice = [i16];
 
     fn set_data(&mut self, encoding: Encoding, data: ByteBufferPtr) {
-        self.buffer.clear();
-        match encoding {
-            Encoding::RLE => {
-                let mut decoder = RleDecoder::new(self.bit_width);
-                decoder.set_data(data);
-                self.decoder = Some(LevelDecoderInner::Rle(decoder));
-            }
-            Encoding::BIT_PACKED => {
-                self.decoder = Some(LevelDecoderInner::Packed(
-                    BitReader::new(data),
-                    self.bit_width,
-                ));
-            }
-            _ => unreachable!("invalid level encoding: {}", encoding),
-        }
+        self.decoder = Some(LevelDecoderInner::new(encoding, data, self.bit_width))
     }
+}
 
-    fn read(&mut self, out: &mut Self::Slice, mut range: Range<usize>) -> Result<usize> {
-        let read_from_buffer = match self.buffer.is_empty() {
-            true => 0,
-            false => {
-                let read_from_buffer = self.buffer.len().min(range.end - range.start);
-                out[range.start..range.start + read_from_buffer]
-                    .copy_from_slice(&self.buffer[0..read_from_buffer]);
-                self.split_off_buffer(read_from_buffer);
-                read_from_buffer
-            }
-        };
-        range.start += read_from_buffer;
-
-        match self.decoder.as_mut().unwrap() {
-            LevelDecoderInner::Packed(reader, bit_width) => Ok(read_from_buffer
-                + reader.get_batch::<i16>(&mut out[range], *bit_width as usize)),
-            LevelDecoderInner::Rle(reader) => {
-                Ok(read_from_buffer + reader.get_batch(&mut out[range])?)
-            }
-        }
+impl DefinitionLevelDecoder for DefinitionLevelDecoderImpl {
+    fn read_def_levels(
+        &mut self,
+        out: &mut Self::Slice,
+        range: Range<usize>,
+    ) -> Result<usize> {
+        self.decoder.as_mut().unwrap().read(&mut out[range])
     }
-}
 
-impl DefinitionLevelDecoder for ColumnLevelDecoderImpl {
     fn skip_def_levels(
         &mut self,
         num_levels: usize,
         max_def_level: i16,
     ) -> Result<(usize, usize)> {
         let mut level_skip = 0;
         let mut value_skip = 0;
+        let mut buf: Vec<i16> = vec![];
         while level_skip < num_levels {
             let remaining_levels = num_levels - level_skip;
 
-            if self.buffer.is_empty() {
-                // Only read number of needed values
-                self.read_to_buffer(remaining_levels.min(SKIP_BUFFER_SIZE))?;
-                if self.buffer.is_empty() {
-                    // Reached end of page
-                    break;
-                }
+            let to_read = remaining_levels.min(SKIP_BUFFER_SIZE);
+            buf.resize(to_read, 0);
+            let read = self.read_def_levels(&mut buf, 0..to_read)?;
+            if read == 0 {
+                // Reached end of page
+                break;
             }
-            let to_read = self.buffer.len().min(remaining_levels);
-
-            level_skip += to_read;
-            value_skip += self.buffer[..to_read]
-                .iter()
-                .filter(|x| **x == max_def_level)
-                .count();
 
-            self.split_off_buffer(to_read)
+            level_skip += read;
+            value_skip += buf[..read].iter().filter(|x| **x == max_def_level).count();
         }
 
         Ok((value_skip, level_skip))
     }
 }
 
-impl RepetitionLevelDecoder for ColumnLevelDecoderImpl {
-    fn skip_rep_levels(
+/// An implementation of [`RepetitionLevelDecoder`] for `[i16]`
+pub struct RepetitionLevelDecoderImpl {

Review Comment:
   This is the main crux of the PR, move the record delimiting logic into the repetition level decoding process, instead of wrapped around ColumnReader



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] alamb commented on pull request #4376: Move record delimiting into ColumnReader (#4365)

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #4376:
URL: https://github.com/apache/arrow-rs/pull/4376#issuecomment-1646558613

   Follow on PR: https://github.com/apache/arrow-rs/pull/4540


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] tustvold commented on a diff in pull request #4376: Move record delimiting into ColumnReader (#4365)

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #4376:
URL: https://github.com/apache/arrow-rs/pull/4376#discussion_r1223323728


##########
parquet/src/arrow/record_reader/mod.rs:
##########
@@ -201,31 +149,10 @@ where
     ///
     /// Number of records skipped
     pub fn skip_records(&mut self, num_records: usize) -> Result<usize> {
-        // First need to clear the buffer
-        let end_of_column = match self.column_reader.as_mut() {
-            Some(reader) => !reader.peek_next()?,
-            None => return Ok(0),
-        };
-
-        let (buffered_records, buffered_values) =
-            self.count_records(num_records, end_of_column);
-
-        self.num_records += buffered_records;
-        self.num_values += buffered_values;
-
-        let remaining = num_records - buffered_records;
-
-        if remaining == 0 {
-            return Ok(buffered_records);
+        match self.column_reader.as_mut() {
+            Some(reader) => reader.skip_records(num_records),
+            None => Ok(0),

Review Comment:
   Yes, RecordReader no longer performs this intermediate buffering



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] alamb commented on a diff in pull request #4376: Move record delimiting into ColumnReader (#4365)

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #4376:
URL: https://github.com/apache/arrow-rs/pull/4376#discussion_r1222882752


##########
parquet/src/arrow/arrow_reader/mod.rs:
##########
@@ -2610,4 +2611,153 @@ mod tests {
         test_decimal_roundtrip::<Decimal128Type>();
         test_decimal_roundtrip::<Decimal256Type>();
     }
+
+    #[test]
+    fn test_list_selection() {
+        let schema = Arc::new(Schema::new(vec![Field::new_list(
+            "list",
+            Field::new("item", ArrowDataType::Utf8, true),
+            false,
+        )]));
+        let mut buf = Vec::with_capacity(1024);
+
+        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
+
+        for _ in 0..2 {
+            let mut list_a_builder = ListBuilder::new(StringBuilder::new());
+            for i in 0..1024 {
+                list_a_builder.values().append_value(format!("{i}"));
+                list_a_builder.append(true);
+            }
+            let batch = RecordBatch::try_new(
+                schema.clone(),
+                vec![Arc::new(list_a_builder.finish())],
+            )
+            .unwrap();
+            writer.write(&batch).unwrap();
+        }
+        let _metadata = writer.close().unwrap();
+
+        let buf = Bytes::from(buf);
+        let reader = ParquetRecordBatchReaderBuilder::try_new(buf)
+            .unwrap()
+            .with_row_selection(RowSelection::from(vec![
+                RowSelector::skip(100),
+                RowSelector::select(924),
+                RowSelector::skip(100),
+                RowSelector::select(924),
+            ]))
+            .build()
+            .unwrap();
+
+        let total_rows: usize = reader.map(|r| r.unwrap().num_rows()).sum();
+        assert_eq!(total_rows, 924 * 2);
+    }
+
+    #[test]
+    fn test_list_selection_fuzz() {
+        let mut rng = thread_rng();
+        let schema = Arc::new(Schema::new(vec![Field::new_list(
+            "list",
+            Field::new_list("item", Field::new("item", ArrowDataType::Int32, true), true),
+            true,
+        )]));
+        let mut buf = Vec::with_capacity(1024);
+        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
+
+        let mut list_a_builder = ListBuilder::new(ListBuilder::new(Int32Builder::new()));
+
+        for _ in 0..2048 {
+            if rng.gen_bool(0.2) {
+                list_a_builder.append(false);
+                continue;
+            }
+
+            let list_a_len = rng.gen_range(0..10);
+            let list_b_builder = list_a_builder.values();
+
+            for _ in 0..list_a_len {
+                if rng.gen_bool(0.2) {
+                    list_b_builder.append(false);
+                    continue;
+                }
+
+                let list_b_len = rng.gen_range(0..10);
+                let int_builder = list_b_builder.values();
+                for _ in 0..list_b_len {
+                    match rng.gen_bool(0.2) {
+                        true => int_builder.append_null(),
+                        false => int_builder.append_value(rng.gen()),
+                    }
+                }
+                list_b_builder.append(true)
+            }
+            list_a_builder.append(true);
+        }
+
+        let array = Arc::new(list_a_builder.finish());
+        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
+
+        writer.write(&batch).unwrap();
+        let _metadata = writer.close().unwrap();
+
+        let buf = Bytes::from(buf);
+
+        let cases = [
+            vec![
+                RowSelector::skip(100),
+                RowSelector::select(924),
+                RowSelector::skip(100),
+                RowSelector::select(924),
+            ],
+            vec![
+                RowSelector::select(924),
+                RowSelector::skip(100),
+                RowSelector::select(924),
+                RowSelector::skip(100),
+            ],
+            vec![

Review Comment:
   I like this boundary condition testing



##########
parquet/src/arrow/arrow_reader/mod.rs:
##########
@@ -2610,4 +2611,153 @@ mod tests {
         test_decimal_roundtrip::<Decimal128Type>();
         test_decimal_roundtrip::<Decimal256Type>();
     }
+
+    #[test]
+    fn test_list_selection() {
+        let schema = Arc::new(Schema::new(vec![Field::new_list(
+            "list",
+            Field::new("item", ArrowDataType::Utf8, true),
+            false,
+        )]));
+        let mut buf = Vec::with_capacity(1024);
+
+        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
+
+        for _ in 0..2 {
+            let mut list_a_builder = ListBuilder::new(StringBuilder::new());
+            for i in 0..1024 {
+                list_a_builder.values().append_value(format!("{i}"));
+                list_a_builder.append(true);
+            }
+            let batch = RecordBatch::try_new(
+                schema.clone(),
+                vec![Arc::new(list_a_builder.finish())],
+            )
+            .unwrap();
+            writer.write(&batch).unwrap();
+        }
+        let _metadata = writer.close().unwrap();
+
+        let buf = Bytes::from(buf);
+        let reader = ParquetRecordBatchReaderBuilder::try_new(buf)
+            .unwrap()
+            .with_row_selection(RowSelection::from(vec![
+                RowSelector::skip(100),
+                RowSelector::select(924),
+                RowSelector::skip(100),
+                RowSelector::select(924),
+            ]))
+            .build()
+            .unwrap();
+
+        let total_rows: usize = reader.map(|r| r.unwrap().num_rows()).sum();
+        assert_eq!(total_rows, 924 * 2);

Review Comment:
   I wonder if it is worth asserting the values come back as expected (you could make the expected results in the `for _ in 0..2` loop above)?



##########
parquet/src/arrow/arrow_reader/mod.rs:
##########
@@ -2610,4 +2611,153 @@ mod tests {
         test_decimal_roundtrip::<Decimal128Type>();
         test_decimal_roundtrip::<Decimal256Type>();
     }
+
+    #[test]
+    fn test_list_selection() {
+        let schema = Arc::new(Schema::new(vec![Field::new_list(
+            "list",
+            Field::new("item", ArrowDataType::Utf8, true),
+            false,
+        )]));
+        let mut buf = Vec::with_capacity(1024);
+
+        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
+
+        for _ in 0..2 {
+            let mut list_a_builder = ListBuilder::new(StringBuilder::new());
+            for i in 0..1024 {
+                list_a_builder.values().append_value(format!("{i}"));
+                list_a_builder.append(true);
+            }
+            let batch = RecordBatch::try_new(
+                schema.clone(),
+                vec![Arc::new(list_a_builder.finish())],
+            )
+            .unwrap();
+            writer.write(&batch).unwrap();
+        }
+        let _metadata = writer.close().unwrap();
+
+        let buf = Bytes::from(buf);
+        let reader = ParquetRecordBatchReaderBuilder::try_new(buf)
+            .unwrap()
+            .with_row_selection(RowSelection::from(vec![
+                RowSelector::skip(100),
+                RowSelector::select(924),
+                RowSelector::skip(100),
+                RowSelector::select(924),
+            ]))
+            .build()
+            .unwrap();
+
+        let total_rows: usize = reader.map(|r| r.unwrap().num_rows()).sum();
+        assert_eq!(total_rows, 924 * 2);
+    }
+
+    #[test]
+    fn test_list_selection_fuzz() {
+        let mut rng = thread_rng();
+        let schema = Arc::new(Schema::new(vec![Field::new_list(
+            "list",
+            Field::new_list("item", Field::new("item", ArrowDataType::Int32, true), true),
+            true,
+        )]));
+        let mut buf = Vec::with_capacity(1024);
+        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
+
+        let mut list_a_builder = ListBuilder::new(ListBuilder::new(Int32Builder::new()));
+
+        for _ in 0..2048 {
+            if rng.gen_bool(0.2) {
+                list_a_builder.append(false);
+                continue;
+            }
+
+            let list_a_len = rng.gen_range(0..10);
+            let list_b_builder = list_a_builder.values();
+
+            for _ in 0..list_a_len {
+                if rng.gen_bool(0.2) {
+                    list_b_builder.append(false);
+                    continue;
+                }
+
+                let list_b_len = rng.gen_range(0..10);
+                let int_builder = list_b_builder.values();
+                for _ in 0..list_b_len {
+                    match rng.gen_bool(0.2) {
+                        true => int_builder.append_null(),
+                        false => int_builder.append_value(rng.gen()),
+                    }
+                }
+                list_b_builder.append(true)
+            }
+            list_a_builder.append(true);
+        }
+
+        let array = Arc::new(list_a_builder.finish());
+        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
+
+        writer.write(&batch).unwrap();
+        let _metadata = writer.close().unwrap();
+
+        let buf = Bytes::from(buf);
+
+        let cases = [
+            vec![
+                RowSelector::skip(100),
+                RowSelector::select(924),
+                RowSelector::skip(100),
+                RowSelector::select(924),
+            ],
+            vec![
+                RowSelector::select(924),
+                RowSelector::skip(100),
+                RowSelector::select(924),
+                RowSelector::skip(100),
+            ],
+            vec![
+                RowSelector::skip(1023),
+                RowSelector::select(1),
+                RowSelector::skip(1023),
+                RowSelector::select(1),
+            ],
+            vec![
+                RowSelector::select(1),
+                RowSelector::skip(1023),
+                RowSelector::select(1),
+                RowSelector::skip(1023),
+            ],
+        ];
+
+        for selection in cases {
+            let selection = RowSelection::from(selection);
+            let mut reader = ParquetRecordBatchReaderBuilder::try_new(buf.clone())
+                .unwrap()
+                .with_row_selection(selection.clone())
+                .with_batch_size(2048)

Review Comment:
   Given all the inputs are only 2048 rows in total (and thus in a single batch) would tests with different batch sizes that span the input (e.g `1000` for example) be a good idea?



##########
parquet/src/column/reader.rs:
##########
@@ -195,99 +198,126 @@ where
     ///
     /// `values` will be contiguously populated with the non-null values. Note that if the column
     /// is not required, this may be less than either `batch_size` or the number of levels read
+    #[deprecated(note = "Use read_records")]
     pub fn read_batch(
         &mut self,
         batch_size: usize,
-        mut def_levels: Option<&mut D::Slice>,
-        mut rep_levels: Option<&mut R::Slice>,
+        def_levels: Option<&mut D::Slice>,
+        rep_levels: Option<&mut R::Slice>,
         values: &mut V::Slice,
     ) -> Result<(usize, usize)> {
-        let mut values_read = 0;
-        let mut levels_read = 0;
+        let (_, values, levels) =
+            self.read_records(batch_size, def_levels, rep_levels, values)?;
+
+        Ok((values, levels))
+    }
 
-        // Compute the smallest batch size we can read based on provided slices
-        let mut batch_size = min(batch_size, values.capacity());
+    /// Read up to `num_records` returning the number of complete records, non-null
+    /// values and levels decoded
+    ///
+    /// If the max definition level is 0, `def_levels` will be ignored, otherwise it will be
+    /// populated with the number of levels read, with an error returned if it is `None`.
+    ///
+    /// If the max repetition level is 0, `rep_levels` will be ignored, otherwise it will be
+    /// populated with the number of levels read, with an error returned if it is `None`.
+    ///
+    /// `values` will be contiguously populated with the non-null values. Note that if the column
+    /// is not required, this may be less than either `max_records` or the number of levels read
+    pub fn read_records(
+        &mut self,
+        max_records: usize,
+        mut def_levels: Option<&mut D::Slice>,
+        mut rep_levels: Option<&mut R::Slice>,
+        values: &mut V::Slice,
+    ) -> Result<(usize, usize, usize)> {
+        let mut max_levels = values.capacity().min(max_records);
         if let Some(ref levels) = def_levels {
-            batch_size = min(batch_size, levels.capacity());
+            max_levels = max_levels.min(levels.capacity());
         }
         if let Some(ref levels) = rep_levels {
-            batch_size = min(batch_size, levels.capacity());
+            max_levels = max_levels.min(levels.capacity())
         }
 
-        // Read exhaustively all pages until we read all batch_size values/levels
-        // or there are no more values/levels to read.
-        while levels_read < batch_size {
-            if !self.has_next()? {
-                break;
-            }
+        let mut total_records_read = 0;
+        let mut total_levels_read = 0;
+        let mut total_values_read = 0;
 
-            // Batch size for the current iteration
-            let iter_batch_size = (batch_size - levels_read)
-                .min((self.num_buffered_values - self.num_decoded_values) as usize);
+        while total_records_read < max_records
+            && total_levels_read < max_levels
+            && self.has_next()?
+        {
+            let remaining_records = max_records - total_records_read;
+            let remaining_levels = self.num_buffered_values - self.num_decoded_values;
+            let levels_to_read = remaining_levels.min(max_levels - total_levels_read);
 
-            // If the field is required and non-repeated, there are no definition levels
-            let null_count = match self.descr.max_def_level() > 0 {
-                true => {
-                    let levels = def_levels
+            let (records_read, levels_read) = match self.rep_level_decoder.as_mut() {
+                Some(reader) => {
+                    let out = rep_levels
                         .as_mut()
-                        .ok_or_else(|| general_err!("must specify definition levels"))?;
+                        .ok_or_else(|| general_err!("must specify repetition levels"))?;
+
+                    let (mut records_read, levels_read) = reader.read_rep_levels(
+                        out,
+                        total_levels_read..total_levels_read + levels_to_read,
+                        remaining_records,
+                    )?;
+
+                    if levels_read == remaining_levels && self.has_record_delimiter {
+                        // Reached end of page, which implies records_read < remaining_records
+                        // as otherwise would have stopped reading before reaching the end
+                        assert!(records_read < remaining_records); // Sanity check
+                        records_read += 1;
+                    }
+                    (records_read, levels_read)
+                }
+                None => {
+                    let min = remaining_records.min(levels_to_read);
+                    (min, min)
+                }
+            };
 
-                    let num_def_levels = self
-                        .def_level_decoder
+            let values_to_read = match self.def_level_decoder.as_mut() {
+                Some(reader) => {
+                    let out = def_levels
                         .as_mut()
-                        .expect("def_level_decoder be set")
-                        .read(levels, levels_read..levels_read + iter_batch_size)?;
+                        .ok_or_else(|| general_err!("must specify definition levels"))?;
+
+                    let read = reader.read_def_levels(
+                        out,
+                        total_levels_read..total_levels_read + levels_read,
+                    )?;
 
-                    if num_def_levels != iter_batch_size {
-                        return Err(general_err!("insufficient definition levels read from column - expected {}, got {}", iter_batch_size, num_def_levels));
+                    if read != levels_read {
+                        return Err(general_err!("insufficient definition levels read from column - expected {rep_levels}, got {read}"));

Review Comment:
   this happens with an invalid input, right?



##########
parquet/src/column/reader.rs:
##########
@@ -493,12 +535,18 @@ where
                                 return Err(general_err!("more nulls than values in page, contained {} values and {} nulls", num_values, num_nulls));
                             }
 
-                            self.num_buffered_values = num_values;
+                            self.num_buffered_values = num_values as _;
                             self.num_decoded_values = 0;
 
                             // DataPage v2 only supports RLE encoding for repetition
                             // levels
                             if self.descr.max_rep_level() > 0 {
+                                // Technically a DataPage v2 should not write a record

Review Comment:
   Here is my evidence the spec says not to write a record across page boundaries:
   https://github.com/apache/parquet-format/blob/c766945d90935ebcd4e03fee13aad2b6efcadce3/src/main/thrift/parquet.thrift#L567
   



##########
parquet/src/column/reader/decoder.rs:
##########
@@ -94,6 +105,22 @@ pub trait RepetitionLevelDecoder: ColumnLevelDecoder {
 }
 
 pub trait DefinitionLevelDecoder: ColumnLevelDecoder {
+    /// Read definition level data into `out[range]` returning the number of levels read
+    ///
+    /// `range` is provided by the caller to allow for types such as default-initialized `[T]`
+    /// that only track capacity and not length
+    ///
+    /// # Panics
+    ///
+    /// Implementations may panic if `range` overlaps with already written data
+    ///
+    // TODO: Should this return the number of nulls

Review Comment:
   is this still a TODO?



##########
parquet/src/arrow/record_reader/mod.rs:
##########
@@ -201,31 +149,10 @@ where
     ///
     /// Number of records skipped
     pub fn skip_records(&mut self, num_records: usize) -> Result<usize> {
-        // First need to clear the buffer
-        let end_of_column = match self.column_reader.as_mut() {
-            Some(reader) => !reader.peek_next()?,
-            None => return Ok(0),
-        };
-
-        let (buffered_records, buffered_values) =
-            self.count_records(num_records, end_of_column);
-
-        self.num_records += buffered_records;
-        self.num_values += buffered_values;
-
-        let remaining = num_records - buffered_records;
-
-        if remaining == 0 {
-            return Ok(buffered_records);
+        match self.column_reader.as_mut() {
+            Some(reader) => reader.skip_records(num_records),
+            None => Ok(0),

Review Comment:
   Is it intentional that this function no longer updates `self.num_records` and `self.num_values`
   
   ```
           self.num_records += buffered_records;
           self.num_values += buffered_values;
   ```



##########
parquet/src/column/reader/decoder.rs:
##########
@@ -270,182 +297,227 @@ impl<T: DataType> ColumnValueDecoder for ColumnValueDecoderImpl<T> {
 
 const SKIP_BUFFER_SIZE: usize = 1024;
 
-/// An implementation of [`ColumnLevelDecoder`] for `[i16]`
-pub struct ColumnLevelDecoderImpl {
-    decoder: Option<LevelDecoderInner>,
-    /// Temporary buffer populated when skipping values
-    buffer: Vec<i16>,
-    bit_width: u8,
+enum LevelDecoder {
+    Packed(BitReader, u8),
+    Rle(RleDecoder),
 }
 
-impl ColumnLevelDecoderImpl {
-    pub fn new(max_level: i16) -> Self {
-        let bit_width = num_required_bits(max_level as u64);
-        Self {
-            decoder: None,
-            buffer: vec![],
-            bit_width,
+impl LevelDecoder {
+    fn new(encoding: Encoding, data: ByteBufferPtr, bit_width: u8) -> Self {
+        match encoding {
+            Encoding::RLE => {
+                let mut decoder = RleDecoder::new(bit_width);
+                decoder.set_data(data);
+                Self::Rle(decoder)
+            }
+            Encoding::BIT_PACKED => Self::Packed(BitReader::new(data), bit_width),
+            _ => unreachable!("invalid level encoding: {}", encoding),
         }
     }
 
-    /// Drops the first `len` values from the internal buffer
-    fn split_off_buffer(&mut self, len: usize) {
-        match self.buffer.len() == len {
-            true => self.buffer.clear(),
-            false => {
-                // Move to_read elements to end of slice
-                self.buffer.rotate_left(len);
-                // Truncate buffer
-                self.buffer.truncate(self.buffer.len() - len);
+    fn read(&mut self, out: &mut [i16]) -> Result<usize> {
+        match self {
+            Self::Packed(reader, bit_width) => {
+                Ok(reader.get_batch::<i16>(out, *bit_width as usize))
             }
+            Self::Rle(reader) => Ok(reader.get_batch(out)?),
         }
     }
+}
 
-    /// Reads up to `to_read` values to the internal buffer
-    fn read_to_buffer(&mut self, to_read: usize) -> Result<()> {
-        let mut buf = std::mem::take(&mut self.buffer);
-
-        // Repopulate buffer
-        buf.resize(to_read, 0);
-        let actual = self.read(&mut buf, 0..to_read)?;
-        buf.truncate(actual);
-
-        self.buffer = buf;
-        Ok(())
-    }
+/// An implementation of [`DefinitionLevelDecoder`] for `[i16]`
+pub struct DefinitionLevelDecoderImpl {
+    decoder: Option<LevelDecoder>,
+    bit_width: u8,
 }
 
-enum LevelDecoderInner {
-    Packed(BitReader, u8),
-    Rle(RleDecoder),
+impl DefinitionLevelDecoderImpl {
+    pub fn new(max_level: i16) -> Self {
+        let bit_width = num_required_bits(max_level as u64);
+        Self {
+            decoder: None,
+            bit_width,
+        }
+    }
 }
 
-impl ColumnLevelDecoder for ColumnLevelDecoderImpl {
+impl ColumnLevelDecoder for DefinitionLevelDecoderImpl {
     type Slice = [i16];
 
     fn set_data(&mut self, encoding: Encoding, data: ByteBufferPtr) {
-        self.buffer.clear();
-        match encoding {
-            Encoding::RLE => {
-                let mut decoder = RleDecoder::new(self.bit_width);
-                decoder.set_data(data);
-                self.decoder = Some(LevelDecoderInner::Rle(decoder));
-            }
-            Encoding::BIT_PACKED => {
-                self.decoder = Some(LevelDecoderInner::Packed(
-                    BitReader::new(data),
-                    self.bit_width,
-                ));
-            }
-            _ => unreachable!("invalid level encoding: {}", encoding),
-        }
+        self.decoder = Some(LevelDecoder::new(encoding, data, self.bit_width))
     }
+}
 
-    fn read(&mut self, out: &mut Self::Slice, mut range: Range<usize>) -> Result<usize> {
-        let read_from_buffer = match self.buffer.is_empty() {
-            true => 0,
-            false => {
-                let read_from_buffer = self.buffer.len().min(range.end - range.start);
-                out[range.start..range.start + read_from_buffer]
-                    .copy_from_slice(&self.buffer[0..read_from_buffer]);
-                self.split_off_buffer(read_from_buffer);
-                read_from_buffer
-            }
-        };
-        range.start += read_from_buffer;
-
-        match self.decoder.as_mut().unwrap() {
-            LevelDecoderInner::Packed(reader, bit_width) => Ok(read_from_buffer
-                + reader.get_batch::<i16>(&mut out[range], *bit_width as usize)),
-            LevelDecoderInner::Rle(reader) => {
-                Ok(read_from_buffer + reader.get_batch(&mut out[range])?)
-            }
-        }
+impl DefinitionLevelDecoder for DefinitionLevelDecoderImpl {
+    fn read_def_levels(
+        &mut self,
+        out: &mut Self::Slice,
+        range: Range<usize>,
+    ) -> Result<usize> {
+        self.decoder.as_mut().unwrap().read(&mut out[range])
     }
-}
 
-impl DefinitionLevelDecoder for ColumnLevelDecoderImpl {
     fn skip_def_levels(
         &mut self,
         num_levels: usize,
         max_def_level: i16,
     ) -> Result<(usize, usize)> {
         let mut level_skip = 0;
         let mut value_skip = 0;
+        let mut buf: Vec<i16> = vec![];
         while level_skip < num_levels {
             let remaining_levels = num_levels - level_skip;
 
-            if self.buffer.is_empty() {
-                // Only read number of needed values
-                self.read_to_buffer(remaining_levels.min(SKIP_BUFFER_SIZE))?;
-                if self.buffer.is_empty() {
-                    // Reached end of page
-                    break;
-                }
+            let to_read = remaining_levels.min(SKIP_BUFFER_SIZE);
+            buf.resize(to_read, 0);
+            let read = self.read_def_levels(&mut buf, 0..to_read)?;
+            if read == 0 {
+                // Reached end of page
+                break;
             }
-            let to_read = self.buffer.len().min(remaining_levels);
 
-            level_skip += to_read;
-            value_skip += self.buffer[..to_read]
-                .iter()
-                .filter(|x| **x == max_def_level)
-                .count();
-
-            self.split_off_buffer(to_read)
+            level_skip += read;
+            value_skip += buf[..read].iter().filter(|x| **x == max_def_level).count();
         }
 
         Ok((value_skip, level_skip))
     }
 }
 
-impl RepetitionLevelDecoder for ColumnLevelDecoderImpl {
-    fn skip_rep_levels(
+pub(crate) const REPETITION_LEVELS_BATCH_SIZE: usize = 1024;
+
+/// An implementation of [`RepetitionLevelDecoder`] for `[i16]`
+pub struct RepetitionLevelDecoderImpl {
+    decoder: Option<LevelDecoder>,
+    bit_width: u8,
+    buffer: Box<[i16; REPETITION_LEVELS_BATCH_SIZE]>,
+    buffer_len: usize,
+    buffer_offset: usize,
+    has_partial: bool,
+}
+
+impl RepetitionLevelDecoderImpl {
+    pub fn new(max_level: i16) -> Self {
+        let bit_width = num_required_bits(max_level as u64);
+        Self {
+            decoder: None,
+            bit_width,
+            buffer: Box::new([0; REPETITION_LEVELS_BATCH_SIZE]),
+            buffer_offset: 0,
+            buffer_len: 0,
+            has_partial: false,
+        }
+    }
+
+    fn fill_buf(&mut self) -> Result<()> {
+        let read = self.decoder.as_mut().unwrap().read(self.buffer.as_mut())?;
+        self.buffer_offset = 0;
+        self.buffer_len = read;
+        Ok(())
+    }
+
+    /// Inspects the buffered repetition levels in the range `self.buffer_offset..self.buffer_len`
+    /// and returns the number of "complete" records along with the corresponding number of values
+    ///
+    /// A "complete" record is one where the buffer contains a subsequent repetition level of 0
+    fn count_records(
         &mut self,
-        num_records: usize,
+        records_to_read: usize,
         num_levels: usize,
-    ) -> Result<(usize, usize)> {
-        let mut level_skip = 0;
-        let mut record_skip = 0;
+    ) -> (bool, usize, usize) {
+        let mut records_read = 0;
 
-        while level_skip < num_levels {
-            let remaining_levels = num_levels - level_skip;
+        let levels = num_levels.min(self.buffer_len - self.buffer_offset);
+        let buf = self.buffer.iter().skip(self.buffer_offset);
+        for (idx, item) in buf.take(levels).enumerate() {
+            if *item == 0 && (idx != 0 || self.has_partial) {
+                records_read += 1;
 
-            if self.buffer.is_empty() {
-                // Only read number of needed values
-                self.read_to_buffer(remaining_levels.min(SKIP_BUFFER_SIZE))?;
-                if self.buffer.is_empty() {
-                    // Reached end of page
-                    break;
+                if records_read == records_to_read {
+                    return (false, records_read, idx);
                 }
             }
+        }
+        // Either ran out of space in `num_levels` or data in `self.buffer`
+        (true, records_read, levels)
+    }
+}
+
+impl ColumnLevelDecoder for RepetitionLevelDecoderImpl {
+    type Slice = [i16];
 
-            let max_skip = self.buffer.len().min(remaining_levels);
+    fn set_data(&mut self, encoding: Encoding, data: ByteBufferPtr) {
+        self.decoder = Some(LevelDecoder::new(encoding, data, self.bit_width));
+        self.buffer_len = 0;
+        self.buffer_offset = 0;
+    }
+}
 
-            let mut to_skip = 0;
-            while to_skip < max_skip && record_skip != num_records {
-                if self.buffer[to_skip] == 0 {
-                    record_skip += 1;
+impl RepetitionLevelDecoder for RepetitionLevelDecoderImpl {
+    fn read_rep_levels(
+        &mut self,
+        out: &mut Self::Slice,
+        range: Range<usize>,
+        max_records: usize,
+    ) -> Result<(usize, usize)> {
+        let output = &mut out[range];
+        let max_levels = output.len();
+        let mut total_records_read = 0;
+        let mut total_levels_read = 0;
+
+        while total_records_read < max_records && total_levels_read < max_levels {
+            if self.buffer_len == self.buffer_offset {
+                self.fill_buf()?;
+                if self.buffer_len == 0 {
+                    break;
                 }
-                to_skip += 1;
             }
 
-            // Find end of record
-            while to_skip < max_skip && self.buffer[to_skip] != 0 {
-                to_skip += 1;
-            }
+            let (partial, records_read, levels_read) = self.count_records(
+                max_records - total_records_read,
+                max_levels - total_levels_read,
+            );
 
-            level_skip += to_skip;
-            if to_skip == self.buffer.len() {
-                // Need to to read more values
-                self.buffer.clear();
-                continue;
-            }
+            output[total_levels_read..total_levels_read + levels_read].copy_from_slice(
+                &self.buffer[self.buffer_offset..self.buffer_offset + levels_read],
+            );
 
-            self.split_off_buffer(to_skip);
-            break;
+            total_levels_read += levels_read;
+            total_records_read += records_read;
+            self.buffer_offset += levels_read;
+            self.has_partial = partial;
         }
+        Ok((total_records_read, total_levels_read))
+    }
+
+    fn skip_rep_levels(

Review Comment:
   it might help to note here in comments that it tries to read up to `num_records` and `num_levels` (whichever is greater) and  returns the (total_records_read, total_levels_read) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] tustvold commented on a diff in pull request #4376: Move record delimiting into ColumnReader (#4365)

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #4376:
URL: https://github.com/apache/arrow-rs/pull/4376#discussion_r1223324759


##########
parquet/src/column/reader/decoder.rs:
##########
@@ -270,182 +297,227 @@ impl<T: DataType> ColumnValueDecoder for ColumnValueDecoderImpl<T> {
 
 const SKIP_BUFFER_SIZE: usize = 1024;
 
-/// An implementation of [`ColumnLevelDecoder`] for `[i16]`
-pub struct ColumnLevelDecoderImpl {
-    decoder: Option<LevelDecoderInner>,
-    /// Temporary buffer populated when skipping values
-    buffer: Vec<i16>,
-    bit_width: u8,
+enum LevelDecoder {
+    Packed(BitReader, u8),
+    Rle(RleDecoder),
 }
 
-impl ColumnLevelDecoderImpl {
-    pub fn new(max_level: i16) -> Self {
-        let bit_width = num_required_bits(max_level as u64);
-        Self {
-            decoder: None,
-            buffer: vec![],
-            bit_width,
+impl LevelDecoder {
+    fn new(encoding: Encoding, data: ByteBufferPtr, bit_width: u8) -> Self {
+        match encoding {
+            Encoding::RLE => {
+                let mut decoder = RleDecoder::new(bit_width);
+                decoder.set_data(data);
+                Self::Rle(decoder)
+            }
+            Encoding::BIT_PACKED => Self::Packed(BitReader::new(data), bit_width),
+            _ => unreachable!("invalid level encoding: {}", encoding),
         }
     }
 
-    /// Drops the first `len` values from the internal buffer
-    fn split_off_buffer(&mut self, len: usize) {
-        match self.buffer.len() == len {
-            true => self.buffer.clear(),
-            false => {
-                // Move to_read elements to end of slice
-                self.buffer.rotate_left(len);
-                // Truncate buffer
-                self.buffer.truncate(self.buffer.len() - len);
+    fn read(&mut self, out: &mut [i16]) -> Result<usize> {
+        match self {
+            Self::Packed(reader, bit_width) => {
+                Ok(reader.get_batch::<i16>(out, *bit_width as usize))
             }
+            Self::Rle(reader) => Ok(reader.get_batch(out)?),
         }
     }
+}
 
-    /// Reads up to `to_read` values to the internal buffer
-    fn read_to_buffer(&mut self, to_read: usize) -> Result<()> {
-        let mut buf = std::mem::take(&mut self.buffer);
-
-        // Repopulate buffer
-        buf.resize(to_read, 0);
-        let actual = self.read(&mut buf, 0..to_read)?;
-        buf.truncate(actual);
-
-        self.buffer = buf;
-        Ok(())
-    }
+/// An implementation of [`DefinitionLevelDecoder`] for `[i16]`
+pub struct DefinitionLevelDecoderImpl {
+    decoder: Option<LevelDecoder>,
+    bit_width: u8,
 }
 
-enum LevelDecoderInner {
-    Packed(BitReader, u8),
-    Rle(RleDecoder),
+impl DefinitionLevelDecoderImpl {
+    pub fn new(max_level: i16) -> Self {
+        let bit_width = num_required_bits(max_level as u64);
+        Self {
+            decoder: None,
+            bit_width,
+        }
+    }
 }
 
-impl ColumnLevelDecoder for ColumnLevelDecoderImpl {
+impl ColumnLevelDecoder for DefinitionLevelDecoderImpl {
     type Slice = [i16];
 
     fn set_data(&mut self, encoding: Encoding, data: ByteBufferPtr) {
-        self.buffer.clear();
-        match encoding {
-            Encoding::RLE => {
-                let mut decoder = RleDecoder::new(self.bit_width);
-                decoder.set_data(data);
-                self.decoder = Some(LevelDecoderInner::Rle(decoder));
-            }
-            Encoding::BIT_PACKED => {
-                self.decoder = Some(LevelDecoderInner::Packed(
-                    BitReader::new(data),
-                    self.bit_width,
-                ));
-            }
-            _ => unreachable!("invalid level encoding: {}", encoding),
-        }
+        self.decoder = Some(LevelDecoder::new(encoding, data, self.bit_width))
     }
+}
 
-    fn read(&mut self, out: &mut Self::Slice, mut range: Range<usize>) -> Result<usize> {
-        let read_from_buffer = match self.buffer.is_empty() {
-            true => 0,
-            false => {
-                let read_from_buffer = self.buffer.len().min(range.end - range.start);
-                out[range.start..range.start + read_from_buffer]
-                    .copy_from_slice(&self.buffer[0..read_from_buffer]);
-                self.split_off_buffer(read_from_buffer);
-                read_from_buffer
-            }
-        };
-        range.start += read_from_buffer;
-
-        match self.decoder.as_mut().unwrap() {
-            LevelDecoderInner::Packed(reader, bit_width) => Ok(read_from_buffer
-                + reader.get_batch::<i16>(&mut out[range], *bit_width as usize)),
-            LevelDecoderInner::Rle(reader) => {
-                Ok(read_from_buffer + reader.get_batch(&mut out[range])?)
-            }
-        }
+impl DefinitionLevelDecoder for DefinitionLevelDecoderImpl {
+    fn read_def_levels(
+        &mut self,
+        out: &mut Self::Slice,
+        range: Range<usize>,
+    ) -> Result<usize> {
+        self.decoder.as_mut().unwrap().read(&mut out[range])
     }
-}
 
-impl DefinitionLevelDecoder for ColumnLevelDecoderImpl {
     fn skip_def_levels(
         &mut self,
         num_levels: usize,
         max_def_level: i16,
     ) -> Result<(usize, usize)> {
         let mut level_skip = 0;
         let mut value_skip = 0;
+        let mut buf: Vec<i16> = vec![];
         while level_skip < num_levels {
             let remaining_levels = num_levels - level_skip;
 
-            if self.buffer.is_empty() {
-                // Only read number of needed values
-                self.read_to_buffer(remaining_levels.min(SKIP_BUFFER_SIZE))?;
-                if self.buffer.is_empty() {
-                    // Reached end of page
-                    break;
-                }
+            let to_read = remaining_levels.min(SKIP_BUFFER_SIZE);
+            buf.resize(to_read, 0);
+            let read = self.read_def_levels(&mut buf, 0..to_read)?;
+            if read == 0 {
+                // Reached end of page
+                break;
             }
-            let to_read = self.buffer.len().min(remaining_levels);
 
-            level_skip += to_read;
-            value_skip += self.buffer[..to_read]
-                .iter()
-                .filter(|x| **x == max_def_level)
-                .count();
-
-            self.split_off_buffer(to_read)
+            level_skip += read;
+            value_skip += buf[..read].iter().filter(|x| **x == max_def_level).count();
         }
 
         Ok((value_skip, level_skip))
     }
 }
 
-impl RepetitionLevelDecoder for ColumnLevelDecoderImpl {
-    fn skip_rep_levels(
+pub(crate) const REPETITION_LEVELS_BATCH_SIZE: usize = 1024;
+
+/// An implementation of [`RepetitionLevelDecoder`] for `[i16]`
+pub struct RepetitionLevelDecoderImpl {
+    decoder: Option<LevelDecoder>,
+    bit_width: u8,
+    buffer: Box<[i16; REPETITION_LEVELS_BATCH_SIZE]>,
+    buffer_len: usize,
+    buffer_offset: usize,
+    has_partial: bool,
+}
+
+impl RepetitionLevelDecoderImpl {
+    pub fn new(max_level: i16) -> Self {
+        let bit_width = num_required_bits(max_level as u64);
+        Self {
+            decoder: None,
+            bit_width,
+            buffer: Box::new([0; REPETITION_LEVELS_BATCH_SIZE]),
+            buffer_offset: 0,
+            buffer_len: 0,
+            has_partial: false,
+        }
+    }
+
+    fn fill_buf(&mut self) -> Result<()> {
+        let read = self.decoder.as_mut().unwrap().read(self.buffer.as_mut())?;
+        self.buffer_offset = 0;
+        self.buffer_len = read;
+        Ok(())
+    }
+
+    /// Inspects the buffered repetition levels in the range `self.buffer_offset..self.buffer_len`
+    /// and returns the number of "complete" records along with the corresponding number of values
+    ///
+    /// A "complete" record is one where the buffer contains a subsequent repetition level of 0
+    fn count_records(
         &mut self,
-        num_records: usize,
+        records_to_read: usize,
         num_levels: usize,
-    ) -> Result<(usize, usize)> {
-        let mut level_skip = 0;
-        let mut record_skip = 0;
+    ) -> (bool, usize, usize) {
+        let mut records_read = 0;
 
-        while level_skip < num_levels {
-            let remaining_levels = num_levels - level_skip;
+        let levels = num_levels.min(self.buffer_len - self.buffer_offset);
+        let buf = self.buffer.iter().skip(self.buffer_offset);
+        for (idx, item) in buf.take(levels).enumerate() {
+            if *item == 0 && (idx != 0 || self.has_partial) {
+                records_read += 1;
 
-            if self.buffer.is_empty() {
-                // Only read number of needed values
-                self.read_to_buffer(remaining_levels.min(SKIP_BUFFER_SIZE))?;
-                if self.buffer.is_empty() {
-                    // Reached end of page
-                    break;
+                if records_read == records_to_read {
+                    return (false, records_read, idx);
                 }
             }
+        }
+        // Either ran out of space in `num_levels` or data in `self.buffer`
+        (true, records_read, levels)
+    }
+}
+
+impl ColumnLevelDecoder for RepetitionLevelDecoderImpl {
+    type Slice = [i16];
 
-            let max_skip = self.buffer.len().min(remaining_levels);
+    fn set_data(&mut self, encoding: Encoding, data: ByteBufferPtr) {
+        self.decoder = Some(LevelDecoder::new(encoding, data, self.bit_width));
+        self.buffer_len = 0;
+        self.buffer_offset = 0;
+    }
+}
 
-            let mut to_skip = 0;
-            while to_skip < max_skip && record_skip != num_records {
-                if self.buffer[to_skip] == 0 {
-                    record_skip += 1;
+impl RepetitionLevelDecoder for RepetitionLevelDecoderImpl {
+    fn read_rep_levels(
+        &mut self,
+        out: &mut Self::Slice,
+        range: Range<usize>,
+        max_records: usize,
+    ) -> Result<(usize, usize)> {
+        let output = &mut out[range];
+        let max_levels = output.len();
+        let mut total_records_read = 0;
+        let mut total_levels_read = 0;
+
+        while total_records_read < max_records && total_levels_read < max_levels {
+            if self.buffer_len == self.buffer_offset {
+                self.fill_buf()?;
+                if self.buffer_len == 0 {
+                    break;
                 }
-                to_skip += 1;
             }
 
-            // Find end of record
-            while to_skip < max_skip && self.buffer[to_skip] != 0 {
-                to_skip += 1;
-            }
+            let (partial, records_read, levels_read) = self.count_records(
+                max_records - total_records_read,
+                max_levels - total_levels_read,
+            );
 
-            level_skip += to_skip;
-            if to_skip == self.buffer.len() {
-                // Need to to read more values
-                self.buffer.clear();
-                continue;
-            }
+            output[total_levels_read..total_levels_read + levels_read].copy_from_slice(
+                &self.buffer[self.buffer_offset..self.buffer_offset + levels_read],
+            );
 
-            self.split_off_buffer(to_skip);
-            break;
+            total_levels_read += levels_read;
+            total_records_read += records_read;
+            self.buffer_offset += levels_read;
+            self.has_partial = partial;
         }
+        Ok((total_records_read, total_levels_read))
+    }
+
+    fn skip_rep_levels(

Review Comment:
   I think the docs on the trait implementation are sufficient



##########
parquet/src/column/reader/decoder.rs:
##########
@@ -270,182 +297,227 @@ impl<T: DataType> ColumnValueDecoder for ColumnValueDecoderImpl<T> {
 
 const SKIP_BUFFER_SIZE: usize = 1024;
 
-/// An implementation of [`ColumnLevelDecoder`] for `[i16]`
-pub struct ColumnLevelDecoderImpl {
-    decoder: Option<LevelDecoderInner>,
-    /// Temporary buffer populated when skipping values
-    buffer: Vec<i16>,
-    bit_width: u8,
+enum LevelDecoder {
+    Packed(BitReader, u8),
+    Rle(RleDecoder),
 }
 
-impl ColumnLevelDecoderImpl {
-    pub fn new(max_level: i16) -> Self {
-        let bit_width = num_required_bits(max_level as u64);
-        Self {
-            decoder: None,
-            buffer: vec![],
-            bit_width,
+impl LevelDecoder {
+    fn new(encoding: Encoding, data: ByteBufferPtr, bit_width: u8) -> Self {
+        match encoding {
+            Encoding::RLE => {
+                let mut decoder = RleDecoder::new(bit_width);
+                decoder.set_data(data);
+                Self::Rle(decoder)
+            }
+            Encoding::BIT_PACKED => Self::Packed(BitReader::new(data), bit_width),
+            _ => unreachable!("invalid level encoding: {}", encoding),
         }
     }
 
-    /// Drops the first `len` values from the internal buffer
-    fn split_off_buffer(&mut self, len: usize) {
-        match self.buffer.len() == len {
-            true => self.buffer.clear(),
-            false => {
-                // Move to_read elements to end of slice
-                self.buffer.rotate_left(len);
-                // Truncate buffer
-                self.buffer.truncate(self.buffer.len() - len);
+    fn read(&mut self, out: &mut [i16]) -> Result<usize> {
+        match self {
+            Self::Packed(reader, bit_width) => {
+                Ok(reader.get_batch::<i16>(out, *bit_width as usize))
             }
+            Self::Rle(reader) => Ok(reader.get_batch(out)?),
         }
     }
+}
 
-    /// Reads up to `to_read` values to the internal buffer
-    fn read_to_buffer(&mut self, to_read: usize) -> Result<()> {
-        let mut buf = std::mem::take(&mut self.buffer);
-
-        // Repopulate buffer
-        buf.resize(to_read, 0);
-        let actual = self.read(&mut buf, 0..to_read)?;
-        buf.truncate(actual);
-
-        self.buffer = buf;
-        Ok(())
-    }
+/// An implementation of [`DefinitionLevelDecoder`] for `[i16]`
+pub struct DefinitionLevelDecoderImpl {
+    decoder: Option<LevelDecoder>,
+    bit_width: u8,
 }
 
-enum LevelDecoderInner {
-    Packed(BitReader, u8),
-    Rle(RleDecoder),
+impl DefinitionLevelDecoderImpl {
+    pub fn new(max_level: i16) -> Self {
+        let bit_width = num_required_bits(max_level as u64);
+        Self {
+            decoder: None,
+            bit_width,
+        }
+    }
 }
 
-impl ColumnLevelDecoder for ColumnLevelDecoderImpl {
+impl ColumnLevelDecoder for DefinitionLevelDecoderImpl {
     type Slice = [i16];
 
     fn set_data(&mut self, encoding: Encoding, data: ByteBufferPtr) {
-        self.buffer.clear();
-        match encoding {
-            Encoding::RLE => {
-                let mut decoder = RleDecoder::new(self.bit_width);
-                decoder.set_data(data);
-                self.decoder = Some(LevelDecoderInner::Rle(decoder));
-            }
-            Encoding::BIT_PACKED => {
-                self.decoder = Some(LevelDecoderInner::Packed(
-                    BitReader::new(data),
-                    self.bit_width,
-                ));
-            }
-            _ => unreachable!("invalid level encoding: {}", encoding),
-        }
+        self.decoder = Some(LevelDecoder::new(encoding, data, self.bit_width))
     }
+}
 
-    fn read(&mut self, out: &mut Self::Slice, mut range: Range<usize>) -> Result<usize> {
-        let read_from_buffer = match self.buffer.is_empty() {
-            true => 0,
-            false => {
-                let read_from_buffer = self.buffer.len().min(range.end - range.start);
-                out[range.start..range.start + read_from_buffer]
-                    .copy_from_slice(&self.buffer[0..read_from_buffer]);
-                self.split_off_buffer(read_from_buffer);
-                read_from_buffer
-            }
-        };
-        range.start += read_from_buffer;
-
-        match self.decoder.as_mut().unwrap() {
-            LevelDecoderInner::Packed(reader, bit_width) => Ok(read_from_buffer
-                + reader.get_batch::<i16>(&mut out[range], *bit_width as usize)),
-            LevelDecoderInner::Rle(reader) => {
-                Ok(read_from_buffer + reader.get_batch(&mut out[range])?)
-            }
-        }
+impl DefinitionLevelDecoder for DefinitionLevelDecoderImpl {
+    fn read_def_levels(
+        &mut self,
+        out: &mut Self::Slice,
+        range: Range<usize>,
+    ) -> Result<usize> {
+        self.decoder.as_mut().unwrap().read(&mut out[range])
     }
-}
 
-impl DefinitionLevelDecoder for ColumnLevelDecoderImpl {
     fn skip_def_levels(
         &mut self,
         num_levels: usize,
         max_def_level: i16,
     ) -> Result<(usize, usize)> {
         let mut level_skip = 0;
         let mut value_skip = 0;
+        let mut buf: Vec<i16> = vec![];
         while level_skip < num_levels {
             let remaining_levels = num_levels - level_skip;
 
-            if self.buffer.is_empty() {
-                // Only read number of needed values
-                self.read_to_buffer(remaining_levels.min(SKIP_BUFFER_SIZE))?;
-                if self.buffer.is_empty() {
-                    // Reached end of page
-                    break;
-                }
+            let to_read = remaining_levels.min(SKIP_BUFFER_SIZE);
+            buf.resize(to_read, 0);
+            let read = self.read_def_levels(&mut buf, 0..to_read)?;
+            if read == 0 {
+                // Reached end of page
+                break;
             }
-            let to_read = self.buffer.len().min(remaining_levels);
 
-            level_skip += to_read;
-            value_skip += self.buffer[..to_read]
-                .iter()
-                .filter(|x| **x == max_def_level)
-                .count();
-
-            self.split_off_buffer(to_read)
+            level_skip += read;
+            value_skip += buf[..read].iter().filter(|x| **x == max_def_level).count();
         }
 
         Ok((value_skip, level_skip))
     }
 }
 
-impl RepetitionLevelDecoder for ColumnLevelDecoderImpl {
-    fn skip_rep_levels(
+pub(crate) const REPETITION_LEVELS_BATCH_SIZE: usize = 1024;
+
+/// An implementation of [`RepetitionLevelDecoder`] for `[i16]`
+pub struct RepetitionLevelDecoderImpl {
+    decoder: Option<LevelDecoder>,
+    bit_width: u8,
+    buffer: Box<[i16; REPETITION_LEVELS_BATCH_SIZE]>,
+    buffer_len: usize,
+    buffer_offset: usize,
+    has_partial: bool,
+}
+
+impl RepetitionLevelDecoderImpl {
+    pub fn new(max_level: i16) -> Self {
+        let bit_width = num_required_bits(max_level as u64);
+        Self {
+            decoder: None,
+            bit_width,
+            buffer: Box::new([0; REPETITION_LEVELS_BATCH_SIZE]),
+            buffer_offset: 0,
+            buffer_len: 0,
+            has_partial: false,
+        }
+    }
+
+    fn fill_buf(&mut self) -> Result<()> {
+        let read = self.decoder.as_mut().unwrap().read(self.buffer.as_mut())?;
+        self.buffer_offset = 0;
+        self.buffer_len = read;
+        Ok(())
+    }
+
+    /// Inspects the buffered repetition levels in the range `self.buffer_offset..self.buffer_len`
+    /// and returns the number of "complete" records along with the corresponding number of values
+    ///
+    /// A "complete" record is one where the buffer contains a subsequent repetition level of 0
+    fn count_records(
         &mut self,
-        num_records: usize,
+        records_to_read: usize,
         num_levels: usize,
-    ) -> Result<(usize, usize)> {
-        let mut level_skip = 0;
-        let mut record_skip = 0;
+    ) -> (bool, usize, usize) {
+        let mut records_read = 0;
 
-        while level_skip < num_levels {
-            let remaining_levels = num_levels - level_skip;
+        let levels = num_levels.min(self.buffer_len - self.buffer_offset);
+        let buf = self.buffer.iter().skip(self.buffer_offset);
+        for (idx, item) in buf.take(levels).enumerate() {
+            if *item == 0 && (idx != 0 || self.has_partial) {
+                records_read += 1;
 
-            if self.buffer.is_empty() {
-                // Only read number of needed values
-                self.read_to_buffer(remaining_levels.min(SKIP_BUFFER_SIZE))?;
-                if self.buffer.is_empty() {
-                    // Reached end of page
-                    break;
+                if records_read == records_to_read {
+                    return (false, records_read, idx);
                 }
             }
+        }
+        // Either ran out of space in `num_levels` or data in `self.buffer`
+        (true, records_read, levels)
+    }
+}
+
+impl ColumnLevelDecoder for RepetitionLevelDecoderImpl {
+    type Slice = [i16];
 
-            let max_skip = self.buffer.len().min(remaining_levels);
+    fn set_data(&mut self, encoding: Encoding, data: ByteBufferPtr) {
+        self.decoder = Some(LevelDecoder::new(encoding, data, self.bit_width));
+        self.buffer_len = 0;
+        self.buffer_offset = 0;
+    }
+}
 
-            let mut to_skip = 0;
-            while to_skip < max_skip && record_skip != num_records {
-                if self.buffer[to_skip] == 0 {
-                    record_skip += 1;
+impl RepetitionLevelDecoder for RepetitionLevelDecoderImpl {
+    fn read_rep_levels(
+        &mut self,
+        out: &mut Self::Slice,
+        range: Range<usize>,
+        max_records: usize,
+    ) -> Result<(usize, usize)> {
+        let output = &mut out[range];
+        let max_levels = output.len();
+        let mut total_records_read = 0;
+        let mut total_levels_read = 0;
+
+        while total_records_read < max_records && total_levels_read < max_levels {
+            if self.buffer_len == self.buffer_offset {
+                self.fill_buf()?;
+                if self.buffer_len == 0 {
+                    break;
                 }
-                to_skip += 1;
             }
 
-            // Find end of record
-            while to_skip < max_skip && self.buffer[to_skip] != 0 {
-                to_skip += 1;
-            }
+            let (partial, records_read, levels_read) = self.count_records(
+                max_records - total_records_read,
+                max_levels - total_levels_read,
+            );
 
-            level_skip += to_skip;
-            if to_skip == self.buffer.len() {
-                // Need to to read more values
-                self.buffer.clear();
-                continue;
-            }
+            output[total_levels_read..total_levels_read + levels_read].copy_from_slice(
+                &self.buffer[self.buffer_offset..self.buffer_offset + levels_read],
+            );
 
-            self.split_off_buffer(to_skip);
-            break;
+            total_levels_read += levels_read;
+            total_records_read += records_read;
+            self.buffer_offset += levels_read;
+            self.has_partial = partial;
         }
+        Ok((total_records_read, total_levels_read))
+    }
+
+    fn skip_rep_levels(

Review Comment:
   I think the docs on the trait definition are sufficient



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] tustvold commented on a diff in pull request #4376: Move record delimiting into ColumnReader (#4365)

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #4376:
URL: https://github.com/apache/arrow-rs/pull/4376#discussion_r1221403305


##########
parquet/src/arrow/record_reader/mod.rs:
##########
@@ -313,90 +231,29 @@ where
                 )
             })?;
 
-            self.records.pad_nulls(
-                self.values_written,
+            self.values.pad_nulls(
+                self.num_values,
                 values_read,
                 levels_read,
                 def_levels.nulls().as_slice(),
             );
         }
 
-        let values_read = max(levels_read, values_read);
-        self.set_values_written(self.values_written + values_read);
-        Ok(values_read)
-    }
-
-    /// Inspects the buffered repetition levels in the range `self.num_values..self.values_written`
-    /// and returns the number of "complete" records along with the corresponding number of values
-    ///
-    /// If `end_of_column` is true it indicates that there are no further values for this
-    /// column chunk beyond what is currently in the buffers
-    ///
-    /// A "complete" record is one where the buffer contains a subsequent repetition level of 0
-    fn count_records(

Review Comment:
   This logic is now moved into RepetitionLevelDecoderImpl



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] zeevm commented on pull request #4376: Move record delimiting into ColumnReader (#4365)

Posted by "zeevm (via GitHub)" <gi...@apache.org>.
zeevm commented on PR #4376:
URL: https://github.com/apache/arrow-rs/pull/4376#issuecomment-1639980948

   @tustvold the new read_records() api returns the tuple (total_records_read, total_values_read, total_levels_read)
   
   What's the purpose of total_levels_read? this will be either 0 is max def levels is 0, otherwise it should be the same as total_records_read, shouldn't it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] tustvold commented on pull request #4376: Move record delimiting into ColumnReader (#4365)

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on PR #4376:
URL: https://github.com/apache/arrow-rs/pull/4376#issuecomment-1640090770

   For a repeated field the number of levels read may exceed the number of records read, as a record is delimited by a repetition level of 0


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] tustvold commented on a diff in pull request #4376: Move record delimiting into ColumnReader (#4365)

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #4376:
URL: https://github.com/apache/arrow-rs/pull/4376#discussion_r1220469934


##########
parquet/src/record/triplet.rs:
##########
@@ -295,16 +295,16 @@ impl<T: DataType> TypedTripletIter<T> {
     fn read_next(&mut self) -> Result<bool> {
         self.curr_triplet_index += 1;
 
-        if self.curr_triplet_index >= self.triplets_left {
-            let (values_read, levels_read) = {
+        while self.curr_triplet_index >= self.triplets_left {

Review Comment:
   I hope to make it so that this change isn't necessary



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] zeevm commented on pull request #4376: Move record delimiting into ColumnReader (#4365)

Posted by "zeevm (via GitHub)" <gi...@apache.org>.
zeevm commented on PR #4376:
URL: https://github.com/apache/arrow-rs/pull/4376#issuecomment-1640124906

   but the '0' rep level is also associated with the first value of the next record, so the total_records should also include that shouldn't it?
   Or will the read_records() impl will only read 'full records', not parts of records?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] tustvold commented on a diff in pull request #4376: Move record delimiting into ColumnReader (#4365)

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #4376:
URL: https://github.com/apache/arrow-rs/pull/4376#discussion_r1223324759


##########
parquet/src/column/reader/decoder.rs:
##########
@@ -270,182 +297,227 @@ impl<T: DataType> ColumnValueDecoder for ColumnValueDecoderImpl<T> {
 
 const SKIP_BUFFER_SIZE: usize = 1024;
 
-/// An implementation of [`ColumnLevelDecoder`] for `[i16]`
-pub struct ColumnLevelDecoderImpl {
-    decoder: Option<LevelDecoderInner>,
-    /// Temporary buffer populated when skipping values
-    buffer: Vec<i16>,
-    bit_width: u8,
+enum LevelDecoder {
+    Packed(BitReader, u8),
+    Rle(RleDecoder),
 }
 
-impl ColumnLevelDecoderImpl {
-    pub fn new(max_level: i16) -> Self {
-        let bit_width = num_required_bits(max_level as u64);
-        Self {
-            decoder: None,
-            buffer: vec![],
-            bit_width,
+impl LevelDecoder {
+    fn new(encoding: Encoding, data: ByteBufferPtr, bit_width: u8) -> Self {
+        match encoding {
+            Encoding::RLE => {
+                let mut decoder = RleDecoder::new(bit_width);
+                decoder.set_data(data);
+                Self::Rle(decoder)
+            }
+            Encoding::BIT_PACKED => Self::Packed(BitReader::new(data), bit_width),
+            _ => unreachable!("invalid level encoding: {}", encoding),
         }
     }
 
-    /// Drops the first `len` values from the internal buffer
-    fn split_off_buffer(&mut self, len: usize) {
-        match self.buffer.len() == len {
-            true => self.buffer.clear(),
-            false => {
-                // Move to_read elements to end of slice
-                self.buffer.rotate_left(len);
-                // Truncate buffer
-                self.buffer.truncate(self.buffer.len() - len);
+    fn read(&mut self, out: &mut [i16]) -> Result<usize> {
+        match self {
+            Self::Packed(reader, bit_width) => {
+                Ok(reader.get_batch::<i16>(out, *bit_width as usize))
             }
+            Self::Rle(reader) => Ok(reader.get_batch(out)?),
         }
     }
+}
 
-    /// Reads up to `to_read` values to the internal buffer
-    fn read_to_buffer(&mut self, to_read: usize) -> Result<()> {
-        let mut buf = std::mem::take(&mut self.buffer);
-
-        // Repopulate buffer
-        buf.resize(to_read, 0);
-        let actual = self.read(&mut buf, 0..to_read)?;
-        buf.truncate(actual);
-
-        self.buffer = buf;
-        Ok(())
-    }
+/// An implementation of [`DefinitionLevelDecoder`] for `[i16]`
+pub struct DefinitionLevelDecoderImpl {
+    decoder: Option<LevelDecoder>,
+    bit_width: u8,
 }
 
-enum LevelDecoderInner {
-    Packed(BitReader, u8),
-    Rle(RleDecoder),
+impl DefinitionLevelDecoderImpl {
+    pub fn new(max_level: i16) -> Self {
+        let bit_width = num_required_bits(max_level as u64);
+        Self {
+            decoder: None,
+            bit_width,
+        }
+    }
 }
 
-impl ColumnLevelDecoder for ColumnLevelDecoderImpl {
+impl ColumnLevelDecoder for DefinitionLevelDecoderImpl {
     type Slice = [i16];
 
     fn set_data(&mut self, encoding: Encoding, data: ByteBufferPtr) {
-        self.buffer.clear();
-        match encoding {
-            Encoding::RLE => {
-                let mut decoder = RleDecoder::new(self.bit_width);
-                decoder.set_data(data);
-                self.decoder = Some(LevelDecoderInner::Rle(decoder));
-            }
-            Encoding::BIT_PACKED => {
-                self.decoder = Some(LevelDecoderInner::Packed(
-                    BitReader::new(data),
-                    self.bit_width,
-                ));
-            }
-            _ => unreachable!("invalid level encoding: {}", encoding),
-        }
+        self.decoder = Some(LevelDecoder::new(encoding, data, self.bit_width))
     }
+}
 
-    fn read(&mut self, out: &mut Self::Slice, mut range: Range<usize>) -> Result<usize> {
-        let read_from_buffer = match self.buffer.is_empty() {
-            true => 0,
-            false => {
-                let read_from_buffer = self.buffer.len().min(range.end - range.start);
-                out[range.start..range.start + read_from_buffer]
-                    .copy_from_slice(&self.buffer[0..read_from_buffer]);
-                self.split_off_buffer(read_from_buffer);
-                read_from_buffer
-            }
-        };
-        range.start += read_from_buffer;
-
-        match self.decoder.as_mut().unwrap() {
-            LevelDecoderInner::Packed(reader, bit_width) => Ok(read_from_buffer
-                + reader.get_batch::<i16>(&mut out[range], *bit_width as usize)),
-            LevelDecoderInner::Rle(reader) => {
-                Ok(read_from_buffer + reader.get_batch(&mut out[range])?)
-            }
-        }
+impl DefinitionLevelDecoder for DefinitionLevelDecoderImpl {
+    fn read_def_levels(
+        &mut self,
+        out: &mut Self::Slice,
+        range: Range<usize>,
+    ) -> Result<usize> {
+        self.decoder.as_mut().unwrap().read(&mut out[range])
     }
-}
 
-impl DefinitionLevelDecoder for ColumnLevelDecoderImpl {
     fn skip_def_levels(
         &mut self,
         num_levels: usize,
         max_def_level: i16,
     ) -> Result<(usize, usize)> {
         let mut level_skip = 0;
         let mut value_skip = 0;
+        let mut buf: Vec<i16> = vec![];
         while level_skip < num_levels {
             let remaining_levels = num_levels - level_skip;
 
-            if self.buffer.is_empty() {
-                // Only read number of needed values
-                self.read_to_buffer(remaining_levels.min(SKIP_BUFFER_SIZE))?;
-                if self.buffer.is_empty() {
-                    // Reached end of page
-                    break;
-                }
+            let to_read = remaining_levels.min(SKIP_BUFFER_SIZE);
+            buf.resize(to_read, 0);
+            let read = self.read_def_levels(&mut buf, 0..to_read)?;
+            if read == 0 {
+                // Reached end of page
+                break;
             }
-            let to_read = self.buffer.len().min(remaining_levels);
 
-            level_skip += to_read;
-            value_skip += self.buffer[..to_read]
-                .iter()
-                .filter(|x| **x == max_def_level)
-                .count();
-
-            self.split_off_buffer(to_read)
+            level_skip += read;
+            value_skip += buf[..read].iter().filter(|x| **x == max_def_level).count();
         }
 
         Ok((value_skip, level_skip))
     }
 }
 
-impl RepetitionLevelDecoder for ColumnLevelDecoderImpl {
-    fn skip_rep_levels(
+pub(crate) const REPETITION_LEVELS_BATCH_SIZE: usize = 1024;
+
+/// An implementation of [`RepetitionLevelDecoder`] for `[i16]`
+pub struct RepetitionLevelDecoderImpl {
+    decoder: Option<LevelDecoder>,
+    bit_width: u8,
+    buffer: Box<[i16; REPETITION_LEVELS_BATCH_SIZE]>,
+    buffer_len: usize,
+    buffer_offset: usize,
+    has_partial: bool,
+}
+
+impl RepetitionLevelDecoderImpl {
+    pub fn new(max_level: i16) -> Self {
+        let bit_width = num_required_bits(max_level as u64);
+        Self {
+            decoder: None,
+            bit_width,
+            buffer: Box::new([0; REPETITION_LEVELS_BATCH_SIZE]),
+            buffer_offset: 0,
+            buffer_len: 0,
+            has_partial: false,
+        }
+    }
+
+    fn fill_buf(&mut self) -> Result<()> {
+        let read = self.decoder.as_mut().unwrap().read(self.buffer.as_mut())?;
+        self.buffer_offset = 0;
+        self.buffer_len = read;
+        Ok(())
+    }
+
+    /// Inspects the buffered repetition levels in the range `self.buffer_offset..self.buffer_len`
+    /// and returns the number of "complete" records along with the corresponding number of values
+    ///
+    /// A "complete" record is one where the buffer contains a subsequent repetition level of 0
+    fn count_records(
         &mut self,
-        num_records: usize,
+        records_to_read: usize,
         num_levels: usize,
-    ) -> Result<(usize, usize)> {
-        let mut level_skip = 0;
-        let mut record_skip = 0;
+    ) -> (bool, usize, usize) {
+        let mut records_read = 0;
 
-        while level_skip < num_levels {
-            let remaining_levels = num_levels - level_skip;
+        let levels = num_levels.min(self.buffer_len - self.buffer_offset);
+        let buf = self.buffer.iter().skip(self.buffer_offset);
+        for (idx, item) in buf.take(levels).enumerate() {
+            if *item == 0 && (idx != 0 || self.has_partial) {
+                records_read += 1;
 
-            if self.buffer.is_empty() {
-                // Only read number of needed values
-                self.read_to_buffer(remaining_levels.min(SKIP_BUFFER_SIZE))?;
-                if self.buffer.is_empty() {
-                    // Reached end of page
-                    break;
+                if records_read == records_to_read {
+                    return (false, records_read, idx);
                 }
             }
+        }
+        // Either ran out of space in `num_levels` or data in `self.buffer`
+        (true, records_read, levels)
+    }
+}
+
+impl ColumnLevelDecoder for RepetitionLevelDecoderImpl {
+    type Slice = [i16];
 
-            let max_skip = self.buffer.len().min(remaining_levels);
+    fn set_data(&mut self, encoding: Encoding, data: ByteBufferPtr) {
+        self.decoder = Some(LevelDecoder::new(encoding, data, self.bit_width));
+        self.buffer_len = 0;
+        self.buffer_offset = 0;
+    }
+}
 
-            let mut to_skip = 0;
-            while to_skip < max_skip && record_skip != num_records {
-                if self.buffer[to_skip] == 0 {
-                    record_skip += 1;
+impl RepetitionLevelDecoder for RepetitionLevelDecoderImpl {
+    fn read_rep_levels(
+        &mut self,
+        out: &mut Self::Slice,
+        range: Range<usize>,
+        max_records: usize,
+    ) -> Result<(usize, usize)> {
+        let output = &mut out[range];
+        let max_levels = output.len();
+        let mut total_records_read = 0;
+        let mut total_levels_read = 0;
+
+        while total_records_read < max_records && total_levels_read < max_levels {
+            if self.buffer_len == self.buffer_offset {
+                self.fill_buf()?;
+                if self.buffer_len == 0 {
+                    break;
                 }
-                to_skip += 1;
             }
 
-            // Find end of record
-            while to_skip < max_skip && self.buffer[to_skip] != 0 {
-                to_skip += 1;
-            }
+            let (partial, records_read, levels_read) = self.count_records(
+                max_records - total_records_read,
+                max_levels - total_levels_read,
+            );
 
-            level_skip += to_skip;
-            if to_skip == self.buffer.len() {
-                // Need to to read more values
-                self.buffer.clear();
-                continue;
-            }
+            output[total_levels_read..total_levels_read + levels_read].copy_from_slice(
+                &self.buffer[self.buffer_offset..self.buffer_offset + levels_read],
+            );
 
-            self.split_off_buffer(to_skip);
-            break;
+            total_levels_read += levels_read;
+            total_records_read += records_read;
+            self.buffer_offset += levels_read;
+            self.has_partial = partial;
         }
+        Ok((total_records_read, total_levels_read))
+    }
+
+    fn skip_rep_levels(

Review Comment:
   I think the docs on the trait definition are sufficient, admittedly less visible in a PR review without code completion I will concede



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] tustvold commented on a diff in pull request #4376: Move record delimiting into ColumnReader (#4365)

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #4376:
URL: https://github.com/apache/arrow-rs/pull/4376#discussion_r1223323728


##########
parquet/src/arrow/record_reader/mod.rs:
##########
@@ -201,31 +149,10 @@ where
     ///
     /// Number of records skipped
     pub fn skip_records(&mut self, num_records: usize) -> Result<usize> {
-        // First need to clear the buffer
-        let end_of_column = match self.column_reader.as_mut() {
-            Some(reader) => !reader.peek_next()?,
-            None => return Ok(0),
-        };
-
-        let (buffered_records, buffered_values) =
-            self.count_records(num_records, end_of_column);
-
-        self.num_records += buffered_records;
-        self.num_values += buffered_values;
-
-        let remaining = num_records - buffered_records;
-
-        if remaining == 0 {
-            return Ok(buffered_records);
+        match self.column_reader.as_mut() {
+            Some(reader) => reader.skip_records(num_records),
+            None => Ok(0),

Review Comment:
   Yes, RecordReader no longer performs this intermediate buffering, it only buffers requested rows



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] tustvold commented on a diff in pull request #4376: Move record delimiting into ColumnReader (#4365)

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #4376:
URL: https://github.com/apache/arrow-rs/pull/4376#discussion_r1221798032


##########
parquet/src/arrow/arrow_reader/mod.rs:
##########
@@ -2658,4 +2653,111 @@ mod tests {
         let total_rows: usize = reader.map(|r| r.unwrap().num_rows()).sum();
         assert_eq!(total_rows, 924 * 2);
     }
+
+    #[test]
+    fn test_list_selection_fuzz() {

Review Comment:
   I'm quite pleased with this, the parquet reader is pretty sophisticated now :smile: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] tustvold merged pull request #4376: Move record delimiting into ColumnReader (#4365)

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold merged PR #4376:
URL: https://github.com/apache/arrow-rs/pull/4376


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] zeevm commented on pull request #4376: Move record delimiting into ColumnReader (#4365)

Posted by "zeevm (via GitHub)" <gi...@apache.org>.
zeevm commented on PR #4376:
URL: https://github.com/apache/arrow-rs/pull/4376#issuecomment-1640130662

   Also, which levels are reported in total_levels_read, rep or def?
   
   For an OPTIONAL field we have def levels but no rep levels.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org