You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/08/25 11:27:50 UTC
[arrow-rs] branch master updated: Improve coalesce_ranges (#2561) (#2562) (#2563)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 9822d627e Improve coalesce_ranges (#2561) (#2562) (#2563)
9822d627e is described below
commit 9822d627ed2699f0b51294bd58e02f65668aa819
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Thu Aug 25 12:27:45 2022 +0100
Improve coalesce_ranges (#2561) (#2562) (#2563)
* Improve coalesce_ranges (#2561) (#2562)
* Review feedback
* Tweak fuzz test
---
object_store/Cargo.toml | 1 +
object_store/src/util.rs | 160 +++++++++++++++++++++++++++++++++++------------
2 files changed, 121 insertions(+), 40 deletions(-)
diff --git a/object_store/Cargo.toml b/object_store/Cargo.toml
index 966c423a7..b5c5ef6ed 100644
--- a/object_store/Cargo.toml
+++ b/object_store/Cargo.toml
@@ -62,3 +62,4 @@ aws = ["cloud"]
dotenv = "0.15.0"
tempfile = "3.1.0"
futures-test = "0.3"
+rand = "0.8"
diff --git a/object_store/src/util.rs b/object_store/src/util.rs
index 1c9521495..2814ca244 100644
--- a/object_store/src/util.rs
+++ b/object_store/src/util.rs
@@ -18,7 +18,7 @@
//! Common logic for interacting with remote object stores
use super::Result;
use bytes::Bytes;
-use futures::{stream::StreamExt, Stream};
+use futures::{stream::StreamExt, Stream, TryStreamExt};
/// Returns the prefix to be passed to an object store
#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
@@ -85,21 +85,60 @@ where
/// will be coalesced into a single request by [`coalesce_ranges`]
pub const OBJECT_STORE_COALESCE_DEFAULT: usize = 1024 * 1024;
-/// Takes a function to fetch ranges and coalesces adjacent ranges if they are
-/// less than `coalesce` bytes apart. Out of order `ranges` are not coalesced
+/// Up to this number of range requests will be performed in parallel by [`coalesce_ranges`]
+pub const OBJECT_STORE_COALESCE_PARALLEL: usize = 10;
+
+/// Takes a function `fetch` that can fetch a range of bytes and uses this to
+/// fetch the provided byte `ranges`
+///
+/// To improve performance it will:
+///
+/// * Combine ranges less than `coalesce` bytes apart into a single call to `fetch`
+/// * Make multiple `fetch` requests in parallel (up to maximum of 10)
+///
pub async fn coalesce_ranges<F, Fut>(
ranges: &[std::ops::Range<usize>],
- mut fetch: F,
+ fetch: F,
coalesce: usize,
) -> Result<Vec<Bytes>>
where
F: Send + FnMut(std::ops::Range<usize>) -> Fut,
Fut: std::future::Future<Output = Result<Bytes>> + Send,
{
+ let fetch_ranges = merge_ranges(ranges, coalesce);
+
+ let fetched: Vec<_> = futures::stream::iter(fetch_ranges.iter().cloned())
+ .map(fetch)
+ .buffered(OBJECT_STORE_COALESCE_PARALLEL)
+ .try_collect()
+ .await?;
+
+ Ok(ranges
+ .iter()
+ .map(|range| {
+ let idx = fetch_ranges.partition_point(|v| v.start <= range.start) - 1;
+ let fetch_range = &fetch_ranges[idx];
+ let fetch_bytes = &fetched[idx];
+
+ let start = range.start - fetch_range.start;
+ let end = range.end - fetch_range.start;
+ fetch_bytes.slice(start..end)
+ })
+ .collect())
+}
+
+/// Returns a sorted list of ranges that cover `ranges`
+fn merge_ranges(
+ ranges: &[std::ops::Range<usize>],
+ coalesce: usize,
+) -> Vec<std::ops::Range<usize>> {
if ranges.is_empty() {
- return Ok(vec![]);
+ return vec![];
}
+ let mut ranges = ranges.to_vec();
+ ranges.sort_unstable_by_key(|range| range.start);
+
let mut ret = Vec::with_capacity(ranges.len());
let mut start_idx = 0;
let mut end_idx = 1;
@@ -110,57 +149,59 @@ where
while end_idx != ranges.len()
&& ranges[end_idx]
.start
- .checked_sub(ranges[start_idx].end)
+ .checked_sub(range_end)
.map(|delta| delta <= coalesce)
- .unwrap_or(false)
+ .unwrap_or(true)
{
- if ranges[end_idx].end > range_end {
- range_end = ranges[end_idx].end;
- }
+ range_end = range_end.max(ranges[end_idx].end);
end_idx += 1;
}
let start = ranges[start_idx].start;
- let bytes = fetch(start..range_end).await?;
- for range in ranges.iter().take(end_idx).skip(start_idx) {
- ret.push(bytes.slice(range.start - start..range.end - start))
- }
+ let end = range_end;
+ ret.push(start..end);
+
start_idx = end_idx;
end_idx += 1;
}
- Ok(ret)
+
+ ret
}
#[cfg(test)]
mod tests {
use super::*;
+ use rand::{thread_rng, Rng};
use std::ops::Range;
+ /// Calls coalesce_ranges and validates the returned data is correct
+ ///
+ /// Returns the fetched ranges
+ async fn do_fetch(ranges: Vec<Range<usize>>, coalesce: usize) -> Vec<Range<usize>> {
+ let max = ranges.iter().map(|x| x.end).max().unwrap_or(0);
+ let src: Vec<_> = (0..max).map(|x| x as u8).collect();
+
+ let mut fetches = vec![];
+ let coalesced = coalesce_ranges(
+ &ranges,
+ |range| {
+ fetches.push(range.clone());
+ futures::future::ready(Ok(Bytes::from(src[range].to_vec())))
+ },
+ coalesce,
+ )
+ .await
+ .unwrap();
+
+ assert_eq!(ranges.len(), coalesced.len());
+ for (range, bytes) in ranges.iter().zip(coalesced) {
+ assert_eq!(bytes.as_ref(), &src[range.clone()]);
+ }
+ fetches
+ }
+
#[tokio::test]
async fn test_coalesce_ranges() {
- let do_fetch = |ranges: Vec<Range<usize>>, coalesce: usize| async move {
- let max = ranges.iter().map(|x| x.end).max().unwrap_or(0);
- let src: Vec<_> = (0..max).map(|x| x as u8).collect();
-
- let mut fetches = vec![];
- let coalesced = coalesce_ranges(
- &ranges,
- |range| {
- fetches.push(range.clone());
- futures::future::ready(Ok(Bytes::from(src[range].to_vec())))
- },
- coalesce,
- )
- .await
- .unwrap();
-
- assert_eq!(ranges.len(), coalesced.len());
- for (range, bytes) in ranges.iter().zip(coalesced) {
- assert_eq!(bytes.as_ref(), &src[range.clone()]);
- }
- fetches
- };
-
let fetches = do_fetch(vec![], 0).await;
assert_eq!(fetches, vec![]);
@@ -180,12 +221,51 @@ mod tests {
assert_eq!(fetches, vec![0..1, 56..75]);
let fetches = do_fetch(vec![0..1, 5..6, 7..9, 2..3, 4..6], 1).await;
- assert_eq!(fetches, vec![0..1, 5..9, 2..6]);
+ assert_eq!(fetches, vec![0..9]);
let fetches = do_fetch(vec![0..1, 5..6, 7..9, 2..3, 4..6], 1).await;
- assert_eq!(fetches, vec![0..1, 5..9, 2..6]);
+ assert_eq!(fetches, vec![0..9]);
let fetches = do_fetch(vec![0..1, 6..7, 8..9, 10..14, 9..10], 4).await;
assert_eq!(fetches, vec![0..1, 6..14]);
}
+
+ #[tokio::test]
+ async fn test_coalesce_fuzz() {
+ let mut rand = thread_rng();
+ for _ in 0..100 {
+ let object_len = rand.gen_range(10..250);
+ let range_count = rand.gen_range(0..10);
+ let ranges: Vec<_> = (0..range_count)
+ .map(|_| {
+ let start = rand.gen_range(0..object_len);
+ let max_len = 20.min(object_len - start);
+ let len = rand.gen_range(0..max_len);
+ start..start + len
+ })
+ .collect();
+
+ let coalesce = rand.gen_range(1..5);
+ let fetches = do_fetch(ranges.clone(), coalesce).await;
+
+ for fetch in fetches.windows(2) {
+ assert!(
+ fetch[0].start <= fetch[1].start,
+ "fetches should be sorted, {:?} vs {:?}",
+ fetch[0],
+ fetch[1]
+ );
+
+ let delta = fetch[1].end - fetch[0].end;
+ assert!(
+ delta > coalesce,
+ "fetches should not overlap by {}, {:?} vs {:?} for {:?}",
+ coalesce,
+ fetch[0],
+ fetch[1],
+ ranges
+ );
+ }
+ }
+ }
}