You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2023/06/28 21:11:57 UTC
[arrow] branch main updated: GH-36178: [C++]support prefetching for ReadRangeCache lazy mode (#36180)
This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 72ec35e67b GH-36178: [C++]support prefetching for ReadRangeCache lazy mode (#36180)
72ec35e67b is described below
commit 72ec35e67bce74879bf7f57ea35e35f4ae541e08
Author: Jinpeng <ji...@google.com>
AuthorDate: Wed Jun 28 17:11:51 2023 -0400
GH-36178: [C++]support prefetching for ReadRangeCache lazy mode (#36180)
### Rationale for this change
The current [ReadRangeCache](https://github.com/apache/arrow/blob/main/cpp/src/arrow/io/caching.h#L100) support a laze mode where each range is read upon an [explicit request](https://github.com/apache/arrow/blob/main/cpp/src/arrow/io/caching.cc#L255), and a non-lazy mode where all ranges are read concurrently [at once](https://github.com/apache/arrow/blob/main/cpp/src/arrow/io/caching.cc#L168). In some scenarios, it would help to support a prefetch mode (which naturally is bound to t [...]
### What changes are included in this PR?
Add a new cache option `prefetch_limit` to define the maximum number of ranges to be prefetched when reading one range in lazy mode.
### Are these changes tested?
Yes. A new unit test is included
### Are there any user-facing changes?
No.
* Closes: #36178
Authored-by: jp0317 <zj...@gmail.com>
Signed-off-by: David Li <li...@gmail.com>
---
cpp/src/arrow/io/caching.cc | 20 ++++++++++++++---
cpp/src/arrow/io/caching.h | 6 +++++-
cpp/src/arrow/io/memory_test.cc | 48 +++++++++++++++++++++++++++++++++++++++++
3 files changed, 70 insertions(+), 4 deletions(-)
diff --git a/cpp/src/arrow/io/caching.cc b/cpp/src/arrow/io/caching.cc
index 1cbebfd935..307b933d16 100644
--- a/cpp/src/arrow/io/caching.cc
+++ b/cpp/src/arrow/io/caching.cc
@@ -35,13 +35,15 @@ namespace io {
CacheOptions CacheOptions::Defaults() {
return CacheOptions{internal::ReadRangeCache::kDefaultHoleSizeLimit,
internal::ReadRangeCache::kDefaultRangeSizeLimit,
- /*lazy=*/false};
+ /*lazy=*/false,
+ /*prefetch_limit=*/0};
}
CacheOptions CacheOptions::LazyDefaults() {
return CacheOptions{internal::ReadRangeCache::kDefaultHoleSizeLimit,
internal::ReadRangeCache::kDefaultRangeSizeLimit,
- /*lazy=*/true};
+ /*lazy=*/true,
+ /*prefetch_limit=*/0};
}
CacheOptions CacheOptions::MakeFromNetworkMetrics(int64_t time_to_first_byte_millis,
@@ -125,7 +127,7 @@ CacheOptions CacheOptions::MakeFromNetworkMetrics(int64_t time_to_first_byte_mil
(1 - ideal_bandwidth_utilization_frac))));
DCHECK_GT(range_size_limit, 0) << "Computed range_size_limit must be > 0";
- return {hole_size_limit, range_size_limit, false};
+ return {hole_size_limit, range_size_limit, /*lazy=*/false, /*prefetch_limit=*/0};
}
namespace internal {
@@ -204,6 +206,18 @@ struct ReadRangeCache::Impl {
if (it != entries.end() && it->range.Contains(range)) {
auto fut = MaybeRead(&*it);
ARROW_ASSIGN_OR_RAISE(auto buf, fut.result());
+ if (options.lazy && options.prefetch_limit > 0) {
+ int64_t num_prefetched = 0;
+ for (auto next_it = it + 1;
+ next_it != entries.end() && num_prefetched < options.prefetch_limit;
+ ++next_it) {
+ if (!next_it->future.is_valid()) {
+ next_it->future =
+ file->ReadAsync(ctx, next_it->range.offset, next_it->range.length);
+ }
+ ++num_prefetched;
+ }
+ }
return SliceBuffer(std::move(buf), range.offset - it->range.offset, range.length);
}
return Status::Invalid("ReadRangeCache did not find matching cache entry");
diff --git a/cpp/src/arrow/io/caching.h b/cpp/src/arrow/io/caching.h
index 9f047fd62f..93f8873d63 100644
--- a/cpp/src/arrow/io/caching.h
+++ b/cpp/src/arrow/io/caching.h
@@ -43,10 +43,14 @@ struct ARROW_EXPORT CacheOptions {
int64_t range_size_limit;
/// \brief A lazy cache does not perform any I/O until requested.
bool lazy;
+ /// \brief The maximum number of ranges to be prefetched. This is only used
+ /// for lazy cache to asynchronously read some ranges after reading the target range.
+ int64_t prefetch_limit = 0;
bool operator==(const CacheOptions& other) const {
return hole_size_limit == other.hole_size_limit &&
- range_size_limit == other.range_size_limit && lazy == other.lazy;
+ range_size_limit == other.range_size_limit && lazy == other.lazy &&
+ prefetch_limit == other.prefetch_limit;
}
/// \brief Construct CacheOptions from network storage metrics (e.g. S3).
diff --git a/cpp/src/arrow/io/memory_test.cc b/cpp/src/arrow/io/memory_test.cc
index cdcbe240f8..216d75f65e 100644
--- a/cpp/src/arrow/io/memory_test.cc
+++ b/cpp/src/arrow/io/memory_test.cc
@@ -845,6 +845,54 @@ TEST(RangeReadCache, Lazy) {
ASSERT_EQ(3, file->read_count());
}
+TEST(RangeReadCache, LazyWithPrefetching) {
+ std::string data = "abcdefghijklmnopqrstuvwxyz";
+
+ auto file = std::make_shared<CountingBufferReader>(Buffer(data));
+ CacheOptions options = CacheOptions::LazyDefaults();
+ options.hole_size_limit = 1;
+ options.range_size_limit = 3;
+ options.prefetch_limit = 2;
+ internal::ReadRangeCache cache(file, {}, options);
+
+ ASSERT_OK(cache.Cache({{1, 1}, {3, 1}, {5, 2}, {8, 2}, {20, 2}, {25, 0}}));
+
+ // Lazy cache doesn't fetch ranges until requested
+ ASSERT_EQ(0, file->read_count());
+
+ ASSERT_OK_AND_ASSIGN(auto buf, cache.Read({8, 2}));
+ AssertBufferEqual(*buf, "ij");
+ // Read {8, 2} and prefetch {20, 2}
+ ASSERT_EQ(2, file->read_count());
+
+ ASSERT_OK_AND_ASSIGN(buf, cache.Read({20, 2}));
+ AssertBufferEqual(*buf, "uv");
+ // Read count remains 2 as the range {20, 2} has already been prefetched
+ ASSERT_EQ(2, file->read_count());
+
+ ASSERT_OK_AND_ASSIGN(buf, cache.Read({1, 1}));
+ AssertBufferEqual(*buf, "b");
+ // Read {1, 3} and prefetch {5, 2}
+ ASSERT_EQ(4, file->read_count());
+
+ ASSERT_OK_AND_ASSIGN(buf, cache.Read({3, 1}));
+ AssertBufferEqual(*buf, "d");
+ // Already prefetched
+ ASSERT_EQ(4, file->read_count());
+
+ // Requested ranges are still cached
+ ASSERT_OK_AND_ASSIGN(buf, cache.Read({5, 1}));
+ AssertBufferEqual(*buf, "f");
+ // Already prefetched
+ ASSERT_EQ(4, file->read_count());
+
+ // Non-cached ranges
+ ASSERT_RAISES(Invalid, cache.Read({20, 3}));
+ ASSERT_RAISES(Invalid, cache.Read({19, 3}));
+ ASSERT_RAISES(Invalid, cache.Read({0, 3}));
+ ASSERT_RAISES(Invalid, cache.Read({25, 2}));
+}
+
TEST(CacheOptions, Basics) {
auto check = [](const CacheOptions actual, const double expected_hole_size_limit_MiB,
const double expected_range_size_limit_MiB) -> void {