You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/11/07 11:35:52 UTC
(arrow-rs) branch master updated: Fix RowSelection::intersection (#5036) (#5041)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 20f10dcd21 Fix RowSelection::intersection (#5036) (#5041)
20f10dcd21 is described below
commit 20f10dcd2159199e36d128a2143eca48ae7438bb
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Tue Nov 7 11:35:45 2023 +0000
Fix RowSelection::intersection (#5036) (#5041)
* Fix RowSelection::intersection (#5036)
* Review feedback
---
parquet/src/arrow/arrow_reader/selection.rs | 282 ++++++++++++++++------------
1 file changed, 167 insertions(+), 115 deletions(-)
diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs
index 5063d24afd..cebf3f9d38 100644
--- a/parquet/src/arrow/arrow_reader/selection.rs
+++ b/parquet/src/arrow/arrow_reader/selection.rs
@@ -64,25 +64,30 @@ impl RowSelector {
/// use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
///
/// let selectors = vec![
-/// RowSelector { row_count: 5, skip: true },
-/// RowSelector { row_count: 5, skip: false },
-/// RowSelector { row_count: 5, skip: false },
-/// RowSelector { row_count: 5, skip: true },
+/// RowSelector::skip(5),
+/// RowSelector::select(5),
+/// RowSelector::select(5),
+/// RowSelector::skip(5),
/// ];
///
/// // Creating a selection will combine adjacent selectors
/// let selection: RowSelection = selectors.into();
///
/// let expected = vec![
-/// RowSelector { row_count: 5, skip: true },
-/// RowSelector { row_count: 10, skip: false },
-/// RowSelector { row_count: 5, skip: true },
+/// RowSelector::skip(5),
+/// RowSelector::select(10),
+/// RowSelector::skip(5),
/// ];
///
/// let actual: Vec<RowSelector> = selection.into();
/// assert_eq!(actual, expected);
/// ```
///
+/// A [`RowSelection`] maintains the following invariants:
+///
+/// * It contains no [`RowSelector`] of 0 rows
+/// * Consecutive [`RowSelector`]s alternate skipping or selecting rows
+///
/// [`PageIndex`]: crate::file::page_index::index::PageIndex
#[derive(Debug, Clone, Default, Eq, PartialEq)]
pub struct RowSelection {
@@ -118,10 +123,13 @@ impl RowSelection {
let mut last_end = 0;
for range in ranges {
let len = range.end - range.start;
+ if len == 0 {
+ continue;
+ }
match range.start.cmp(&last_end) {
Ordering::Equal => match selectors.last_mut() {
- Some(last) => last.row_count += len,
+ Some(last) => last.row_count = last.row_count.checked_add(len).unwrap(),
None => selectors.push(RowSelector::select(len)),
},
Ordering::Greater => {
@@ -140,38 +148,6 @@ impl RowSelection {
Self { selectors }
}
- /// Creates a [`RowSelection`] from a slice of uncombined `RowSelector`:
- /// Like [skip(5),skip(5),read(10)].
- /// After combine will return [skip(10),read(10)]
- /// # Note
- /// [`RowSelection`] must be combined prior to use within offset_index or else the code will panic.
- fn from_selectors_and_combine(selectors: &[RowSelector]) -> Self {
- if selectors.len() < 2 {
- return Self {
- selectors: Vec::from(selectors),
- };
- }
- let first = selectors.first().unwrap();
- let mut sum_rows = first.row_count;
- let mut skip = first.skip;
- let mut combined_result = vec![];
-
- for s in selectors.iter().skip(1) {
- if s.skip == skip {
- sum_rows += s.row_count
- } else {
- add_selector(skip, sum_rows, &mut combined_result);
- sum_rows = s.row_count;
- skip = s.skip;
- }
- }
- add_selector(skip, sum_rows, &mut combined_result);
-
- Self {
- selectors: combined_result,
- }
- }
-
/// Given an offset index, return the byte ranges for all data pages selected by `self`
///
/// This is useful for determining what byte ranges to fetch from underlying storage
@@ -351,9 +327,7 @@ impl RowSelection {
///
/// returned: NNNNNNNNYYNYN
pub fn intersection(&self, other: &Self) -> Self {
- Self {
- selectors: intersect_row_selections(&self.selectors, &other.selectors),
- }
+ intersect_row_selections(&self.selectors, &other.selectors)
}
/// Returns `true` if this [`RowSelection`] selects any rows
@@ -443,7 +417,37 @@ impl RowSelection {
impl From<Vec<RowSelector>> for RowSelection {
fn from(selectors: Vec<RowSelector>) -> Self {
- Self::from_selectors_and_combine(selectors.as_slice())
+ selectors.into_iter().collect()
+ }
+}
+
+impl FromIterator<RowSelector> for RowSelection {
+ fn from_iter<T: IntoIterator<Item = RowSelector>>(iter: T) -> Self {
+ let iter = iter.into_iter();
+
+ // Capacity before filter
+ let mut selectors = Vec::with_capacity(iter.size_hint().0);
+
+ let mut filtered = iter.filter(|x| x.row_count != 0);
+ if let Some(x) = filtered.next() {
+ selectors.push(x);
+ }
+
+ for s in filtered {
+ if s.row_count == 0 {
+ continue;
+ }
+
+ // Combine consecutive selectors
+ let last = selectors.last_mut().unwrap();
+ if last.skip == s.skip {
+ last.row_count = last.row_count.checked_add(s.row_count).unwrap();
+ } else {
+ selectors.push(s)
+ }
+ }
+
+ Self { selectors }
}
}
@@ -465,64 +469,58 @@ impl From<RowSelection> for VecDeque<RowSelector> {
/// other: NYNNNNNNY
///
/// returned: NNNNNNNNYYNYN
-fn intersect_row_selections(left: &[RowSelector], right: &[RowSelector]) -> Vec<RowSelector> {
- let mut res = Vec::with_capacity(left.len());
+fn intersect_row_selections(left: &[RowSelector], right: &[RowSelector]) -> RowSelection {
let mut l_iter = left.iter().copied().peekable();
let mut r_iter = right.iter().copied().peekable();
- while let (Some(a), Some(b)) = (l_iter.peek_mut(), r_iter.peek_mut()) {
- if a.row_count == 0 {
- l_iter.next().unwrap();
- continue;
- }
- if b.row_count == 0 {
- r_iter.next().unwrap();
- continue;
- }
- match (a.skip, b.skip) {
- // Keep both ranges
- (false, false) => {
- if a.row_count < b.row_count {
- res.push(RowSelector::select(a.row_count));
- b.row_count -= a.row_count;
+ let iter = std::iter::from_fn(move || {
+ loop {
+ let l = l_iter.peek_mut();
+ let r = r_iter.peek_mut();
+
+ match (l, r) {
+ (Some(a), _) if a.row_count == 0 => {
l_iter.next().unwrap();
- } else {
- res.push(RowSelector::select(b.row_count));
- a.row_count -= b.row_count;
- r_iter.next().unwrap();
}
- }
- // skip at least one
- _ => {
- if a.row_count < b.row_count {
- res.push(RowSelector::skip(a.row_count));
- b.row_count -= a.row_count;
- l_iter.next().unwrap();
- } else {
- res.push(RowSelector::skip(b.row_count));
- a.row_count -= b.row_count;
+ (_, Some(b)) if b.row_count == 0 => {
r_iter.next().unwrap();
}
+ (Some(l), Some(r)) => {
+ return match (l.skip, r.skip) {
+ // Keep both ranges
+ (false, false) => {
+ if l.row_count < r.row_count {
+ r.row_count -= l.row_count;
+ l_iter.next()
+ } else {
+ l.row_count -= r.row_count;
+ r_iter.next()
+ }
+ }
+ // skip at least one
+ _ => {
+ if l.row_count < r.row_count {
+ let skip = l.row_count;
+ r.row_count -= l.row_count;
+ l_iter.next();
+ Some(RowSelector::skip(skip))
+ } else {
+ let skip = r.row_count;
+ l.row_count -= skip;
+ r_iter.next();
+ Some(RowSelector::skip(skip))
+ }
+ }
+ };
+ }
+ (Some(_), None) => return l_iter.next(),
+ (None, Some(_)) => return r_iter.next(),
+ (None, None) => return None,
}
}
- }
-
- if l_iter.peek().is_some() {
- res.extend(l_iter);
- }
- if r_iter.peek().is_some() {
- res.extend(r_iter);
- }
- res
-}
+ });
-fn add_selector(skip: bool, sum_row: usize, combined_result: &mut Vec<RowSelector>) {
- let selector = if skip {
- RowSelector::skip(sum_row)
- } else {
- RowSelector::select(sum_row)
- };
- combined_result.push(selector);
+ iter.collect()
}
#[cfg(test)]
@@ -770,40 +768,28 @@ mod tests {
RowSelector::skip(4),
]);
- assert_eq!(RowSelection::from_selectors_and_combine(&a), expected);
- assert_eq!(RowSelection::from_selectors_and_combine(&b), expected);
- assert_eq!(RowSelection::from_selectors_and_combine(&c), expected);
+ assert_eq!(RowSelection::from_iter(a), expected);
+ assert_eq!(RowSelection::from_iter(b), expected);
+ assert_eq!(RowSelection::from_iter(c), expected);
}
#[test]
fn test_combine_2elements() {
let a = vec![RowSelector::select(10), RowSelector::select(5)];
let a_expect = vec![RowSelector::select(15)];
- assert_eq!(
- RowSelection::from_selectors_and_combine(&a).selectors,
- a_expect
- );
+ assert_eq!(RowSelection::from_iter(a).selectors, a_expect);
let b = vec![RowSelector::select(10), RowSelector::skip(5)];
let b_expect = vec![RowSelector::select(10), RowSelector::skip(5)];
- assert_eq!(
- RowSelection::from_selectors_and_combine(&b).selectors,
- b_expect
- );
+ assert_eq!(RowSelection::from_iter(b).selectors, b_expect);
let c = vec![RowSelector::skip(10), RowSelector::select(5)];
let c_expect = vec![RowSelector::skip(10), RowSelector::select(5)];
- assert_eq!(
- RowSelection::from_selectors_and_combine(&c).selectors,
- c_expect
- );
+ assert_eq!(RowSelection::from_iter(c).selectors, c_expect);
let d = vec![RowSelector::skip(10), RowSelector::skip(5)];
let d_expect = vec![RowSelector::skip(15)];
- assert_eq!(
- RowSelection::from_selectors_and_combine(&d).selectors,
- d_expect
- );
+ assert_eq!(RowSelection::from_iter(d).selectors, d_expect);
}
#[test]
@@ -859,7 +845,7 @@ mod tests {
let res = intersect_row_selections(&a, &b);
assert_eq!(
- RowSelection::from_selectors_and_combine(&res).selectors,
+ res.selectors,
vec![
RowSelector::select(5),
RowSelector::skip(4),
@@ -877,7 +863,7 @@ mod tests {
let b = vec![RowSelector::select(36), RowSelector::skip(36)];
let res = intersect_row_selections(&a, &b);
assert_eq!(
- RowSelection::from_selectors_and_combine(&res).selectors,
+ res.selectors,
vec![RowSelector::select(3), RowSelector::skip(69)]
);
@@ -892,7 +878,7 @@ mod tests {
];
let res = intersect_row_selections(&a, &b);
assert_eq!(
- RowSelection::from_selectors_and_combine(&res).selectors,
+ res.selectors,
vec![RowSelector::select(2), RowSelector::skip(8)]
);
@@ -906,7 +892,7 @@ mod tests {
];
let res = intersect_row_selections(&a, &b);
assert_eq!(
- RowSelection::from_selectors_and_combine(&res).selectors,
+ res.selectors,
vec![RowSelector::select(2), RowSelector::skip(8)]
);
}
@@ -1142,4 +1128,70 @@ mod tests {
// assert_eq!(mask, vec![false, true, true, false, true, true, true]);
assert_eq!(ranges, vec![10..20, 20..30, 30..40]);
}
+
+ #[test]
+ fn test_empty_ranges() {
+ let ranges = [1..3, 4..6, 6..6, 8..8, 9..10];
+ let selection = RowSelection::from_consecutive_ranges(ranges.into_iter(), 10);
+ assert_eq!(
+ selection.selectors,
+ vec![
+ RowSelector::skip(1),
+ RowSelector::select(2),
+ RowSelector::skip(1),
+ RowSelector::select(2),
+ RowSelector::skip(3),
+ RowSelector::select(1)
+ ]
+ )
+ }
+
+ #[test]
+ fn test_empty_selector() {
+ let selection = RowSelection::from(vec![
+ RowSelector::skip(0),
+ RowSelector::select(2),
+ RowSelector::skip(0),
+ RowSelector::select(2),
+ ]);
+ assert_eq!(selection.selectors, vec![RowSelector::select(4)]);
+
+ let selection = RowSelection::from(vec![
+ RowSelector::select(0),
+ RowSelector::skip(2),
+ RowSelector::select(0),
+ RowSelector::skip(2),
+ ]);
+ assert_eq!(selection.selectors, vec![RowSelector::skip(4)]);
+ }
+
+ #[test]
+ fn test_intersection() {
+ let selection = RowSelection::from(vec![RowSelector::select(1048576)]);
+ let result = selection.intersection(&selection);
+ assert_eq!(result, selection);
+
+ let a = RowSelection::from(vec![
+ RowSelector::skip(10),
+ RowSelector::select(10),
+ RowSelector::skip(10),
+ RowSelector::select(20),
+ ]);
+
+ let b = RowSelection::from(vec![
+ RowSelector::skip(20),
+ RowSelector::select(20),
+ RowSelector::skip(10),
+ ]);
+
+ let result = a.intersection(&b);
+ assert_eq!(
+ result.selectors,
+ vec![
+ RowSelector::skip(30),
+ RowSelector::select(10),
+ RowSelector::skip(10)
+ ]
+ );
+ }
}