You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/07/07 16:05:23 UTC

[arrow-rs] branch master updated: Stub out Skip Records API (#1792) (#1998)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new e59b02348 Stub out Skip Records API (#1792) (#1998)
e59b02348 is described below

commit e59b023480437f67e84ba2f827b58f78fd44c3a1
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Thu Jul 7 12:05:18 2022 -0400

    Stub out Skip Records API (#1792) (#1998)
    
    * Stub API for parquet record skipping
    
    * Update parquet/src/arrow/record_reader/mod.rs
    
    Co-authored-by: Yang Jiang <ji...@163.com>
    
    * Remove empty google.protobuf.rs
    
    * Replace todo with nyi_err
    
    * Update doc comment
    
    Co-authored-by: Yang Jiang <ji...@163.com>
---
 parquet/src/arrow/array_reader/byte_array.rs       |   8 ++
 .../arrow/array_reader/byte_array_dictionary.rs    |   8 ++
 .../src/arrow/array_reader/complex_object_array.rs |   7 ++
 parquet/src/arrow/array_reader/empty_array.rs      |   6 ++
 parquet/src/arrow/array_reader/list_array.rs       |   4 +
 parquet/src/arrow/array_reader/map_array.rs        |  13 +++
 parquet/src/arrow/array_reader/mod.rs              |   3 +
 parquet/src/arrow/array_reader/null_array.rs       |   4 +
 parquet/src/arrow/array_reader/primitive_array.rs  |   4 +
 parquet/src/arrow/array_reader/struct_array.rs     |  20 ++++
 parquet/src/arrow/array_reader/test_util.rs        |   5 +
 parquet/src/arrow/arrow_reader.rs                  | 108 ++++++++++++++++++++-
 parquet/src/arrow/async_reader.rs                  |  10 +-
 .../src/arrow/record_reader/definition_levels.rs   |  16 ++-
 parquet/src/arrow/record_reader/mod.rs             |  38 +++++++-
 parquet/src/column/page.rs                         |  17 ++++
 parquet/src/column/reader.rs                       |  73 +++++++++++++-
 parquet/src/column/reader/decoder.rs               |  44 +++++++++
 parquet/src/file/serialized_reader.rs              |  10 +-
 parquet/src/util/test_common/page_util.rs          |  10 +-
 20 files changed, 391 insertions(+), 17 deletions(-)

diff --git a/parquet/src/arrow/array_reader/byte_array.rs b/parquet/src/arrow/array_reader/byte_array.rs
index 95620d940..b762236c4 100644
--- a/parquet/src/arrow/array_reader/byte_array.rs
+++ b/parquet/src/arrow/array_reader/byte_array.rs
@@ -122,6 +122,10 @@ impl<I: OffsetSizeTrait + ScalarValue> ArrayReader for ByteArrayReader<I> {
         Ok(buffer.into_array(null_buffer, self.data_type.clone()))
     }
 
+    fn skip_records(&mut self, num_records: usize) -> Result<usize> {
+        self.record_reader.skip_records(num_records)
+    }
+
     fn get_def_levels(&self) -> Option<&[i16]> {
         self.def_levels_buffer
             .as_ref()
@@ -210,6 +214,10 @@ impl<I: OffsetSizeTrait + ScalarValue> ColumnValueDecoder
 
         decoder.read(out, range.end - range.start, self.dict.as_ref())
     }
+
+    fn skip_values(&mut self, _num_values: usize) -> Result<usize> {
+        Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
+    }
 }
 
 /// A generic decoder from uncompressed parquet value data to [`OffsetBuffer`]
diff --git a/parquet/src/arrow/array_reader/byte_array_dictionary.rs b/parquet/src/arrow/array_reader/byte_array_dictionary.rs
index 77f7916ed..bfe557499 100644
--- a/parquet/src/arrow/array_reader/byte_array_dictionary.rs
+++ b/parquet/src/arrow/array_reader/byte_array_dictionary.rs
@@ -184,6 +184,10 @@ where
         Ok(array)
     }
 
+    fn skip_records(&mut self, num_records: usize) -> Result<usize> {
+        self.record_reader.skip_records(num_records)
+    }
+
     fn get_def_levels(&self) -> Option<&[i16]> {
         self.def_levels_buffer
             .as_ref()
@@ -371,6 +375,10 @@ where
             }
         }
     }
