You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/06/01 13:37:35 UTC
[arrow-rs] branch master updated: Add separate row_count and level_count to PageMetadata (#4321) (#4326)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new b78d99d8b Add separate row_count and level_count to PageMetadata (#4321) (#4326)
b78d99d8b is described below
commit b78d99d8b6a45fef5ca998c86c3f774cc6fce644
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Thu Jun 1 14:37:30 2023 +0100
Add separate row_count and level_count to PageMetadata (#4321) (#4326)
---
parquet/src/arrow/arrow_reader/mod.rs | 32 +++++++++++++++++++++++++++++
parquet/src/column/page.rs | 34 ++++++++++++++++++++-----------
parquet/src/column/reader.rs | 26 +++++++++++++----------
parquet/src/file/serialized_reader.rs | 14 +++++++------
parquet/src/util/test_common/page_util.rs | 15 ++++++++++----
5 files changed, 88 insertions(+), 33 deletions(-)
diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs
index 9cb09c9a5..deca0c719 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -2522,4 +2522,36 @@ mod tests {
assert_eq!(&written.slice(0, 8), &read[0]);
}
+
+ #[test]
+ fn test_list_skip() {
+ let mut list = ListBuilder::new(Int32Builder::new());
+ list.append_value([Some(1), Some(2)]);
+ list.append_value([Some(3)]);
+ list.append_value([Some(4)]);
+ let list = list.finish();
+ let batch = RecordBatch::try_from_iter([("l", Arc::new(list) as _)]).unwrap();
+
+ // First page contains 2 values but only 1 row
+ let props = WriterProperties::builder()
+ .set_data_page_row_count_limit(1)
+ .set_write_batch_size(2)
+ .build();
+
+ let mut buffer = Vec::with_capacity(1024);
+ let mut writer =
+ ArrowWriter::try_new(&mut buffer, batch.schema(), Some(props)).unwrap();
+ writer.write(&batch).unwrap();
+ writer.close().unwrap();
+
+ let selection = vec![RowSelector::skip(2), RowSelector::select(1)];
+ let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer))
+ .unwrap()
+ .with_row_selection(selection.into())
+ .build()
+ .unwrap();
+ let out = reader.next().unwrap().unwrap();
+ assert_eq!(out.num_rows(), 1);
+ assert_eq!(out, batch.slice(2, 1));
+ }
}
diff --git a/parquet/src/column/page.rs b/parquet/src/column/page.rs
index 3b19734a2..654cd0816 100644
--- a/parquet/src/column/page.rs
+++ b/parquet/src/column/page.rs
@@ -265,9 +265,10 @@ impl PageWriteSpec {
/// Contains metadata for a page
#[derive(Clone)]
pub struct PageMetadata {
- /// The number of rows in this page
- pub num_rows: usize,
-
+ /// The number of rows within the page if known
+ pub num_rows: Option<usize>,
+ /// The number of levels within the page if known
+ pub num_levels: Option<usize>,
/// Returns true if the page is a dictionary page
pub is_dict: bool,
}
@@ -277,18 +278,27 @@ impl TryFrom<&PageHeader> for PageMetadata {
fn try_from(value: &PageHeader) -> std::result::Result<Self, Self::Error> {
match value.type_ {
- crate::format::PageType::DATA_PAGE => Ok(PageMetadata {
- num_rows: value.data_page_header.as_ref().unwrap().num_values as usize,
- is_dict: false,
- }),
+ crate::format::PageType::DATA_PAGE => {
+ let header = value.data_page_header.as_ref().unwrap();
+ Ok(PageMetadata {
+ num_rows: None,
+ num_levels: Some(header.num_values as _),
+ is_dict: false,
+ })
+ }
crate::format::PageType::DICTIONARY_PAGE => Ok(PageMetadata {
- num_rows: usize::MIN,
+ num_rows: None,
+ num_levels: None,
is_dict: true,
}),
- crate::format::PageType::DATA_PAGE_V2 => Ok(PageMetadata {
- num_rows: value.data_page_header_v2.as_ref().unwrap().num_rows as usize,
- is_dict: false,
- }),
+ crate::format::PageType::DATA_PAGE_V2 => {
+ let header = value.data_page_header_v2.as_ref().unwrap();
+ Ok(PageMetadata {
+ num_rows: Some(header.num_rows as _),
+ num_levels: Some(header.num_values as _),
+ is_dict: false,
+ })
+ }
other => Err(ParquetError::General(format!(
"page type {other:?} cannot be converted to PageMetadata"
))),
diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs
index 3434eba69..991ec2c54 100644
--- a/parquet/src/column/reader.rs
+++ b/parquet/src/column/reader.rs
@@ -312,11 +312,20 @@ where
// If page has less rows than the remaining records to
// be skipped, skip entire page
- if metadata.num_rows <= remaining_records {
- self.page_reader.skip_next_page()?;
- remaining_records -= metadata.num_rows;
- continue;
- };
+ let rows = metadata.num_rows.or_else(|| {
+ // If no repetition levels, num_levels == num_rows
+ self.rep_level_decoder
+ .is_none()
+ .then_some(metadata.num_levels)?
+ });
+
+ if let Some(rows) = rows {
+ if rows <= remaining_records {
+ self.page_reader.skip_next_page()?;
+ remaining_records -= rows;
+ continue;
+ }
+ }
// because self.num_buffered_values == self.num_decoded_values means
// we need reads a new page and set up the decoders for levels
if !self.read_new_page()? {
@@ -533,12 +542,7 @@ where
if self.num_buffered_values == 0
|| self.num_buffered_values == self.num_decoded_values
{
- // TODO: should we return false if read_new_page() = true and
- // num_buffered_values = 0?
- match self.page_reader.peek_next_page()? {
- Some(next_page) => Ok(next_page.num_rows != 0),
- None => Ok(false),
- }
+ Ok(self.page_reader.peek_next_page()?.is_some())
} else {
Ok(true)
}
diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs
index 782394942..2b3536904 100644
--- a/parquet/src/file/serialized_reader.rs
+++ b/parquet/src/file/serialized_reader.rs
@@ -722,7 +722,8 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
} => {
if dictionary_page.is_some() {
Ok(Some(PageMetadata {
- num_rows: 0,
+ num_rows: None,
+ num_levels: None,
is_dict: true,
}))
} else if let Some(page) = page_locations.front() {
@@ -732,7 +733,8 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
.unwrap_or(*total_rows);
Ok(Some(PageMetadata {
- num_rows: next_rows - page.first_row_index as usize,
+ num_rows: Some(next_rows - page.first_row_index as usize),
+ num_levels: None,
is_dict: false,
}))
} else {
@@ -1644,11 +1646,11 @@ mod tests {
// have checked with `parquet-tools column-index -c string_col ./alltypes_tiny_pages.parquet`
// page meta has two scenarios(21, 20) of num_rows expect last page has 11 rows.
if i != 351 {
- assert!((meta.num_rows == 21) || (meta.num_rows == 20));
+ assert!((meta.num_rows == Some(21)) || (meta.num_rows == Some(20)));
} else {
// last page first row index is 7290, total row count is 7300
// because first row start with zero, last page row count should be 10.
- assert_eq!(meta.num_rows, 10);
+ assert_eq!(meta.num_rows, Some(10));
}
assert!(!meta.is_dict);
vec.push(meta);
@@ -1686,11 +1688,11 @@ mod tests {
// have checked with `parquet-tools column-index -c string_col ./alltypes_tiny_pages.parquet`
// page meta has two scenarios(21, 20) of num_rows expect last page has 11 rows.
if i != 351 {
- assert!((meta.num_rows == 21) || (meta.num_rows == 20));
+ assert!((meta.num_levels == Some(21)) || (meta.num_levels == Some(20)));
} else {
// last page first row index is 7290, total row count is 7300
// because first row start with zero, last page row count should be 10.
- assert_eq!(meta.num_rows, 10);
+ assert_eq!(meta.num_levels, Some(10));
}
assert!(!meta.is_dict);
vec.push(meta);
diff --git a/parquet/src/util/test_common/page_util.rs b/parquet/src/util/test_common/page_util.rs
index ab5287462..c51c5158c 100644
--- a/parquet/src/util/test_common/page_util.rs
+++ b/parquet/src/util/test_common/page_util.rs
@@ -170,15 +170,22 @@ impl<P: Iterator<Item = Page> + Send> PageReader for InMemoryPageReader<P> {
if let Some(x) = self.page_iter.peek() {
match x {
Page::DataPage { num_values, .. } => Ok(Some(PageMetadata {
- num_rows: *num_values as usize,
+ num_rows: None,
+ num_levels: Some(*num_values as _),
is_dict: false,
})),
- Page::DataPageV2 { num_rows, .. } => Ok(Some(PageMetadata {
- num_rows: *num_rows as usize,
+ Page::DataPageV2 {
+ num_rows,
+ num_values,
+ ..
+ } => Ok(Some(PageMetadata {
+ num_rows: Some(*num_rows as _),
+ num_levels: Some(*num_values as _),
is_dict: false,
})),
Page::DictionaryPage { .. } => Ok(Some(PageMetadata {
- num_rows: 0,
+ num_rows: None,
+ num_levels: None,
is_dict: true,
})),
}