You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/06/01 13:37:35 UTC

[arrow-rs] branch master updated: Add separate row_count and level_count to PageMetadata (#4321) (#4326)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new b78d99d8b Add separate row_count and level_count to PageMetadata (#4321) (#4326)
b78d99d8b is described below

commit b78d99d8b6a45fef5ca998c86c3f774cc6fce644
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Thu Jun 1 14:37:30 2023 +0100

    Add separate row_count and level_count to PageMetadata (#4321) (#4326)
---
 parquet/src/arrow/arrow_reader/mod.rs     | 32 +++++++++++++++++++++++++++++
 parquet/src/column/page.rs                | 34 ++++++++++++++++++++-----------
 parquet/src/column/reader.rs              | 26 +++++++++++++----------
 parquet/src/file/serialized_reader.rs     | 14 +++++++------
 parquet/src/util/test_common/page_util.rs | 15 ++++++++++----
 5 files changed, 88 insertions(+), 33 deletions(-)

diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs
index 9cb09c9a5..deca0c719 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -2522,4 +2522,36 @@ mod tests {
 
         assert_eq!(&written.slice(0, 8), &read[0]);
     }
+
+    #[test]
+    fn test_list_skip() {
+        let mut list = ListBuilder::new(Int32Builder::new());
+        list.append_value([Some(1), Some(2)]);
+        list.append_value([Some(3)]);
+        list.append_value([Some(4)]);
+        let list = list.finish();
+        let batch = RecordBatch::try_from_iter([("l", Arc::new(list) as _)]).unwrap();
+
+        // First page contains 2 values but only 1 row
+        let props = WriterProperties::builder()
+            .set_data_page_row_count_limit(1)
+            .set_write_batch_size(2)
+            .build();
+
+        let mut buffer = Vec::with_capacity(1024);
+        let mut writer =
+            ArrowWriter::try_new(&mut buffer, batch.schema(), Some(props)).unwrap();
+        writer.write(&batch).unwrap();
+        writer.close().unwrap();
+
+        let selection = vec![RowSelector::skip(2), RowSelector::select(1)];
+        let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer))
+            .unwrap()
+            .with_row_selection(selection.into())
+            .build()
+            .unwrap();
+        let out = reader.next().unwrap().unwrap();
+        assert_eq!(out.num_rows(), 1);
+        assert_eq!(out, batch.slice(2, 1));
+    }
 }
