You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by th...@apache.org on 2023/01/30 11:06:22 UTC
[arrow-rs] 01/01: Add limit to ArrowReaderBuilder to push limit down to parquet reader
This is an automated email from the ASF dual-hosted git repository.
thinkharderdev pushed a commit to branch parquet-limit-pushdown
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
commit a6b65fbcbd8846d93dd761fd61d94d023511586f
Author: Dan Harris <da...@thinkharder.dev>
AuthorDate: Mon Jan 30 13:02:30 2023 +0200
Add limit to ArrowReaderBuilder to push limit down to parquet reader
---
parquet/src/arrow/arrow_reader/mod.rs | 56 ++++++++++++++++-
parquet/src/arrow/arrow_reader/selection.rs | 95 ++++++++++++++++++++++++++++-
2 files changed, 149 insertions(+), 2 deletions(-)
diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs
index 87165ef8e..70ab098ab 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -69,6 +69,8 @@ pub struct ArrowReaderBuilder<T> {
pub(crate) filter: Option<RowFilter>,
pub(crate) selection: Option<RowSelection>,
+
+ pub(crate) limit: Option<usize>,
}
impl<T> ArrowReaderBuilder<T> {
@@ -98,6 +100,7 @@ impl<T> ArrowReaderBuilder<T> {
projection: ProjectionMask::all(),
filter: None,
selection: None,
+ limit: None,
})
}
@@ -167,6 +170,17 @@ impl<T> ArrowReaderBuilder<T> {
..self
}
}
+
+ /// Provide a limit to the number of rows to be read
+ ///
+ /// The limit will be used to generate an `RowSelection` so only `limit`
+ /// rows are decoded
+ pub fn with_limit(self, limit: usize) -> Self {
+ Self {
+ limit: Some(limit),
+ ..self
+ }
+ }
}
/// Arrow reader api.
@@ -453,6 +467,17 @@ impl<T: ChunkReader + 'static> ArrowReaderBuilder<SyncReader<T>> {
selection = Some(RowSelection::from(vec![]));
}
+ // If a limit is defined, apply it to the final `RowSelection`
+ if let Some(limit) = self.limit {
+ selection = Some(
+ selection
+ .map(|selection| selection.limit(limit))
+ .unwrap_or_else(|| {
+ RowSelection::from_limit(limit, reader.num_rows())
+ }),
+ );
+ }
+
Ok(ParquetRecordBatchReader::new(
batch_size,
array_reader,
@@ -1215,6 +1240,8 @@ mod tests {
row_selections: Option<(RowSelection, usize)>,
/// row filter
row_filter: Option<Vec<bool>>,
+ /// limit
+ limit: Option<usize>,
}
/// Manually implement this to avoid printing entire contents of row_selections and row_filter
@@ -1233,6 +1260,7 @@ mod tests {
.field("encoding", &self.encoding)
.field("row_selections", &self.row_selections.is_some())
.field("row_filter", &self.row_filter.is_some())
+ .field("limit", &self.limit)
.finish()
}
}
@@ -1252,6 +1280,7 @@ mod tests {
encoding: Encoding::PLAIN,
row_selections: None,
row_filter: None,
+ limit: None,
}
}
}
@@ -1323,6 +1352,13 @@ mod tests {
}
}
+ fn with_limit(self, limit: usize) -> Self {
+ Self {
+ limit: Some(limit),
+ ..self
+ }
+ }
+
fn writer_props(&self) -> WriterProperties {
let builder = WriterProperties::builder()
.set_data_pagesize_limit(self.max_data_page_size)
@@ -1381,6 +1417,14 @@ mod tests {
TestOptions::new(2, 256, 127).with_null_percent(0),
// Test optional with nulls
TestOptions::new(2, 256, 93).with_null_percent(25),
+ // Test with limit of 0
+ TestOptions::new(4, 100, 25).with_limit(0),
+ // Test with limit of 50
+ TestOptions::new(4, 100, 25).with_limit(50),
+ // Test with limit equal to number of rows
+ TestOptions::new(4, 100, 25).with_limit(10),
+ // Test with limit larger than number of rows
+ TestOptions::new(4, 100, 25).with_limit(101),
// Test with no page-level statistics
TestOptions::new(2, 256, 91)
.with_null_percent(25)
@@ -1423,6 +1467,11 @@ mod tests {
TestOptions::new(2, 256, 93)
.with_null_percent(25)
.with_row_selections(),
+ // Test optional with nulls
+ TestOptions::new(2, 256, 93)
+ .with_null_percent(25)
+ .with_row_selections()
+ .with_limit(10),
// Test filter
// Test with row filter
@@ -1592,7 +1641,7 @@ mod tests {
}
};
- let expected_data = match opts.row_filter {
+ let mut expected_data = match opts.row_filter {
Some(filter) => {
let expected_data = expected_data
.into_iter()
@@ -1622,6 +1671,11 @@ mod tests {
None => expected_data,
};
+ if let Some(limit) = opts.limit {
+ builder = builder.with_limit(limit);
+ expected_data = expected_data.into_iter().take(limit).collect();
+ }
+
let mut record_reader = builder
.with_batch_size(opts.record_batch_size)
.build()
diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs
index 03c7e01e0..1e32b5510 100644
--- a/parquet/src/arrow/arrow_reader/selection.rs
+++ b/parquet/src/arrow/arrow_reader/selection.rs
@@ -19,6 +19,7 @@ use arrow_array::{Array, BooleanArray};
use arrow_select::filter::SlicesIterator;
use std::cmp::Ordering;
use std::collections::VecDeque;
+use std::mem;
use std::ops::Range;
/// [`RowSelection`] is a collection of [`RowSelector`] used to skip rows when
@@ -110,8 +111,18 @@ impl RowSelection {
Self::from_consecutive_ranges(iter, total_rows)
}
+ /// Creates a [`RowSelection`] that will select `limit` rows and skip all remaining rows.
+ pub(crate) fn from_limit(limit: usize, total_rows: usize) -> Self {
+ Self {
+ selectors: vec![
+ RowSelector::select(limit),
+ RowSelector::skip(total_rows.saturating_sub(limit)),
+ ],
+ }
+ }
+
/// Creates a [`RowSelection`] from an iterator of consecutive ranges to keep
- fn from_consecutive_ranges<I: Iterator<Item = Range<usize>>>(
+ pub(crate) fn from_consecutive_ranges<I: Iterator<Item = Range<usize>>>(
ranges: I,
total_rows: usize,
) -> Self {
@@ -371,6 +382,35 @@ impl RowSelection {
self
}
+ /// Limit this [`RowSelection`] to only select `limit` rows
+ pub(crate) fn limit(mut self, mut limit: usize) -> Self {
+ let mut remaining = 0;
+ let mut new_selectors = Vec::with_capacity(self.selectors.len());
+ for selection in mem::take(&mut self.selectors) {
+ if limit == 0 {
+ remaining += selection.row_count;
+ } else if !selection.skip {
+ if selection.row_count > limit {
+ remaining += selection.row_count - limit;
+ new_selectors.push(RowSelector::select(limit));
+ limit = 0;
+ } else {
+ limit -= selection.row_count;
+ new_selectors.push(selection);
+ }
+ } else {
+ new_selectors.push(selection);
+ }
+ }
+
+ if remaining > 0 {
+ new_selectors.push(RowSelector::skip(remaining));
+ }
+
+ self.selectors = new_selectors;
+ self
+ }
+
/// Returns an iterator over the [`RowSelector`]s for this
/// [`RowSelection`].
pub fn iter(&self) -> impl Iterator<Item = &RowSelector> {
@@ -841,6 +881,59 @@ mod tests {
assert_eq!(selectors, round_tripped);
}
+ #[test]
+ fn test_limit() {
+ // Limit to existing limit should no-op
+ let selection = RowSelection::from_limit(10, 100);
+ let limited = selection.clone().limit(10);
+ assert_eq!(selection, limited);
+
+ let selection = RowSelection::from(vec![
+ RowSelector::select(10),
+ RowSelector::skip(10),
+ RowSelector::select(10),
+ RowSelector::skip(10),
+ RowSelector::select(10),
+ ]);
+
+ let limited = selection.clone().limit(5);
+ let expected = vec![RowSelector::select(5), RowSelector::skip(45)];
+ assert_eq!(limited.selectors, expected);
+
+ let limited = selection.clone().limit(15);
+ let expected = vec![
+ RowSelector::select(10),
+ RowSelector::skip(10),
+ RowSelector::select(5),
+ RowSelector::skip(25),
+ ];
+ assert_eq!(limited.selectors, expected);
+
+ let limited = selection.clone().limit(0);
+ let expected = vec![RowSelector::skip(50)];
+ assert_eq!(limited.selectors, expected);
+
+ let limited = selection.clone().limit(30);
+ let expected = vec![
+ RowSelector::select(10),
+ RowSelector::skip(10),
+ RowSelector::select(10),
+ RowSelector::skip(10),
+ RowSelector::select(10),
+ ];
+ assert_eq!(limited.selectors, expected);
+
+ let limited = selection.limit(100);
+ let expected = vec![
+ RowSelector::select(10),
+ RowSelector::skip(10),
+ RowSelector::select(10),
+ RowSelector::skip(10),
+ RowSelector::select(10),
+ ];
+ assert_eq!(limited.selectors, expected);
+ }
+
#[test]
fn test_scan_ranges() {
let index = vec![