You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/08/25 11:27:50 UTC

[arrow-rs] branch master updated: Improve coalesce_ranges (#2561) (#2562) (#2563)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 9822d627e Improve coalesce_ranges (#2561) (#2562) (#2563)
9822d627e is described below

commit 9822d627ed2699f0b51294bd58e02f65668aa819
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Thu Aug 25 12:27:45 2022 +0100

    Improve coalesce_ranges (#2561) (#2562) (#2563)
    
    * Improve coalesce_ranges (#2561) (#2562)
    
    * Review feedback
    
    * Tweak fuzz test
---
 object_store/Cargo.toml  |   1 +
 object_store/src/util.rs | 160 +++++++++++++++++++++++++++++++++++------------
 2 files changed, 121 insertions(+), 40 deletions(-)

diff --git a/object_store/Cargo.toml b/object_store/Cargo.toml
index 966c423a7..b5c5ef6ed 100644
--- a/object_store/Cargo.toml
+++ b/object_store/Cargo.toml
@@ -62,3 +62,4 @@ aws = ["cloud"]
 dotenv = "0.15.0"
 tempfile = "3.1.0"
 futures-test = "0.3"
+rand = "0.8"
diff --git a/object_store/src/util.rs b/object_store/src/util.rs
index 1c9521495..2814ca244 100644
--- a/object_store/src/util.rs
+++ b/object_store/src/util.rs
@@ -18,7 +18,7 @@
 //! Common logic for interacting with remote object stores
 use super::Result;
 use bytes::Bytes;
-use futures::{stream::StreamExt, Stream};
+use futures::{stream::StreamExt, Stream, TryStreamExt};
 
 /// Returns the prefix to be passed to an object store
 #[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
@@ -85,21 +85,60 @@ where
 /// will be coalesced into a single request by [`coalesce_ranges`]
 pub const OBJECT_STORE_COALESCE_DEFAULT: usize = 1024 * 1024;
 
-/// Takes a function to fetch ranges and coalesces adjacent ranges if they are
-/// less than `coalesce` bytes apart. Out of order `ranges` are not coalesced
+/// Up to this number of range requests will be performed in parallel by [`coalesce_ranges`]
+pub const OBJECT_STORE_COALESCE_PARALLEL: usize = 10;
+
+/// Takes a function `fetch` that can fetch a range of bytes and uses this to
+/// fetch the provided byte `ranges`
+///
+/// To improve performance it will:
+///
+/// * Combine ranges less than `coalesce` bytes apart into a single call to `fetch`
+/// * Make multiple `fetch` requests in parallel (up to maximum of 10)
+///
 pub async fn coalesce_ranges<F, Fut>(
     ranges: &[std::ops::Range<usize>],
-    mut fetch: F,
+    fetch: F,
     coalesce: usize,
 ) -> Result<Vec<Bytes>>
 where
     F: Send + FnMut(std::ops::Range<usize>) -> Fut,
     Fut: std::future::Future<Output = Result<Bytes>> + Send,
 {
+    let fetch_ranges = merge_ranges(ranges, coalesce);
+
+    let fetched: Vec<_> = futures::stream::iter(fetch_ranges.iter().cloned())
+        .map(fetch)
+        .buffered(OBJECT_STORE_COALESCE_PARALLEL)
+        .try_collect()
+        .await?;
+
+    Ok(ranges
+        .iter()
+        .map(|range| {
+            let idx = fetch_ranges.partition_point(|v| v.start <= range.start) - 1;
+            let fetch_range = &fetch_ranges[idx];
+            let fetch_bytes = &fetched[idx];
+
+            let start = range.start - fetch_range.start;
+            let end = range.end - fetch_range.start;
+            fetch_bytes.slice(start..end)
+        })
+        .collect())
+}
+
+/// Returns a sorted list of ranges that cover `ranges`
+fn merge_ranges(
+    ranges: &[std::ops::Range<usize>],
+    coalesce: usize,
+) -> Vec<std::ops::Range<usize>> {
     if ranges.is_empty() {
-        return Ok(vec![]);
+        return vec![];
     }
 
+    let mut ranges = ranges.to_vec();
+    ranges.sort_unstable_by_key(|range| range.start);
+
     let mut ret = Vec::with_capacity(ranges.len());
     let mut start_idx = 0;
     let mut end_idx = 1;
@@ -110,57 +149,59 @@ where
         while end_idx != ranges.len()
             && ranges[end_idx]
                 .start
-                .checked_sub(ranges[start_idx].end)
+                .checked_sub(range_end)
                 .map(|delta| delta <= coalesce)
-                .unwrap_or(false)
+                .unwrap_or(true)
         {
-            if ranges[end_idx].end > range_end {
-                range_end = ranges[end_idx].end;
-            }
+            range_end = range_end.max(ranges[end_idx].end);
             end_idx += 1;
         }
 
         let start = ranges[start_idx].start;
-        let bytes = fetch(start..range_end).await?;
-        for range in ranges.iter().take(end_idx).skip(start_idx) {
-            ret.push(bytes.slice(range.start - start..range.end - start))
-        }
+        let end = range_end;
+        ret.push(start..end);
+
         start_idx = end_idx;
         end_idx += 1;
     }
-    Ok(ret)
+
+    ret
 }
 
 #[cfg(test)]
 mod tests {
     use super::*;
+    use rand::{thread_rng, Rng};
     use std::ops::Range;
 
+    /// Calls coalesce_ranges and validates the returned data is correct
+    ///
+    /// Returns the fetched ranges
+    async fn do_fetch(ranges: Vec<Range<usize>>, coalesce: usize) -> Vec<Range<usize>> {
+        let max = ranges.iter().map(|x| x.end).max().unwrap_or(0);
+        let src: Vec<_> = (0..max).map(|x| x as u8).collect();
+
+        let mut fetches = vec![];
+        let coalesced = coalesce_ranges(
+            &ranges,
+            |range| {
+                fetches.push(range.clone());
+                futures::future::ready(Ok(Bytes::from(src[range].to_vec())))
+            },
+            coalesce,
+        )
+        .await
+        .unwrap();
+
+        assert_eq!(ranges.len(), coalesced.len());
+        for (range, bytes) in ranges.iter().zip(coalesced) {
+            assert_eq!(bytes.as_ref(), &src[range.clone()]);
+        }
+        fetches
+    }
+
     #[tokio::test]
     async fn test_coalesce_ranges() {
-        let do_fetch = |ranges: Vec<Range<usize>>, coalesce: usize| async move {
-            let max = ranges.iter().map(|x| x.end).max().unwrap_or(0);
-            let src: Vec<_> = (0..max).map(|x| x as u8).collect();
-
-            let mut fetches = vec![];
-            let coalesced = coalesce_ranges(
-                &ranges,
-                |range| {
-                    fetches.push(range.clone());
-                    futures::future::ready(Ok(Bytes::from(src[range].to_vec())))
-                },
-                coalesce,
-            )
-            .await
-            .unwrap();
-
-            assert_eq!(ranges.len(), coalesced.len());
-            for (range, bytes) in ranges.iter().zip(coalesced) {
-                assert_eq!(bytes.as_ref(), &src[range.clone()]);
-            }
-            fetches
-        };
-
         let fetches = do_fetch(vec![], 0).await;
         assert_eq!(fetches, vec![]);
 
@@ -180,12 +221,51 @@ mod tests {
         assert_eq!(fetches, vec![0..1, 56..75]);
 
         let fetches = do_fetch(vec![0..1, 5..6, 7..9, 2..3, 4..6], 1).await;
-        assert_eq!(fetches, vec![0..1, 5..9, 2..6]);
+        assert_eq!(fetches, vec![0..9]);
 
         let fetches = do_fetch(vec![0..1, 5..6, 7..9, 2..3, 4..6], 1).await;
-        assert_eq!(fetches, vec![0..1, 5..9, 2..6]);
+        assert_eq!(fetches, vec![0..9]);
 
         let fetches = do_fetch(vec![0..1, 6..7, 8..9, 10..14, 9..10], 4).await;
         assert_eq!(fetches, vec![0..1, 6..14]);
     }
+
+    #[tokio::test]
+    async fn test_coalesce_fuzz() {
+        let mut rand = thread_rng();
+        for _ in 0..100 {
+            let object_len = rand.gen_range(10..250);
+            let range_count = rand.gen_range(0..10);
+            let ranges: Vec<_> = (0..range_count)
+                .map(|_| {
+                    let start = rand.gen_range(0..object_len);
+                    let max_len = 20.min(object_len - start);
+                    let len = rand.gen_range(0..max_len);
+                    start..start + len
+                })
+                .collect();
+
+            let coalesce = rand.gen_range(1..5);
+            let fetches = do_fetch(ranges.clone(), coalesce).await;
+
+            for fetch in fetches.windows(2) {
+                assert!(
+                    fetch[0].start <= fetch[1].start,
+                    "fetches should be sorted, {:?} vs {:?}",
+                    fetch[0],
+                    fetch[1]
+                );
+
+                let delta = fetch[1].end - fetch[0].end;
+                assert!(
+                    delta > coalesce,
+                    "fetches should not overlap by {}, {:?} vs {:?} for {:?}",
+                    coalesce,
+                    fetch[0],
+                    fetch[1],
+                    ranges
+                );
+            }
+        }
+    }
 }