diff --git a/parquet/src/column/page.rs b/parquet/src/column/page.rs
index 3b19734a2..654cd0816 100644
--- a/parquet/src/column/page.rs
+++ b/parquet/src/column/page.rs
@@ -265,9 +265,10 @@ impl PageWriteSpec {
 /// Contains metadata for a page
 #[derive(Clone)]
 pub struct PageMetadata {
-    /// The number of rows in this page
-    pub num_rows: usize,
-
+    /// The number of rows within the page if known
+    pub num_rows: Option<usize>,
+    /// The number of levels within the page if known
+    pub num_levels: Option<usize>,
     /// Returns true if the page is a dictionary page
     pub is_dict: bool,
 }
@@ -277,18 +278,27 @@ impl TryFrom<&PageHeader> for PageMetadata {
 
     fn try_from(value: &PageHeader) -> std::result::Result<Self, Self::Error> {
         match value.type_ {
-            crate::format::PageType::DATA_PAGE => Ok(PageMetadata {
-                num_rows: value.data_page_header.as_ref().unwrap().num_values as usize,
-                is_dict: false,
-            }),
+            crate::format::PageType::DATA_PAGE => {
+                let header = value.data_page_header.as_ref().unwrap();
+                Ok(PageMetadata {
+                    num_rows: None,
+                    num_levels: Some(header.num_values as _),
+                    is_dict: false,
+                })
+            }
             crate::format::PageType::DICTIONARY_PAGE => Ok(PageMetadata {
-                num_rows: usize::MIN,
+                num_rows: None,
+                num_levels: None,
                 is_dict: true,
             }),
-            crate::format::PageType::DATA_PAGE_V2 => Ok(PageMetadata {
-                num_rows: value.data_page_header_v2.as_ref().unwrap().num_rows as usize,
-                is_dict: false,
-            }),
+            crate::format::PageType::DATA_PAGE_V2 => {
+                let header = value.data_page_header_v2.as_ref().unwrap();
+                Ok(PageMetadata {
+                    num_rows: Some(header.num_rows as _),
+                    num_levels: Some(header.num_values as _),
+                    is_dict: false,
+                })
+            }
             other => Err(ParquetError::General(format!(
                 "page type {other:?} cannot be converted to PageMetadata"
             ))),
diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs
index 3434eba69..991ec2c54 100644
--- a/parquet/src/column/reader.rs
+++ b/parquet/src/column/reader.rs
@@ -312,11 +312,20 @@ where
 
                 // If page has less rows than the remaining records to
                 // be skipped, skip entire page
-                if metadata.num_rows <= remaining_records {
-                    self.page_reader.skip_next_page()?;
-                    remaining_records -= metadata.num_rows;
-                    continue;
-                };
+                let rows = metadata.num_rows.or_else(|| {
+                    // If no repetition levels, num_levels == num_rows
+                    self.rep_level_decoder
+                        .is_none()
+                        .then_some(metadata.num_levels)?
+                });
+
+                if let Some(rows) = rows {
+                    if rows <= remaining_records {
+                        self.page_reader.skip_next_page()?;
+                        remaining_records -= rows;
+                        continue;
+                    }
+                }
                 // because self.num_buffered_values == self.num_decoded_values means
                 // we need reads a new page and set up the decoders for levels
                 if !self.read_new_page()? {
@@ -533,12 +542,7 @@ where
         if self.num_buffered_values == 0
             || self.num_buffered_values == self.num_decoded_values
         {
-            // TODO: should we return false if read_new_page() = true and
-            // num_buffered_values = 0?
-            match self.page_reader.peek_next_page()? {
-                Some(next_page) => Ok(next_page.num_rows != 0),
-                None => Ok(false),
-            }
+            Ok(self.page_reader.peek_next_page()?.is_some())
         } else {
             Ok(true)
         }
diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs
index 782394942..2b3536904 100644
--- a/parquet/src/file/serialized_reader.rs
+++ b/parquet/src/file/serialized_reader.rs
@@ -722,7 +722,8 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
             } => {
                 if dictionary_page.is_some() {
                     Ok(Some(PageMetadata {
-                        num_rows: 0,
+                        num_rows: None,
+                        num_levels: None,
                         is_dict: true,
                     }))
                 } else if let Some(page) = page_locations.front() {
@@ -732,7 +733,8 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
                         .unwrap_or(*total_rows);
 
                     Ok(Some(PageMetadata {
-                        num_rows: next_rows - page.first_row_index as usize,
+                        num_rows: Some(next_rows - page.first_row_index as usize),
+                        num_levels: None,
                         is_dict: false,
                     }))
                 } else {
@@ -1644,11 +1646,11 @@ mod tests {
             // have checked with `parquet-tools column-index   -c string_col  ./alltypes_tiny_pages.parquet`
             // page meta has two scenarios(21, 20) of num_rows expect last page has 11 rows.
             if i != 351 {
-                assert!((meta.num_rows == 21) || (meta.num_rows == 20));
+                assert!((meta.num_rows == Some(21)) || (meta.num_rows == Some(20)));
             } else {
                 // last page first row index is 7290, total row count is 7300
                 // because first row start with zero, last page row count should be 10.
-                assert_eq!(meta.num_rows, 10);
+                assert_eq!(meta.num_rows, Some(10));
             }
             assert!(!meta.is_dict);
             vec.push(meta);
@@ -1686,11 +1688,11 @@ mod tests {
             // have checked with `parquet-tools column-index   -c string_col  ./alltypes_tiny_pages.parquet`
             // page meta has two scenarios(21, 20) of num_rows expect last page has 11 rows.
             if i != 351 {
-                assert!((meta.num_rows == 21) || (meta.num_rows == 20));
+                assert!((meta.num_levels == Some(21)) || (meta.num_levels == Some(20)));
             } else {
                 // last page first row index is 7290, total row count is 7300
                 // because first row start with zero, last page row count should be 10.
-                assert_eq!(meta.num_rows, 10);
+                assert_eq!(meta.num_levels, Some(10));
             }
             assert!(!meta.is_dict);
             vec.push(meta);
diff --git a/parquet/src/util/test_common/page_util.rs b/parquet/src/util/test_common/page_util.rs
index ab5287462..c51c5158c 100644
--- a/parquet/src/util/test_common/page_util.rs
+++ b/parquet/src/util/test_common/page_util.rs
@@ -170,15 +170,22 @@ impl<P: Iterator<Item = Page> + Send> PageReader for InMemoryPageReader<P> {
         if let Some(x) = self.page_iter.peek() {
             match x {
                 Page::DataPage { num_values, .. } => Ok(Some(PageMetadata {
-                    num_rows: *num_values as usize,
+                    num_rows: None,
+                    num_levels: Some(*num_values as _),
                     is_dict: false,
                 })),
-                Page::DataPageV2 { num_rows, .. } => Ok(Some(PageMetadata {
-                    num_rows: *num_rows as usize,
+                Page::DataPageV2 {
+                    num_rows,
+                    num_values,
+                    ..
+                } => Ok(Some(PageMetadata {
+                    num_rows: Some(*num_rows as _),
+                    num_levels: Some(*num_values as _),
                     is_dict: false,
                 })),
                 Page::DictionaryPage { .. } => Ok(Some(PageMetadata {
-                    num_rows: 0,
+                    num_rows: None,
+                    num_levels: None,
                     is_dict: true,
                 })),
             }