You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2023/06/28 21:11:57 UTC

[arrow] branch main updated: GH-36178: [C++]support prefetching for ReadRangeCache lazy mode (#36180)

This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 72ec35e67b GH-36178: [C++]support prefetching for ReadRangeCache lazy mode (#36180)
72ec35e67b is described below

commit 72ec35e67bce74879bf7f57ea35e35f4ae541e08
Author: Jinpeng <ji...@google.com>
AuthorDate: Wed Jun 28 17:11:51 2023 -0400

    GH-36178: [C++]support prefetching for ReadRangeCache lazy mode (#36180)
    
    ### Rationale for this change
    
    The current [ReadRangeCache](https://github.com/apache/arrow/blob/main/cpp/src/arrow/io/caching.h#L100) support a laze mode where each range is read upon an [explicit request](https://github.com/apache/arrow/blob/main/cpp/src/arrow/io/caching.cc#L255), and a non-lazy mode where all ranges are read concurrently [at once](https://github.com/apache/arrow/blob/main/cpp/src/arrow/io/caching.cc#L168). In some scenarios, it would help to support a prefetch mode (which naturally is bound to t [...]
    
    ### What changes are included in this PR?
    
    Add a new cache option `prefetch_limit` to define the maximum number of ranges to be prefetched when reading one range in lazy mode.
    
    ### Are these changes tested?
    
    Yes. A new unit test is included
    
    ### Are there any user-facing changes?
    
    No.
    * Closes: #36178
    
    Authored-by: jp0317 <zj...@gmail.com>
    Signed-off-by: David Li <li...@gmail.com>
---
 cpp/src/arrow/io/caching.cc     | 20 ++++++++++++++---
 cpp/src/arrow/io/caching.h      |  6 +++++-
 cpp/src/arrow/io/memory_test.cc | 48 +++++++++++++++++++++++++++++++++++++++++
 3 files changed, 70 insertions(+), 4 deletions(-)

diff --git a/cpp/src/arrow/io/caching.cc b/cpp/src/arrow/io/caching.cc
index 1cbebfd935..307b933d16 100644
--- a/cpp/src/arrow/io/caching.cc
+++ b/cpp/src/arrow/io/caching.cc
@@ -35,13 +35,15 @@ namespace io {
 CacheOptions CacheOptions::Defaults() {
   return CacheOptions{internal::ReadRangeCache::kDefaultHoleSizeLimit,
                       internal::ReadRangeCache::kDefaultRangeSizeLimit,
-                      /*lazy=*/false};
+                      /*lazy=*/false,
+                      /*prefetch_limit=*/0};
 }
 
 CacheOptions CacheOptions::LazyDefaults() {
   return CacheOptions{internal::ReadRangeCache::kDefaultHoleSizeLimit,
                       internal::ReadRangeCache::kDefaultRangeSizeLimit,
-                      /*lazy=*/true};
+                      /*lazy=*/true,
+                      /*prefetch_limit=*/0};
 }
 
 CacheOptions CacheOptions::MakeFromNetworkMetrics(int64_t time_to_first_byte_millis,
@@ -125,7 +127,7 @@ CacheOptions CacheOptions::MakeFromNetworkMetrics(int64_t time_to_first_byte_mil
                                       (1 - ideal_bandwidth_utilization_frac))));
   DCHECK_GT(range_size_limit, 0) << "Computed range_size_limit must be > 0";
 
-  return {hole_size_limit, range_size_limit, false};
+  return {hole_size_limit, range_size_limit, /*lazy=*/false, /*prefetch_limit=*/0};
 }
 
 namespace internal {
@@ -204,6 +206,18 @@ struct ReadRangeCache::Impl {
     if (it != entries.end() && it->range.Contains(range)) {
       auto fut = MaybeRead(&*it);
       ARROW_ASSIGN_OR_RAISE(auto buf, fut.result());
+      if (options.lazy && options.prefetch_limit > 0) {
+        int64_t num_prefetched = 0;
+        for (auto next_it = it + 1;
+             next_it != entries.end() && num_prefetched < options.prefetch_limit;
+             ++next_it) {
+          if (!next_it->future.is_valid()) {
+            next_it->future =
+                file->ReadAsync(ctx, next_it->range.offset, next_it->range.length);
+          }
+          ++num_prefetched;
+        }
+      }
       return SliceBuffer(std::move(buf), range.offset - it->range.offset, range.length);
     }
     return Status::Invalid("ReadRangeCache did not find matching cache entry");
diff --git a/cpp/src/arrow/io/caching.h b/cpp/src/arrow/io/caching.h
index 9f047fd62f..93f8873d63 100644
--- a/cpp/src/arrow/io/caching.h
+++ b/cpp/src/arrow/io/caching.h
@@ -43,10 +43,14 @@ struct ARROW_EXPORT CacheOptions {
   int64_t range_size_limit;
   /// \brief A lazy cache does not perform any I/O until requested.
   bool lazy;
+  /// \brief The maximum number of ranges to be prefetched. This is only used
+  ///   for lazy cache to asynchronously read some ranges after reading the target range.
+  int64_t prefetch_limit = 0;
 
   bool operator==(const CacheOptions& other) const {
     return hole_size_limit == other.hole_size_limit &&
-           range_size_limit == other.range_size_limit && lazy == other.lazy;
+           range_size_limit == other.range_size_limit && lazy == other.lazy &&
+           prefetch_limit == other.prefetch_limit;
   }
 
   /// \brief Construct CacheOptions from network storage metrics (e.g. S3).
diff --git a/cpp/src/arrow/io/memory_test.cc b/cpp/src/arrow/io/memory_test.cc
index cdcbe240f8..216d75f65e 100644
--- a/cpp/src/arrow/io/memory_test.cc
+++ b/cpp/src/arrow/io/memory_test.cc
@@ -845,6 +845,54 @@ TEST(RangeReadCache, Lazy) {
   ASSERT_EQ(3, file->read_count());
 }
 
+TEST(RangeReadCache, LazyWithPrefetching) {
+  std::string data = "abcdefghijklmnopqrstuvwxyz";
+
+  auto file = std::make_shared<CountingBufferReader>(Buffer(data));
+  CacheOptions options = CacheOptions::LazyDefaults();
+  options.hole_size_limit = 1;
+  options.range_size_limit = 3;
+  options.prefetch_limit = 2;
+  internal::ReadRangeCache cache(file, {}, options);
+
+  ASSERT_OK(cache.Cache({{1, 1}, {3, 1}, {5, 2}, {8, 2}, {20, 2}, {25, 0}}));
+
+  // Lazy cache doesn't fetch ranges until requested
+  ASSERT_EQ(0, file->read_count());
+
+  ASSERT_OK_AND_ASSIGN(auto buf, cache.Read({8, 2}));
+  AssertBufferEqual(*buf, "ij");
+  // Read {8, 2} and prefetch {20, 2}
+  ASSERT_EQ(2, file->read_count());
+
+  ASSERT_OK_AND_ASSIGN(buf, cache.Read({20, 2}));
+  AssertBufferEqual(*buf, "uv");
+  // Read count remains 2 as the range {20, 2} has already been prefetched
+  ASSERT_EQ(2, file->read_count());
+
+  ASSERT_OK_AND_ASSIGN(buf, cache.Read({1, 1}));
+  AssertBufferEqual(*buf, "b");
+  // Read {1, 3} and prefetch {5, 2}
+  ASSERT_EQ(4, file->read_count());
+
+  ASSERT_OK_AND_ASSIGN(buf, cache.Read({3, 1}));
+  AssertBufferEqual(*buf, "d");
+  // Already prefetched
+  ASSERT_EQ(4, file->read_count());
+
+  // Requested ranges are still cached
+  ASSERT_OK_AND_ASSIGN(buf, cache.Read({5, 1}));
+  AssertBufferEqual(*buf, "f");
+  // Already prefetched
+  ASSERT_EQ(4, file->read_count());
+
+  // Non-cached ranges
+  ASSERT_RAISES(Invalid, cache.Read({20, 3}));
+  ASSERT_RAISES(Invalid, cache.Read({19, 3}));
+  ASSERT_RAISES(Invalid, cache.Read({0, 3}));
+  ASSERT_RAISES(Invalid, cache.Read({25, 2}));
+}
+
 TEST(CacheOptions, Basics) {
   auto check = [](const CacheOptions actual, const double expected_hole_size_limit_MiB,
                   const double expected_range_size_limit_MiB) -> void {