+
+    fn skip_values(&mut self, _num_values: usize) -> Result<usize> {
+        Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
+    }
 }
 
 #[cfg(test)]
diff --git a/parquet/src/arrow/array_reader/complex_object_array.rs b/parquet/src/arrow/array_reader/complex_object_array.rs
index b91fde5c4..6e7585ff9 100644
--- a/parquet/src/arrow/array_reader/complex_object_array.rs
+++ b/parquet/src/arrow/array_reader/complex_object_array.rs
@@ -163,6 +163,13 @@ where
         Ok(array)
     }
 
+    fn skip_records(&mut self, num_records: usize) -> Result<usize> {
+        match self.column_reader.as_mut() {
+            Some(reader) => reader.skip_records(num_records),
+            None => Ok(0),
+        }
+    }
+
     fn get_def_levels(&self) -> Option<&[i16]> {
         self.def_levels_buffer.as_deref()
     }
diff --git a/parquet/src/arrow/array_reader/empty_array.rs b/parquet/src/arrow/array_reader/empty_array.rs
index 54b77becb..b06646cc1 100644
--- a/parquet/src/arrow/array_reader/empty_array.rs
+++ b/parquet/src/arrow/array_reader/empty_array.rs
@@ -65,6 +65,12 @@ impl ArrayReader for EmptyArrayReader {
         Ok(Arc::new(StructArray::from(data)))
     }
 
+    fn skip_records(&mut self, num_records: usize) -> Result<usize> {
+        let skipped = self.remaining_rows.min(num_records);
+        self.remaining_rows -= skipped;
+        Ok(skipped)
+    }
+
     fn get_def_levels(&self) -> Option<&[i16]> {
         None
     }
diff --git a/parquet/src/arrow/array_reader/list_array.rs b/parquet/src/arrow/array_reader/list_array.rs
index e1cd71b9e..3d612facd 100644
--- a/parquet/src/arrow/array_reader/list_array.rs
+++ b/parquet/src/arrow/array_reader/list_array.rs
@@ -231,6 +231,10 @@ impl<OffsetSize: OffsetSizeTrait> ArrayReader for ListArrayReader<OffsetSize> {
         Ok(Arc::new(result_array))
     }
 
+    fn skip_records(&mut self, num_records: usize) -> Result<usize> {
+        self.item_reader.skip_records(num_records)
+    }
+
     fn get_def_levels(&self) -> Option<&[i16]> {
         self.item_reader.get_def_levels()
     }
diff --git a/parquet/src/arrow/array_reader/map_array.rs b/parquet/src/arrow/array_reader/map_array.rs
index 92487ebbc..00c3db41a 100644
--- a/parquet/src/arrow/array_reader/map_array.rs
+++ b/parquet/src/arrow/array_reader/map_array.rs
@@ -149,6 +149,19 @@ impl ArrayReader for MapArrayReader {
         Ok(Arc::new(MapArray::from(array_data)))
     }
 
