You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/11/07 11:35:52 UTC

(arrow-rs) branch master updated: Fix RowSelection::intersection (#5036) (#5041)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 20f10dcd21 Fix RowSelection::intersection (#5036) (#5041)
20f10dcd21 is described below

commit 20f10dcd2159199e36d128a2143eca48ae7438bb
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Tue Nov 7 11:35:45 2023 +0000

    Fix RowSelection::intersection (#5036) (#5041)
    
    * Fix RowSelection::intersection (#5036)
    
    * Review feedback
---
 parquet/src/arrow/arrow_reader/selection.rs | 282 ++++++++++++++++------------
 1 file changed, 167 insertions(+), 115 deletions(-)

diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs
index 5063d24afd..cebf3f9d38 100644
--- a/parquet/src/arrow/arrow_reader/selection.rs
+++ b/parquet/src/arrow/arrow_reader/selection.rs
@@ -64,25 +64,30 @@ impl RowSelector {
 /// use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
 ///
 /// let selectors = vec![
-///   RowSelector { row_count: 5, skip: true },
-///   RowSelector { row_count: 5, skip: false },
-///   RowSelector { row_count: 5, skip: false },
-///   RowSelector { row_count: 5, skip: true },
+///     RowSelector::skip(5),
+///     RowSelector::select(5),
+///     RowSelector::select(5),
+///     RowSelector::skip(5),
 /// ];
 ///
 /// // Creating a selection will combine adjacent selectors
 /// let selection: RowSelection = selectors.into();
 ///
 /// let expected = vec![
-///   RowSelector { row_count: 5, skip: true },
-///   RowSelector { row_count: 10, skip: false },
-///   RowSelector { row_count: 5, skip: true },
+///     RowSelector::skip(5),
+///     RowSelector::select(10),
+///     RowSelector::skip(5),
 /// ];
 ///
 /// let actual: Vec<RowSelector> = selection.into();
 /// assert_eq!(actual, expected);
 /// ```
 ///
+/// A [`RowSelection`] maintains the following invariants:
+///
+/// * It contains no [`RowSelector`] of 0 rows
+/// * Consecutive [`RowSelector`]s alternate skipping or selecting rows
+///
 /// [`PageIndex`]: crate::file::page_index::index::PageIndex
 #[derive(Debug, Clone, Default, Eq, PartialEq)]
 pub struct RowSelection {
@@ -118,10 +123,13 @@ impl RowSelection {
         let mut last_end = 0;
         for range in ranges {
             let len = range.end - range.start;
+            if len == 0 {
+                continue;
+            }
 
             match range.start.cmp(&last_end) {
                 Ordering::Equal => match selectors.last_mut() {
-                    Some(last) => last.row_count += len,
+                    Some(last) => last.row_count = last.row_count.checked_add(len).unwrap(),
                     None => selectors.push(RowSelector::select(len)),
                 },
                 Ordering::Greater => {
@@ -140,38 +148,6 @@ impl RowSelection {
         Self { selectors }
     }
 
-    /// Creates a [`RowSelection`] from a slice of uncombined `RowSelector`:
-    /// Like [skip(5),skip(5),read(10)].
-    /// After combine will return [skip(10),read(10)]
-    /// # Note
-    ///  [`RowSelection`] must be combined prior to use within offset_index or else the code will panic.
-    fn from_selectors_and_combine(selectors: &[RowSelector]) -> Self {
-        if selectors.len() < 2 {
-            return Self {
-                selectors: Vec::from(selectors),
-            };
-        }
-        let first = selectors.first().unwrap();
-        let mut sum_rows = first.row_count;
-        let mut skip = first.skip;
-        let mut combined_result = vec![];
-
-        for s in selectors.iter().skip(1) {
-            if s.skip == skip {
-                sum_rows += s.row_count
-            } else {
-                add_selector(skip, sum_rows, &mut combined_result);
-                sum_rows = s.row_count;
-                skip = s.skip;
-            }
-        }
-        add_selector(skip, sum_rows, &mut combined_result);
-
-        Self {
-            selectors: combined_result,
-        }
-    }
-
     /// Given an offset index, return the byte ranges for all data pages selected by `self`
     ///
     /// This is useful for determining what byte ranges to fetch from underlying storage
@@ -351,9 +327,7 @@ impl RowSelection {
     ///
     /// returned:  NNNNNNNNYYNYN
     pub fn intersection(&self, other: &Self) -> Self {
-        Self {
-            selectors: intersect_row_selections(&self.selectors, &other.selectors),
-        }
+        intersect_row_selections(&self.selectors, &other.selectors)
     }
 
     /// Returns `true` if this [`RowSelection`] selects any rows
@@ -443,7 +417,37 @@ impl RowSelection {
 
 impl From<Vec<RowSelector>> for RowSelection {
     fn from(selectors: Vec<RowSelector>) -> Self {
-        Self::from_selectors_and_combine(selectors.as_slice())
+        selectors.into_iter().collect()
+    }
+}
+
+impl FromIterator<RowSelector> for RowSelection {
+    fn from_iter<T: IntoIterator<Item = RowSelector>>(iter: T) -> Self {
+        let iter = iter.into_iter();
+
+        // Capacity before filter
+        let mut selectors = Vec::with_capacity(iter.size_hint().0);
+
+        let mut filtered = iter.filter(|x| x.row_count != 0);
+        if let Some(x) = filtered.next() {
+            selectors.push(x);
+        }
+
+        for s in filtered {
+            if s.row_count == 0 {
+                continue;
+            }
+
+            // Combine consecutive selectors
+            let last = selectors.last_mut().unwrap();
+            if last.skip == s.skip {
+                last.row_count = last.row_count.checked_add(s.row_count).unwrap();
+            } else {
+                selectors.push(s)
+            }
+        }
+
+        Self { selectors }
     }
 }
 
@@ -465,64 +469,58 @@ impl From<RowSelection> for VecDeque<RowSelector> {
 /// other:     NYNNNNNNY
 ///
 /// returned:  NNNNNNNNYYNYN
-fn intersect_row_selections(left: &[RowSelector], right: &[RowSelector]) -> Vec<RowSelector> {
-    let mut res = Vec::with_capacity(left.len());
+fn intersect_row_selections(left: &[RowSelector], right: &[RowSelector]) -> RowSelection {
     let mut l_iter = left.iter().copied().peekable();
     let mut r_iter = right.iter().copied().peekable();
 
-    while let (Some(a), Some(b)) = (l_iter.peek_mut(), r_iter.peek_mut()) {
-        if a.row_count == 0 {
-            l_iter.next().unwrap();
-            continue;
-        }
-        if b.row_count == 0 {
-            r_iter.next().unwrap();
-            continue;
-        }
-        match (a.skip, b.skip) {
-            // Keep both ranges
-            (false, false) => {
-                if a.row_count < b.row_count {
-                    res.push(RowSelector::select(a.row_count));
-                    b.row_count -= a.row_count;
+    let iter = std::iter::from_fn(move || {
+        loop {
+            let l = l_iter.peek_mut();
+            let r = r_iter.peek_mut();
+
+            match (l, r) {
+                (Some(a), _) if a.row_count == 0 => {
                     l_iter.next().unwrap();
-                } else {
-                    res.push(RowSelector::select(b.row_count));
-                    a.row_count -= b.row_count;
-                    r_iter.next().unwrap();
                 }
-            }
-            // skip at least one
-            _ => {
-                if a.row_count < b.row_count {
-                    res.push(RowSelector::skip(a.row_count));
-                    b.row_count -= a.row_count;
-                    l_iter.next().unwrap();
-                } else {
-                    res.push(RowSelector::skip(b.row_count));
-                    a.row_count -= b.row_count;
+                (_, Some(b)) if b.row_count == 0 => {
                     r_iter.next().unwrap();
                 }
+                (Some(l), Some(r)) => {
+                    return match (l.skip, r.skip) {
+                        // Keep both ranges
+                        (false, false) => {
+                            if l.row_count < r.row_count {
+                                r.row_count -= l.row_count;
+                                l_iter.next()
+                            } else {
+                                l.row_count -= r.row_count;
+                                r_iter.next()
+                            }
+                        }
+                        // skip at least one
+                        _ => {
+                            if l.row_count < r.row_count {
+                                let skip = l.row_count;
+                                r.row_count -= l.row_count;
+                                l_iter.next();
+                                Some(RowSelector::skip(skip))
+                            } else {
+                                let skip = r.row_count;
+                                l.row_count -= skip;
+                                r_iter.next();
+                                Some(RowSelector::skip(skip))
+                            }
+                        }
+                    };
+                }
+                (Some(_), None) => return l_iter.next(),
+                (None, Some(_)) => return r_iter.next(),
+                (None, None) => return None,
             }
         }
-    }
-
-    if l_iter.peek().is_some() {
-        res.extend(l_iter);
-    }
-    if r_iter.peek().is_some() {
-        res.extend(r_iter);
-    }
-    res
-}
+    });
 
-fn add_selector(skip: bool, sum_row: usize, combined_result: &mut Vec<RowSelector>) {
-    let selector = if skip {
-        RowSelector::skip(sum_row)
-    } else {
-        RowSelector::select(sum_row)
-    };
-    combined_result.push(selector);
+    iter.collect()
 }
 
 #[cfg(test)]
@@ -770,40 +768,28 @@ mod tests {
             RowSelector::skip(4),
         ]);
 
-        assert_eq!(RowSelection::from_selectors_and_combine(&a), expected);
-        assert_eq!(RowSelection::from_selectors_and_combine(&b), expected);
-        assert_eq!(RowSelection::from_selectors_and_combine(&c), expected);
+        assert_eq!(RowSelection::from_iter(a), expected);
+        assert_eq!(RowSelection::from_iter(b), expected);
+        assert_eq!(RowSelection::from_iter(c), expected);
     }
 
     #[test]
     fn test_combine_2elements() {
         let a = vec![RowSelector::select(10), RowSelector::select(5)];
         let a_expect = vec![RowSelector::select(15)];
-        assert_eq!(
-            RowSelection::from_selectors_and_combine(&a).selectors,
-            a_expect
-        );
+        assert_eq!(RowSelection::from_iter(a).selectors, a_expect);
 
         let b = vec![RowSelector::select(10), RowSelector::skip(5)];
         let b_expect = vec![RowSelector::select(10), RowSelector::skip(5)];
-        assert_eq!(
-            RowSelection::from_selectors_and_combine(&b).selectors,
-            b_expect
-        );
+        assert_eq!(RowSelection::from_iter(b).selectors, b_expect);
 
         let c = vec![RowSelector::skip(10), RowSelector::select(5)];
         let c_expect = vec![RowSelector::skip(10), RowSelector::select(5)];
-        assert_eq!(
-            RowSelection::from_selectors_and_combine(&c).selectors,
-            c_expect
-        );
+        assert_eq!(RowSelection::from_iter(c).selectors, c_expect);
 
         let d = vec![RowSelector::skip(10), RowSelector::skip(5)];
         let d_expect = vec![RowSelector::skip(15)];
-        assert_eq!(
-            RowSelection::from_selectors_and_combine(&d).selectors,
-            d_expect
-        );
+        assert_eq!(RowSelection::from_iter(d).selectors, d_expect);
     }
 
     #[test]
@@ -859,7 +845,7 @@ mod tests {
 
         let res = intersect_row_selections(&a, &b);
         assert_eq!(
-            RowSelection::from_selectors_and_combine(&res).selectors,
+            res.selectors,
             vec![
                 RowSelector::select(5),
                 RowSelector::skip(4),
@@ -877,7 +863,7 @@ mod tests {
         let b = vec![RowSelector::select(36), RowSelector::skip(36)];
         let res = intersect_row_selections(&a, &b);
         assert_eq!(
-            RowSelection::from_selectors_and_combine(&res).selectors,
+            res.selectors,
             vec![RowSelector::select(3), RowSelector::skip(69)]
         );
 
@@ -892,7 +878,7 @@ mod tests {
         ];
         let res = intersect_row_selections(&a, &b);
         assert_eq!(
-            RowSelection::from_selectors_and_combine(&res).selectors,
+            res.selectors,
             vec![RowSelector::select(2), RowSelector::skip(8)]
         );
 
@@ -906,7 +892,7 @@ mod tests {
         ];
         let res = intersect_row_selections(&a, &b);
         assert_eq!(
-            RowSelection::from_selectors_and_combine(&res).selectors,
+            res.selectors,
             vec![RowSelector::select(2), RowSelector::skip(8)]
         );
     }
@@ -1142,4 +1128,70 @@ mod tests {
         // assert_eq!(mask, vec![false, true, true, false, true, true, true]);
         assert_eq!(ranges, vec![10..20, 20..30, 30..40]);
     }
+
+    #[test]
+    fn test_empty_ranges() {
+        let ranges = [1..3, 4..6, 6..6, 8..8, 9..10];
+        let selection = RowSelection::from_consecutive_ranges(ranges.into_iter(), 10);
+        assert_eq!(
+            selection.selectors,
+            vec![
+                RowSelector::skip(1),
+                RowSelector::select(2),
+                RowSelector::skip(1),
+                RowSelector::select(2),
+                RowSelector::skip(3),
+                RowSelector::select(1)
+            ]
+        )
+    }
+
+    #[test]
+    fn test_empty_selector() {
+        let selection = RowSelection::from(vec![
+            RowSelector::skip(0),
+            RowSelector::select(2),
+            RowSelector::skip(0),
+            RowSelector::select(2),
+        ]);
+        assert_eq!(selection.selectors, vec![RowSelector::select(4)]);
+
+        let selection = RowSelection::from(vec![
+            RowSelector::select(0),
+            RowSelector::skip(2),
+            RowSelector::select(0),
+            RowSelector::skip(2),
+        ]);
+        assert_eq!(selection.selectors, vec![RowSelector::skip(4)]);
+    }
+
+    #[test]
+    fn test_intersection() {
+        let selection = RowSelection::from(vec![RowSelector::select(1048576)]);
+        let result = selection.intersection(&selection);
+        assert_eq!(result, selection);
+
+        let a = RowSelection::from(vec![
+            RowSelector::skip(10),
+            RowSelector::select(10),
+            RowSelector::skip(10),
+            RowSelector::select(20),
+        ]);
+
+        let b = RowSelection::from(vec![
+            RowSelector::skip(20),
+            RowSelector::select(20),
+            RowSelector::skip(10),
+        ]);
+
+        let result = a.intersection(&b);
+        assert_eq!(
+            result.selectors,
+            vec![
+                RowSelector::skip(30),
+                RowSelector::select(10),
+                RowSelector::skip(10)
+            ]
+        );
+    }
 }