You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "alamb (via GitHub)" <gi...@apache.org> on 2023/06/08 11:20:08 UTC

[GitHub] [arrow-rs] alamb commented on a diff in pull request #4376: Move record delimiting into ColumnReader (#4365)

alamb commented on code in PR #4376:
URL: https://github.com/apache/arrow-rs/pull/4376#discussion_r1222882752


##########
parquet/src/arrow/arrow_reader/mod.rs:
##########
@@ -2610,4 +2611,153 @@ mod tests {
         test_decimal_roundtrip::<Decimal128Type>();
         test_decimal_roundtrip::<Decimal256Type>();
     }
+
+    #[test]
+    fn test_list_selection() {
+        let schema = Arc::new(Schema::new(vec![Field::new_list(
+            "list",
+            Field::new("item", ArrowDataType::Utf8, true),
+            false,
+        )]));
+        let mut buf = Vec::with_capacity(1024);
+
+        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
+
+        for _ in 0..2 {
+            let mut list_a_builder = ListBuilder::new(StringBuilder::new());
+            for i in 0..1024 {
+                list_a_builder.values().append_value(format!("{i}"));
+                list_a_builder.append(true);
+            }
+            let batch = RecordBatch::try_new(
+                schema.clone(),
+                vec![Arc::new(list_a_builder.finish())],
+            )
+            .unwrap();
+            writer.write(&batch).unwrap();
+        }
+        let _metadata = writer.close().unwrap();
+
+        let buf = Bytes::from(buf);
+        let reader = ParquetRecordBatchReaderBuilder::try_new(buf)
+            .unwrap()
+            .with_row_selection(RowSelection::from(vec![
+                RowSelector::skip(100),
+                RowSelector::select(924),
+                RowSelector::skip(100),
+                RowSelector::select(924),
+            ]))
+            .build()
+            .unwrap();
+
+        let total_rows: usize = reader.map(|r| r.unwrap().num_rows()).sum();
+        assert_eq!(total_rows, 924 * 2);
+    }
+
+    #[test]
+    fn test_list_selection_fuzz() {
+        let mut rng = thread_rng();
+        let schema = Arc::new(Schema::new(vec![Field::new_list(
+            "list",
+            Field::new_list("item", Field::new("item", ArrowDataType::Int32, true), true),
+            true,
+        )]));
+        let mut buf = Vec::with_capacity(1024);
+        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
+
+        let mut list_a_builder = ListBuilder::new(ListBuilder::new(Int32Builder::new()));
+
+        for _ in 0..2048 {
+            if rng.gen_bool(0.2) {
+                list_a_builder.append(false);
+                continue;
+            }
+
+            let list_a_len = rng.gen_range(0..10);
+            let list_b_builder = list_a_builder.values();
+
+            for _ in 0..list_a_len {
+                if rng.gen_bool(0.2) {
+                    list_b_builder.append(false);
+                    continue;
+                }
+
+                let list_b_len = rng.gen_range(0..10);
+                let int_builder = list_b_builder.values();
+                for _ in 0..list_b_len {
+                    match rng.gen_bool(0.2) {
+                        true => int_builder.append_null(),
+                        false => int_builder.append_value(rng.gen()),
+                    }
+                }
+                list_b_builder.append(true)
+            }
+            list_a_builder.append(true);
+        }
+
+        let array = Arc::new(list_a_builder.finish());
+        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
+
+        writer.write(&batch).unwrap();
+        let _metadata = writer.close().unwrap();
+
+        let buf = Bytes::from(buf);
+
+        let cases = [
+            vec![
+                RowSelector::skip(100),
+                RowSelector::select(924),
+                RowSelector::skip(100),
+                RowSelector::select(924),
+            ],
+            vec![
+                RowSelector::select(924),
+                RowSelector::skip(100),
+                RowSelector::select(924),
+                RowSelector::skip(100),
+            ],
+            vec![

Review Comment:
   I like this boundary condition testing



##########
parquet/src/arrow/arrow_reader/mod.rs:
##########
@@ -2610,4 +2611,153 @@ mod tests {
         test_decimal_roundtrip::<Decimal128Type>();
         test_decimal_roundtrip::<Decimal256Type>();
     }
+
+    #[test]
+    fn test_list_selection() {
+        let schema = Arc::new(Schema::new(vec![Field::new_list(
+            "list",
+            Field::new("item", ArrowDataType::Utf8, true),
+            false,
+        )]));
+        let mut buf = Vec::with_capacity(1024);
+
+        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
+
+        for _ in 0..2 {
+            let mut list_a_builder = ListBuilder::new(StringBuilder::new());
+            for i in 0..1024 {
+                list_a_builder.values().append_value(format!("{i}"));
+                list_a_builder.append(true);
+            }
+            let batch = RecordBatch::try_new(
+                schema.clone(),
+                vec![Arc::new(list_a_builder.finish())],
+            )
+            .unwrap();
+            writer.write(&batch).unwrap();
+        }
+        let _metadata = writer.close().unwrap();
+
+        let buf = Bytes::from(buf);
+        let reader = ParquetRecordBatchReaderBuilder::try_new(buf)
+            .unwrap()
+            .with_row_selection(RowSelection::from(vec![
+                RowSelector::skip(100),
+                RowSelector::select(924),
+                RowSelector::skip(100),
+                RowSelector::select(924),
+            ]))
+            .build()
+            .unwrap();
+
+        let total_rows: usize = reader.map(|r| r.unwrap().num_rows()).sum();
+        assert_eq!(total_rows, 924 * 2);

Review Comment:
   I wonder if it is worth asserting the values come back as expected (you could make the expected results in the `for _ in 0..2` loop above)?



##########
parquet/src/arrow/arrow_reader/mod.rs:
##########
@@ -2610,4 +2611,153 @@ mod tests {
         test_decimal_roundtrip::<Decimal128Type>();
         test_decimal_roundtrip::<Decimal256Type>();
     }
+
+    #[test]
+    fn test_list_selection() {
+        let schema = Arc::new(Schema::new(vec![Field::new_list(
+            "list",
+            Field::new("item", ArrowDataType::Utf8, true),
+            false,
+        )]));
+        let mut buf = Vec::with_capacity(1024);
+
+        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
+
+        for _ in 0..2 {
+            let mut list_a_builder = ListBuilder::new(StringBuilder::new());
+            for i in 0..1024 {
+                list_a_builder.values().append_value(format!("{i}"));
+                list_a_builder.append(true);
+            }
+            let batch = RecordBatch::try_new(
+                schema.clone(),
+                vec![Arc::new(list_a_builder.finish())],
+            )
+            .unwrap();
+            writer.write(&batch).unwrap();
+        }
+        let _metadata = writer.close().unwrap();
+
+        let buf = Bytes::from(buf);
+        let reader = ParquetRecordBatchReaderBuilder::try_new(buf)
+            .unwrap()
+            .with_row_selection(RowSelection::from(vec![
+                RowSelector::skip(100),
+                RowSelector::select(924),
+                RowSelector::skip(100),
+                RowSelector::select(924),
+            ]))
+            .build()
+            .unwrap();
+
+        let total_rows: usize = reader.map(|r| r.unwrap().num_rows()).sum();
+        assert_eq!(total_rows, 924 * 2);
+    }
+
+    #[test]
+    fn test_list_selection_fuzz() {
+        let mut rng = thread_rng();
+        let schema = Arc::new(Schema::new(vec![Field::new_list(
+            "list",
+            Field::new_list("item", Field::new("item", ArrowDataType::Int32, true), true),
+            true,
+        )]));
+        let mut buf = Vec::with_capacity(1024);
+        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
+
+        let mut list_a_builder = ListBuilder::new(ListBuilder::new(Int32Builder::new()));
+
+        for _ in 0..2048 {
+            if rng.gen_bool(0.2) {
+                list_a_builder.append(false);
+                continue;
+            }
+
+            let list_a_len = rng.gen_range(0..10);
+            let list_b_builder = list_a_builder.values();
+
+            for _ in 0..list_a_len {
+                if rng.gen_bool(0.2) {
+                    list_b_builder.append(false);
+                    continue;
+                }
+
+                let list_b_len = rng.gen_range(0..10);
+                let int_builder = list_b_builder.values();
+                for _ in 0..list_b_len {
+                    match rng.gen_bool(0.2) {
+                        true => int_builder.append_null(),
+                        false => int_builder.append_value(rng.gen()),
+                    }
+                }
+                list_b_builder.append(true)
+            }
+            list_a_builder.append(true);
+        }
+
+        let array = Arc::new(list_a_builder.finish());
+        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
+
+        writer.write(&batch).unwrap();
+        let _metadata = writer.close().unwrap();
+
+        let buf = Bytes::from(buf);
+
+        let cases = [
+            vec![
+                RowSelector::skip(100),
+                RowSelector::select(924),
+                RowSelector::skip(100),
+                RowSelector::select(924),
+            ],
+            vec![
+                RowSelector::select(924),
+                RowSelector::skip(100),
+                RowSelector::select(924),
+                RowSelector::skip(100),
+            ],
+            vec![
+                RowSelector::skip(1023),
+                RowSelector::select(1),
+                RowSelector::skip(1023),
+                RowSelector::select(1),
+            ],
+            vec![
+                RowSelector::select(1),
+                RowSelector::skip(1023),
+                RowSelector::select(1),
+                RowSelector::skip(1023),
+            ],
+        ];
+
+        for selection in cases {
+            let selection = RowSelection::from(selection);
+            let mut reader = ParquetRecordBatchReaderBuilder::try_new(buf.clone())
+                .unwrap()
+                .with_row_selection(selection.clone())
+                .with_batch_size(2048)

Review Comment:
   Given all the inputs are only 2048 rows in total (and thus in a single batch) would tests with different batch sizes that span the input (e.g `1000` for example) be a good idea?



##########
parquet/src/column/reader.rs:
##########
@@ -195,99 +198,126 @@ where
     ///
     /// `values` will be contiguously populated with the non-null values. Note that if the column
     /// is not required, this may be less than either `batch_size` or the number of levels read
+    #[deprecated(note = "Use read_records")]
     pub fn read_batch(
         &mut self,
         batch_size: usize,
-        mut def_levels: Option<&mut D::Slice>,
-        mut rep_levels: Option<&mut R::Slice>,
+        def_levels: Option<&mut D::Slice>,
+        rep_levels: Option<&mut R::Slice>,
         values: &mut V::Slice,
     ) -> Result<(usize, usize)> {
-        let mut values_read = 0;
-        let mut levels_read = 0;
+        let (_, values, levels) =
+            self.read_records(batch_size, def_levels, rep_levels, values)?;
+
+        Ok((values, levels))
+    }
 
-        // Compute the smallest batch size we can read based on provided slices
-        let mut batch_size = min(batch_size, values.capacity());
+    /// Read up to `num_records` returning the number of complete records, non-null
+    /// values and levels decoded
+    ///
+    /// If the max definition level is 0, `def_levels` will be ignored, otherwise it will be
+    /// populated with the number of levels read, with an error returned if it is `None`.
+    ///
+    /// If the max repetition level is 0, `rep_levels` will be ignored, otherwise it will be
+    /// populated with the number of levels read, with an error returned if it is `None`.
+    ///
+    /// `values` will be contiguously populated with the non-null values. Note that if the column
+    /// is not required, this may be less than either `max_records` or the number of levels read
+    pub fn read_records(
+        &mut self,
+        max_records: usize,
+        mut def_levels: Option<&mut D::Slice>,
+        mut rep_levels: Option<&mut R::Slice>,
+        values: &mut V::Slice,
+    ) -> Result<(usize, usize, usize)> {
+        let mut max_levels = values.capacity().min(max_records);
         if let Some(ref levels) = def_levels {
-            batch_size = min(batch_size, levels.capacity());
+            max_levels = max_levels.min(levels.capacity());
         }
         if let Some(ref levels) = rep_levels {
-            batch_size = min(batch_size, levels.capacity());
+            max_levels = max_levels.min(levels.capacity())
         }
 
-        // Read exhaustively all pages until we read all batch_size values/levels
-        // or there are no more values/levels to read.
-        while levels_read < batch_size {
-            if !self.has_next()? {
-                break;
-            }
+        let mut total_records_read = 0;
+        let mut total_levels_read = 0;
+        let mut total_values_read = 0;
 
-            // Batch size for the current iteration
-            let iter_batch_size = (batch_size - levels_read)
-                .min((self.num_buffered_values - self.num_decoded_values) as usize);
+        while total_records_read < max_records
+            && total_levels_read < max_levels
+            && self.has_next()?
+        {
+            let remaining_records = max_records - total_records_read;
+            let remaining_levels = self.num_buffered_values - self.num_decoded_values;
+            let levels_to_read = remaining_levels.min(max_levels - total_levels_read);
 
-            // If the field is required and non-repeated, there are no definition levels
-            let null_count = match self.descr.max_def_level() > 0 {
-                true => {
-                    let levels = def_levels
+            let (records_read, levels_read) = match self.rep_level_decoder.as_mut() {
+                Some(reader) => {
+                    let out = rep_levels
                         .as_mut()
-                        .ok_or_else(|| general_err!("must specify definition levels"))?;
+                        .ok_or_else(|| general_err!("must specify repetition levels"))?;
+
+                    let (mut records_read, levels_read) = reader.read_rep_levels(
+                        out,
+                        total_levels_read..total_levels_read + levels_to_read,
+                        remaining_records,
+                    )?;
+
+                    if levels_read == remaining_levels && self.has_record_delimiter {
+                        // Reached end of page, which implies records_read < remaining_records
+                        // as otherwise would have stopped reading before reaching the end
+                        assert!(records_read < remaining_records); // Sanity check
+                        records_read += 1;
+                    }
+                    (records_read, levels_read)
+                }
+                None => {
+                    let min = remaining_records.min(levels_to_read);
+                    (min, min)
+                }
+            };
 
-                    let num_def_levels = self
-                        .def_level_decoder
+            let values_to_read = match self.def_level_decoder.as_mut() {
+                Some(reader) => {
+                    let out = def_levels
                         .as_mut()
-                        .expect("def_level_decoder be set")
-                        .read(levels, levels_read..levels_read + iter_batch_size)?;
+                        .ok_or_else(|| general_err!("must specify definition levels"))?;
+
+                    let read = reader.read_def_levels(
+                        out,
+                        total_levels_read..total_levels_read + levels_read,
+                    )?;
 
-                    if num_def_levels != iter_batch_size {
-                        return Err(general_err!("insufficient definition levels read from column - expected {}, got {}", iter_batch_size, num_def_levels));
+                    if read != levels_read {
+                        return Err(general_err!("insufficient definition levels read from column - expected {rep_levels}, got {read}"));

Review Comment:
   this happens with an invalid input, right?



##########
parquet/src/column/reader.rs:
##########
@@ -493,12 +535,18 @@ where
                                 return Err(general_err!("more nulls than values in page, contained {} values and {} nulls", num_values, num_nulls));
                             }
 
-                            self.num_buffered_values = num_values;
+                            self.num_buffered_values = num_values as _;
                             self.num_decoded_values = 0;
 
                             // DataPage v2 only supports RLE encoding for repetition
                             // levels
                             if self.descr.max_rep_level() > 0 {
+                                // Technically a DataPage v2 should not write a record

Review Comment:
   Here is my evidence the spec says not to write a record across page boundaries:
   https://github.com/apache/parquet-format/blob/c766945d90935ebcd4e03fee13aad2b6efcadce3/src/main/thrift/parquet.thrift#L567
   



##########
parquet/src/column/reader/decoder.rs:
##########
@@ -94,6 +105,22 @@ pub trait RepetitionLevelDecoder: ColumnLevelDecoder {
 }
 
 pub trait DefinitionLevelDecoder: ColumnLevelDecoder {
+    /// Read definition level data into `out[range]` returning the number of levels read
+    ///
+    /// `range` is provided by the caller to allow for types such as default-initialized `[T]`
+    /// that only track capacity and not length
+    ///
+    /// # Panics
+    ///
+    /// Implementations may panic if `range` overlaps with already written data
+    ///
+    // TODO: Should this return the number of nulls

Review Comment:
   is this still a TODO?



##########
parquet/src/arrow/record_reader/mod.rs:
##########
@@ -201,31 +149,10 @@ where
     ///
     /// Number of records skipped
     pub fn skip_records(&mut self, num_records: usize) -> Result<usize> {
-        // First need to clear the buffer
-        let end_of_column = match self.column_reader.as_mut() {
-            Some(reader) => !reader.peek_next()?,
-            None => return Ok(0),
-        };
-
-        let (buffered_records, buffered_values) =
-            self.count_records(num_records, end_of_column);
-
-        self.num_records += buffered_records;
-        self.num_values += buffered_values;
-
-        let remaining = num_records - buffered_records;
-
-        if remaining == 0 {
-            return Ok(buffered_records);
+        match self.column_reader.as_mut() {
+            Some(reader) => reader.skip_records(num_records),
+            None => Ok(0),

Review Comment:
   Is it intentional that this function no longer updates `self.num_records` and `self.num_values`
   
   ```
           self.num_records += buffered_records;
           self.num_values += buffered_values;
   ```



##########
parquet/src/column/reader/decoder.rs:
##########
@@ -270,182 +297,227 @@ impl<T: DataType> ColumnValueDecoder for ColumnValueDecoderImpl<T> {
 
 const SKIP_BUFFER_SIZE: usize = 1024;
 
-/// An implementation of [`ColumnLevelDecoder`] for `[i16]`
-pub struct ColumnLevelDecoderImpl {
-    decoder: Option<LevelDecoderInner>,
-    /// Temporary buffer populated when skipping values
-    buffer: Vec<i16>,
-    bit_width: u8,
+enum LevelDecoder {
+    Packed(BitReader, u8),
+    Rle(RleDecoder),
 }
 
-impl ColumnLevelDecoderImpl {
-    pub fn new(max_level: i16) -> Self {
-        let bit_width = num_required_bits(max_level as u64);
-        Self {
-            decoder: None,
-            buffer: vec![],
-            bit_width,
+impl LevelDecoder {
+    fn new(encoding: Encoding, data: ByteBufferPtr, bit_width: u8) -> Self {
+        match encoding {
+            Encoding::RLE => {
+                let mut decoder = RleDecoder::new(bit_width);
+                decoder.set_data(data);
+                Self::Rle(decoder)
+            }
+            Encoding::BIT_PACKED => Self::Packed(BitReader::new(data), bit_width),
+            _ => unreachable!("invalid level encoding: {}", encoding),
         }
     }
 
-    /// Drops the first `len` values from the internal buffer
-    fn split_off_buffer(&mut self, len: usize) {
-        match self.buffer.len() == len {
-            true => self.buffer.clear(),
-            false => {
-                // Move to_read elements to end of slice
-                self.buffer.rotate_left(len);
-                // Truncate buffer
-                self.buffer.truncate(self.buffer.len() - len);
+    fn read(&mut self, out: &mut [i16]) -> Result<usize> {
+        match self {
+            Self::Packed(reader, bit_width) => {
+                Ok(reader.get_batch::<i16>(out, *bit_width as usize))
             }
+            Self::Rle(reader) => Ok(reader.get_batch(out)?),
         }
     }
+}
 
-    /// Reads up to `to_read` values to the internal buffer
-    fn read_to_buffer(&mut self, to_read: usize) -> Result<()> {
-        let mut buf = std::mem::take(&mut self.buffer);
-
-        // Repopulate buffer
-        buf.resize(to_read, 0);
-        let actual = self.read(&mut buf, 0..to_read)?;
-        buf.truncate(actual);
-
-        self.buffer = buf;
-        Ok(())
-    }
+/// An implementation of [`DefinitionLevelDecoder`] for `[i16]`
+pub struct DefinitionLevelDecoderImpl {
+    decoder: Option<LevelDecoder>,
+    bit_width: u8,
 }
 
-enum LevelDecoderInner {
-    Packed(BitReader, u8),
-    Rle(RleDecoder),
+impl DefinitionLevelDecoderImpl {
+    pub fn new(max_level: i16) -> Self {
+        let bit_width = num_required_bits(max_level as u64);
+        Self {
+            decoder: None,
+            bit_width,
+        }
+    }
 }
 
-impl ColumnLevelDecoder for ColumnLevelDecoderImpl {
+impl ColumnLevelDecoder for DefinitionLevelDecoderImpl {
     type Slice = [i16];
 
     fn set_data(&mut self, encoding: Encoding, data: ByteBufferPtr) {
-        self.buffer.clear();
-        match encoding {
-            Encoding::RLE => {
-                let mut decoder = RleDecoder::new(self.bit_width);
-                decoder.set_data(data);
-                self.decoder = Some(LevelDecoderInner::Rle(decoder));
-            }
-            Encoding::BIT_PACKED => {
-                self.decoder = Some(LevelDecoderInner::Packed(
-                    BitReader::new(data),
-                    self.bit_width,
-                ));
-            }
-            _ => unreachable!("invalid level encoding: {}", encoding),
-        }
+        self.decoder = Some(LevelDecoder::new(encoding, data, self.bit_width))
     }
+}
 
-    fn read(&mut self, out: &mut Self::Slice, mut range: Range<usize>) -> Result<usize> {
-        let read_from_buffer = match self.buffer.is_empty() {
-            true => 0,
-            false => {
-                let read_from_buffer = self.buffer.len().min(range.end - range.start);
-                out[range.start..range.start + read_from_buffer]
-                    .copy_from_slice(&self.buffer[0..read_from_buffer]);
-                self.split_off_buffer(read_from_buffer);
-                read_from_buffer
-            }
-        };
-        range.start += read_from_buffer;
-
-        match self.decoder.as_mut().unwrap() {
-            LevelDecoderInner::Packed(reader, bit_width) => Ok(read_from_buffer
-                + reader.get_batch::<i16>(&mut out[range], *bit_width as usize)),
-            LevelDecoderInner::Rle(reader) => {
-                Ok(read_from_buffer + reader.get_batch(&mut out[range])?)
-            }
-        }
+impl DefinitionLevelDecoder for DefinitionLevelDecoderImpl {
+    fn read_def_levels(
+        &mut self,
+        out: &mut Self::Slice,
+        range: Range<usize>,
+    ) -> Result<usize> {
+        self.decoder.as_mut().unwrap().read(&mut out[range])
     }
-}
 
-impl DefinitionLevelDecoder for ColumnLevelDecoderImpl {
     fn skip_def_levels(
         &mut self,
         num_levels: usize,
         max_def_level: i16,
     ) -> Result<(usize, usize)> {
         let mut level_skip = 0;
         let mut value_skip = 0;
+        let mut buf: Vec<i16> = vec![];
         while level_skip < num_levels {
             let remaining_levels = num_levels - level_skip;
 
-            if self.buffer.is_empty() {
-                // Only read number of needed values
-                self.read_to_buffer(remaining_levels.min(SKIP_BUFFER_SIZE))?;
-                if self.buffer.is_empty() {
-                    // Reached end of page
-                    break;
-                }
+            let to_read = remaining_levels.min(SKIP_BUFFER_SIZE);
+            buf.resize(to_read, 0);
+            let read = self.read_def_levels(&mut buf, 0..to_read)?;
+            if read == 0 {
+                // Reached end of page
+                break;
             }
-            let to_read = self.buffer.len().min(remaining_levels);
 
-            level_skip += to_read;
-            value_skip += self.buffer[..to_read]
-                .iter()
-                .filter(|x| **x == max_def_level)
-                .count();
-
-            self.split_off_buffer(to_read)
+            level_skip += read;
+            value_skip += buf[..read].iter().filter(|x| **x == max_def_level).count();
         }
 
         Ok((value_skip, level_skip))
     }
 }
 
-impl RepetitionLevelDecoder for ColumnLevelDecoderImpl {
-    fn skip_rep_levels(
+pub(crate) const REPETITION_LEVELS_BATCH_SIZE: usize = 1024;
+
+/// An implementation of [`RepetitionLevelDecoder`] for `[i16]`
+pub struct RepetitionLevelDecoderImpl {
+    decoder: Option<LevelDecoder>,
+    bit_width: u8,
+    buffer: Box<[i16; REPETITION_LEVELS_BATCH_SIZE]>,
+    buffer_len: usize,
+    buffer_offset: usize,
+    has_partial: bool,
+}
+
+impl RepetitionLevelDecoderImpl {
+    pub fn new(max_level: i16) -> Self {
+        let bit_width = num_required_bits(max_level as u64);
+        Self {
+            decoder: None,
+            bit_width,
+            buffer: Box::new([0; REPETITION_LEVELS_BATCH_SIZE]),
+            buffer_offset: 0,
+            buffer_len: 0,
+            has_partial: false,
+        }
+    }
+
+    fn fill_buf(&mut self) -> Result<()> {
+        let read = self.decoder.as_mut().unwrap().read(self.buffer.as_mut())?;
+        self.buffer_offset = 0;
+        self.buffer_len = read;
+        Ok(())
+    }
+
+    /// Inspects the buffered repetition levels in the range `self.buffer_offset..self.buffer_len`
+    /// and returns the number of "complete" records along with the corresponding number of values
+    ///
+    /// A "complete" record is one where the buffer contains a subsequent repetition level of 0
+    fn count_records(
         &mut self,
-        num_records: usize,
+        records_to_read: usize,
         num_levels: usize,
-    ) -> Result<(usize, usize)> {
-        let mut level_skip = 0;
-        let mut record_skip = 0;
+    ) -> (bool, usize, usize) {
+        let mut records_read = 0;
 
-        while level_skip < num_levels {
-            let remaining_levels = num_levels - level_skip;
+        let levels = num_levels.min(self.buffer_len - self.buffer_offset);
+        let buf = self.buffer.iter().skip(self.buffer_offset);
+        for (idx, item) in buf.take(levels).enumerate() {
+            if *item == 0 && (idx != 0 || self.has_partial) {
+                records_read += 1;
 
-            if self.buffer.is_empty() {
-                // Only read number of needed values
-                self.read_to_buffer(remaining_levels.min(SKIP_BUFFER_SIZE))?;
-                if self.buffer.is_empty() {
-                    // Reached end of page
-                    break;
+                if records_read == records_to_read {
+                    return (false, records_read, idx);
                 }
             }
+        }
+        // Either ran out of space in `num_levels` or data in `self.buffer`
+        (true, records_read, levels)
+    }
+}
+
+impl ColumnLevelDecoder for RepetitionLevelDecoderImpl {
+    type Slice = [i16];
 
-            let max_skip = self.buffer.len().min(remaining_levels);
+    fn set_data(&mut self, encoding: Encoding, data: ByteBufferPtr) {
+        self.decoder = Some(LevelDecoder::new(encoding, data, self.bit_width));
+        self.buffer_len = 0;
+        self.buffer_offset = 0;
+    }
+}
 
-            let mut to_skip = 0;
-            while to_skip < max_skip && record_skip != num_records {
-                if self.buffer[to_skip] == 0 {
-                    record_skip += 1;
+impl RepetitionLevelDecoder for RepetitionLevelDecoderImpl {
+    fn read_rep_levels(
+        &mut self,
+        out: &mut Self::Slice,
+        range: Range<usize>,
+        max_records: usize,
+    ) -> Result<(usize, usize)> {
+        let output = &mut out[range];
+        let max_levels = output.len();
+        let mut total_records_read = 0;
+        let mut total_levels_read = 0;
+
+        while total_records_read < max_records && total_levels_read < max_levels {
+            if self.buffer_len == self.buffer_offset {
+                self.fill_buf()?;
+                if self.buffer_len == 0 {
+                    break;
                 }
-                to_skip += 1;
             }
 
-            // Find end of record
-            while to_skip < max_skip && self.buffer[to_skip] != 0 {
-                to_skip += 1;
-            }
+            let (partial, records_read, levels_read) = self.count_records(
+                max_records - total_records_read,
+                max_levels - total_levels_read,
+            );
 
-            level_skip += to_skip;
-            if to_skip == self.buffer.len() {
-                // Need to to read more values
-                self.buffer.clear();
-                continue;
-            }
+            output[total_levels_read..total_levels_read + levels_read].copy_from_slice(
+                &self.buffer[self.buffer_offset..self.buffer_offset + levels_read],
+            );
 
-            self.split_off_buffer(to_skip);
-            break;
+            total_levels_read += levels_read;
+            total_records_read += records_read;
+            self.buffer_offset += levels_read;
+            self.has_partial = partial;
         }
+        Ok((total_records_read, total_levels_read))
+    }
+
+    fn skip_rep_levels(

Review Comment:
   it might help to note here in comments that it tries to read up to `num_records` and `num_levels` (whichever is greater) and  returns the (total_records_read, total_levels_read) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org