+    fn skip_records(&mut self, num_records: usize) -> Result<usize> {
+        let key_skipped = self.key_reader.skip_records(num_records)?;
+        let value_skipped = self.value_reader.skip_records(num_records)?;
+        if key_skipped != value_skipped {
+            return Err(general_err!(
+                "MapArrayReader out of sync, skipped {} keys and {} values",
+                key_skipped,
+                value_skipped
+            ));
+        }
+        Ok(key_skipped)
+    }
+
     fn get_def_levels(&self) -> Option<&[i16]> {
         // Children definition levels should describe the same parent structure,
         // so return key_reader only
diff --git a/parquet/src/arrow/array_reader/mod.rs b/parquet/src/arrow/array_reader/mod.rs
index dd65a3626..e30c33bba 100644
--- a/parquet/src/arrow/array_reader/mod.rs
+++ b/parquet/src/arrow/array_reader/mod.rs
@@ -64,6 +64,9 @@ pub trait ArrayReader: Send {
     /// Reads at most `batch_size` records into an arrow array and return it.
     fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef>;
 
+    /// Skips over `num_records` records, returning the number of rows skipped
+    fn skip_records(&mut self, num_records: usize) -> Result<usize>;
+
     /// If this array has a non-zero definition level, i.e. has a nullable parent
     /// array, returns the definition levels of data from the last call of `next_batch`
     ///
diff --git a/parquet/src/arrow/array_reader/null_array.rs b/parquet/src/arrow/array_reader/null_array.rs
index 4b592025d..b207d8b2c 100644
--- a/parquet/src/arrow/array_reader/null_array.rs
+++ b/parquet/src/arrow/array_reader/null_array.rs
@@ -96,6 +96,10 @@ where
         Ok(Arc::new(array))
     }
 
+    fn skip_records(&mut self, num_records: usize) -> Result<usize> {
+        self.record_reader.skip_records(num_records)
+    }
+
     fn get_def_levels(&self) -> Option<&[i16]> {
         self.def_levels_buffer.as_ref().map(|buf| buf.typed_data())
     }
diff --git a/parquet/src/arrow/array_reader/primitive_array.rs b/parquet/src/arrow/array_reader/primitive_array.rs
index c25df89a6..cb41d1fba 100644
--- a/parquet/src/arrow/array_reader/primitive_array.rs
+++ b/parquet/src/arrow/array_reader/primitive_array.rs
@@ -233,6 +233,10 @@ where
         Ok(array)
     }
 
+    fn skip_records(&mut self, num_records: usize) -> Result<usize> {
+        self.record_reader.skip_records(num_records)
+    }
+
     fn get_def_levels(&self) -> Option<&[i16]> {
         self.def_levels_buffer.as_ref().map(|buf| buf.typed_data())
     }
diff --git a/parquet/src/arrow/array_reader/struct_array.rs b/parquet/src/arrow/array_reader/struct_array.rs
index 30824d742..602c598f8 100644
--- a/parquet/src/arrow/array_reader/struct_array.rs
+++ b/parquet/src/arrow/array_reader/struct_array.rs
@@ -157,6 +157,26 @@ impl ArrayReader for StructArrayReader {
         Ok(Arc::new(StructArray::from(array_data)))
     }
 
