You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/04/28 08:50:05 UTC

[GitHub] [arrow] pitrou commented on a change in pull request #10145: ARROW-12522: [C++] Add ReadRangeCache::WaitFor

pitrou commented on a change in pull request #10145:
URL: https://github.com/apache/arrow/pull/10145#discussion_r621949530



##########
File path: cpp/src/arrow/io/caching.cc
##########
@@ -149,12 +177,117 @@ struct ReadRangeCache::Impl {
     } else {
       entries = std::move(new_entries);
     }
+    // Prefetch immediately, regardless of executor availability, if possible
+    return file->WillNeed(ranges);
+  }
+
+  // Read the given range from the cache, blocking if needed. Cannot read a range
+  // that spans cache entries.
+  virtual Result<std::shared_ptr<Buffer>> Read(ReadRange range) {
+    if (range.length == 0) {
+      static const uint8_t byte = 0;
+      return std::make_shared<Buffer>(&byte, 0);
+    }
+
+    const auto it = std::lower_bound(
+        entries.begin(), entries.end(), range,
+        [](const RangeCacheEntry& entry, const ReadRange& range) {
+          return entry.range.offset + entry.range.length < range.offset + range.length;
+        });
+    if (it != entries.end() && it->range.Contains(range)) {
+      auto fut = MaybeRead(&*it);
+      ARROW_ASSIGN_OR_RAISE(auto buf, fut.result());
+      return SliceBuffer(std::move(buf), range.offset - it->range.offset, range.length);
+    }
+    return Status::Invalid("ReadRangeCache did not find matching cache entry");
+  }
+
+  virtual Future<> Wait() {
+    std::vector<Future<>> futures;
+    for (auto& entry : entries) {
+      futures.emplace_back(MaybeRead(&entry));
+    }
+    return AllComplete(futures);
+  }
+
+  // Return a Future that completes when the given ranges have been read.
+  virtual Future<> WaitFor(std::vector<ReadRange> ranges) {
+    auto end = std::remove_if(ranges.begin(), ranges.end(),
+                              [](const ReadRange& range) { return range.length == 0; });
+    ranges.resize(end - ranges.begin());
+    // Sort in reverse position order
+    std::sort(ranges.begin(), ranges.end(),
+              [](const ReadRange& a, const ReadRange& b) { return a.offset > b.offset; });
+
+    std::vector<Future<>> futures;
+    for (auto& entry : entries) {

Review comment:
       This algorithm looks a bit unexpected to me. Basically, you're iterating all known entries in the hope that they might match a requested range? It will be a bit costly if the number of entries is much larger than the number of requested ranges, since you may iterate all entries.
   
   Why not do the converse? For each requested range, try to find it in the existing entries. It is doable using bisection (see `Read` above), and you shouldn't need to sort the requested ranges.

##########
File path: cpp/src/arrow/io/memory_test.cc
##########
@@ -692,10 +694,24 @@ TEST(CoalesceReadRanges, Basics) {
         {{110, 21}, {140, 100}, {240, 31}});
 }
 
+class CountingBufferReader : public BufferReader {
+ public:
+  using BufferReader::BufferReader;
+  Future<std::shared_ptr<Buffer>> ReadAsync(const IOContext& context, int64_t position,
+                                            int64_t nbytes) override {
+    read_count_++;
+    return BufferReader::ReadAsync(context, position, nbytes);
+  }
+  int64_t read_count() const { return read_count_; }
+
+ private:
+  int64_t read_count_ = 0;
+};
+
 TEST(RangeReadCache, Basics) {
   std::string data = "abcdefghijklmnopqrstuvwxyz";
 
-  auto file = std::make_shared<BufferReader>(Buffer(data));
+  auto file = std::make_shared<CountingBufferReader>(Buffer(data));

Review comment:
       Should you test both lazy and non-lazy versions here?

##########
File path: cpp/src/arrow/io/caching.cc
##########
@@ -139,8 +147,28 @@ struct ReadRangeCache::Impl {
   // Ordered by offset (so as to find a matching region by binary search)
   std::vector<RangeCacheEntry> entries;
 
-  // Add new entries, themselves ordered by offset
-  void AddEntries(std::vector<RangeCacheEntry> new_entries) {
+  virtual ~Impl() = default;
+
+  // Get the future corresponding to a range
+  virtual Future<std::shared_ptr<Buffer>> MaybeRead(RangeCacheEntry* entry) {
+    return entry->future;
+  }
+
+  // Make a cache entry for a range
+  virtual RangeCacheEntry MakeCacheEntry(const ReadRange& range) {

Review comment:
       You may make this `std::vector<RangeCacheEntry> MakeCacheEntries(const std::vector<ReadRange>&)` instead and you will issue one virtual call instead of N.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org