You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by th...@apache.org on 2023/01/30 11:06:21 UTC

[arrow-rs] branch parquet-limit-pushdown created (now a6b65fbcb)

This is an automated email from the ASF dual-hosted git repository.

thinkharderdev pushed a change to branch parquet-limit-pushdown
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


      at a6b65fbcb Add limit to ArrowReaderBuilder to push limit down to parquet reader

This branch includes the following new commits:

     new a6b65fbcb Add limit to ArrowReaderBuilder to push limit down to parquet reader

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[arrow-rs] 01/01: Add limit to ArrowReaderBuilder to push limit down to parquet reader

Posted by th...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

thinkharderdev pushed a commit to branch parquet-limit-pushdown
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git

commit a6b65fbcbd8846d93dd761fd61d94d023511586f
Author: Dan Harris <da...@thinkharder.dev>
AuthorDate: Mon Jan 30 13:02:30 2023 +0200

    Add limit to ArrowReaderBuilder to push limit down to parquet reader
---
 parquet/src/arrow/arrow_reader/mod.rs       | 56 ++++++++++++++++-
 parquet/src/arrow/arrow_reader/selection.rs | 95 ++++++++++++++++++++++++++++-
 2 files changed, 149 insertions(+), 2 deletions(-)

diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs
index 87165ef8e..70ab098ab 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -69,6 +69,8 @@ pub struct ArrowReaderBuilder<T> {
     pub(crate) filter: Option<RowFilter>,
 
     pub(crate) selection: Option<RowSelection>,
+
+    pub(crate) limit: Option<usize>,
 }
 
 impl<T> ArrowReaderBuilder<T> {
@@ -98,6 +100,7 @@ impl<T> ArrowReaderBuilder<T> {
             projection: ProjectionMask::all(),
             filter: None,
             selection: None,
+            limit: None,
         })
     }
 
@@ -167,6 +170,17 @@ impl<T> ArrowReaderBuilder<T> {
             ..self
         }
     }
+
+    /// Provide a limit to the number of rows to be read
+    ///
+    /// The limit will be used to generate an `RowSelection` so only `limit`
+    /// rows are decoded
+    pub fn with_limit(self, limit: usize) -> Self {
+        Self {
+            limit: Some(limit),
+            ..self
+        }
+    }
 }
 
 /// Arrow reader api.
@@ -453,6 +467,17 @@ impl<T: ChunkReader + 'static> ArrowReaderBuilder<SyncReader<T>> {
             selection = Some(RowSelection::from(vec![]));
         }
 
+        // If a limit is defined, apply it to the final `RowSelection`
+        if let Some(limit) = self.limit {
+            selection = Some(
+                selection
+                    .map(|selection| selection.limit(limit))
+                    .unwrap_or_else(|| {
+                        RowSelection::from_limit(limit, reader.num_rows())
+                    }),
+            );
+        }
+
         Ok(ParquetRecordBatchReader::new(
             batch_size,
             array_reader,
@@ -1215,6 +1240,8 @@ mod tests {
         row_selections: Option<(RowSelection, usize)>,
         /// row filter
         row_filter: Option<Vec<bool>>,
+        /// limit
+        limit: Option<usize>,
     }
 
     /// Manually implement this to avoid printing entire contents of row_selections and row_filter
@@ -1233,6 +1260,7 @@ mod tests {
                 .field("encoding", &self.encoding)
                 .field("row_selections", &self.row_selections.is_some())
                 .field("row_filter", &self.row_filter.is_some())
+                .field("limit", &self.limit)
                 .finish()
         }
     }
@@ -1252,6 +1280,7 @@ mod tests {
                 encoding: Encoding::PLAIN,
                 row_selections: None,
                 row_filter: None,
+                limit: None,
             }
         }
     }
@@ -1323,6 +1352,13 @@ mod tests {
             }
         }
 
