You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/06/08 18:07:56 UTC

[arrow-rs] branch master updated: Move record delimiting into ColumnReader (#4365) (#4376)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new ab5669398 Move record delimiting into ColumnReader (#4365) (#4376)
ab5669398 is described below

commit ab56693985826bb8caea30558b8c25db286a5e37
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Thu Jun 8 19:07:50 2023 +0100

    Move record delimiting into ColumnReader (#4365) (#4376)
    
    * Move record delimiting into ColumnReader (#4365)
    
    * Misc tweaks
    
    * More tests
    
    * Clippy
    
    * Review feedback
---
 .../src/arrow/array_reader/fixed_len_byte_array.rs |   4 +-
 parquet/src/arrow/arrow_reader/mod.rs              | 182 ++++++++-
 parquet/src/arrow/buffer/dictionary_buffer.rs      |  30 +-
 parquet/src/arrow/buffer/offset_buffer.rs          |  29 +-
 parquet/src/arrow/record_reader/buffer.rs          |  38 +-
 .../src/arrow/record_reader/definition_levels.rs   |  88 ++---
 parquet/src/arrow/record_reader/mod.rs             | 209 ++--------
 parquet/src/column/mod.rs                          |  11 +-
 parquet/src/column/reader.rs                       | 240 ++++++-----
 parquet/src/column/reader/decoder.rs               | 439 +++++++++++----------
 parquet/src/column/writer/mod.rs                   |   6 +-
 parquet/src/file/writer.rs                         |   4 +-
 parquet/src/record/triplet.rs                      |  11 +-
 13 files changed, 649 insertions(+), 642 deletions(-)

diff --git a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs
index 47bd03a73..b06091b6b 100644
--- a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs
+++ b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs
@@ -247,8 +247,8 @@ impl BufferQueue for FixedLenByteArrayBuffer {
     type Output = Buffer;
     type Slice = Self;
 
-    fn split_off(&mut self, len: usize) -> Self::Output {
-        self.buffer.split_off(len * self.byte_length)
+    fn consume(&mut self) -> Self::Output {
+        self.buffer.consume()
     }
 
     fn spare_capacity_mut(&mut self, _batch_size: usize) -> &mut Self::Slice {
diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs
index 432b00399..988738dac 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -548,12 +548,14 @@ mod tests {
     use tempfile::tempfile;
 
     use arrow_array::builder::*;
+    use arrow_array::cast::AsArray;
     use arrow_array::types::{Decimal128Type, Decimal256Type, DecimalType};
     use arrow_array::*;
     use arrow_array::{RecordBatch, RecordBatchReader};
     use arrow_buffer::{i256, ArrowNativeType, Buffer};
     use arrow_data::ArrayDataBuilder;
     use arrow_schema::{DataType as ArrowDataType, Field, Fields, Schema};
+    use arrow_select::concat::concat_batches;
 
     use crate::arrow::arrow_reader::{
         ArrowPredicateFn, ArrowReaderOptions, ParquetRecordBatchReader,
@@ -562,6 +564,7 @@ mod tests {
     use crate::arrow::schema::add_encoded_arrow_schema_to_metadata;
     use crate::arrow::{ArrowWriter, ProjectionMask};
     use crate::basic::{ConvertedType, Encoding, Repetition, Type as PhysicalType};
+    use crate::column::reader::decoder::REPETITION_LEVELS_BATCH_SIZE;
     use crate::data_type::{
         BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray,
         FixedLenByteArrayType, Int32Type, Int64Type, Int96Type,
@@ -2131,15 +2134,15 @@ mod tests {
 
     #[test]
     fn test_row_group_exact_multiple() {
-        use crate::arrow::record_reader::MIN_BATCH_SIZE;
+        const BATCH_SIZE: usize = REPETITION_LEVELS_BATCH_SIZE;
         test_row_group_batch(8, 8);
         test_row_group_batch(10, 8);
         test_row_group_batch(8, 10);
-        test_row_group_batch(MIN_BATCH_SIZE, MIN_BATCH_SIZE);
-        test_row_group_batch(MIN_BATCH_SIZE + 1, MIN_BATCH_SIZE);
-        test_row_group_batch(MIN_BATCH_SIZE, MIN_BATCH_SIZE + 1);
-        test_row_group_batch(MIN_BATCH_SIZE, MIN_BATCH_SIZE - 1);
-        test_row_group_batch(MIN_BATCH_SIZE - 1, MIN_BATCH_SIZE);
+        test_row_group_batch(BATCH_SIZE, BATCH_SIZE);
+        test_row_group_batch(BATCH_SIZE + 1, BATCH_SIZE);
+        test_row_group_batch(BATCH_SIZE, BATCH_SIZE + 1);
+        test_row_group_batch(BATCH_SIZE, BATCH_SIZE - 1);
+        test_row_group_batch(BATCH_SIZE - 1, BATCH_SIZE);
     }
 
     /// Given a RecordBatch containing all the column data, return the expected batches given
@@ -2610,4 +2613,171 @@ mod tests {
         test_decimal_roundtrip::<Decimal128Type>();
         test_decimal_roundtrip::<Decimal256Type>();
     }
+
+    #[test]
+    fn test_list_selection() {
+        let schema = Arc::new(Schema::new(vec![Field::new_list(
+            "list",
+            Field::new("item", ArrowDataType::Utf8, true),
+            false,
+        )]));
+        let mut buf = Vec::with_capacity(1024);
+
+        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
+
+        for i in 0..2 {
+            let mut list_a_builder = ListBuilder::new(StringBuilder::new());
+            for j in 0..1024 {
+                list_a_builder.values().append_value(format!("{i} {j}"));
+                list_a_builder.append(true);
+            }
+            let batch = RecordBatch::try_new(
+                schema.clone(),
+                vec![Arc::new(list_a_builder.finish())],
+            )
+            .unwrap();
+            writer.write(&batch).unwrap();
+        }
+        let _metadata = writer.close().unwrap();
+
+        let buf = Bytes::from(buf);
+        let reader = ParquetRecordBatchReaderBuilder::try_new(buf)
+            .unwrap()
+            .with_row_selection(RowSelection::from(vec![
+                RowSelector::skip(100),
+                RowSelector::select(924),
+                RowSelector::skip(100),
+                RowSelector::select(924),
+            ]))
+            .build()
+            .unwrap();
+
+        let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
+        let batch = concat_batches(&schema, &batches).unwrap();
+
+        assert_eq!(batch.num_rows(), 924 * 2);
+        let list = batch.column(0).as_list::<i32>();
+
+        for w in list.value_offsets().windows(2) {
+            assert_eq!(w[0] + 1, w[1])
+        }
+        let mut values = list.values().as_string::<i32>().iter();
+
+        for i in 0..2 {
+            for j in 100..1024 {
+                let expected = format!("{i} {j}");
+                assert_eq!(values.next().unwrap().unwrap(), &expected);
+            }
+        }
+    }
+
+    #[test]
+    fn test_list_selection_fuzz() {
+        let mut rng = thread_rng();
+        let schema = Arc::new(Schema::new(vec![Field::new_list(
+            "list",
+            Field::new_list("item", Field::new("item", ArrowDataType::Int32, true), true),
+            true,
+        )]));
+        let mut buf = Vec::with_capacity(1024);
+        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
+
+        let mut list_a_builder = ListBuilder::new(ListBuilder::new(Int32Builder::new()));
+
+        for _ in 0..2048 {
+            if rng.gen_bool(0.2) {
+                list_a_builder.append(false);
+                continue;
+            }
+
+            let list_a_len = rng.gen_range(0..10);
+            let list_b_builder = list_a_builder.values();
+
+            for _ in 0..list_a_len {
+                if rng.gen_bool(0.2) {
+                    list_b_builder.append(false);
+                    continue;
+                }
+
+                let list_b_len = rng.gen_range(0..10);
+                let int_builder = list_b_builder.values();
+                for _ in 0..list_b_len {
+                    match rng.gen_bool(0.2) {
+                        true => int_builder.append_null(),
+                        false => int_builder.append_value(rng.gen()),
+                    }
+                }
+                list_b_builder.append(true)
+            }
+            list_a_builder.append(true);
+        }
+
+        let array = Arc::new(list_a_builder.finish());
+        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
+
+        writer.write(&batch).unwrap();
+        let _metadata = writer.close().unwrap();
+
+        let buf = Bytes::from(buf);
+
+        let cases = [
+            vec![
+                RowSelector::skip(100),
+                RowSelector::select(924),
+                RowSelector::skip(100),
+                RowSelector::select(924),
+            ],
+            vec![
+                RowSelector::select(924),
+                RowSelector::skip(100),
+                RowSelector::select(924),
+                RowSelector::skip(100),
+            ],
+            vec![
+                RowSelector::skip(1023),
+                RowSelector::select(1),
+                RowSelector::skip(1023),
+                RowSelector::select(1),
+            ],
+            vec![
+                RowSelector::select(1),
+                RowSelector::skip(1023),
+                RowSelector::select(1),
+                RowSelector::skip(1023),
+            ],
+        ];
+
+        for batch_size in [100, 1024, 2048] {
+            for selection in &cases {
+                let selection = RowSelection::from(selection.clone());
+                let reader = ParquetRecordBatchReaderBuilder::try_new(buf.clone())
+                    .unwrap()
+                    .with_row_selection(selection.clone())
+                    .with_batch_size(batch_size)
+                    .build()
+                    .unwrap();
+
+                let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
+                let actual = concat_batches(&batch.schema(), &batches).unwrap();
+                assert_eq!(actual.num_rows(), selection.row_count());
+
+                let mut batch_offset = 0;
+                let mut actual_offset = 0;
+                for selector in selection.iter() {
+                    if selector.skip {
+                        batch_offset += selector.row_count;
+                        continue;
+                    }
+
+                    assert_eq!(
+                        batch.slice(batch_offset, selector.row_count),
+                        actual.slice(actual_offset, selector.row_count)
+                    );
+
+                    batch_offset += selector.row_count;
+                    actual_offset += selector.row_count;
+                }
+            }
+        }
+    }
 }
diff --git a/parquet/src/arrow/buffer/dictionary_buffer.rs b/parquet/src/arrow/buffer/dictionary_buffer.rs
index 529c28872..6344d9dd3 100644
--- a/parquet/src/arrow/buffer/dictionary_buffer.rs
+++ b/parquet/src/arrow/buffer/dictionary_buffer.rs
@@ -227,14 +227,14 @@ impl<K: ScalarValue, V: ScalarValue + OffsetSizeTrait> BufferQueue
     type Output = Self;
     type Slice = Self;
 
-    fn split_off(&mut self, len: usize) -> Self::Output {
+    fn consume(&mut self) -> Self::Output {
         match self {
             Self::Dict { keys, values } => Self::Dict {
-                keys: keys.take(len),
+                keys: std::mem::take(keys),
                 values: values.clone(),
             },
             Self::Values { values } => Self::Values {
-                values: values.split_off(len),
+                values: values.consume(),
             },
         }
     }
@@ -275,20 +275,6 @@ mod tests {
         let valid_buffer = Buffer::from_iter(valid.iter().cloned());
         buffer.pad_nulls(0, values.len(), valid.len(), valid_buffer.as_slice());
 
-        // Split off some data
-
-        let split = buffer.split_off(4);
-        let null_buffer = Buffer::from_iter(valid.drain(0..4));
-        let array = split.into_array(Some(null_buffer), &dict_type).unwrap();
-        assert_eq!(array.data_type(), &dict_type);
-
-        let strings = cast(&array, &ArrowType::Utf8).unwrap();
-        let strings = strings.as_any().downcast_ref::<StringArray>().unwrap();
-        assert_eq!(
-            strings.iter().collect::<Vec<_>>(),
-            vec![None, None, Some("world"), Some("hello")]
-        );
-
         // Read some data not preserving the dictionary
 
         let values = buffer.spill_values().unwrap();
@@ -300,8 +286,8 @@ mod tests {
         let null_buffer = Buffer::from_iter(valid.iter().cloned());
         buffer.pad_nulls(read_offset, 2, 5, null_buffer.as_slice());
 
-        assert_eq!(buffer.len(), 9);
-        let split = buffer.split_off(9);
+        assert_eq!(buffer.len(), 13);
+        let split = buffer.consume();
 
         let array = split.into_array(Some(null_buffer), &dict_type).unwrap();
         assert_eq!(array.data_type(), &dict_type);
@@ -311,6 +297,10 @@ mod tests {
         assert_eq!(
             strings.iter().collect::<Vec<_>>(),
             vec![
+                None,
+                None,
+                Some("world"),
+                Some("hello"),
                 None,
                 Some("a"),
                 Some(""),
@@ -332,7 +322,7 @@ mod tests {
             .unwrap()
             .extend_from_slice(&[0, 1, 0, 1]);
 
-        let array = buffer.split_off(4).into_array(None, &dict_type).unwrap();
+        let array = buffer.consume().into_array(None, &dict_type).unwrap();
         assert_eq!(array.data_type(), &dict_type);
 
         let strings = cast(&array, &ArrowType::Utf8).unwrap();
diff --git a/parquet/src/arrow/buffer/offset_buffer.rs b/parquet/src/arrow/buffer/offset_buffer.rs
index df96996e3..c8732bc4e 100644
--- a/parquet/src/arrow/buffer/offset_buffer.rs
+++ b/parquet/src/arrow/buffer/offset_buffer.rs
@@ -151,25 +151,8 @@ impl<I: OffsetSizeTrait + ScalarValue> BufferQueue for OffsetBuffer<I> {
     type Output = Self;
     type Slice = Self;
 
-    fn split_off(&mut self, len: usize) -> Self::Output {
-        assert!(self.offsets.len() > len, "{} > {}", self.offsets.len(), len);
-        let remaining_offsets = self.offsets.len() - len - 1;
-        let offsets = self.offsets.as_slice();
-
-        let end_offset = offsets[len];
-
-        let mut new_offsets = ScalarBuffer::new();
-        new_offsets.reserve(remaining_offsets + 1);
-        for v in &offsets[len..] {
-            new_offsets.push(*v - end_offset)
-        }
-
-        self.offsets.resize(len + 1);
-
-        Self {
-            offsets: std::mem::replace(&mut self.offsets, new_offsets),
-            values: self.values.take(end_offset.as_usize()),
-        }
+    fn consume(&mut self) -> Self::Output {
+        std::mem::take(self)
     }
 
     fn spare_capacity_mut(&mut self, _batch_size: usize) -> &mut Self::Slice {
@@ -267,18 +250,18 @@ mod tests {
     }
 
     #[test]
-    fn test_offset_buffer_split() {
+    fn test_offset_buffer() {
         let mut buffer = OffsetBuffer::<i32>::default();
         for v in ["hello", "world", "cupcakes", "a", "b", "c"] {
             buffer.try_push(v.as_bytes(), false).unwrap()
         }
-        let split = buffer.split_off(3);
+        let split = buffer.consume();
 
         let array = split.into_array(None, ArrowType::Utf8);
         let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
         assert_eq!(
             strings.iter().map(|x| x.unwrap()).collect::<Vec<_>>(),
-            vec!["hello", "world", "cupcakes"]
+            vec!["hello", "world", "cupcakes", "a", "b", "c"]
         );
 
         buffer.try_push("test".as_bytes(), false).unwrap();
@@ -286,7 +269,7 @@ mod tests {
         let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
         assert_eq!(
             strings.iter().map(|x| x.unwrap()).collect::<Vec<_>>(),
-            vec!["a", "b", "c", "test"]
+            vec!["test"]
         );
     }
 
diff --git a/parquet/src/arrow/record_reader/buffer.rs b/parquet/src/arrow/record_reader/buffer.rs
index 404989493..4a0fc2a2f 100644
--- a/parquet/src/arrow/record_reader/buffer.rs
+++ b/parquet/src/arrow/record_reader/buffer.rs
@@ -30,13 +30,8 @@ pub trait BufferQueue: Sized {
 
     type Slice: ?Sized;
 
-    /// Split out the first `len` items
-    ///
-    /// # Panics
-    ///
-    /// Implementations must panic if `len` is beyond the length of [`BufferQueue`]
-    ///
-    fn split_off(&mut self, len: usize) -> Self::Output;
+    /// Consumes the contents of this [`BufferQueue`]
+    fn consume(&mut self) -> Self::Output;
 
     /// Returns a [`Self::Slice`] with at least `batch_size` capacity that can be used
     /// to append data to the end of this [`BufferQueue`]
@@ -146,31 +141,6 @@ impl<T: ScalarValue> ScalarBuffer<T> {
         assert!(prefix.is_empty() && suffix.is_empty());
         buf
     }
-
-    pub fn take(&mut self, len: usize) -> Self {
-        assert!(len <= self.len);
-
-        let num_bytes = len * std::mem::size_of::<T>();
-        let remaining_bytes = self.buffer.len() - num_bytes;
-        // TODO: Optimize to reduce the copy
-        // create an empty buffer, as it will be resized below
-        let mut remaining = MutableBuffer::new(0);
-        remaining.resize(remaining_bytes, 0);
-
-        let new_records = remaining.as_slice_mut();
-
-        new_records[0..remaining_bytes]
-            .copy_from_slice(&self.buffer.as_slice()[num_bytes..]);
-
-        self.buffer.resize(num_bytes, 0);
-        self.len -= len;
-
-        Self {
-            buffer: std::mem::replace(&mut self.buffer, remaining),
-            len,
-            _phantom: Default::default(),
-        }
-    }
 }
 
 impl<T: ScalarValue + ArrowNativeType> ScalarBuffer<T> {
@@ -196,8 +166,8 @@ impl<T: ScalarValue> BufferQueue for ScalarBuffer<T> {
 
     type Slice = [T];
 
-    fn split_off(&mut self, len: usize) -> Self::Output {
-        self.take(len).into()
+    fn consume(&mut self) -> Self::Output {
+        std::mem::take(self).into()
     }
 
     fn spare_capacity_mut(&mut self, batch_size: usize) -> &mut Self::Slice {
diff --git a/parquet/src/arrow/record_reader/definition_levels.rs b/parquet/src/arrow/record_reader/definition_levels.rs
index 272716caf..5be0ac84d 100644
--- a/parquet/src/arrow/record_reader/definition_levels.rs
+++ b/parquet/src/arrow/record_reader/definition_levels.rs
@@ -22,16 +22,16 @@ use arrow_buffer::bit_chunk_iterator::UnalignedBitChunk;
 use arrow_buffer::Buffer;
 
 use crate::arrow::buffer::bit_util::count_set_bits;
-use crate::arrow::record_reader::buffer::BufferQueue;
 use crate::basic::Encoding;
 use crate::column::reader::decoder::{
-    ColumnLevelDecoder, ColumnLevelDecoderImpl, DefinitionLevelDecoder, LevelsBufferSlice,
+    ColumnLevelDecoder, DefinitionLevelDecoder, DefinitionLevelDecoderImpl,
+    LevelsBufferSlice,
 };
 use crate::errors::{ParquetError, Result};
 use crate::schema::types::ColumnDescPtr;
 use crate::util::memory::ByteBufferPtr;
 
-use super::{buffer::ScalarBuffer, MIN_BATCH_SIZE};
+use super::buffer::ScalarBuffer;
 
 enum BufferInner {
     /// Compute levels and null mask
@@ -87,13 +87,10 @@ impl DefinitionLevelBuffer {
         Self { inner, len: 0 }
     }
 
-    pub fn split_levels(&mut self, len: usize) -> Option<Buffer> {
+    /// Returns the built level data
+    pub fn consume_levels(&mut self) -> Option<Buffer> {
         match &mut self.inner {
-            BufferInner::Full { levels, .. } => {
-                let out = levels.split_off(len);
-                self.len = levels.len();
-                Some(out)
-            }
+            BufferInner::Full { levels, .. } => Some(std::mem::take(levels).into()),
             BufferInner::Mask { .. } => None,
         }
     }
@@ -103,27 +100,13 @@ impl DefinitionLevelBuffer {
         self.len = len;
     }
 
-    /// Split `len` levels out of `self`
-    pub fn split_bitmask(&mut self, len: usize) -> Buffer {
-        let old_builder = match &mut self.inner {
-            BufferInner::Full { nulls, .. } => nulls,
-            BufferInner::Mask { nulls } => nulls,
-        };
-
-        // Compute the number of values left behind
-        let num_left_values = old_builder.len() - len;
-        let mut new_builder =
-            BooleanBufferBuilder::new(MIN_BATCH_SIZE.max(num_left_values));
-
-        // Copy across remaining values
-        new_builder.append_packed_range(len..old_builder.len(), old_builder.as_slice());
-
-        // Truncate buffer
-        old_builder.resize(len);
-
-        // Swap into self
-        self.len = new_builder.len();
-        std::mem::replace(old_builder, new_builder).into()
+    /// Returns the built null bitmask
+    pub fn consume_bitmask(&mut self) -> Buffer {
+        self.len = 0;
+        match &mut self.inner {
+            BufferInner::Full { nulls, .. } => nulls.finish().into_inner(),
+            BufferInner::Mask { nulls } => nulls.finish().into_inner(),
+        }
     }
 
     pub fn nulls(&self) -> &BooleanBufferBuilder {
@@ -148,7 +131,7 @@ impl LevelsBufferSlice for DefinitionLevelBuffer {
 
 enum MaybePacked {
     Packed(PackedDecoder),
-    Fallback(ColumnLevelDecoderImpl),
+    Fallback(DefinitionLevelDecoderImpl),
 }
 
 pub struct DefinitionLevelBufferDecoder {
@@ -160,7 +143,7 @@ impl DefinitionLevelBufferDecoder {
     pub fn new(max_level: i16, packed: bool) -> Self {
         let decoder = match packed {
             true => MaybePacked::Packed(PackedDecoder::new()),
-            false => MaybePacked::Fallback(ColumnLevelDecoderImpl::new(max_level)),
+            false => MaybePacked::Fallback(DefinitionLevelDecoderImpl::new(max_level)),
         };
 
         Self { max_level, decoder }
@@ -176,8 +159,14 @@ impl ColumnLevelDecoder for DefinitionLevelBufferDecoder {
             MaybePacked::Fallback(d) => d.set_data(encoding, data),
         }
     }
+}
 
-    fn read(&mut self, writer: &mut Self::Slice, range: Range<usize>) -> Result<usize> {
+impl DefinitionLevelDecoder for DefinitionLevelBufferDecoder {
+    fn read_def_levels(
+        &mut self,
+        writer: &mut Self::Slice,
+        range: Range<usize>,
+    ) -> Result<usize> {
         match (&mut writer.inner, &mut self.decoder) {
             (
                 BufferInner::Full {
@@ -193,7 +182,7 @@ impl ColumnLevelDecoder for DefinitionLevelBufferDecoder {
                 levels.resize(range.end + writer.len);
 
                 let slice = &mut levels.as_slice_mut()[writer.len..];
-                let levels_read = decoder.read(slice, range.clone())?;
+                let levels_read = decoder.read_def_levels(slice, range.clone())?;
 
                 nulls.reserve(levels_read);
                 for i in &slice[range.start..range.start + levels_read] {
@@ -211,9 +200,7 @@ impl ColumnLevelDecoder for DefinitionLevelBufferDecoder {
             _ => unreachable!("inconsistent null mask"),
         }
     }
-}
 
-impl DefinitionLevelDecoder for DefinitionLevelBufferDecoder {
     fn skip_def_levels(
         &mut self,
         num_levels: usize,
@@ -391,11 +378,8 @@ impl PackedDecoder {
 #[cfg(test)]
 mod tests {
     use super::*;
-    use std::sync::Arc;
 
-    use crate::basic::Type as PhysicalType;
     use crate::encodings::rle::RleEncoder;
-    use crate::schema::types::{ColumnDescriptor, ColumnPath, Type};
     use rand::{thread_rng, Rng};
 
     #[test]
@@ -492,30 +476,4 @@ mod tests {
         assert_eq!(read_level + skip_level, len);
         assert_eq!(read_value + skip_value, total_value);
     }
-
-    #[test]
-    fn test_split_off() {
-        let t = Type::primitive_type_builder("col", PhysicalType::INT32)
-            .build()
-            .unwrap();
-
-        let descriptor = Arc::new(ColumnDescriptor::new(
-            Arc::new(t),
-            1,
-            0,
-            ColumnPath::new(vec![]),
-        ));
-
-        let mut buffer = DefinitionLevelBuffer::new(&descriptor, true);
-        match &mut buffer.inner {
-            BufferInner::Mask { nulls } => nulls.append_n(100, false),
-            _ => unreachable!(),
-        };
-
-        let bitmap = buffer.split_bitmask(19);
-
-        // Should have split off 19 records leaving, 81 behind
-        assert_eq!(bitmap.len(), 3); // Note: bitmask only tracks bytes not bits
-        assert_eq!(buffer.nulls().len(), 81);
-    }
 }
diff --git a/parquet/src/arrow/record_reader/mod.rs b/parquet/src/arrow/record_reader/mod.rs
index e47bdee1c..35933e6e1 100644
--- a/parquet/src/arrow/record_reader/mod.rs
+++ b/parquet/src/arrow/record_reader/mod.rs
@@ -15,18 +15,17 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::cmp::{max, min};
-
 use arrow_buffer::Buffer;
 
 use crate::arrow::record_reader::{
     buffer::{BufferQueue, ScalarBuffer, ValuesBuffer},
     definition_levels::{DefinitionLevelBuffer, DefinitionLevelBufferDecoder},
 };
+use crate::column::reader::decoder::RepetitionLevelDecoderImpl;
 use crate::column::{
     page::PageReader,
     reader::{
-        decoder::{ColumnLevelDecoderImpl, ColumnValueDecoder, ColumnValueDecoderImpl},
+        decoder::{ColumnValueDecoder, ColumnValueDecoderImpl},
         GenericColumnReader,
     },
 };
@@ -37,15 +36,12 @@ use crate::schema::types::ColumnDescPtr;
 pub(crate) mod buffer;
 mod definition_levels;
 
-/// The minimum number of levels read when reading a repeated field
-pub(crate) const MIN_BATCH_SIZE: usize = 1024;
-
 /// A `RecordReader` is a stateful column reader that delimits semantic records.
 pub type RecordReader<T> =
     GenericRecordReader<ScalarBuffer<<T as DataType>::T>, ColumnValueDecoderImpl<T>>;
 
 pub(crate) type ColumnReader<CV> =
-    GenericColumnReader<ColumnLevelDecoderImpl, DefinitionLevelBufferDecoder, CV>;
+    GenericColumnReader<RepetitionLevelDecoderImpl, DefinitionLevelBufferDecoder, CV>;
 
 /// A generic stateful column reader that delimits semantic records
 ///
@@ -55,19 +51,14 @@ pub(crate) type ColumnReader<CV> =
 pub struct GenericRecordReader<V, CV> {
     column_desc: ColumnDescPtr,
 
-    records: V,
+    values: V,
     def_levels: Option<DefinitionLevelBuffer>,
     rep_levels: Option<ScalarBuffer<i16>>,
     column_reader: Option<ColumnReader<CV>>,
-
-    /// Number of records accumulated in records
-    num_records: usize,
-
-    /// Number of values `num_records` contains.
+    /// Number of buffered levels / null-padded values
     num_values: usize,
-
-    /// Starts from 1, number of values have been written to buffer
-    values_written: usize,
+    /// Number of buffered records
+    num_records: usize,
 }
 
 impl<V, CV> GenericRecordReader<V, CV>
@@ -93,14 +84,13 @@ where
         let rep_levels = (desc.max_rep_level() > 0).then(ScalarBuffer::new);
 
         Self {
-            records,
+            values: records,
             def_levels,
             rep_levels,
             column_reader: None,
             column_desc: desc,
-            num_records: 0,
             num_values: 0,
-            values_written: 0,
+            num_records: 0,
         }
     }
 
@@ -117,7 +107,7 @@ where
         });
 
         let rep_level_decoder = (descr.max_rep_level() != 0)
-            .then(|| ColumnLevelDecoderImpl::new(descr.max_rep_level()));
+            .then(|| RepetitionLevelDecoderImpl::new(descr.max_rep_level()));
 
         self.column_reader = Some(GenericColumnReader::new_with_decoders(
             self.column_desc.clone(),
@@ -142,56 +132,14 @@ where
         let mut records_read = 0;
 
         loop {
-            // Try to find some records from buffers that has been read into memory
-            // but not counted as seen records.
-
-            // Check to see if the column is exhausted. Only peek the next page since in
-            // case we are reading to a page boundary and do not actually need to read
-            // the next page.
-            let end_of_column = !self.column_reader.as_mut().unwrap().peek_next()?;
-
-            let (record_count, value_count) =
-                self.count_records(num_records - records_read, end_of_column);
-
-            self.num_records += record_count;
-            self.num_values += value_count;
-            records_read += record_count;
-
+            let records_to_read = num_records - records_read;
+            records_read += self.read_one_batch(records_to_read)?;
             if records_read == num_records
                 || !self.column_reader.as_mut().unwrap().has_next()?
             {
                 break;
             }
-
-            // If repetition levels present, we don't know how much more to read
-            // in order to read the requested number of records, therefore read at least
-            // MIN_BATCH_SIZE, otherwise read **exactly** what was requested. This helps
-            // to avoid a degenerate case where the buffers are never fully drained.
-            //
-            // Consider the scenario where the user is requesting batches of MIN_BATCH_SIZE.
-            //
-            // When transitioning across a row group boundary, this will read some remainder
-            // from the row group `r`, before reading MIN_BATCH_SIZE from the next row group,
-            // leaving `MIN_BATCH_SIZE + r` in the buffer.
-            //
-            // The client will then only split off the `MIN_BATCH_SIZE` they actually wanted,
-            // leaving behind `r`. This will continue indefinitely.
-            //
-            // Aside from wasting cycles splitting and shuffling buffers unnecessarily, this
-            // prevents dictionary preservation from functioning correctly as the buffer
-            // will never be emptied, allowing a new dictionary to be registered.
-            //
-            // This degenerate case can still occur for repeated fields, but
-            // it is avoided for the more common case of a non-repeated field
-            let batch_size = match &self.rep_levels {
-                Some(_) => max(num_records - records_read, MIN_BATCH_SIZE),
-                None => num_records - records_read,
-            };
-
-            // Try to more value from parquet pages
-            self.read_one_batch(batch_size)?;
         }
-
         Ok(records_read)
     }
 
@@ -201,31 +149,10 @@ where
     ///
     /// Number of records skipped
     pub fn skip_records(&mut self, num_records: usize) -> Result<usize> {
-        // First need to clear the buffer
-        let end_of_column = match self.column_reader.as_mut() {
-            Some(reader) => !reader.peek_next()?,
-            None => return Ok(0),
-        };
-
-        let (buffered_records, buffered_values) =
-            self.count_records(num_records, end_of_column);
-
-        self.num_records += buffered_records;
-        self.num_values += buffered_values;
-
-        let remaining = num_records - buffered_records;
-
-        if remaining == 0 {
-            return Ok(buffered_records);
+        match self.column_reader.as_mut() {
+            Some(reader) => reader.skip_records(num_records),
+            None => Ok(0),
         }
-
-        let skipped = self
-            .column_reader
-            .as_mut()
-            .unwrap()
-            .skip_records(remaining)?;
-
-        Ok(skipped + buffered_records)
     }
 
     /// Returns number of records stored in buffer.
@@ -246,25 +173,19 @@ where
     /// definition level values that have already been read into memory but not counted
     /// as record values, e.g. those from `self.num_values` to `self.values_written`.
     pub fn consume_def_levels(&mut self) -> Option<Buffer> {
-        match self.def_levels.as_mut() {
-            Some(x) => x.split_levels(self.num_values),
-            None => None,
-        }
+        self.def_levels.as_mut().and_then(|x| x.consume_levels())
     }
 
     /// Return repetition level data.
     /// The side effect is similar to `consume_def_levels`.
     pub fn consume_rep_levels(&mut self) -> Option<Buffer> {
-        match self.rep_levels.as_mut() {
-            Some(x) => Some(x.split_off(self.num_values)),
-            None => None,
-        }
+        self.rep_levels.as_mut().map(|x| x.consume())
     }
 
     /// Returns currently stored buffer data.
     /// The side effect is similar to `consume_def_levels`.
     pub fn consume_record_data(&mut self) -> V::Output {
-        self.records.split_off(self.num_values)
+        self.values.consume()
     }
 
     /// Returns currently stored null bitmap data.
@@ -277,34 +198,31 @@ where
     /// Should be called after consuming data, e.g. `consume_rep_levels`,
     /// `consume_rep_levels`, `consume_record_data` and `consume_bitmap_buffer`.
     pub fn reset(&mut self) {
-        self.values_written -= self.num_values;
-        self.num_records = 0;
         self.num_values = 0;
+        self.num_records = 0;
     }
 
     /// Returns bitmap data.
     pub fn consume_bitmap(&mut self) -> Option<Buffer> {
         self.def_levels
             .as_mut()
-            .map(|levels| levels.split_bitmask(self.num_values))
+            .map(|levels| levels.consume_bitmask())
     }
 
-    /// Try to read one batch of data.
+    /// Try to read one batch of data returning the number of records read
     fn read_one_batch(&mut self, batch_size: usize) -> Result<usize> {
         let rep_levels = self
             .rep_levels
             .as_mut()
             .map(|levels| levels.spare_capacity_mut(batch_size));
-
         let def_levels = self.def_levels.as_mut();
+        let values = self.values.spare_capacity_mut(batch_size);
 
-        let values = self.records.spare_capacity_mut(batch_size);
-
-        let (values_read, levels_read) = self
+        let (records_read, values_read, levels_read) = self
             .column_reader
             .as_mut()
             .unwrap()
-            .read_batch(batch_size, def_levels, rep_levels, values)?;
+            .read_records(batch_size, def_levels, rep_levels, values)?;
 
         if values_read < levels_read {
             let def_levels = self.def_levels.as_ref().ok_or_else(|| {
@@ -313,90 +231,29 @@ where
                 )
             })?;
 
-            self.records.pad_nulls(
-                self.values_written,
+            self.values.pad_nulls(
+                self.num_values,
                 values_read,
                 levels_read,
                 def_levels.nulls().as_slice(),
             );
         }
 
-        let values_read = max(levels_read, values_read);
-        self.set_values_written(self.values_written + values_read);
-        Ok(values_read)
-    }
-
-    /// Inspects the buffered repetition levels in the range `self.num_values..self.values_written`
-    /// and returns the number of "complete" records along with the corresponding number of values
-    ///
-    /// If `end_of_column` is true it indicates that there are no further values for this
-    /// column chunk beyond what is currently in the buffers
-    ///
-    /// A "complete" record is one where the buffer contains a subsequent repetition level of 0
-    fn count_records(
-        &self,
-        records_to_read: usize,
-        end_of_column: bool,
-    ) -> (usize, usize) {
-        match self.rep_levels.as_ref() {
-            Some(buf) => {
-                let buf = buf.as_slice();
-
-                let mut records_read = 0;
-                let mut end_of_last_record = self.num_values;
-
-                for (current, item) in buf
-                    .iter()
-                    .enumerate()
-                    .take(self.values_written)
-                    .skip(self.num_values)
-                {
-                    if *item == 0 && current != self.num_values {
-                        records_read += 1;
-                        end_of_last_record = current;
-
-                        if records_read == records_to_read {
-                            break;
-                        }
-                    }
-                }
-
-                // If reached end of column chunk => end of a record
-                if records_read != records_to_read
-                    && end_of_column
-                    && self.values_written != self.num_values
-                {
-                    records_read += 1;
-                    end_of_last_record = self.values_written;
-                }
-
-                (records_read, end_of_last_record - self.num_values)
-            }
-            None => {
-                let records_read =
-                    min(records_to_read, self.values_written - self.num_values);
-
-                (records_read, records_read)
-            }
-        }
-    }
-
-    fn set_values_written(&mut self, new_values_written: usize) {
-        self.values_written = new_values_written;
-        self.records.set_len(self.values_written);
-
+        self.num_records += records_read;
+        self.num_values += levels_read;
+        self.values.set_len(self.num_values);
         if let Some(ref mut buf) = self.rep_levels {
-            buf.set_len(self.values_written)
+            buf.set_len(self.num_values)
         };
-
         if let Some(ref mut buf) = self.def_levels {
-            buf.set_len(self.values_written)
+            buf.set_len(self.num_values)
         };
+        Ok(records_read)
     }
 }
 
 /// Returns true if we do not need to unpack the nullability for this column, this is
-/// only possible if the max defiition level is 1, and corresponds to nulls at the
+/// only possible if the max definition level is 1, and corresponds to nulls at the
 /// leaf level, as opposed to a nullable parent nested type
 fn packed_null_mask(descr: &ColumnDescPtr) -> bool {
     descr.max_def_level() == 1
diff --git a/parquet/src/column/mod.rs b/parquet/src/column/mod.rs
index a68127a4e..c81d6290a 100644
--- a/parquet/src/column/mod.rs
+++ b/parquet/src/column/mod.rs
@@ -84,7 +84,6 @@
 //! let reader = SerializedFileReader::new(file).unwrap();
 //! let metadata = reader.metadata();
 //!
-//! let mut res = Ok((0, 0));
 //! let mut values = vec![0; 8];
 //! let mut def_levels = vec![0; 8];
 //! let mut rep_levels = vec![0; 8];
@@ -98,19 +97,21 @@
 //!         match column_reader {
 //!             // You can also use `get_typed_column_reader` method to extract typed reader.
 //!             ColumnReader::Int32ColumnReader(ref mut typed_reader) => {
-//!                 res = typed_reader.read_batch(
-//!                     8, // batch size
+//!                 let (records, values, levels) = typed_reader.read_records(
+//!                     8, // maximum records to read
 //!                     Some(&mut def_levels),
 //!                     Some(&mut rep_levels),
 //!                     &mut values,
-//!                 );
+//!                 ).unwrap();
+//!                 assert_eq!(records, 2);
+//!                 assert_eq!(levels, 5);
+//!                 assert_eq!(values, 3);
 //!             }
 //!             _ => {}
 //!         }
 //!     }
 //! }
 //!
-//! assert_eq!(res.unwrap(), (3, 5));
 //! assert_eq!(values, vec![1, 2, 3, 0, 0, 0, 0, 0]);
 //! assert_eq!(def_levels, vec![3, 3, 3, 2, 2, 0, 0, 0]);
 //! assert_eq!(rep_levels, vec![0, 1, 0, 1, 1, 0, 0, 0]);
diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs
index 991ec2c54..88967e179 100644
--- a/parquet/src/column/reader.rs
+++ b/parquet/src/column/reader.rs
@@ -17,13 +17,12 @@
 
 //! Contains column reader API.
 
-use std::cmp::min;
-
 use super::page::{Page, PageReader};
 use crate::basic::*;
 use crate::column::reader::decoder::{
-    ColumnLevelDecoderImpl, ColumnValueDecoder, ColumnValueDecoderImpl,
-    DefinitionLevelDecoder, LevelsBufferSlice, RepetitionLevelDecoder, ValuesBufferSlice,
+    ColumnValueDecoder, ColumnValueDecoderImpl, DefinitionLevelDecoder,
+    DefinitionLevelDecoderImpl, LevelsBufferSlice, RepetitionLevelDecoder,
+    RepetitionLevelDecoderImpl, ValuesBufferSlice,
 };
 use crate::data_type::*;
 use crate::errors::{ParquetError, Result};
@@ -103,8 +102,8 @@ pub fn get_typed_column_reader<T: DataType>(
 
 /// Typed value reader for a particular primitive column.
 pub type ColumnReaderImpl<T> = GenericColumnReader<
-    ColumnLevelDecoderImpl,
-    ColumnLevelDecoderImpl,
+    RepetitionLevelDecoderImpl,
+    DefinitionLevelDecoderImpl,
     ColumnValueDecoderImpl<T>,
 >;
 
@@ -119,11 +118,14 @@ pub struct GenericColumnReader<R, D, V> {
     page_reader: Box<dyn PageReader>,
 
     /// The total number of values stored in the data page.
-    num_buffered_values: u32,
+    num_buffered_values: usize,
 
     /// The number of values from the current data page that has been decoded into memory
     /// so far.
-    num_decoded_values: u32,
+    num_decoded_values: usize,
+
+    /// True if the end of the current data page denotes the end of a record
+    has_record_delimiter: bool,
 
     /// The decoder for the definition levels if any
     def_level_decoder: Option<D>,
@@ -135,7 +137,7 @@ pub struct GenericColumnReader<R, D, V> {
     values_decoder: V,
 }
 
-impl<V> GenericColumnReader<ColumnLevelDecoderImpl, ColumnLevelDecoderImpl, V>
+impl<V> GenericColumnReader<RepetitionLevelDecoderImpl, DefinitionLevelDecoderImpl, V>
 where
     V: ColumnValueDecoder,
 {
@@ -144,10 +146,10 @@ where
         let values_decoder = V::new(&descr);
 
         let def_level_decoder = (descr.max_def_level() != 0)
-            .then(|| ColumnLevelDecoderImpl::new(descr.max_def_level()));
+            .then(|| DefinitionLevelDecoderImpl::new(descr.max_def_level()));
 
         let rep_level_decoder = (descr.max_rep_level() != 0)
-            .then(|| ColumnLevelDecoderImpl::new(descr.max_rep_level()));
+            .then(|| RepetitionLevelDecoderImpl::new(descr.max_rep_level()));
 
         Self::new_with_decoders(
             descr,
@@ -180,6 +182,7 @@ where
             num_buffered_values: 0,
             num_decoded_values: 0,
             values_decoder,
+            has_record_delimiter: false,
         }
     }
 
@@ -195,99 +198,126 @@ where
     ///
     /// `values` will be contiguously populated with the non-null values. Note that if the column
     /// is not required, this may be less than either `batch_size` or the number of levels read
+    #[deprecated(note = "Use read_records")]
     pub fn read_batch(
         &mut self,
         batch_size: usize,
-        mut def_levels: Option<&mut D::Slice>,
-        mut rep_levels: Option<&mut R::Slice>,
+        def_levels: Option<&mut D::Slice>,
+        rep_levels: Option<&mut R::Slice>,
         values: &mut V::Slice,
     ) -> Result<(usize, usize)> {
-        let mut values_read = 0;
-        let mut levels_read = 0;
+        let (_, values, levels) =
+            self.read_records(batch_size, def_levels, rep_levels, values)?;
+
+        Ok((values, levels))
+    }
 
-        // Compute the smallest batch size we can read based on provided slices
-        let mut batch_size = min(batch_size, values.capacity());
+    /// Read up to `num_records` returning the number of complete records, non-null
+    /// values and levels decoded
+    ///
+    /// If the max definition level is 0, `def_levels` will be ignored, otherwise it will be
+    /// populated with the number of levels read, with an error returned if it is `None`.
+    ///
+    /// If the max repetition level is 0, `rep_levels` will be ignored, otherwise it will be
+    /// populated with the number of levels read, with an error returned if it is `None`.
+    ///
+    /// `values` will be contiguously populated with the non-null values. Note that if the column
+    /// is not required, this may be less than either `max_records` or the number of levels read
+    pub fn read_records(
+        &mut self,
+        max_records: usize,
+        mut def_levels: Option<&mut D::Slice>,
+        mut rep_levels: Option<&mut R::Slice>,
+        values: &mut V::Slice,
+    ) -> Result<(usize, usize, usize)> {
+        let mut max_levels = values.capacity().min(max_records);
         if let Some(ref levels) = def_levels {
-            batch_size = min(batch_size, levels.capacity());
+            max_levels = max_levels.min(levels.capacity());
         }
         if let Some(ref levels) = rep_levels {
-            batch_size = min(batch_size, levels.capacity());
+            max_levels = max_levels.min(levels.capacity())
         }
 
-        // Read exhaustively all pages until we read all batch_size values/levels
-        // or there are no more values/levels to read.
-        while levels_read < batch_size {
-            if !self.has_next()? {
-                break;
-            }
+        let mut total_records_read = 0;
+        let mut total_levels_read = 0;
+        let mut total_values_read = 0;
 
-            // Batch size for the current iteration
-            let iter_batch_size = (batch_size - levels_read)
-                .min((self.num_buffered_values - self.num_decoded_values) as usize);
+        while total_records_read < max_records
+            && total_levels_read < max_levels
+            && self.has_next()?
+        {
+            let remaining_records = max_records - total_records_read;
+            let remaining_levels = self.num_buffered_values - self.num_decoded_values;
+            let levels_to_read = remaining_levels.min(max_levels - total_levels_read);
 
-            // If the field is required and non-repeated, there are no definition levels
-            let null_count = match self.descr.max_def_level() > 0 {
-                true => {
-                    let levels = def_levels
+            let (records_read, levels_read) = match self.rep_level_decoder.as_mut() {
+                Some(reader) => {
+                    let out = rep_levels
                         .as_mut()
-                        .ok_or_else(|| general_err!("must specify definition levels"))?;
+                        .ok_or_else(|| general_err!("must specify repetition levels"))?;
+
+                    let (mut records_read, levels_read) = reader.read_rep_levels(
+                        out,
+                        total_levels_read..total_levels_read + levels_to_read,
+                        remaining_records,
+                    )?;
+
+                    if levels_read == remaining_levels && self.has_record_delimiter {
+                        // Reached end of page, which implies records_read < remaining_records
+                        // as otherwise would have stopped reading before reaching the end
+                        assert!(records_read < remaining_records); // Sanity check
+                        records_read += 1;
+                    }
+                    (records_read, levels_read)
+                }
+                None => {
+                    let min = remaining_records.min(levels_to_read);
+                    (min, min)
+                }
+            };
 
-                    let num_def_levels = self
-                        .def_level_decoder
+            let values_to_read = match self.def_level_decoder.as_mut() {
+                Some(reader) => {
+                    let out = def_levels
                         .as_mut()
-                        .expect("def_level_decoder be set")
-                        .read(levels, levels_read..levels_read + iter_batch_size)?;
+                        .ok_or_else(|| general_err!("must specify definition levels"))?;
+
+                    let read = reader.read_def_levels(
+                        out,
+                        total_levels_read..total_levels_read + levels_read,
+                    )?;
 
-                    if num_def_levels != iter_batch_size {
-                        return Err(general_err!("insufficient definition levels read from column - expected {}, got {}", iter_batch_size, num_def_levels));
+                    if read != levels_read {
+                        return Err(general_err!("insufficient definition levels read from column - expected {rep_levels}, got {read}"));
                     }
 
-                    levels.count_nulls(
-                        levels_read..levels_read + num_def_levels,
+                    let null_count = out.count_nulls(
+                        total_levels_read..total_levels_read + read,
                         self.descr.max_def_level(),
-                    )
+                    );
+                    levels_read - null_count
                 }
-                false => 0,
+                None => levels_read,
             };
 
-            if self.descr.max_rep_level() > 0 {
-                let levels = rep_levels
-                    .as_mut()
-                    .ok_or_else(|| general_err!("must specify repetition levels"))?;
-
-                let rep_levels = self
-                    .rep_level_decoder
-                    .as_mut()
-                    .expect("rep_level_decoder be set")
-                    .read(levels, levels_read..levels_read + iter_batch_size)?;
-
-                if rep_levels != iter_batch_size {
-                    return Err(general_err!("insufficient repetition levels read from column - expected {}, got {}", iter_batch_size, rep_levels));
-                }
-            }
-
-            let values_to_read = iter_batch_size - null_count;
-            let curr_values_read = self
-                .values_decoder
-                .read(values, values_read..values_read + values_to_read)?;
+            let values_read = self.values_decoder.read(
+                values,
+                total_values_read..total_values_read + values_to_read,
+            )?;
 
-            if curr_values_read != values_to_read {
+            if values_read != values_to_read {
                 return Err(general_err!(
-                    "insufficient values read from column - expected: {}, got: {}",
-                    values_to_read,
-                    curr_values_read
+                    "insufficient values read from column - expected: {values_to_read}, got: {values_read}",
                 ));
             }
 
-            // Update all "return" counters and internal state.
-
-            // This is to account for when def or rep levels are not provided
-            self.num_decoded_values += iter_batch_size as u32;
-            levels_read += iter_batch_size;
-            values_read += curr_values_read;
+            self.num_decoded_values += levels_read;
+            total_records_read += records_read;
+            total_levels_read += levels_read;
+            total_values_read += values_read;
         }
 
-        Ok((values_read, levels_read))
+        Ok((total_records_read, total_values_read, total_levels_read))
     }
 
     /// Skips over `num_records` records, where records are delimited by repetition levels of 0
@@ -336,21 +366,30 @@ where
             // start skip values in page level
 
             // The number of levels in the current data page
-            let buffered_levels =
-                (self.num_buffered_values - self.num_decoded_values) as usize;
+            let remaining_levels = self.num_buffered_values - self.num_decoded_values;
 
             let (records_read, rep_levels_read) = match self.rep_level_decoder.as_mut() {
                 Some(decoder) => {
-                    decoder.skip_rep_levels(remaining_records, buffered_levels)?
+                    let (mut records_read, levels_read) =
+                        decoder.skip_rep_levels(remaining_records, remaining_levels)?;
+
+                    if levels_read == remaining_levels && self.has_record_delimiter {
+                        // Reached end of page, which implies records_read < remaining_records
+                        // as otherwise would have stopped reading before reaching the end
+                        assert!(records_read < remaining_records); // Sanity check
+                        records_read += 1;
+                    }
+
+                    (records_read, levels_read)
                 }
                 None => {
                     // No repetition levels, so each level corresponds to a row
-                    let levels = buffered_levels.min(remaining_records);
+                    let levels = remaining_levels.min(remaining_records);
                     (levels, levels)
                 }
             };
 
-            self.num_decoded_values += rep_levels_read as u32;
+            self.num_decoded_values += rep_levels_read;
             remaining_records -= records_read;
 
             if self.num_buffered_values == self.num_decoded_values {
@@ -431,7 +470,7 @@ where
                             rep_level_encoding,
                             statistics: _,
                         } => {
-                            self.num_buffered_values = num_values;
+                            self.num_buffered_values = num_values as _;
                             self.num_decoded_values = 0;
 
                             let max_rep_level = self.descr.max_rep_level();
@@ -448,6 +487,9 @@ where
                                 )?;
                                 offset += bytes_read;
 
+                                self.has_record_delimiter =
+                                    self.page_reader.peek_next_page()?.is_none();
+
                                 self.rep_level_decoder
                                     .as_mut()
                                     .unwrap()
@@ -493,12 +535,18 @@ where
                                 return Err(general_err!("more nulls than values in page, contained {} values and {} nulls", num_values, num_nulls));
                             }
 
-                            self.num_buffered_values = num_values;
+                            self.num_buffered_values = num_values as _;
                             self.num_decoded_values = 0;
 
                             // DataPage v2 only supports RLE encoding for repetition
                             // levels
                             if self.descr.max_rep_level() > 0 {
+                                // Technically a DataPage v2 should not write a record
+                                // across multiple pages, however, the parquet writer
+                                // used to do this so we preserve backwards compatibility
+                                self.has_record_delimiter =
+                                    self.page_reader.peek_next_page()?.is_none();
+
                                 self.rep_level_decoder.as_mut().unwrap().set_data(
                                     Encoding::RLE,
                                     buf.range(0, rep_levels_byte_len as usize),
@@ -533,21 +581,6 @@ where
         }
     }
 
-    /// Check whether there is more data to read from this column,
-    /// If the current page is fully decoded, this will NOT load the next page
-    /// into the buffer
-    #[inline]
-    #[cfg(feature = "arrow")]
-    pub(crate) fn peek_next(&mut self) -> Result<bool> {
-        if self.num_buffered_values == 0
-            || self.num_buffered_values == self.num_decoded_values
-        {
-            Ok(self.page_reader.peek_next_page()?.is_some())
-        } else {
-            Ok(true)
-        }
-    }
-
     /// Check whether there is more data to read from this column,
     /// If the current page is fully decoded, this will load the next page
     /// (if it exists) into the buffer
@@ -1359,15 +1392,14 @@ mod tests {
 
             let mut curr_values_read = 0;
             let mut curr_levels_read = 0;
-            let mut done = false;
-            while !done {
+            loop {
                 let actual_def_levels =
                     def_levels.as_mut().map(|vec| &mut vec[curr_levels_read..]);
                 let actual_rep_levels =
                     rep_levels.as_mut().map(|vec| &mut vec[curr_levels_read..]);
 
-                let (values_read, levels_read) = typed_column_reader
-                    .read_batch(
+                let (_, values_read, levels_read) = typed_column_reader
+                    .read_records(
                         batch_size,
                         actual_def_levels,
                         actual_rep_levels,
@@ -1375,12 +1407,12 @@ mod tests {
                     )
                     .expect("read_batch() should be OK");
 
-                if values_read == 0 && levels_read == 0 {
-                    done = true;
-                }
-
                 curr_values_read += values_read;
                 curr_levels_read += levels_read;
+
+                if values_read == 0 && levels_read == 0 {
+                    break;
+                }
             }
 
             assert!(
diff --git a/parquet/src/column/reader/decoder.rs b/parquet/src/column/reader/decoder.rs
index 3a6795c8c..369b335dc 100644
--- a/parquet/src/column/reader/decoder.rs
+++ b/parquet/src/column/reader/decoder.rs
@@ -68,24 +68,35 @@ pub trait ColumnLevelDecoder {
 
     /// Set data for this [`ColumnLevelDecoder`]
     fn set_data(&mut self, encoding: Encoding, data: ByteBufferPtr);
+}
 
-    /// Read level data into `out[range]` returning the number of levels read
+pub trait RepetitionLevelDecoder: ColumnLevelDecoder {
+    /// Read up to `max_records` of repetition level data into `out[range]` returning the number
+    /// of complete records and levels read
     ///
     /// `range` is provided by the caller to allow for types such as default-initialized `[T]`
     /// that only track capacity and not length
     ///
+    /// A record only ends when the data contains a subsequent repetition level of 0,
+    /// it is therefore left to the caller to delimit the final record in a column
+    ///
     /// # Panics
     ///
     /// Implementations may panic if `range` overlaps with already written data
-    ///
-    fn read(&mut self, out: &mut Self::Slice, range: Range<usize>) -> Result<usize>;
-}
+    fn read_rep_levels(
+        &mut self,
+        out: &mut Self::Slice,
+        range: Range<usize>,
+        max_records: usize,
+    ) -> Result<(usize, usize)>;
 
-pub trait RepetitionLevelDecoder: ColumnLevelDecoder {
     /// Skips over up to `num_levels` repetition levels corresponding to `num_records` records,
     /// where a record is delimited by a repetition level of 0
     ///
     /// Returns the number of records skipped, and the number of levels skipped
+    ///
+    /// A record only ends when the data contains a subsequent repetition level of 0,
+    /// it is therefore left to the caller to delimit the final record in a column
     fn skip_rep_levels(
         &mut self,
         num_records: usize,
@@ -94,6 +105,22 @@ pub trait RepetitionLevelDecoder: ColumnLevelDecoder {
 }
 
 pub trait DefinitionLevelDecoder: ColumnLevelDecoder {
+    /// Read definition level data into `out[range]` returning the number of levels read
+    ///
+    /// `range` is provided by the caller to allow for types such as default-initialized `[T]`
+    /// that only track capacity and not length
+    ///
+    /// # Panics
+    ///
+    /// Implementations may panic if `range` overlaps with already written data
+    ///
+    // TODO: Should this return the number of nulls
+    fn read_def_levels(
+        &mut self,
+        out: &mut Self::Slice,
+        range: Range<usize>,
+    ) -> Result<usize>;
+
     /// Skips over `num_levels` definition levels
     ///
     /// Returns the number of values skipped, and the number of levels skipped
@@ -270,101 +297,67 @@ impl<T: DataType> ColumnValueDecoder for ColumnValueDecoderImpl<T> {
 
 const SKIP_BUFFER_SIZE: usize = 1024;
 
-/// An implementation of [`ColumnLevelDecoder`] for `[i16]`
-pub struct ColumnLevelDecoderImpl {
-    decoder: Option<LevelDecoderInner>,
-    /// Temporary buffer populated when skipping values
-    buffer: Vec<i16>,
-    bit_width: u8,
+enum LevelDecoder {
+    Packed(BitReader, u8),
+    Rle(RleDecoder),
 }
 
-impl ColumnLevelDecoderImpl {
-    pub fn new(max_level: i16) -> Self {
-        let bit_width = num_required_bits(max_level as u64);
-        Self {
-            decoder: None,
-            buffer: vec![],
-            bit_width,
+impl LevelDecoder {
+    fn new(encoding: Encoding, data: ByteBufferPtr, bit_width: u8) -> Self {
+        match encoding {
+            Encoding::RLE => {
+                let mut decoder = RleDecoder::new(bit_width);
+                decoder.set_data(data);
+                Self::Rle(decoder)
+            }
+            Encoding::BIT_PACKED => Self::Packed(BitReader::new(data), bit_width),
+            _ => unreachable!("invalid level encoding: {}", encoding),
         }
     }
 
-    /// Drops the first `len` values from the internal buffer
-    fn split_off_buffer(&mut self, len: usize) {
-        match self.buffer.len() == len {
-            true => self.buffer.clear(),
-            false => {
-                // Move to_read elements to end of slice
-                self.buffer.rotate_left(len);
-                // Truncate buffer
-                self.buffer.truncate(self.buffer.len() - len);
+    fn read(&mut self, out: &mut [i16]) -> Result<usize> {
+        match self {
+            Self::Packed(reader, bit_width) => {
+                Ok(reader.get_batch::<i16>(out, *bit_width as usize))
             }
+            Self::Rle(reader) => Ok(reader.get_batch(out)?),
         }
     }
+}
 
-    /// Reads up to `to_read` values to the internal buffer
-    fn read_to_buffer(&mut self, to_read: usize) -> Result<()> {
-        let mut buf = std::mem::take(&mut self.buffer);
-
-        // Repopulate buffer
-        buf.resize(to_read, 0);
-        let actual = self.read(&mut buf, 0..to_read)?;
-        buf.truncate(actual);
-
-        self.buffer = buf;
-        Ok(())
-    }
+/// An implementation of [`DefinitionLevelDecoder`] for `[i16]`
+pub struct DefinitionLevelDecoderImpl {
+    decoder: Option<LevelDecoder>,
+    bit_width: u8,
 }
 
-enum LevelDecoderInner {
-    Packed(BitReader, u8),
-    Rle(RleDecoder),
+impl DefinitionLevelDecoderImpl {
+    pub fn new(max_level: i16) -> Self {
+        let bit_width = num_required_bits(max_level as u64);
+        Self {
+            decoder: None,
+            bit_width,
+        }
+    }
 }
 
-impl ColumnLevelDecoder for ColumnLevelDecoderImpl {
+impl ColumnLevelDecoder for DefinitionLevelDecoderImpl {
     type Slice = [i16];
 
     fn set_data(&mut self, encoding: Encoding, data: ByteBufferPtr) {
-        self.buffer.clear();
-        match encoding {
-            Encoding::RLE => {
-                let mut decoder = RleDecoder::new(self.bit_width);
-                decoder.set_data(data);
-                self.decoder = Some(LevelDecoderInner::Rle(decoder));
-            }
-            Encoding::BIT_PACKED => {
-                self.decoder = Some(LevelDecoderInner::Packed(
-                    BitReader::new(data),
-                    self.bit_width,
-                ));
-            }
-            _ => unreachable!("invalid level encoding: {}", encoding),
-        }
+        self.decoder = Some(LevelDecoder::new(encoding, data, self.bit_width))
     }
+}
 
-    fn read(&mut self, out: &mut Self::Slice, mut range: Range<usize>) -> Result<usize> {
-        let read_from_buffer = match self.buffer.is_empty() {
-            true => 0,
-            false => {
-                let read_from_buffer = self.buffer.len().min(range.end - range.start);
-                out[range.start..range.start + read_from_buffer]
-                    .copy_from_slice(&self.buffer[0..read_from_buffer]);
-                self.split_off_buffer(read_from_buffer);
-                read_from_buffer
-            }
-        };
-        range.start += read_from_buffer;
-
-        match self.decoder.as_mut().unwrap() {
-            LevelDecoderInner::Packed(reader, bit_width) => Ok(read_from_buffer
-                + reader.get_batch::<i16>(&mut out[range], *bit_width as usize)),
-            LevelDecoderInner::Rle(reader) => {
-                Ok(read_from_buffer + reader.get_batch(&mut out[range])?)
-            }
-        }
+impl DefinitionLevelDecoder for DefinitionLevelDecoderImpl {
+    fn read_def_levels(
+        &mut self,
+        out: &mut Self::Slice,
+        range: Range<usize>,
+    ) -> Result<usize> {
+        self.decoder.as_mut().unwrap().read(&mut out[range])
     }
-}
 
-impl DefinitionLevelDecoder for ColumnLevelDecoderImpl {
     fn skip_def_levels(
         &mut self,
         num_levels: usize,
@@ -372,80 +365,159 @@ impl DefinitionLevelDecoder for ColumnLevelDecoderImpl {
     ) -> Result<(usize, usize)> {
         let mut level_skip = 0;
         let mut value_skip = 0;
+        let mut buf: Vec<i16> = vec![];
         while level_skip < num_levels {
             let remaining_levels = num_levels - level_skip;
 
-            if self.buffer.is_empty() {
-                // Only read number of needed values
-                self.read_to_buffer(remaining_levels.min(SKIP_BUFFER_SIZE))?;
-                if self.buffer.is_empty() {
-                    // Reached end of page
-                    break;
-                }
+            let to_read = remaining_levels.min(SKIP_BUFFER_SIZE);
+            buf.resize(to_read, 0);
+            let read = self.read_def_levels(&mut buf, 0..to_read)?;
+            if read == 0 {
+                // Reached end of page
+                break;
             }
-            let to_read = self.buffer.len().min(remaining_levels);
 
-            level_skip += to_read;
-            value_skip += self.buffer[..to_read]
-                .iter()
-                .filter(|x| **x == max_def_level)
-                .count();
-
-            self.split_off_buffer(to_read)
+            level_skip += read;
+            value_skip += buf[..read].iter().filter(|x| **x == max_def_level).count();
         }
 
         Ok((value_skip, level_skip))
     }
 }
 
-impl RepetitionLevelDecoder for ColumnLevelDecoderImpl {
-    fn skip_rep_levels(
+pub(crate) const REPETITION_LEVELS_BATCH_SIZE: usize = 1024;
+
+/// An implementation of [`RepetitionLevelDecoder`] for `[i16]`
+pub struct RepetitionLevelDecoderImpl {
+    decoder: Option<LevelDecoder>,
+    bit_width: u8,
+    buffer: Box<[i16; REPETITION_LEVELS_BATCH_SIZE]>,
+    buffer_len: usize,
+    buffer_offset: usize,
+    has_partial: bool,
+}
+
+impl RepetitionLevelDecoderImpl {
+    pub fn new(max_level: i16) -> Self {
+        let bit_width = num_required_bits(max_level as u64);
+        Self {
+            decoder: None,
+            bit_width,
+            buffer: Box::new([0; REPETITION_LEVELS_BATCH_SIZE]),
+            buffer_offset: 0,
+            buffer_len: 0,
+            has_partial: false,
+        }
+    }
+
+    fn fill_buf(&mut self) -> Result<()> {
+        let read = self.decoder.as_mut().unwrap().read(self.buffer.as_mut())?;
+        self.buffer_offset = 0;
+        self.buffer_len = read;
+        Ok(())
+    }
+
+    /// Inspects the buffered repetition levels in the range `self.buffer_offset..self.buffer_len`
+    /// and returns the number of "complete" records along with the corresponding number of values
+    ///
+    /// A "complete" record is one where the buffer contains a subsequent repetition level of 0
+    fn count_records(
         &mut self,
-        num_records: usize,
+        records_to_read: usize,
         num_levels: usize,
-    ) -> Result<(usize, usize)> {
-        let mut level_skip = 0;
-        let mut record_skip = 0;
+    ) -> (bool, usize, usize) {
+        let mut records_read = 0;
 
-        while level_skip < num_levels {
-            let remaining_levels = num_levels - level_skip;
+        let levels = num_levels.min(self.buffer_len - self.buffer_offset);
+        let buf = self.buffer.iter().skip(self.buffer_offset);
+        for (idx, item) in buf.take(levels).enumerate() {
+            if *item == 0 && (idx != 0 || self.has_partial) {
+                records_read += 1;
 
-            if self.buffer.is_empty() {
-                // Only read number of needed values
-                self.read_to_buffer(remaining_levels.min(SKIP_BUFFER_SIZE))?;
-                if self.buffer.is_empty() {
-                    // Reached end of page
-                    break;
+                if records_read == records_to_read {
+                    return (false, records_read, idx);
                 }
             }
+        }
+        // Either ran out of space in `num_levels` or data in `self.buffer`
+        (true, records_read, levels)
+    }
+}
+
+impl ColumnLevelDecoder for RepetitionLevelDecoderImpl {
+    type Slice = [i16];
 
-            let max_skip = self.buffer.len().min(remaining_levels);
+    fn set_data(&mut self, encoding: Encoding, data: ByteBufferPtr) {
+        self.decoder = Some(LevelDecoder::new(encoding, data, self.bit_width));
+        self.buffer_len = 0;
+        self.buffer_offset = 0;
+    }
+}
 
-            let mut to_skip = 0;
-            while to_skip < max_skip && record_skip != num_records {
-                if self.buffer[to_skip] == 0 {
-                    record_skip += 1;
+impl RepetitionLevelDecoder for RepetitionLevelDecoderImpl {
+    fn read_rep_levels(
+        &mut self,
+        out: &mut Self::Slice,
+        range: Range<usize>,
+        max_records: usize,
+    ) -> Result<(usize, usize)> {
+        let output = &mut out[range];
+        let max_levels = output.len();
+        let mut total_records_read = 0;
+        let mut total_levels_read = 0;
+
+        while total_records_read < max_records && total_levels_read < max_levels {
+            if self.buffer_len == self.buffer_offset {
+                self.fill_buf()?;
+                if self.buffer_len == 0 {
+                    break;
                 }
-                to_skip += 1;
             }
 
-            // Find end of record
-            while to_skip < max_skip && self.buffer[to_skip] != 0 {
-                to_skip += 1;
-            }
+            let (partial, records_read, levels_read) = self.count_records(
+                max_records - total_records_read,
+                max_levels - total_levels_read,
+            );
 
-            level_skip += to_skip;
-            if to_skip == self.buffer.len() {
-                // Need to to read more values
-                self.buffer.clear();
-                continue;
-            }
+            output[total_levels_read..total_levels_read + levels_read].copy_from_slice(
+                &self.buffer[self.buffer_offset..self.buffer_offset + levels_read],
+            );
 
-            self.split_off_buffer(to_skip);
-            break;
+            total_levels_read += levels_read;
+            total_records_read += records_read;
+            self.buffer_offset += levels_read;
+            self.has_partial = partial;
         }
+        Ok((total_records_read, total_levels_read))
+    }
+
+    fn skip_rep_levels(
+        &mut self,
+        num_records: usize,
+        num_levels: usize,
+    ) -> Result<(usize, usize)> {
+        let mut total_records_read = 0;
+        let mut total_levels_read = 0;
 
-        Ok((record_skip, level_skip))
+        while total_records_read < num_records && total_levels_read < num_levels {
+            if self.buffer_len == self.buffer_offset {
+                self.fill_buf()?;
+                if self.buffer_len == 0 {
+                    break;
+                }
+            }
+
+            let (partial, records_read, levels_read) = self.count_records(
+                num_records - total_records_read,
+                num_levels - total_levels_read,
+            );
+
+            total_levels_read += levels_read;
+            total_records_read += records_read;
+            self.buffer_offset += levels_read;
+            self.has_partial = partial;
+        }
+        Ok((total_records_read, total_levels_read))
     }
 }
 
@@ -455,35 +527,6 @@ mod tests {
     use crate::encodings::rle::RleEncoder;
     use rand::prelude::*;
 
-    fn test_skip_levels<F>(encoded: &[i16], data: ByteBufferPtr, skip: F)
-    where
-        F: Fn(&mut ColumnLevelDecoderImpl, &mut usize, usize),
-    {
-        let mut rng = thread_rng();
-        let mut decoder = ColumnLevelDecoderImpl::new(5);
-        decoder.set_data(Encoding::RLE, data);
-
-        let mut read = 0;
-        let mut decoded = vec![];
-        let mut expected = vec![];
-        while read < encoded.len() {
-            let to_read = rng.gen_range(0..(encoded.len() - read).min(100)) + 1;
-
-            if rng.gen_bool(0.5) {
-                skip(&mut decoder, &mut read, to_read)
-            } else {
-                let start = decoded.len();
-                let end = decoded.len() + to_read;
-                decoded.resize(end, 0);
-                let actual_read = decoder.read(&mut decoded, start..end).unwrap();
-                assert_eq!(actual_read, to_read);
-                expected.extend_from_slice(&encoded[read..read + to_read]);
-                read += to_read;
-            }
-        }
-        assert_eq!(decoded, expected);
-    }
-
     #[test]
     fn test_skip_padding() {
         let mut encoder = RleEncoder::new(1, 1024);
@@ -491,67 +534,67 @@ mod tests {
         (0..3).for_each(|_| encoder.put(1));
         let data = ByteBufferPtr::new(encoder.consume());
 
-        let mut decoder = ColumnLevelDecoderImpl::new(1);
+        let mut decoder = RepetitionLevelDecoderImpl::new(1);
         decoder.set_data(Encoding::RLE, data.clone());
-        let (records, levels) = decoder.skip_rep_levels(100, 4).unwrap();
-        assert_eq!(records, 1);
+        let (_, levels) = decoder.skip_rep_levels(100, 4).unwrap();
         assert_eq!(levels, 4);
 
         // The length of the final bit packed run is ambiguous, so without the correct
         // levels limit, it will decode zero padding
-        let mut decoder = ColumnLevelDecoderImpl::new(1);
+        let mut decoder = RepetitionLevelDecoderImpl::new(1);
         decoder.set_data(Encoding::RLE, data);
-        let (records, levels) = decoder.skip_rep_levels(100, 6).unwrap();
-        assert_eq!(records, 3);
+        let (_, levels) = decoder.skip_rep_levels(100, 6).unwrap();
         assert_eq!(levels, 6);
     }
 
     #[test]
-    fn test_skip() {
+    fn test_skip_rep_levels() {
         for _ in 0..10 {
             let mut rng = thread_rng();
             let total_len = 10000_usize;
-            let encoded: Vec<i16> = (0..total_len).map(|_| rng.gen_range(0..5)).collect();
+            let mut encoded: Vec<i16> =
+                (0..total_len).map(|_| rng.gen_range(0..5)).collect();
+            encoded[0] = 0;
             let mut encoder = RleEncoder::new(3, 1024);
             for v in &encoded {
                 encoder.put(*v as _)
             }
             let data = ByteBufferPtr::new(encoder.consume());
 
-            test_skip_levels(&encoded, data.clone(), |decoder, read, to_read| {
-                let (values_skipped, levels_skipped) =
-                    decoder.skip_def_levels(to_read, 5).unwrap();
-                assert_eq!(levels_skipped, to_read);
-
-                let expected = &encoded[*read..*read + to_read];
-                let expected_values_skipped =
-                    expected.iter().filter(|x| **x == 5).count();
-                assert_eq!(values_skipped, expected_values_skipped);
-                *read += to_read;
-            });
-
-            test_skip_levels(&encoded, data.clone(), |decoder, read, to_read| {
-                let remaining_levels = total_len - *read;
-                let (records_skipped, levels_skipped) =
-                    decoder.skip_rep_levels(to_read, remaining_levels).unwrap();
-
-                assert!(levels_skipped <= remaining_levels);
-
-                // If not run out of values
-                if levels_skipped + *read != encoded.len() {
-                    // Should have read correct number of records
-                    assert_eq!(records_skipped, to_read);
-                    // Next value should be start of record
-                    assert_eq!(encoded[levels_skipped + *read], 0);
+            let mut decoder = RepetitionLevelDecoderImpl::new(5);
+            decoder.set_data(Encoding::RLE, data);
+
+            let total_records = encoded.iter().filter(|x| **x == 0).count();
+            let mut remaining_records = total_records;
+            let mut remaining_levels = encoded.len();
+            loop {
+                let skip = rng.gen_bool(0.5);
+                let records = rng.gen_range(1..=remaining_records.min(5));
+                let (records_read, levels_read) = if skip {
+                    decoder.skip_rep_levels(records, remaining_levels).unwrap()
+                } else {
+                    let mut decoded = vec![0; remaining_levels];
+                    let (records_read, levels_read) = decoder
+                        .read_rep_levels(&mut decoded, 0..remaining_levels, records)
+                        .unwrap();
+
+                    assert_eq!(
+                        decoded[..levels_read],
+                        encoded[encoded.len() - remaining_levels..][..levels_read]
+                    );
+                    (records_read, levels_read)
+                };
+
+                remaining_levels = remaining_levels.checked_sub(levels_read).unwrap();
+                if remaining_levels == 0 {
+                    assert_eq!(records_read + 1, records);
+                    assert_eq!(records, remaining_records);
+                    break;
                 }
-
-                let expected = &encoded[*read..*read + levels_skipped];
-                let expected_records_skipped =
-                    expected.iter().filter(|x| **x == 0).count();
-                assert_eq!(records_skipped, expected_records_skipped);
-
-                *read += levels_skipped;
-            });
+                assert_eq!(records_read, records);
+                remaining_records -= records;
+                assert_ne!(remaining_records, 0);
+            }
         }
     }
 }
diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs
index fc5e29b03..93dff1b46 100644
--- a/parquet/src/column/writer/mod.rs
+++ b/parquet/src/column/writer/mod.rs
@@ -2332,7 +2332,7 @@ mod tests {
         let mut actual_def_levels = def_levels.map(|_| vec![0i16; max_batch_size]);
         let mut actual_rep_levels = rep_levels.map(|_| vec![0i16; max_batch_size]);
 
-        let (values_read, levels_read) = read_fully(
+        let (_, values_read, levels_read) = read_fully(
             reader,
             max_batch_size,
             actual_def_levels.as_mut(),
@@ -2409,11 +2409,11 @@ mod tests {
         mut def_levels: Option<&mut Vec<i16>>,
         mut rep_levels: Option<&mut Vec<i16>>,
         values: &mut [T::T],
-    ) -> (usize, usize) {
+    ) -> (usize, usize, usize) {
         let actual_def_levels = def_levels.as_mut().map(|vec| &mut vec[..]);
         let actual_rep_levels = rep_levels.as_mut().map(|vec| &mut vec[..]);
         reader
-            .read_batch(batch_size, actual_def_levels, actual_rep_levels, values)
+            .read_records(batch_size, actual_def_levels, actual_rep_levels, values)
             .unwrap()
     }
 
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index defdaad32..15240e33c 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -1632,12 +1632,12 @@ mod tests {
             let mut out = [0; 4];
             let c1 = row_group.get_column_reader(0).unwrap();
             let mut c1 = get_typed_column_reader::<Int32Type>(c1);
-            c1.read_batch(4, None, None, &mut out).unwrap();
+            c1.read_records(4, None, None, &mut out).unwrap();
             assert_eq!(out, column_data[0]);
 
             let c2 = row_group.get_column_reader(1).unwrap();
             let mut c2 = get_typed_column_reader::<Int32Type>(c2);
-            c2.read_batch(4, None, None, &mut out).unwrap();
+            c2.read_records(4, None, None, &mut out).unwrap();
             assert_eq!(out, column_data[1]);
         };
 
diff --git a/parquet/src/record/triplet.rs b/parquet/src/record/triplet.rs
index 14a4a3945..67c407b3a 100644
--- a/parquet/src/record/triplet.rs
+++ b/parquet/src/record/triplet.rs
@@ -295,8 +295,11 @@ impl<T: DataType> TypedTripletIter<T> {
     fn read_next(&mut self) -> Result<bool> {
         self.curr_triplet_index += 1;
 
-        if self.curr_triplet_index >= self.triplets_left {
-            let (values_read, levels_read) = {
+        // A loop is required to handle the case of a batch size of 1, as in such a case
+        // on reaching the end of a record, read_records will return `Ok((1, 0, 0))`
+        // and therefore not advance `self.triplets_left`
+        while self.curr_triplet_index >= self.triplets_left {
+            let (records_read, values_read, levels_read) = {
                 // Get slice of definition levels, if available
                 let def_levels = self.def_levels.as_mut().map(|vec| &mut vec[..]);
 
@@ -304,7 +307,7 @@ impl<T: DataType> TypedTripletIter<T> {
                 let rep_levels = self.rep_levels.as_mut().map(|vec| &mut vec[..]);
 
                 // Buffer triplets
-                self.reader.read_batch(
+                self.reader.read_records(
                     self.batch_size,
                     def_levels,
                     rep_levels,
@@ -313,7 +316,7 @@ impl<T: DataType> TypedTripletIter<T> {
             };
 
             // No more values or levels to read
-            if values_read == 0 && levels_read == 0 {
+            if records_read == 0 && values_read == 0 && levels_read == 0 {
                 self.has_next = false;
                 return Ok(false);
             }