You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/07/07 16:05:23 UTC
[arrow-rs] branch master updated: Stub out Skip Records API (#1792) (#1998)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new e59b02348 Stub out Skip Records API (#1792) (#1998)
e59b02348 is described below
commit e59b023480437f67e84ba2f827b58f78fd44c3a1
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Thu Jul 7 12:05:18 2022 -0400
Stub out Skip Records API (#1792) (#1998)
* Stub API for parquet record skipping
* Update parquet/src/arrow/record_reader/mod.rs
Co-authored-by: Yang Jiang <ji...@163.com>
* Remove empty google.protobuf.rs
* Replace todo with nyi_err
* Update doc comment
Co-authored-by: Yang Jiang <ji...@163.com>
---
parquet/src/arrow/array_reader/byte_array.rs | 8 ++
.../arrow/array_reader/byte_array_dictionary.rs | 8 ++
.../src/arrow/array_reader/complex_object_array.rs | 7 ++
parquet/src/arrow/array_reader/empty_array.rs | 6 ++
parquet/src/arrow/array_reader/list_array.rs | 4 +
parquet/src/arrow/array_reader/map_array.rs | 13 +++
parquet/src/arrow/array_reader/mod.rs | 3 +
parquet/src/arrow/array_reader/null_array.rs | 4 +
parquet/src/arrow/array_reader/primitive_array.rs | 4 +
parquet/src/arrow/array_reader/struct_array.rs | 20 ++++
parquet/src/arrow/array_reader/test_util.rs | 5 +
parquet/src/arrow/arrow_reader.rs | 108 ++++++++++++++++++++-
parquet/src/arrow/async_reader.rs | 10 +-
.../src/arrow/record_reader/definition_levels.rs | 16 ++-
parquet/src/arrow/record_reader/mod.rs | 38 +++++++-
parquet/src/column/page.rs | 17 ++++
parquet/src/column/reader.rs | 73 +++++++++++++-
parquet/src/column/reader/decoder.rs | 44 +++++++++
parquet/src/file/serialized_reader.rs | 10 +-
parquet/src/util/test_common/page_util.rs | 10 +-
20 files changed, 391 insertions(+), 17 deletions(-)
diff --git a/parquet/src/arrow/array_reader/byte_array.rs b/parquet/src/arrow/array_reader/byte_array.rs
index 95620d940..b762236c4 100644
--- a/parquet/src/arrow/array_reader/byte_array.rs
+++ b/parquet/src/arrow/array_reader/byte_array.rs
@@ -122,6 +122,10 @@ impl<I: OffsetSizeTrait + ScalarValue> ArrayReader for ByteArrayReader<I> {
Ok(buffer.into_array(null_buffer, self.data_type.clone()))
}
+ fn skip_records(&mut self, num_records: usize) -> Result<usize> {
+ self.record_reader.skip_records(num_records)
+ }
+
fn get_def_levels(&self) -> Option<&[i16]> {
self.def_levels_buffer
.as_ref()
@@ -210,6 +214,10 @@ impl<I: OffsetSizeTrait + ScalarValue> ColumnValueDecoder
decoder.read(out, range.end - range.start, self.dict.as_ref())
}
+
+ fn skip_values(&mut self, _num_values: usize) -> Result<usize> {
+ Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
+ }
}
/// A generic decoder from uncompressed parquet value data to [`OffsetBuffer`]
diff --git a/parquet/src/arrow/array_reader/byte_array_dictionary.rs b/parquet/src/arrow/array_reader/byte_array_dictionary.rs
index 77f7916ed..bfe557499 100644
--- a/parquet/src/arrow/array_reader/byte_array_dictionary.rs
+++ b/parquet/src/arrow/array_reader/byte_array_dictionary.rs
@@ -184,6 +184,10 @@ where
Ok(array)
}
+ fn skip_records(&mut self, num_records: usize) -> Result<usize> {
+ self.record_reader.skip_records(num_records)
+ }
+
fn get_def_levels(&self) -> Option<&[i16]> {
self.def_levels_buffer
.as_ref()
@@ -371,6 +375,10 @@ where
}
}
}
+
+ fn skip_values(&mut self, _num_values: usize) -> Result<usize> {
+ Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
+ }
}
#[cfg(test)]
diff --git a/parquet/src/arrow/array_reader/complex_object_array.rs b/parquet/src/arrow/array_reader/complex_object_array.rs
index b91fde5c4..6e7585ff9 100644
--- a/parquet/src/arrow/array_reader/complex_object_array.rs
+++ b/parquet/src/arrow/array_reader/complex_object_array.rs
@@ -163,6 +163,13 @@ where
Ok(array)
}
+ fn skip_records(&mut self, num_records: usize) -> Result<usize> {
+ match self.column_reader.as_mut() {
+ Some(reader) => reader.skip_records(num_records),
+ None => Ok(0),
+ }
+ }
+
fn get_def_levels(&self) -> Option<&[i16]> {
self.def_levels_buffer.as_deref()
}
diff --git a/parquet/src/arrow/array_reader/empty_array.rs b/parquet/src/arrow/array_reader/empty_array.rs
index 54b77becb..b06646cc1 100644
--- a/parquet/src/arrow/array_reader/empty_array.rs
+++ b/parquet/src/arrow/array_reader/empty_array.rs
@@ -65,6 +65,12 @@ impl ArrayReader for EmptyArrayReader {
Ok(Arc::new(StructArray::from(data)))
}
+ fn skip_records(&mut self, num_records: usize) -> Result<usize> {
+ let skipped = self.remaining_rows.min(num_records);
+ self.remaining_rows -= skipped;
+ Ok(skipped)
+ }
+
fn get_def_levels(&self) -> Option<&[i16]> {
None
}
diff --git a/parquet/src/arrow/array_reader/list_array.rs b/parquet/src/arrow/array_reader/list_array.rs
index e1cd71b9e..3d612facd 100644
--- a/parquet/src/arrow/array_reader/list_array.rs
+++ b/parquet/src/arrow/array_reader/list_array.rs
@@ -231,6 +231,10 @@ impl<OffsetSize: OffsetSizeTrait> ArrayReader for ListArrayReader<OffsetSize> {
Ok(Arc::new(result_array))
}
+ fn skip_records(&mut self, num_records: usize) -> Result<usize> {
+ self.item_reader.skip_records(num_records)
+ }
+
fn get_def_levels(&self) -> Option<&[i16]> {
self.item_reader.get_def_levels()
}
diff --git a/parquet/src/arrow/array_reader/map_array.rs b/parquet/src/arrow/array_reader/map_array.rs
index 92487ebbc..00c3db41a 100644
--- a/parquet/src/arrow/array_reader/map_array.rs
+++ b/parquet/src/arrow/array_reader/map_array.rs
@@ -149,6 +149,19 @@ impl ArrayReader for MapArrayReader {
Ok(Arc::new(MapArray::from(array_data)))
}
+ fn skip_records(&mut self, num_records: usize) -> Result<usize> {
+ let key_skipped = self.key_reader.skip_records(num_records)?;
+ let value_skipped = self.value_reader.skip_records(num_records)?;
+ if key_skipped != value_skipped {
+ return Err(general_err!(
+ "MapArrayReader out of sync, skipped {} keys and {} values",
+ key_skipped,
+ value_skipped
+ ));
+ }
+ Ok(key_skipped)
+ }
+
fn get_def_levels(&self) -> Option<&[i16]> {
// Children definition levels should describe the same parent structure,
// so return key_reader only
diff --git a/parquet/src/arrow/array_reader/mod.rs b/parquet/src/arrow/array_reader/mod.rs
index dd65a3626..e30c33bba 100644
--- a/parquet/src/arrow/array_reader/mod.rs
+++ b/parquet/src/arrow/array_reader/mod.rs
@@ -64,6 +64,9 @@ pub trait ArrayReader: Send {
/// Reads at most `batch_size` records into an arrow array and return it.
fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef>;
+ /// Skips over `num_records` records, returning the number of rows skipped
+ fn skip_records(&mut self, num_records: usize) -> Result<usize>;
+
/// If this array has a non-zero definition level, i.e. has a nullable parent
/// array, returns the definition levels of data from the last call of `next_batch`
///
diff --git a/parquet/src/arrow/array_reader/null_array.rs b/parquet/src/arrow/array_reader/null_array.rs
index 4b592025d..b207d8b2c 100644
--- a/parquet/src/arrow/array_reader/null_array.rs
+++ b/parquet/src/arrow/array_reader/null_array.rs
@@ -96,6 +96,10 @@ where
Ok(Arc::new(array))
}
+ fn skip_records(&mut self, num_records: usize) -> Result<usize> {
+ self.record_reader.skip_records(num_records)
+ }
+
fn get_def_levels(&self) -> Option<&[i16]> {
self.def_levels_buffer.as_ref().map(|buf| buf.typed_data())
}
diff --git a/parquet/src/arrow/array_reader/primitive_array.rs b/parquet/src/arrow/array_reader/primitive_array.rs
index c25df89a6..cb41d1fba 100644
--- a/parquet/src/arrow/array_reader/primitive_array.rs
+++ b/parquet/src/arrow/array_reader/primitive_array.rs
@@ -233,6 +233,10 @@ where
Ok(array)
}
+ fn skip_records(&mut self, num_records: usize) -> Result<usize> {
+ self.record_reader.skip_records(num_records)
+ }
+
fn get_def_levels(&self) -> Option<&[i16]> {
self.def_levels_buffer.as_ref().map(|buf| buf.typed_data())
}
diff --git a/parquet/src/arrow/array_reader/struct_array.rs b/parquet/src/arrow/array_reader/struct_array.rs
index 30824d742..602c598f8 100644
--- a/parquet/src/arrow/array_reader/struct_array.rs
+++ b/parquet/src/arrow/array_reader/struct_array.rs
@@ -157,6 +157,26 @@ impl ArrayReader for StructArrayReader {
Ok(Arc::new(StructArray::from(array_data)))
}
+ fn skip_records(&mut self, num_records: usize) -> Result<usize> {
+ let mut skipped = None;
+ for child in self.children.iter_mut() {
+ let child_skipped = child.skip_records(num_records)?;
+ match skipped {
+ Some(expected) => {
+ if expected != child_skipped {
+ return Err(general_err!(
+ "StructArrayReader out of sync, expected {} skipped, got {}",
+ expected,
+ child_skipped
+ ));
+ }
+ }
+ None => skipped = Some(child_skipped),
+ }
+ }
+ Ok(skipped.unwrap_or(0))
+ }
+
fn get_def_levels(&self) -> Option<&[i16]> {
// Children definition levels should describe the same
// parent structure, so return first child's
diff --git a/parquet/src/arrow/array_reader/test_util.rs b/parquet/src/arrow/array_reader/test_util.rs
index 0c044eb2d..04c0f6c68 100644
--- a/parquet/src/arrow/array_reader/test_util.rs
+++ b/parquet/src/arrow/array_reader/test_util.rs
@@ -170,6 +170,11 @@ impl ArrayReader for InMemoryArrayReader {
Ok(self.array.slice(self.last_idx, read))
}
+ fn skip_records(&mut self, num_records: usize) -> Result<usize> {
+ let array = self.next_batch(num_records)?;
+ Ok(array.len())
+ }
+
fn get_def_levels(&self) -> Option<&[i16]> {
self.def_levels
.as_ref()
diff --git a/parquet/src/arrow/arrow_reader.rs b/parquet/src/arrow/arrow_reader.rs
index c5d1f66e5..6a3270762 100644
--- a/parquet/src/arrow/arrow_reader.rs
+++ b/parquet/src/arrow/arrow_reader.rs
@@ -17,6 +17,7 @@
//! Contains reader which reads parquet data into arrow [`RecordBatch`]
+use std::collections::VecDeque;
use std::sync::Arc;
use arrow::array::Array;
@@ -29,7 +30,7 @@ use crate::arrow::array_reader::{build_array_reader, ArrayReader};
use crate::arrow::schema::parquet_to_arrow_schema;
use crate::arrow::schema::parquet_to_arrow_schema_by_columns;
use crate::arrow::ProjectionMask;
-use crate::errors::Result;
+use crate::errors::{ParquetError, Result};
use crate::file::metadata::{KeyValue, ParquetMetaData};
use crate::file::reader::{ChunkReader, FileReader, SerializedFileReader};
use crate::schema::types::SchemaDescriptor;
@@ -70,9 +71,39 @@ pub trait ArrowReader {
) -> Result<Self::RecordReader>;
}
+/// [`RowSelection`] allows selecting or skipping a provided number of rows
+/// when scanning the parquet file
+#[derive(Debug, Clone, Copy)]
+pub(crate) struct RowSelection {
+ /// The number of rows
+ pub row_count: usize,
+
+ /// If true, skip `row_count` rows
+ pub skip: bool,
+}
+
+impl RowSelection {
+ /// Select `row_count` rows
+ pub fn select(row_count: usize) -> Self {
+ Self {
+ row_count,
+ skip: false,
+ }
+ }
+
+ /// Skip `row_count` rows
+ pub fn skip(row_count: usize) -> Self {
+ Self {
+ row_count,
+ skip: true,
+ }
+ }
+}
+
#[derive(Debug, Clone, Default)]
pub struct ArrowReaderOptions {
skip_arrow_metadata: bool,
+ selection: Option<Vec<RowSelection>>,
}
impl ArrowReaderOptions {
@@ -90,6 +121,20 @@ impl ArrowReaderOptions {
pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
Self {
skip_arrow_metadata,
+ ..self
+ }
+ }
+
+ /// Scan rows from the parquet file according to the provided `selection`
+ ///
+ /// TODO: Make public once row selection fully implemented (#1792)
+ pub(crate) fn with_row_selection(
+ self,
+ selection: impl Into<Vec<RowSelection>>,
+ ) -> Self {
+ Self {
+ selection: Some(selection.into()),
+ ..self
}
}
}
@@ -139,7 +184,12 @@ impl ArrowReader for ParquetFileArrowReader {
Box::new(self.file_reader.clone()),
)?;
- ParquetRecordBatchReader::try_new(batch_size, array_reader)
+ let selection = self.options.selection.clone().map(Into::into);
+ Ok(ParquetRecordBatchReader::new(
+ batch_size,
+ array_reader,
+ selection,
+ ))
}
}
@@ -221,13 +271,47 @@ pub struct ParquetRecordBatchReader {
batch_size: usize,
array_reader: Box<dyn ArrayReader>,
schema: SchemaRef,
+ selection: Option<VecDeque<RowSelection>>,
}
impl Iterator for ParquetRecordBatchReader {
type Item = ArrowResult<RecordBatch>;
fn next(&mut self) -> Option<Self::Item> {
- match self.array_reader.next_batch(self.batch_size) {
+ let to_read = match self.selection.as_mut() {
+ Some(selection) => loop {
+ let front = selection.pop_front()?;
+ if front.skip {
+ let skipped = match self.array_reader.skip_records(front.row_count) {
+ Ok(skipped) => skipped,
+ Err(e) => return Some(Err(e.into())),
+ };
+
+ if skipped != front.row_count {
+ return Some(Err(general_err!(
+ "failed to skip rows, expected {}, got {}",
+ front.row_count,
+ skipped
+ )
+ .into()));
+ }
+ continue;
+ }
+
+ let to_read = match front.row_count.checked_sub(self.batch_size) {
+ Some(remaining) => {
+ selection.push_front(RowSelection::skip(remaining));
+ self.batch_size
+ }
+ None => front.row_count,
+ };
+
+ break to_read;
+ },
+ None => self.batch_size,
+ };
+
+ match self.array_reader.next_batch(to_read) {
Err(error) => Some(Err(error.into())),
Ok(array) => {
let struct_array =
@@ -257,16 +341,30 @@ impl ParquetRecordBatchReader {
batch_size: usize,
array_reader: Box<dyn ArrayReader>,
) -> Result<Self> {
+ Ok(Self::new(batch_size, array_reader, None))
+ }
+
+ /// Create a new [`ParquetRecordBatchReader`] that will read at most `batch_size` rows at
+ /// a time from [`ArrayReader`] based on the configured `selection`. If `selection` is `None`
+ /// all rows will be returned
+ ///
+ /// TODO: Make public once row selection fully implemented (#1792)
+ pub(crate) fn new(
+ batch_size: usize,
+ array_reader: Box<dyn ArrayReader>,
+ selection: Option<VecDeque<RowSelection>>,
+ ) -> Self {
let schema = match array_reader.get_data_type() {
ArrowType::Struct(ref fields) => Schema::new(fields.clone()),
_ => unreachable!("Struct array reader's data type is not struct!"),
};
- Ok(Self {
+ Self {
batch_size,
array_reader,
schema: Arc::new(schema),
- })
+ selection,
+ }
}
}
diff --git a/parquet/src/arrow/async_reader.rs b/parquet/src/arrow/async_reader.rs
index 2400a0b6a..b251c2a82 100644
--- a/parquet/src/arrow/async_reader.rs
+++ b/parquet/src/arrow/async_reader.rs
@@ -97,7 +97,7 @@ use crate::arrow::arrow_reader::ParquetRecordBatchReader;
use crate::arrow::schema::parquet_to_arrow_schema;
use crate::arrow::ProjectionMask;
use crate::basic::Compression;
-use crate::column::page::{Page, PageIterator, PageReader};
+use crate::column::page::{Page, PageIterator, PageMetadata, PageReader};
use crate::compression::{create_codec, Codec};
use crate::errors::{ParquetError, Result};
use crate::file::footer::{decode_footer, decode_metadata};
@@ -551,6 +551,14 @@ impl PageReader for InMemoryColumnChunkReader {
// We are at the end of this column chunk and no more page left. Return None.
Ok(None)
}
+
+ fn peek_next_page(&self) -> Result<Option<PageMetadata>> {
+ Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
+ }
+
+ fn skip_next_page(&mut self) -> Result<()> {
+ Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
+ }
}
/// Implements [`PageIterator`] for a single column chunk, yielding a single [`PageReader`]
diff --git a/parquet/src/arrow/record_reader/definition_levels.rs b/parquet/src/arrow/record_reader/definition_levels.rs
index 9cca25c8a..21526f21f 100644
--- a/parquet/src/arrow/record_reader/definition_levels.rs
+++ b/parquet/src/arrow/record_reader/definition_levels.rs
@@ -25,7 +25,7 @@ use crate::arrow::buffer::bit_util::count_set_bits;
use crate::arrow::record_reader::buffer::BufferQueue;
use crate::basic::Encoding;
use crate::column::reader::decoder::{
- ColumnLevelDecoder, ColumnLevelDecoderImpl, LevelsBufferSlice,
+ ColumnLevelDecoder, ColumnLevelDecoderImpl, DefinitionLevelDecoder, LevelsBufferSlice,
};
use crate::errors::{ParquetError, Result};
use crate::schema::types::ColumnDescPtr;
@@ -146,7 +146,7 @@ impl LevelsBufferSlice for DefinitionLevelBuffer {
}
}
-pub struct DefinitionLevelDecoder {
+pub struct DefinitionLevelBufferDecoder {
max_level: i16,
encoding: Encoding,
data: Option<ByteBufferPtr>,
@@ -154,7 +154,7 @@ pub struct DefinitionLevelDecoder {
packed_decoder: Option<PackedDecoder>,
}
-impl ColumnLevelDecoder for DefinitionLevelDecoder {
+impl ColumnLevelDecoder for DefinitionLevelBufferDecoder {
type Slice = DefinitionLevelBuffer;
fn new(max_level: i16, encoding: Encoding, data: ByteBufferPtr) -> Self {
@@ -223,6 +223,16 @@ impl ColumnLevelDecoder for DefinitionLevelDecoder {
}
}
+impl DefinitionLevelDecoder for DefinitionLevelBufferDecoder {
+ fn skip_def_levels(
+ &mut self,
+ _num_levels: usize,
+ _max_def_level: i16,
+ ) -> Result<(usize, usize)> {
+ Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
+ }
+}
+
/// An optimized decoder for decoding [RLE] and [BIT_PACKED] data with a bit width of 1
/// directly into a bitmask
///
diff --git a/parquet/src/arrow/record_reader/mod.rs b/parquet/src/arrow/record_reader/mod.rs
index af75dbb49..046e01d46 100644
--- a/parquet/src/arrow/record_reader/mod.rs
+++ b/parquet/src/arrow/record_reader/mod.rs
@@ -22,7 +22,7 @@ use arrow::buffer::Buffer;
use crate::arrow::record_reader::{
buffer::{BufferQueue, ScalarBuffer, ValuesBuffer},
- definition_levels::{DefinitionLevelBuffer, DefinitionLevelDecoder},
+ definition_levels::{DefinitionLevelBuffer, DefinitionLevelBufferDecoder},
};
use crate::column::{
page::PageReader,
@@ -56,8 +56,9 @@ pub struct GenericRecordReader<V, CV> {
records: V,
def_levels: Option<DefinitionLevelBuffer>,
rep_levels: Option<ScalarBuffer<i16>>,
- column_reader:
- Option<GenericColumnReader<ColumnLevelDecoderImpl, DefinitionLevelDecoder, CV>>,
+ column_reader: Option<
+ GenericColumnReader<ColumnLevelDecoderImpl, DefinitionLevelBufferDecoder, CV>,
+ >,
/// Number of records accumulated in records
num_records: usize,
@@ -202,6 +203,37 @@ where
Ok(records_read)
}
+ /// Try to skip the next `num_records` rows
+ ///
+ /// # Returns
+ ///
+ /// Number of records skipped
+ pub fn skip_records(&mut self, num_records: usize) -> Result<usize> {
+ // First need to clear the buffer
+ let (buffered_records, buffered_values) = self.count_records(num_records);
+ self.num_records += buffered_records;
+ self.num_values += buffered_values;
+
+ self.consume_def_levels();
+ self.consume_rep_levels();
+ self.consume_record_data();
+ self.consume_bitmap();
+ self.reset();
+
+ let remaining = num_records - buffered_records;
+
+ if remaining == 0 {
+ return Ok(buffered_records);
+ }
+
+ let skipped = match self.column_reader.as_mut() {
+ Some(column_reader) => column_reader.skip_records(remaining)?,
+ None => 0,
+ };
+
+ Ok(skipped + buffered_records)
+ }
+
/// Returns number of records stored in buffer.
pub fn num_records(&self) -> usize {
self.num_records
diff --git a/parquet/src/column/page.rs b/parquet/src/column/page.rs
index 9364bd30f..d667af712 100644
--- a/parquet/src/column/page.rs
+++ b/parquet/src/column/page.rs
@@ -187,12 +187,29 @@ impl PageWriteSpec {
}
}
+/// Contains metadata for a page
+pub struct PageMetadata {
+ /// The number of rows in this page
+ pub num_rows: usize,
+
+ /// Returns true if the page is a dictionary page
+ pub is_dict: bool,
+}
+
/// API for reading pages from a column chunk.
/// This offers a iterator like API to get the next page.
pub trait PageReader: Iterator<Item = Result<Page>> + Send {
/// Gets the next page in the column chunk associated with this reader.
/// Returns `None` if there are no pages left.
fn get_next_page(&mut self) -> Result<Option<Page>>;
+
+ /// Gets metadata about the next page, returns an error if no
+ /// column index information
+ fn peek_next_page(&self) -> Result<Option<PageMetadata>>;
+
+ /// Skips reading the next page, returns an error if no
+ /// column index information
+ fn skip_next_page(&mut self) -> Result<()>;
}
/// API for writing pages in a column chunk.
diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs
index a97787ccf..35e725b19 100644
--- a/parquet/src/column/reader.rs
+++ b/parquet/src/column/reader.rs
@@ -22,7 +22,8 @@ use std::cmp::min;
use super::page::{Page, PageReader};
use crate::basic::*;
use crate::column::reader::decoder::{
- ColumnLevelDecoder, ColumnValueDecoder, LevelsBufferSlice, ValuesBufferSlice,
+ ColumnValueDecoder, DefinitionLevelDecoder, LevelsBufferSlice,
+ RepetitionLevelDecoder, ValuesBufferSlice,
};
use crate::data_type::*;
use crate::errors::{ParquetError, Result};
@@ -137,8 +138,8 @@ pub struct GenericColumnReader<R, D, V> {
impl<R, D, V> GenericColumnReader<R, D, V>
where
- R: ColumnLevelDecoder,
- D: ColumnLevelDecoder,
+ R: RepetitionLevelDecoder,
+ D: DefinitionLevelDecoder,
V: ColumnValueDecoder,
{
/// Creates new column reader based on column descriptor and page reader.
@@ -271,6 +272,72 @@ where
Ok((values_read, levels_read))
}
+ /// Skips over `num_records` records, where records are delimited by repetition levels of 0
+ ///
+ /// # Returns
+ ///
+ /// Returns the number of records skipped
+ pub fn skip_records(&mut self, num_records: usize) -> Result<usize> {
+ let mut remaining = num_records;
+ while remaining != 0 {
+ if self.num_buffered_values == self.num_decoded_values {
+ let metadata = match self.page_reader.peek_next_page()? {
+ None => return Ok(num_records - remaining),
+ Some(metadata) => metadata,
+ };
+
+ // If dictionary, we must read it
+ if metadata.is_dict {
+ self.read_new_page()?;
+ continue;
+ }
+
+ // If page has less rows than the remaining records to
+ // be skipped, skip entire page
+ if metadata.num_rows < remaining {
+ self.page_reader.skip_next_page()?;
+ remaining -= metadata.num_rows;
+ continue;
+ }
+ }
+
+ let to_read = remaining
+ .min((self.num_buffered_values - self.num_decoded_values) as usize);
+
+ let (records_read, rep_levels_read) = match self.rep_level_decoder.as_mut() {
+ Some(decoder) => decoder.skip_rep_levels(to_read)?,
+ None => (to_read, to_read),
+ };
+
+ let (values_read, def_levels_read) = match self.def_level_decoder.as_mut() {
+ Some(decoder) => decoder
+ .skip_def_levels(rep_levels_read, self.descr.max_def_level())?,
+ None => (rep_levels_read, rep_levels_read),
+ };
+
+ if rep_levels_read != def_levels_read {
+ return Err(general_err!(
+ "levels mismatch, read {} repetition levels and {} definition levels",
+ rep_levels_read,
+ def_levels_read
+ ));
+ }
+
+ let values = self.values_decoder.skip_values(values_read)?;
+ if values != values_read {
+ return Err(general_err!(
+ "skipped {} values, expected {}",
+ values,
+ values_read
+ ));
+ }
+
+ self.num_decoded_values += rep_levels_read as u32;
+ remaining -= records_read;
+ }
+ Ok(num_records - remaining)
+ }
+
/// Reads a new page and set up the decoders for levels, values or dictionary.
/// Returns false if there's no page left.
fn read_new_page(&mut self) -> Result<bool> {
diff --git a/parquet/src/column/reader/decoder.rs b/parquet/src/column/reader/decoder.rs
index 9f1799133..6fefdca23 100644
--- a/parquet/src/column/reader/decoder.rs
+++ b/parquet/src/column/reader/decoder.rs
@@ -81,6 +81,25 @@ pub trait ColumnLevelDecoder {
fn read(&mut self, out: &mut Self::Slice, range: Range<usize>) -> Result<usize>;
}
+pub trait RepetitionLevelDecoder: ColumnLevelDecoder {
+ /// Skips over repetition level corresponding to `num_records` records, where a record
+ /// is delimited by a repetition level of 0
+ ///
+ /// Returns the number of records skipped, and the number of levels skipped
+ fn skip_rep_levels(&mut self, num_records: usize) -> Result<(usize, usize)>;
+}
+
+pub trait DefinitionLevelDecoder: ColumnLevelDecoder {
+ /// Skips over `num_levels` definition levels
+ ///
+ /// Returns the number of values skipped, and the number of levels skipped
+ fn skip_def_levels(
+ &mut self,
+ num_levels: usize,
+ max_def_level: i16,
+ ) -> Result<(usize, usize)>;
+}
+
/// Decodes value data to a [`ValuesBufferSlice`]
pub trait ColumnValueDecoder {
type Slice: ValuesBufferSlice + ?Sized;
@@ -126,6 +145,11 @@ pub trait ColumnValueDecoder {
/// Implementations may panic if `range` overlaps with already written data
///
fn read(&mut self, out: &mut Self::Slice, range: Range<usize>) -> Result<usize>;
+
+ /// Skips over `num_values` values
+ ///
+ /// Returns the number of values skipped
+ fn skip_values(&mut self, num_values: usize) -> Result<usize>;
}
/// An implementation of [`ColumnValueDecoder`] for `[T::T]`
@@ -225,6 +249,10 @@ impl<T: DataType> ColumnValueDecoder for ColumnValueDecoderImpl<T> {
current_decoder.get(&mut out[range])
}
+
+ fn skip_values(&mut self, _num_values: usize) -> Result<usize> {
+ Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
+ }
}
/// An implementation of [`ColumnLevelDecoder`] for `[i16]`
@@ -266,3 +294,19 @@ impl ColumnLevelDecoder for ColumnLevelDecoderImpl {
}
}
}
+
+impl DefinitionLevelDecoder for ColumnLevelDecoderImpl {
+ fn skip_def_levels(
+ &mut self,
+ _num_levels: usize,
+ _max_def_level: i16,
+ ) -> Result<(usize, usize)> {
+ Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
+ }
+}
+
+impl RepetitionLevelDecoder for ColumnLevelDecoderImpl {
+ fn skip_rep_levels(&mut self, _num_records: usize) -> Result<(usize, usize)> {
+ Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
+ }
+}
diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs
index 6ff73e041..1dfd1eb45 100644
--- a/parquet/src/file/serialized_reader.rs
+++ b/parquet/src/file/serialized_reader.rs
@@ -25,7 +25,7 @@ use parquet_format::{PageHeader, PageType};
use thrift::protocol::TCompactInputProtocol;
use crate::basic::{Compression, Encoding, Type};
-use crate::column::page::{Page, PageReader};
+use crate::column::page::{Page, PageMetadata, PageReader};
use crate::compression::{create_codec, Codec};
use crate::errors::{ParquetError, Result};
use crate::file::page_index::index_reader;
@@ -555,6 +555,14 @@ impl<T: Read + Send> PageReader for SerializedPageReader<T> {
// We are at the end of this column chunk and no more page left. Return None.
Ok(None)
}
+
+ fn peek_next_page(&self) -> Result<Option<PageMetadata>> {
+ Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
+ }
+
+ fn skip_next_page(&mut self) -> Result<()> {
+ Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
+ }
}
#[cfg(test)]
diff --git a/parquet/src/util/test_common/page_util.rs b/parquet/src/util/test_common/page_util.rs
index 3719d280a..0b70c38ad 100644
--- a/parquet/src/util/test_common/page_util.rs
+++ b/parquet/src/util/test_common/page_util.rs
@@ -16,7 +16,7 @@
// under the License.
use crate::basic::Encoding;
-use crate::column::page::PageReader;
+use crate::column::page::{PageMetadata, PageReader};
use crate::column::page::{Page, PageIterator};
use crate::data_type::DataType;
use crate::encodings::encoding::{get_encoder, DictEncoder, Encoder};
@@ -172,6 +172,14 @@ impl<P: Iterator<Item = Page> + Send> PageReader for InMemoryPageReader<P> {
fn get_next_page(&mut self) -> Result<Option<Page>> {
Ok(self.page_iter.next())
}
+
+ fn peek_next_page(&self) -> Result<Option<PageMetadata>> {
+ unimplemented!()
+ }
+
+ fn skip_next_page(&mut self) -> Result<()> {
+ unimplemented!()
+ }
}
impl<P: Iterator<Item = Page> + Send> Iterator for InMemoryPageReader<P> {