+        fn with_limit(self, limit: usize) -> Self {
+            Self {
+                limit: Some(limit),
+                ..self
+            }
+        }
+
         fn writer_props(&self) -> WriterProperties {
             let builder = WriterProperties::builder()
                 .set_data_pagesize_limit(self.max_data_page_size)
@@ -1381,6 +1417,14 @@ mod tests {
             TestOptions::new(2, 256, 127).with_null_percent(0),
             // Test optional with nulls
             TestOptions::new(2, 256, 93).with_null_percent(25),
+            // Test with limit of 0
+            TestOptions::new(4, 100, 25).with_limit(0),
+            // Test with limit of 50
+            TestOptions::new(4, 100, 25).with_limit(50),
+            // Test with limit equal to number of rows
+            TestOptions::new(4, 100, 25).with_limit(10),
+            // Test with limit larger than number of rows
+            TestOptions::new(4, 100, 25).with_limit(101),
             // Test with no page-level statistics
             TestOptions::new(2, 256, 91)
                 .with_null_percent(25)
@@ -1423,6 +1467,11 @@ mod tests {
             TestOptions::new(2, 256, 93)
                 .with_null_percent(25)
                 .with_row_selections(),
+            // Test optional with nulls
+            TestOptions::new(2, 256, 93)
+                .with_null_percent(25)
+                .with_row_selections()
+                .with_limit(10),
             // Test filter
 
             // Test with row filter
@@ -1592,7 +1641,7 @@ mod tests {
             }
         };
 
-        let expected_data = match opts.row_filter {
+        let mut expected_data = match opts.row_filter {
             Some(filter) => {
                 let expected_data = expected_data
                     .into_iter()
@@ -1622,6 +1671,11 @@ mod tests {
             None => expected_data,
         };
 
+        if let Some(limit) = opts.limit {
+            builder = builder.with_limit(limit);
+            expected_data = expected_data.into_iter().take(limit).collect();
+        }
+
         let mut record_reader = builder
             .with_batch_size(opts.record_batch_size)
             .build()
diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs
index 03c7e01e0..1e32b5510 100644
--- a/parquet/src/arrow/arrow_reader/selection.rs
+++ b/parquet/src/arrow/arrow_reader/selection.rs
@@ -19,6 +19,7 @@ use arrow_array::{Array, BooleanArray};
 use arrow_select::filter::SlicesIterator;
 use std::cmp::Ordering;
 use std::collections::VecDeque;
+use std::mem;
 use std::ops::Range;
 
 /// [`RowSelection`] is a collection of [`RowSelector`] used to skip rows when
@@ -110,8 +111,18 @@ impl RowSelection {
         Self::from_consecutive_ranges(iter, total_rows)
     }
 
+    /// Creates a [`RowSelection`] that will select `limit` rows and skip all remaining rows.
+    pub(crate) fn from_limit(limit: usize, total_rows: usize) -> Self {
+        Self {
+            selectors: vec![
+                RowSelector::select(limit),
+                RowSelector::skip(total_rows.saturating_sub(limit)),
+            ],
+        }
+    }
+
     /// Creates a [`RowSelection`] from an iterator of consecutive ranges to keep
-    fn from_consecutive_ranges<I: Iterator<Item = Range<usize>>>(
+    pub(crate) fn from_consecutive_ranges<I: Iterator<Item = Range<usize>>>(
         ranges: I,
         total_rows: usize,
     ) -> Self {
@@ -371,6 +382,35 @@ impl RowSelection {
         self
     }
 
+    /// Limit this [`RowSelection`] to only select `limit` rows
+    pub(crate) fn limit(mut self, mut limit: usize) -> Self {
+        let mut remaining = 0;
+        let mut new_selectors = Vec::with_capacity(self.selectors.len());
+        for selection in mem::take(&mut self.selectors) {
+            if limit == 0 {
+                remaining += selection.row_count;
+            } else if !selection.skip {
+                if selection.row_count > limit {
+                    remaining += selection.row_count - limit;
+                    new_selectors.push(RowSelector::select(limit));
+                    limit = 0;
+                } else {
+                    limit -= selection.row_count;
+                    new_selectors.push(selection);
+                }
+            } else {
+                new_selectors.push(selection);
+            }
+        }
+
+        if remaining > 0 {
+            new_selectors.push(RowSelector::skip(remaining));
+        }
+
+        self.selectors = new_selectors;
+        self
+    }
+
     /// Returns an iterator over the [`RowSelector`]s for this
     /// [`RowSelection`].
     pub fn iter(&self) -> impl Iterator<Item = &RowSelector> {
@@ -841,6 +881,59 @@ mod tests {
         assert_eq!(selectors, round_tripped);
     }
 
+    #[test]
+    fn test_limit() {
+        // Limit to existing limit should no-op
+        let selection = RowSelection::from_limit(10, 100);
+        let limited = selection.clone().limit(10);
+        assert_eq!(selection, limited);
+
+        let selection = RowSelection::from(vec![
+            RowSelector::select(10),
+            RowSelector::skip(10),
+            RowSelector::select(10),
+            RowSelector::skip(10),
+            RowSelector::select(10),
+        ]);
+
+        let limited = selection.clone().limit(5);
+        let expected = vec![RowSelector::select(5), RowSelector::skip(45)];
+        assert_eq!(limited.selectors, expected);
+
+        let limited = selection.clone().limit(15);
+        let expected = vec![
+            RowSelector::select(10),
+            RowSelector::skip(10),
+            RowSelector::select(5),
+            RowSelector::skip(25),
+        ];
+        assert_eq!(limited.selectors, expected);
+
+        let limited = selection.clone().limit(0);
+        let expected = vec![RowSelector::skip(50)];
+        assert_eq!(limited.selectors, expected);
+
+        let limited = selection.clone().limit(30);
+        let expected = vec![
+            RowSelector::select(10),
+            RowSelector::skip(10),
+            RowSelector::select(10),
+            RowSelector::skip(10),
+            RowSelector::select(10),
+        ];
+        assert_eq!(limited.selectors, expected);
+
+        let limited = selection.limit(100);
+        let expected = vec![
+            RowSelector::select(10),
+            RowSelector::skip(10),
+            RowSelector::select(10),
+            RowSelector::skip(10),
+            RowSelector::select(10),
+        ];
+        assert_eq!(limited.selectors, expected);
+    }
+
     #[test]
     fn test_scan_ranges() {
         let index = vec![