+    fn skip_records(&mut self, num_records: usize) -> Result<usize> {
+        let mut skipped = None;
+        for child in self.children.iter_mut() {
+            let child_skipped = child.skip_records(num_records)?;
+            match skipped {
+                Some(expected) => {
+                    if expected != child_skipped {
+                        return Err(general_err!(
+                            "StructArrayReader out of sync, expected {} skipped, got {}",
+                            expected,
+                            child_skipped
+                        ));
+                    }
+                }
+                None => skipped = Some(child_skipped),
+            }
+        }
+        Ok(skipped.unwrap_or(0))
+    }
+
     fn get_def_levels(&self) -> Option<&[i16]> {
         // Children definition levels should describe the same
         // parent structure, so return first child's
diff --git a/parquet/src/arrow/array_reader/test_util.rs b/parquet/src/arrow/array_reader/test_util.rs
index 0c044eb2d..04c0f6c68 100644
--- a/parquet/src/arrow/array_reader/test_util.rs
+++ b/parquet/src/arrow/array_reader/test_util.rs
@@ -170,6 +170,11 @@ impl ArrayReader for InMemoryArrayReader {
         Ok(self.array.slice(self.last_idx, read))
     }
 
+    fn skip_records(&mut self, num_records: usize) -> Result<usize> {
+        let array = self.next_batch(num_records)?;
+        Ok(array.len())
+    }
+
     fn get_def_levels(&self) -> Option<&[i16]> {
         self.def_levels
             .as_ref()
diff --git a/parquet/src/arrow/arrow_reader.rs b/parquet/src/arrow/arrow_reader.rs
index c5d1f66e5..6a3270762 100644
--- a/parquet/src/arrow/arrow_reader.rs
+++ b/parquet/src/arrow/arrow_reader.rs
@@ -17,6 +17,7 @@
 
 //! Contains reader which reads parquet data into arrow [`RecordBatch`]
 
+use std::collections::VecDeque;
 use std::sync::Arc;
 
 use arrow::array::Array;
@@ -29,7 +30,7 @@ use crate::arrow::array_reader::{build_array_reader, ArrayReader};
 use crate::arrow::schema::parquet_to_arrow_schema;
 use crate::arrow::schema::parquet_to_arrow_schema_by_columns;
 use crate::arrow::ProjectionMask;
-use crate::errors::Result;
+use crate::errors::{ParquetError, Result};
 use crate::file::metadata::{KeyValue, ParquetMetaData};
 use crate::file::reader::{ChunkReader, FileReader, SerializedFileReader};
 use crate::schema::types::SchemaDescriptor;
@@ -70,9 +71,39 @@ pub trait ArrowReader {
     ) -> Result<Self::RecordReader>;
 }
 
+/// [`RowSelection`] allows selecting or skipping a provided number of rows
+/// when scanning the parquet file
+#[derive(Debug, Clone, Copy)]
+pub(crate) struct RowSelection {
+    /// The number of rows
+    pub row_count: usize,
+
+    /// If true, skip `row_count` rows
+    pub skip: bool,
+}
+
+impl RowSelection {
+    /// Select `row_count` rows
+    pub fn select(row_count: usize) -> Self {
+        Self {
+            row_count,
+            skip: false,
+        }
+    }
+
+    /// Skip `row_count` rows
+    pub fn skip(row_count: usize) -> Self {
+        Self {
+            row_count,
+            skip: true,
+        }
+    }
+}
+
 #[derive(Debug, Clone, Default)]
 pub struct ArrowReaderOptions {
     skip_arrow_metadata: bool,
+    selection: Option<Vec<RowSelection>>,
 }
 
 impl ArrowReaderOptions {
@@ -90,6 +121,20 @@ impl ArrowReaderOptions {
     pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
         Self {
             skip_arrow_metadata,
+            ..self
+        }
+    }
+
+    /// Scan rows from the parquet file according to the provided `selection`
+    ///
+    /// TODO: Make public once row selection fully implemented (#1792)
+    pub(crate) fn with_row_selection(
+        self,
+        selection: impl Into<Vec<RowSelection>>,
+    ) -> Self {
+        Self {
+            selection: Some(selection.into()),
+            ..self
         }
     }
 }
@@ -139,7 +184,12 @@ impl ArrowReader for ParquetFileArrowReader {
             Box::new(self.file_reader.clone()),
         )?;
 
-        ParquetRecordBatchReader::try_new(batch_size, array_reader)
+        let selection = self.options.selection.clone().map(Into::into);
+        Ok(ParquetRecordBatchReader::new(
+            batch_size,
+            array_reader,
+            selection,
+        ))
     }
 }
 
