You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/11/10 21:34:35 UTC
[arrow-rs] branch master updated: Move `intersect_row_selections` from datafusion to arrow-rs. (#3047)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 9f1468331 Move `intersect_row_selections` from datafusion to arrow-rs. (#3047)
9f1468331 is described below
commit 9f14683313bd87e72344cdeb6b35201943fdbcb4
Author: Yang Jiang <ya...@ebay.com>
AuthorDate: Fri Nov 11 05:34:29 2022 +0800
Move `intersect_row_selections` from datafusion to arrow-rs. (#3047)
* Add `RowSelection::intersect_row_selections` from datafusion.
* fix pub issue
---
parquet/src/arrow/arrow_reader/mod.rs | 2 +-
parquet/src/arrow/arrow_reader/selection.rs | 128 ++++++++++++++++++++++++++++
2 files changed, 129 insertions(+), 1 deletion(-)
diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs
index 35b70a048..1f841a0ee 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -41,7 +41,7 @@ mod filter;
mod selection;
pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
-pub use selection::{RowSelection, RowSelector};
+pub use selection::{intersect_row_selections, RowSelection, RowSelector};
/// A generic builder for constructing sync or async arrow parquet readers. This is not intended
/// to be used directly, instead you should use the specialization for the type of reader
diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs
index 357960906..e01c584b6 100644
--- a/parquet/src/arrow/arrow_reader/selection.rs
+++ b/parquet/src/arrow/arrow_reader/selection.rs
@@ -349,6 +349,66 @@ impl From<RowSelection> for VecDeque<RowSelector> {
}
}
+// Combine two lists of `RowSelection` return the intersection of them
+// For example:
+// self: NNYYYYNNYYNYN
+// other: NYNNNNNNY
+//
+// returned: NNNNNNNNYYNYN
+pub fn intersect_row_selections(
+ left: Vec<RowSelector>,
+ right: Vec<RowSelector>,
+) -> Vec<RowSelector> {
+ let mut res = Vec::with_capacity(left.len());
+ let mut l_iter = left.into_iter().peekable();
+ let mut r_iter = right.into_iter().peekable();
+
+ while let (Some(a), Some(b)) = (l_iter.peek_mut(), r_iter.peek_mut()) {
+ if a.row_count == 0 {
+ l_iter.next().unwrap();
+ continue;
+ }
+ if b.row_count == 0 {
+ r_iter.next().unwrap();
+ continue;
+ }
+ match (a.skip, b.skip) {
+ // Keep both ranges
+ (false, false) => {
+ if a.row_count < b.row_count {
+ res.push(RowSelector::select(a.row_count));
+ b.row_count -= a.row_count;
+ l_iter.next().unwrap();
+ } else {
+ res.push(RowSelector::select(b.row_count));
+ a.row_count -= b.row_count;
+ r_iter.next().unwrap();
+ }
+ }
+ // skip at least one
+ _ => {
+ if a.row_count < b.row_count {
+ res.push(RowSelector::skip(a.row_count));
+ b.row_count -= a.row_count;
+ l_iter.next().unwrap();
+ } else {
+ res.push(RowSelector::skip(b.row_count));
+ a.row_count -= b.row_count;
+ r_iter.next().unwrap();
+ }
+ }
+ }
+ }
+
+ if l_iter.peek().is_some() {
+ res.extend(l_iter);
+ }
+ if r_iter.peek().is_some() {
+ res.extend(r_iter);
+ }
+ res
+}
+
fn add_selector(skip: bool, sum_row: usize, combined_result: &mut Vec<RowSelector>) {
let selector = if skip {
RowSelector::skip(sum_row)
@@ -618,6 +678,74 @@ mod tests {
a.and_then(&b);
}
+ #[test]
+ fn test_intersect_row_selection_and_combine() {
+ // a size equal b size
+ let a = vec![
+ RowSelector::select(5),
+ RowSelector::skip(4),
+ RowSelector::select(1),
+ ];
+ let b = vec![
+ RowSelector::select(8),
+ RowSelector::skip(1),
+ RowSelector::select(1),
+ ];
+
+ let res = intersect_row_selections(a, b);
+ assert_eq!(
+ RowSelection::from_selectors_and_combine(&res).selectors,
+ vec![
+ RowSelector::select(5),
+ RowSelector::skip(4),
+ RowSelector::select(1),
+ ],
+ );
+
+ // a size larger than b size
+ let a = vec![
+ RowSelector::select(3),
+ RowSelector::skip(33),
+ RowSelector::select(3),
+ RowSelector::skip(33),
+ ];
+ let b = vec![RowSelector::select(36), RowSelector::skip(36)];
+ let res = intersect_row_selections(a, b);
+ assert_eq!(
+ RowSelection::from_selectors_and_combine(&res).selectors,
+ vec![RowSelector::select(3), RowSelector::skip(69)]
+ );
+
+ // a size less than b size
+ let a = vec![RowSelector::select(3), RowSelector::skip(7)];
+ let b = vec![
+ RowSelector::select(2),
+ RowSelector::skip(2),
+ RowSelector::select(2),
+ RowSelector::skip(2),
+ RowSelector::select(2),
+ ];
+ let res = intersect_row_selections(a, b);
+ assert_eq!(
+ RowSelection::from_selectors_and_combine(&res).selectors,
+ vec![RowSelector::select(2), RowSelector::skip(8)]
+ );
+
+ let a = vec![RowSelector::select(3), RowSelector::skip(7)];
+ let b = vec![
+ RowSelector::select(2),
+ RowSelector::skip(2),
+ RowSelector::select(2),
+ RowSelector::skip(2),
+ RowSelector::select(2),
+ ];
+ let res = intersect_row_selections(a, b);
+ assert_eq!(
+ RowSelection::from_selectors_and_combine(&res).selectors,
+ vec![RowSelector::select(2), RowSelector::skip(8)]
+ );
+ }
+
#[test]
fn test_and_fuzz() {
let mut rand = thread_rng();