You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/12/15 19:56:52 UTC

[arrow-rs] branch master updated: Avoid allocating vector of indices in lexicographical_partition_ranges (#998)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new fc343e7  Avoid allocating vector of indices in lexicographical_partition_ranges (#998)
fc343e7 is described below

commit fc343e7c02a05cd7d2a1d401f4e5feeb8d6de179
Author: Jörn Horstmann <gi...@jhorstmann.net>
AuthorDate: Wed Dec 15 20:56:45 2021 +0100

    Avoid allocating vector of indices in lexicographical_partition_ranges (#998)
    
    * Avoid allocating vector of indices in lexicographical_partition_ranges
    
    * Adjust comments
    
    * Improve comments and remove one unneeded parameter
---
 arrow/src/compute/kernels/partition.rs | 133 ++++++++++++++++++++++++++-------
 1 file changed, 108 insertions(+), 25 deletions(-)

diff --git a/arrow/src/compute/kernels/partition.rs b/arrow/src/compute/kernels/partition.rs
index 8a16942..991aa7b 100644
--- a/arrow/src/compute/kernels/partition.rs
+++ b/arrow/src/compute/kernels/partition.rs
@@ -44,7 +44,6 @@ struct LexicographicalPartitionIterator<'a> {
     num_rows: usize,
     previous_partition_point: usize,
     partition_point: usize,
-    value_indices: Vec<usize>,
 }
 
 impl<'a> LexicographicalPartitionIterator<'a> {
@@ -62,39 +61,78 @@ impl<'a> LexicographicalPartitionIterator<'a> {
         };
 
         let comparator = LexicographicalComparator::try_new(columns)?;
-        let value_indices = (0..num_rows).collect::<Vec<usize>>();
         Ok(LexicographicalPartitionIterator {
             comparator,
             num_rows,
             previous_partition_point: 0,
             partition_point: 0,
-            value_indices,
         })
     }
 }
 
-/// Exponential search is to remedy for the case when array size and cardinality are both large
+/// Returns the next partition point of the range `start..end` according to the given comparator.
+/// The return value is the index of the first element of the second partition,
+/// and is guaranteed to be between `start..=end` (inclusive).
+///
+/// The values corresponding to those indices are assumed to be partitioned according to the given comparator.
+///
+/// Exponential search is to remedy for the case when array size and cardinality are both large.
+/// In these cases the partition point would be near the beginning of the range and
+/// plain binary search would be doing some unnecessary iterations on each call.
+///
 /// see <https://en.wikipedia.org/wiki/Exponential_search>
 #[inline]
-fn exponential_search(
-    indices: &[usize],
-    target: &usize,
+fn exponential_search_next_partition_point(
+    start: usize,
+    end: usize,
     comparator: &LexicographicalComparator<'_>,
 ) -> usize {
+    let target = start;
     let mut bound = 1;
-    while bound < indices.len()
-        && comparator.compare(&indices[bound], target) != Ordering::Greater
+    while bound + start < end
+        && comparator.compare(&(bound + start), &target) != Ordering::Greater
     {
         bound *= 2;
     }
+
     // invariant after while loop:
-    // indices[bound / 2] <= target < indices[min(indices.len(), bound + 1)]
+    // (start + bound / 2) <= target < min(end, start + bound + 1)
     // where <= and < are defined by the comparator;
-    // note here we have right = min(indices.len(), bound + 1) because indices[bound] might
+    // note here we have right = min(end, start + bound + 1) because (start + bound) might
     // actually be considered and must be included.
-    (bound / 2)
-        + indices[(bound / 2)..indices.len().min(bound + 1)]
-            .partition_point(|idx| comparator.compare(idx, target) != Ordering::Greater)
+    partition_point(start + bound / 2, end.min(start + bound + 1), |idx| {
+        comparator.compare(&idx, &target) != Ordering::Greater
+    })
+}
+
+/// Returns the partition point of the range `start..end` according to the given predicate.
+/// The return value is the index of the first element of the second partition,
+/// and is guaranteed to be between `start..=end` (inclusive).
+///
+/// The algorithm is similar to a binary search.
+///
+/// The values corresponding to those indices are assumed to be partitioned according to the given predicate.
+///
+/// See [`std::slice::partition_point`]
+#[inline]
+fn partition_point<P: Fn(usize) -> bool>(start: usize, end: usize, pred: P) -> usize {
+    let mut left = start;
+    let mut right = end;
+    let mut size = right - left;
+    while left < right {
+        let mid = left + size / 2;
+
+        let less = pred(mid);
+
+        if less {
+            left = mid + 1;
+        } else {
+            right = mid;
+        }
+
+        size = right - left;
+    }
+    left
 }
 
 impl<'a> Iterator for LexicographicalPartitionIterator<'a> {
@@ -103,17 +141,12 @@ impl<'a> Iterator for LexicographicalPartitionIterator<'a> {
     fn next(&mut self) -> Option<Self::Item> {
         if self.partition_point < self.num_rows {
             // invariant:
-            // value_indices[0..previous_partition_point] all are values <= value_indices[previous_partition_point]
-            // so in order to save time we can do binary search on the value_indices[previous_partition_point..]
-            // and find when any value is greater than value_indices[previous_partition_point]; because we are using
-            // new indices, the new offset is _added_ to the previous_partition_point.
-            //
-            // be careful that idx is of type &usize which points to the actual value within value_indices, which itself
-            // contains usize (0..row_count), providing access to lexicographical_comparator as pointers into the
-            // original columnar data.
-            self.partition_point += exponential_search(
-                &self.value_indices[self.partition_point..],
-                &self.partition_point,
+            // in the range [0..previous_partition_point] all values are <= the value at [previous_partition_point]
+            // so in order to save time we can do binary search on the range [previous_partition_point..num_rows]
+            // and find the index where any value is greater than the value at [previous_partition_point]
+            self.partition_point = exponential_search_next_partition_point(
+                self.partition_point,
+                self.num_rows,
                 &self.comparator,
             );
             let start = self.previous_partition_point;
@@ -135,6 +168,56 @@ mod tests {
     use std::sync::Arc;
 
     #[test]
+    fn test_partition_point() {
+        let input = &[1, 1, 1, 2, 2, 2, 2, 2, 2, 3, 3, 3, 4];
+        {
+            let median = input[input.len() / 2];
+            assert_eq!(
+                9,
+                partition_point(
+                    0,
+                    input.len(),
+                    &(|i: usize| input[i].cmp(&median) != Ordering::Greater)
+                )
+            );
+        }
+        {
+            let search = input[9];
+            assert_eq!(
+                12,
+                partition_point(
+                    9,
+                    input.len(),
+                    &(|i: usize| input[i].cmp(&search) != Ordering::Greater)
+                )
+            );
+        }
+        {
+            let search = input[0];
+            assert_eq!(
+                3,
+                partition_point(
+                    0,
+                    9,
+                    &(|i: usize| input[i].cmp(&search) != Ordering::Greater)
+                )
+            );
+        }
+        let input = &[1, 2, 2, 2, 2, 2, 2, 2, 9];
+        {
+            let search = input[5];
+            assert_eq!(
+                8,
+                partition_point(
+                    5,
+                    9,
+                    &(|i: usize| input[i].cmp(&search) != Ordering::Greater)
+                )
+            );
+        }
+    }
+
+    #[test]
     fn test_lexicographical_partition_ranges_empty() {
         let input = vec![];
         assert!(