@@ -221,13 +271,47 @@ pub struct ParquetRecordBatchReader {
     batch_size: usize,
     array_reader: Box<dyn ArrayReader>,
     schema: SchemaRef,
+    selection: Option<VecDeque<RowSelection>>,
 }
 
 impl Iterator for ParquetRecordBatchReader {
     type Item = ArrowResult<RecordBatch>;
 
     fn next(&mut self) -> Option<Self::Item> {
-        match self.array_reader.next_batch(self.batch_size) {
+        let to_read = match self.selection.as_mut() {
+            Some(selection) => loop {
+                let front = selection.pop_front()?;
+                if front.skip {
+                    let skipped = match self.array_reader.skip_records(front.row_count) {
+                        Ok(skipped) => skipped,
+                        Err(e) => return Some(Err(e.into())),
+                    };
+
+                    if skipped != front.row_count {
+                        return Some(Err(general_err!(
+                            "failed to skip rows, expected {}, got {}",
+                            front.row_count,
+                            skipped
+                        )
+                        .into()));
+                    }
+                    continue;
+                }
+
+                let to_read = match front.row_count.checked_sub(self.batch_size) {
+                    Some(remaining) => {
+                        selection.push_front(RowSelection::skip(remaining));
+                        self.batch_size
+                    }
+                    None => front.row_count,
+                };
+
+                break to_read;
+            },
+            None => self.batch_size,
+        };
+
+        match self.array_reader.next_batch(to_read) {
             Err(error) => Some(Err(error.into())),
             Ok(array) => {
                 let struct_array =
@@ -257,16 +341,30 @@ impl ParquetRecordBatchReader {
         batch_size: usize,
         array_reader: Box<dyn ArrayReader>,
     ) -> Result<Self> {
+        Ok(Self::new(batch_size, array_reader, None))
+    }
+
+    /// Create a new [`ParquetRecordBatchReader`] that will read at most `batch_size` rows at
+    /// a time from [`ArrayReader`] based on the configured `selection`. If `selection` is `None`
+    /// all rows will be returned
+    ///
+    /// TODO: Make public once row selection fully implemented (#1792)
+    pub(crate) fn new(
+        batch_size: usize,
+        array_reader: Box<dyn ArrayReader>,
+        selection: Option<VecDeque<RowSelection>>,
+    ) -> Self {
         let schema = match array_reader.get_data_type() {
             ArrowType::Struct(ref fields) => Schema::new(fields.clone()),
             _ => unreachable!("Struct array reader's data type is not struct!"),
         };
 
-        Ok(Self {
+        Self {
             batch_size,
             array_reader,
             schema: Arc::new(schema),
-        })
+            selection,
+        }
     }
 }
 
diff --git a/parquet/src/arrow/async_reader.rs b/parquet/src/arrow/async_reader.rs
index 2400a0b6a..b251c2a82 100644
--- a/parquet/src/arrow/async_reader.rs
+++ b/parquet/src/arrow/async_reader.rs
@@ -97,7 +97,7 @@ use crate::arrow::arrow_reader::ParquetRecordBatchReader;
 use crate::arrow::schema::parquet_to_arrow_schema;
 use crate::arrow::ProjectionMask;
 use crate::basic::Compression;
-use crate::column::page::{Page, PageIterator, PageReader};
+use crate::column::page::{Page, PageIterator, PageMetadata, PageReader};
 use crate::compression::{create_codec, Codec};
 use crate::errors::{ParquetError, Result};
 use crate::file::footer::{decode_footer, decode_metadata};
@@ -551,6 +551,14 @@ impl PageReader for InMemoryColumnChunkReader {
         // We are at the end of this column chunk and no more page left. Return None.
         Ok(None)
     }
+
+    fn peek_next_page(&self) -> Result<Option<PageMetadata>> {
+        Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
+    }
+
+    fn skip_next_page(&mut self) -> Result<()> {
+        Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
+    }
 }
 
 /// Implements [`PageIterator`] for a single column chunk, yielding a single [`PageReader`]
diff --git a/parquet/src/arrow/record_reader/definition_levels.rs b/parquet/src/arrow/record_reader/definition_levels.rs
index 9cca25c8a..21526f21f 100644
--- a/parquet/src/arrow/record_reader/definition_levels.rs
+++ b/parquet/src/arrow/record_reader/definition_levels.rs
@@ -25,7 +25,7 @@ use crate::arrow::buffer::bit_util::count_set_bits;
 use crate::arrow::record_reader::buffer::BufferQueue;
 use crate::basic::Encoding;
 use crate::column::reader::decoder::{
-    ColumnLevelDecoder, ColumnLevelDecoderImpl, LevelsBufferSlice,
+    ColumnLevelDecoder, ColumnLevelDecoderImpl, DefinitionLevelDecoder, LevelsBufferSlice,
 };
 use crate::errors::{ParquetError, Result};
 use crate::schema::types::ColumnDescPtr;
@@ -146,7 +146,7 @@ impl LevelsBufferSlice for DefinitionLevelBuffer {
     }
 }
 
-pub struct DefinitionLevelDecoder {
+pub struct DefinitionLevelBufferDecoder {
     max_level: i16,
     encoding: Encoding,
     data: Option<ByteBufferPtr>,
@@ -154,7 +154,7 @@ pub struct DefinitionLevelDecoder {
     packed_decoder: Option<PackedDecoder>,
 }
 
-impl ColumnLevelDecoder for DefinitionLevelDecoder {
+impl ColumnLevelDecoder for DefinitionLevelBufferDecoder {
     type Slice = DefinitionLevelBuffer;
 
     fn new(max_level: i16, encoding: Encoding, data: ByteBufferPtr) -> Self {
@@ -223,6 +223,16 @@ impl ColumnLevelDecoder for DefinitionLevelDecoder {
     }
 }
 
+impl DefinitionLevelDecoder for DefinitionLevelBufferDecoder {
+    fn skip_def_levels(
+        &mut self,
+        _num_levels: usize,
+        _max_def_level: i16,
+    ) -> Result<(usize, usize)> {
+        Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
+    }
+}
+
 /// An optimized decoder for decoding [RLE] and [BIT_PACKED] data with a bit width of 1
 /// directly into a bitmask
 ///
diff --git a/parquet/src/arrow/record_reader/mod.rs b/parquet/src/arrow/record_reader/mod.rs
index af75dbb49..046e01d46 100644
--- a/parquet/src/arrow/record_reader/mod.rs
+++ b/parquet/src/arrow/record_reader/mod.rs
@@ -22,7 +22,7 @@ use arrow::buffer::Buffer;
 
 use crate::arrow::record_reader::{
     buffer::{BufferQueue, ScalarBuffer, ValuesBuffer},
-    definition_levels::{DefinitionLevelBuffer, DefinitionLevelDecoder},
+    definition_levels::{DefinitionLevelBuffer, DefinitionLevelBufferDecoder},
 };
 use crate::column::{
     page::PageReader,
@@ -56,8 +56,9 @@ pub struct GenericRecordReader<V, CV> {
     records: V,
     def_levels: Option<DefinitionLevelBuffer>,
     rep_levels: Option<ScalarBuffer<i16>>,
-    column_reader:
-        Option<GenericColumnReader<ColumnLevelDecoderImpl, DefinitionLevelDecoder, CV>>,
+    column_reader: Option<
+        GenericColumnReader<ColumnLevelDecoderImpl, DefinitionLevelBufferDecoder, CV>,
+    >,
 
     /// Number of records accumulated in records
     num_records: usize,
@@ -202,6 +203,37 @@ where
         Ok(records_read)
     }
 
+    /// Try to skip the next `num_records` rows
+    ///
+    /// # Returns
+    ///
+    /// Number of records skipped
+    pub fn skip_records(&mut self, num_records: usize) -> Result<usize> {
+        // First need to clear the buffer
+        let (buffered_records, buffered_values) = self.count_records(num_records);
+        self.num_records += buffered_records;
+        self.num_values += buffered_values;
+
+        self.consume_def_levels();
+        self.consume_rep_levels();
+        self.consume_record_data();
+        self.consume_bitmap();
+        self.reset();
+
+        let remaining = num_records - buffered_records;
+
+        if remaining == 0 {
+            return Ok(buffered_records);
+        }
+
+        let skipped = match self.column_reader.as_mut() {
+            Some(column_reader) => column_reader.skip_records(remaining)?,
+            None => 0,
+        };
+
+        Ok(skipped + buffered_records)
+    }
+
     /// Returns number of records stored in buffer.
     pub fn num_records(&self) -> usize {
         self.num_records
diff --git a/parquet/src/column/page.rs b/parquet/src/column/page.rs
index 9364bd30f..d667af712 100644
--- a/parquet/src/column/page.rs
+++ b/parquet/src/column/page.rs
@@ -187,12 +187,29 @@ impl PageWriteSpec {
     }
 }
 
+/// Contains metadata for a page
+pub struct PageMetadata {
+    /// The number of rows in this page
+    pub num_rows: usize,
+
+    /// Returns true if the page is a dictionary page
+    pub is_dict: bool,
+}
+
 /// API for reading pages from a column chunk.
 /// This offers a iterator like API to get the next page.
 pub trait PageReader: Iterator<Item = Result<Page>> + Send {
     /// Gets the next page in the column chunk associated with this reader.
     /// Returns `None` if there are no pages left.
     fn get_next_page(&mut self) -> Result<Option<Page>>;
+
+    /// Gets metadata about the next page, returns an error if no
+    /// column index information
+    fn peek_next_page(&self) -> Result<Option<PageMetadata>>;
+
+    /// Skips reading the next page, returns an error if no
+    /// column index information
+    fn skip_next_page(&mut self) -> Result<()>;
 }
 
 /// API for writing pages in a column chunk.
diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs
index a97787ccf..35e725b19 100644
--- a/parquet/src/column/reader.rs
+++ b/parquet/src/column/reader.rs
@@ -22,7 +22,8 @@ use std::cmp::min;
 use super::page::{Page, PageReader};
 use crate::basic::*;
 use crate::column::reader::decoder::{
-    ColumnLevelDecoder, ColumnValueDecoder, LevelsBufferSlice, ValuesBufferSlice,
+    ColumnValueDecoder, DefinitionLevelDecoder, LevelsBufferSlice,
+    RepetitionLevelDecoder, ValuesBufferSlice,
 };
 use crate::data_type::*;
 use crate::errors::{ParquetError, Result};
@@ -137,8 +138,8 @@ pub struct GenericColumnReader<R, D, V> {
 
 impl<R, D, V> GenericColumnReader<R, D, V>
 where
-    R: ColumnLevelDecoder,
-    D: ColumnLevelDecoder,
+    R: RepetitionLevelDecoder,
+    D: DefinitionLevelDecoder,
     V: ColumnValueDecoder,
 {
     /// Creates new column reader based on column descriptor and page reader.
@@ -271,6 +272,72 @@ where
         Ok((values_read, levels_read))
     }
 
+    /// Skips over `num_records` records, where records are delimited by repetition levels of 0
+    ///
+    /// # Returns
+    ///
+    /// Returns the number of records skipped
+    pub fn skip_records(&mut self, num_records: usize) -> Result<usize> {
+        let mut remaining = num_records;
+        while remaining != 0 {
+            if self.num_buffered_values == self.num_decoded_values {
+                let metadata = match self.page_reader.peek_next_page()? {
+                    None => return Ok(num_records - remaining),
+                    Some(metadata) => metadata,
+                };
+
+                // If dictionary, we must read it
+                if metadata.is_dict {
+                    self.read_new_page()?;
+                    continue;
+                }
+
+                // If page has less rows than the remaining records to
+                // be skipped, skip entire page
+                if metadata.num_rows < remaining {
+                    self.page_reader.skip_next_page()?;
+                    remaining -= metadata.num_rows;
+                    continue;
+                }
+            }
+
+            let to_read = remaining
+                .min((self.num_buffered_values - self.num_decoded_values) as usize);
+
+            let (records_read, rep_levels_read) = match self.rep_level_decoder.as_mut() {
+                Some(decoder) => decoder.skip_rep_levels(to_read)?,
+                None => (to_read, to_read),
+            };
+
+            let (values_read, def_levels_read) = match self.def_level_decoder.as_mut() {
+                Some(decoder) => decoder
+                    .skip_def_levels(rep_levels_read, self.descr.max_def_level())?,
+                None => (rep_levels_read, rep_levels_read),
+            };
+
+            if rep_levels_read != def_levels_read {
+                return Err(general_err!(
+                    "levels mismatch, read {} repetition levels and {} definition levels",
+                    rep_levels_read,
+                    def_levels_read
+                ));
+            }
+
+            let values = self.values_decoder.skip_values(values_read)?;
+            if values != values_read {
+                return Err(general_err!(
+                    "skipped {} values, expected {}",
+                    values,
+                    values_read
+                ));
+            }
+
+            self.num_decoded_values += rep_levels_read as u32;
+            remaining -= records_read;
+        }
+        Ok(num_records - remaining)
+    }
+
     /// Reads a new page and set up the decoders for levels, values or dictionary.
     /// Returns false if there's no page left.
     fn read_new_page(&mut self) -> Result<bool> {
diff --git a/parquet/src/column/reader/decoder.rs b/parquet/src/column/reader/decoder.rs
index 9f1799133..6fefdca23 100644
--- a/parquet/src/column/reader/decoder.rs
+++ b/parquet/src/column/reader/decoder.rs
@@ -81,6 +81,25 @@ pub trait ColumnLevelDecoder {
     fn read(&mut self, out: &mut Self::Slice, range: Range<usize>) -> Result<usize>;
 }
 
+pub trait RepetitionLevelDecoder: ColumnLevelDecoder {
+    /// Skips over repetition level corresponding to `num_records` records, where a record
+    /// is delimited by a repetition level of 0
+    ///
+    /// Returns the number of records skipped, and the number of levels skipped
+    fn skip_rep_levels(&mut self, num_records: usize) -> Result<(usize, usize)>;
+}
+
+pub trait DefinitionLevelDecoder: ColumnLevelDecoder {
+    /// Skips over `num_levels` definition levels
+    ///
+    /// Returns the number of values skipped, and the number of levels skipped
+    fn skip_def_levels(
+        &mut self,
+        num_levels: usize,
+        max_def_level: i16,
+    ) -> Result<(usize, usize)>;
+}
+
 /// Decodes value data to a [`ValuesBufferSlice`]
 pub trait ColumnValueDecoder {
     type Slice: ValuesBufferSlice + ?Sized;
@@ -126,6 +145,11 @@ pub trait ColumnValueDecoder {
     /// Implementations may panic if `range` overlaps with already written data
     ///
     fn read(&mut self, out: &mut Self::Slice, range: Range<usize>) -> Result<usize>;
+
+    /// Skips over `num_values` values
+    ///
+    /// Returns the number of values skipped
+    fn skip_values(&mut self, num_values: usize) -> Result<usize>;
 }
 
 /// An implementation of [`ColumnValueDecoder`] for `[T::T]`
@@ -225,6 +249,10 @@ impl<T: DataType> ColumnValueDecoder for ColumnValueDecoderImpl<T> {
 
         current_decoder.get(&mut out[range])
     }
+
+    fn skip_values(&mut self, _num_values: usize) -> Result<usize> {
+        Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
+    }
 }
 
 /// An implementation of [`ColumnLevelDecoder`] for `[i16]`
@@ -266,3 +294,19 @@ impl ColumnLevelDecoder for ColumnLevelDecoderImpl {
         }
     }
 }
+
+impl DefinitionLevelDecoder for ColumnLevelDecoderImpl {
+    fn skip_def_levels(
+        &mut self,
+        _num_levels: usize,
+        _max_def_level: i16,
+    ) -> Result<(usize, usize)> {
+        Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
+    }
+}
+
+impl RepetitionLevelDecoder for ColumnLevelDecoderImpl {
+    fn skip_rep_levels(&mut self, _num_records: usize) -> Result<(usize, usize)> {
+        Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
+    }
+}
diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs
index 6ff73e041..1dfd1eb45 100644
--- a/parquet/src/file/serialized_reader.rs
+++ b/parquet/src/file/serialized_reader.rs
@@ -25,7 +25,7 @@ use parquet_format::{PageHeader, PageType};
 use thrift::protocol::TCompactInputProtocol;
 
 use crate::basic::{Compression, Encoding, Type};
-use crate::column::page::{Page, PageReader};
+use crate::column::page::{Page, PageMetadata, PageReader};
 use crate::compression::{create_codec, Codec};
 use crate::errors::{ParquetError, Result};
 use crate::file::page_index::index_reader;
@@ -555,6 +555,14 @@ impl<T: Read + Send> PageReader for SerializedPageReader<T> {
         // We are at the end of this column chunk and no more page left. Return None.
         Ok(None)
     }
+
+    fn peek_next_page(&self) -> Result<Option<PageMetadata>> {
+        Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
+    }
+
+    fn skip_next_page(&mut self) -> Result<()> {
+        Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
+    }
 }
 
 #[cfg(test)]
diff --git a/parquet/src/util/test_common/page_util.rs b/parquet/src/util/test_common/page_util.rs
index 3719d280a..0b70c38ad 100644
--- a/parquet/src/util/test_common/page_util.rs
+++ b/parquet/src/util/test_common/page_util.rs
@@ -16,7 +16,7 @@
 // under the License.
 
 use crate::basic::Encoding;
-use crate::column::page::PageReader;
+use crate::column::page::{PageMetadata, PageReader};
 use crate::column::page::{Page, PageIterator};
 use crate::data_type::DataType;
 use crate::encodings::encoding::{get_encoder, DictEncoder, Encoder};
@@ -172,6 +172,14 @@ impl<P: Iterator<Item = Page> + Send> PageReader for InMemoryPageReader<P> {
     fn get_next_page(&mut self) -> Result<Option<Page>> {
         Ok(self.page_iter.next())
     }
+
+    fn peek_next_page(&self) -> Result<Option<PageMetadata>> {
+        unimplemented!()
+    }
+
+    fn skip_next_page(&mut self) -> Result<()> {
+        unimplemented!()
+    }
 }
 
 impl<P: Iterator<Item = Page> + Send> Iterator for InMemoryPageReader<P> {