You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by th...@apache.org on 2023/01/30 11:06:22 UTC

[arrow-rs] 01/01: Add limit to ArrowReaderBuilder to push limit down to parquet reader

This is an automated email from the ASF dual-hosted git repository.

thinkharderdev pushed a commit to branch parquet-limit-pushdown
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git

commit a6b65fbcbd8846d93dd761fd61d94d023511586f
Author: Dan Harris <da...@thinkharder.dev>
AuthorDate: Mon Jan 30 13:02:30 2023 +0200

    Add limit to ArrowReaderBuilder to push limit down to parquet reader
---
 parquet/src/arrow/arrow_reader/mod.rs       | 56 ++++++++++++++++-
 parquet/src/arrow/arrow_reader/selection.rs | 95 ++++++++++++++++++++++++++++-
 2 files changed, 149 insertions(+), 2 deletions(-)

diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs
index 87165ef8e..70ab098ab 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -69,6 +69,8 @@ pub struct ArrowReaderBuilder<T> {
     pub(crate) filter: Option<RowFilter>,
 
     pub(crate) selection: Option<RowSelection>,
+
+    pub(crate) limit: Option<usize>,
 }
 
 impl<T> ArrowReaderBuilder<T> {
@@ -98,6 +100,7 @@ impl<T> ArrowReaderBuilder<T> {
             projection: ProjectionMask::all(),
             filter: None,
             selection: None,
+            limit: None,
         })
     }
 
@@ -167,6 +170,17 @@ impl<T> ArrowReaderBuilder<T> {
             ..self
         }
     }
+
+    /// Provide a limit to the number of rows to be read
+    ///
+    /// The limit will be used to generate an `RowSelection` so only `limit`
+    /// rows are decoded
+    pub fn with_limit(self, limit: usize) -> Self {
+        Self {
+            limit: Some(limit),
+            ..self
+        }
+    }
 }
 
 /// Arrow reader api.
@@ -453,6 +467,17 @@ impl<T: ChunkReader + 'static> ArrowReaderBuilder<SyncReader<T>> {
             selection = Some(RowSelection::from(vec![]));
         }
 
+        // If a limit is defined, apply it to the final `RowSelection`
+        if let Some(limit) = self.limit {
+            selection = Some(
+                selection
+                    .map(|selection| selection.limit(limit))
+                    .unwrap_or_else(|| {
+                        RowSelection::from_limit(limit, reader.num_rows())
+                    }),
+            );
+        }
+
         Ok(ParquetRecordBatchReader::new(
             batch_size,
             array_reader,
@@ -1215,6 +1240,8 @@ mod tests {
         row_selections: Option<(RowSelection, usize)>,
         /// row filter
         row_filter: Option<Vec<bool>>,
+        /// limit
+        limit: Option<usize>,
     }
 
     /// Manually implement this to avoid printing entire contents of row_selections and row_filter
@@ -1233,6 +1260,7 @@ mod tests {
                 .field("encoding", &self.encoding)
                 .field("row_selections", &self.row_selections.is_some())
                 .field("row_filter", &self.row_filter.is_some())
+                .field("limit", &self.limit)
                 .finish()
         }
     }
@@ -1252,6 +1280,7 @@ mod tests {
                 encoding: Encoding::PLAIN,
                 row_selections: None,
                 row_filter: None,
+                limit: None,
             }
         }
     }
@@ -1323,6 +1352,13 @@ mod tests {
             }
         }
 
