You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/11/10 21:34:35 UTC

[arrow-rs] branch master updated: Move `intersect_row_selections` from datafusion to arrow-rs. (#3047)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f1468331 Move `intersect_row_selections` from datafusion to arrow-rs. (#3047)
9f1468331 is described below

commit 9f14683313bd87e72344cdeb6b35201943fdbcb4
Author: Yang Jiang <ya...@ebay.com>
AuthorDate: Fri Nov 11 05:34:29 2022 +0800

    Move `intersect_row_selections` from datafusion to arrow-rs. (#3047)
    
    * Add `RowSelection::intersect_row_selections` from datafusion.
    
    * fix pub issue
---
 parquet/src/arrow/arrow_reader/mod.rs       |   2 +-
 parquet/src/arrow/arrow_reader/selection.rs | 128 ++++++++++++++++++++++++++++
 2 files changed, 129 insertions(+), 1 deletion(-)

diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs
index 35b70a048..1f841a0ee 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -41,7 +41,7 @@ mod filter;
 mod selection;
 
 pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
-pub use selection::{RowSelection, RowSelector};
+pub use selection::{intersect_row_selections, RowSelection, RowSelector};
 
 /// A generic builder for constructing sync or async arrow parquet readers. This is not intended
 /// to be used directly, instead you should use the specialization for the type of reader
diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs
index 357960906..e01c584b6 100644
--- a/parquet/src/arrow/arrow_reader/selection.rs
+++ b/parquet/src/arrow/arrow_reader/selection.rs
@@ -349,6 +349,66 @@ impl From<RowSelection> for VecDeque<RowSelector> {
     }
 }
 
+// Combine two lists of `RowSelection` return the intersection of them
+// For example:
+// self:      NNYYYYNNYYNYN
+// other:     NYNNNNNNY
+//
+// returned:  NNNNNNNNYYNYN
+pub fn intersect_row_selections(
+    left: Vec<RowSelector>,
+    right: Vec<RowSelector>,
+) -> Vec<RowSelector> {
+    let mut res = Vec::with_capacity(left.len());
+    let mut l_iter = left.into_iter().peekable();
+    let mut r_iter = right.into_iter().peekable();
+
+    while let (Some(a), Some(b)) = (l_iter.peek_mut(), r_iter.peek_mut()) {
+        if a.row_count == 0 {
+            l_iter.next().unwrap();
+            continue;
+        }
+        if b.row_count == 0 {
+            r_iter.next().unwrap();
+            continue;
+        }
+        match (a.skip, b.skip) {
+            // Keep both ranges
+            (false, false) => {
+                if a.row_count < b.row_count {
+                    res.push(RowSelector::select(a.row_count));
+                    b.row_count -= a.row_count;
+                    l_iter.next().unwrap();
+                } else {
+                    res.push(RowSelector::select(b.row_count));
+                    a.row_count -= b.row_count;
+                    r_iter.next().unwrap();
+                }
+            }
+            // skip at least one
+            _ => {
+                if a.row_count < b.row_count {
+                    res.push(RowSelector::skip(a.row_count));
+                    b.row_count -= a.row_count;
+                    l_iter.next().unwrap();
+                } else {
+                    res.push(RowSelector::skip(b.row_count));
+                    a.row_count -= b.row_count;
+                    r_iter.next().unwrap();
+                }
+            }
+        }
+    }
+
+    if l_iter.peek().is_some() {
+        res.extend(l_iter);
+    }
+    if r_iter.peek().is_some() {
+        res.extend(r_iter);
+    }
+    res
+}
+
 fn add_selector(skip: bool, sum_row: usize, combined_result: &mut Vec<RowSelector>) {
     let selector = if skip {
         RowSelector::skip(sum_row)
@@ -618,6 +678,74 @@ mod tests {
         a.and_then(&b);
     }
 
+    #[test]
+    fn test_intersect_row_selection_and_combine() {
+        // a size equal b size
+        let a = vec![
+            RowSelector::select(5),
+            RowSelector::skip(4),
+            RowSelector::select(1),
+        ];
+        let b = vec![
+            RowSelector::select(8),
+            RowSelector::skip(1),
+            RowSelector::select(1),
+        ];
+
+        let res = intersect_row_selections(a, b);
+        assert_eq!(
+            RowSelection::from_selectors_and_combine(&res).selectors,
+            vec![
+                RowSelector::select(5),
+                RowSelector::skip(4),
+                RowSelector::select(1),
+            ],
+        );
+
+        // a size larger than b size
+        let a = vec![
+            RowSelector::select(3),
+            RowSelector::skip(33),
+            RowSelector::select(3),
+            RowSelector::skip(33),
+        ];
+        let b = vec![RowSelector::select(36), RowSelector::skip(36)];
+        let res = intersect_row_selections(a, b);
+        assert_eq!(
+            RowSelection::from_selectors_and_combine(&res).selectors,
+            vec![RowSelector::select(3), RowSelector::skip(69)]
+        );
+
+        // a size less than b size
+        let a = vec![RowSelector::select(3), RowSelector::skip(7)];
+        let b = vec![
+            RowSelector::select(2),
+            RowSelector::skip(2),
+            RowSelector::select(2),
+            RowSelector::skip(2),
+            RowSelector::select(2),
+        ];
+        let res = intersect_row_selections(a, b);
+        assert_eq!(
+            RowSelection::from_selectors_and_combine(&res).selectors,
+            vec![RowSelector::select(2), RowSelector::skip(8)]
+        );
+
+        let a = vec![RowSelector::select(3), RowSelector::skip(7)];
+        let b = vec![
+            RowSelector::select(2),
+            RowSelector::skip(2),
+            RowSelector::select(2),
+            RowSelector::skip(2),
+            RowSelector::select(2),
+        ];
+        let res = intersect_row_selections(a, b);
+        assert_eq!(
+            RowSelection::from_selectors_and_combine(&res).selectors,
+            vec![RowSelector::select(2), RowSelector::skip(8)]
+        );
+    }
+
     #[test]
     fn test_and_fuzz() {
         let mut rand = thread_rng();