+        fn with_limit(self, limit: usize) -> Self {
+            Self {
+                limit: Some(limit),
+                ..self
+            }
+        }
+
         fn writer_props(&self) -> WriterProperties {
             let builder = WriterProperties::builder()
                 .set_data_pagesize_limit(self.max_data_page_size)
@@ -1381,6 +1417,14 @@ mod tests {
             TestOptions::new(2, 256, 127).with_null_percent(0),
             // Test optional with nulls
             TestOptions::new(2, 256, 93).with_null_percent(25),
+            // Test with limit of 0
+            TestOptions::new(4, 100, 25).with_limit(0),
+            // Test with limit of 50
+            TestOptions::new(4, 100, 25).with_limit(50),
+            // Test with limit equal to number of rows
+            TestOptions::new(4, 100, 25).with_limit(10),
+            // Test with limit larger than number of rows
+            TestOptions::new(4, 100, 25).with_limit(101),
             // Test with no page-level statistics
             TestOptions::new(2, 256, 91)
                 .with_null_percent(25)
@@ -1423,6 +1467,11 @@ mod tests {
             TestOptions::new(2, 256, 93)
                 .with_null_percent(25)
                 .with_row_selections(),
+            // Test optional with nulls
+            TestOptions::new(2, 256, 93)
+                .with_null_percent(25)
+                .with_row_selections()
+                .with_limit(10),
             // Test filter
 
             // Test with row filter
@@ -1592,7 +1641,7 @@ mod tests {
             }
         };
 
-        let expected_data = match opts.row_filter {
+        let mut expected_data = match opts.row_filter {
             Some(filter) => {
                 let expected_data = expected_data
                     .into_iter()
@@ -1622,6 +1671,11 @@ mod tests {
             None => expected_data,
         };
 
+        if let Some(limit) = opts.limit {
+            builder = builder.with_limit(limit);
+            expected_data = expected_data.into_iter().take(limit).collect();
+        }
+
         let mut record_reader = builder
             .with_batch_size(opts.record_batch_size)
             .build()
diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs
index 03c7e01e0..1e32b5510 100644
--- a/parquet/src/arrow/arrow_reader/selection.rs
+++ b/parquet/src/arrow/arrow_reader/selection.rs
@@ -19,6 +19,7 @@ use arrow_array::{Array, BooleanArray};
 use arrow_select::filter::SlicesIterator;
 use std::cmp::Ordering;
 use std::collections::VecDeque;
+use std::mem;
 use std::ops::Range;
 
 /// [`RowSelection`] is a collection of [`RowSelector`] used to skip rows when
@@ -110,8 +111,18 @@ impl RowSelection {
         Self::from_consecutive_ranges(iter, total_rows)
     }
 
+    /// Creates a [`RowSelection`] that will select `limit` rows and skip all remaining rows.
+    pub(crate) fn from_limit(limit: usize, total_rows: usize) -> Self {
+        Self {
+            selectors: vec![
+                RowSelector::select(limit),
+                RowSelector::skip(total_rows.saturating_sub(limit)),
+            ],
+        }
+    }
+
     /// Creates a [`RowSelection`] from an iterator of consecutive ranges to keep
-    fn from_consecutive_ranges<I: Iterator<Item = Range<usize>>>(
+    pub(crate) fn from_consecutive_ranges<I: Iterator<Item = Range<usize>>>(
         ranges: I,
         total_rows: usize,
     ) -> Self {
@@ -371,6 +382,35 @@ impl RowSelection {
         self
     }
 
+    /// Limit this [`RowSelection`] to only select `limit` rows
+    pub(crate) fn limit(mut self, mut limit: usize) -> Self {
+        let mut remaining = 0;
+        let mut new_selectors = Vec::with_capacity(self.selectors.len());
+        for selection in mem::take(&mut self.selectors) {
+            if limit == 0 {
+                remaining += selection.row_count;
+            } else if !selection.skip {
+                if selection.row_count > limit {
+                    remaining += selection.row_count - limit;
+                    new_selectors.push(RowSelector::select(limit));
+                    limit = 0;
+                } else {
+                    limit -= selection.row_count;
+                    new_selectors.push(selection);
+                }
+            } else {
+                new_selectors.push(selection);
+            }
+        }
+
+        if remaining > 0 {
+            new_selectors.push(RowSelector::skip(remaining));
+        }
+
+        self.selectors = new_selectors;
+        self
+    }
+
     /// Returns an iterator over the [`RowSelector`]s for this
     /// [`RowSelection`].
     pub fn iter(&self) -> impl Iterator<Item = &RowSelector> {
@@ -841,6 +881,59 @@ mod tests {
         assert_eq!(selectors, round_tripped);
     }
 
+    #[test]
+    fn test_limit() {
+        // Limit to existing limit should no-op
+        let selection = RowSelection::from_limit(10, 100);
+        let limited = selection.clone().limit(10);
+        assert_eq!(selection, limited);
+
+        let selection = RowSelection::from(vec![
+            RowSelector::select(10),
+            RowSelector::skip(10),
+            RowSelector::select(10),
+            RowSelector::skip(10),
+            RowSelector::select(10),
+        ]);
+
+        let limited = selection.clone().limit(5);
+        let expected = vec![RowSelector::select(5), RowSelector::skip(45)];
+        assert_eq!(limited.selectors, expected);
+
+        let limited = selection.clone().limit(15);
+        let expected = vec![
+            RowSelector::select(10),
+            RowSelector::skip(10),
+            RowSelector::select(5),
+            RowSelector::skip(25),
+        ];
+        assert_eq!(limited.selectors, expected);
+
+        let limited = selection.clone().limit(0);
+        let expected = vec![RowSelector::skip(50)];
+        assert_eq!(limited.selectors, expected);
+
+        let limited = selection.clone().limit(30);
+        let expected = vec![
+            RowSelector::select(10),
+            RowSelector::skip(10),
+            RowSelector::select(10),
+            RowSelector::skip(10),
+            RowSelector::select(10),
+        ];
+        assert_eq!(limited.selectors, expected);
+
+        let limited = selection.limit(100);
+        let expected = vec![
+            RowSelector::select(10),
+            RowSelector::skip(10),
+            RowSelector::select(10),
+            RowSelector::skip(10),
+            RowSelector::select(10),
+        ];
+        assert_eq!(limited.selectors, expected);
+    }
+
     #[test]
     fn test_scan_ranges() {
         let index = vec![