You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2022/11/28 22:57:44 UTC

[arrow] branch master updated: ARROW-18113: [C++] Add RandomAccessFile::ReadManyAsync (#14723)

This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 15a3b054f9 ARROW-18113: [C++] Add RandomAccessFile::ReadManyAsync (#14723)
15a3b054f9 is described below

commit 15a3b054f92377cdaafb0e516b8ac263c711d79e
Author: Percy Camilo Triveño Aucahuasi <au...@users.noreply.github.com>
AuthorDate: Mon Nov 28 17:57:37 2022 -0500

    ARROW-18113: [C++] Add RandomAccessFile::ReadManyAsync (#14723)
    
    This PR adds the new method `RandomAccessFile::ReadManyAsync`: to perform asynchronously multiple reads at once.
    The idea for this PR is to provide the new method with its default implementation (which is simply iterating over all the ranges).
    The next PRs, will provide specific implementations using coalescing (`internal::CoalesceReadRanges`) for the some `io::RandomAccessFile` concrete classes (e.g. the cloud ones and perhaps the CUDA buffer reader as well).
    
    Related Jira ticket:
    https://issues.apache.org/jira/browse/ARROW-18113
    
    Authored-by: Percy Camilo Triveño Aucahuasi <pe...@gmail.com>
    Signed-off-by: David Li <li...@gmail.com>
---
 cpp/src/arrow/io/file_test.cc  | 16 ++++++++++++++++
 cpp/src/arrow/io/interfaces.cc | 14 ++++++++++++++
 cpp/src/arrow/io/interfaces.h  | 21 +++++++++++++++++++++
 3 files changed, 51 insertions(+)

diff --git a/cpp/src/arrow/io/file_test.cc b/cpp/src/arrow/io/file_test.cc
index 38ea08e8ae..ba8ecd54b5 100644
--- a/cpp/src/arrow/io/file_test.cc
+++ b/cpp/src/arrow/io/file_test.cc
@@ -387,6 +387,22 @@ TEST_F(TestReadableFile, ReadAsync) {
   AssertBufferEqual(*buf2, "test");
 }
 
+TEST_F(TestReadableFile, ReadManyAsync) {
+  MakeTestFile();
+  OpenFile();
+
+  std::vector<ReadRange> ranges = {{1, 3}, {2, 5}, {4, 2}};
+  auto futs = file_->ReadManyAsync(std::move(ranges));
+
+  ASSERT_EQ(futs.size(), 3);
+  ASSERT_OK_AND_ASSIGN(auto buf1, futs[0].result());
+  ASSERT_OK_AND_ASSIGN(auto buf2, futs[1].result());
+  ASSERT_OK_AND_ASSIGN(auto buf3, futs[2].result());
+  AssertBufferEqual(*buf1, "est");
+  AssertBufferEqual(*buf2, "stdat");
+  AssertBufferEqual(*buf3, "da");
+}
+
 TEST_F(TestReadableFile, SeekingRequired) {
   MakeTestFile();
   OpenFile();
diff --git a/cpp/src/arrow/io/interfaces.cc b/cpp/src/arrow/io/interfaces.cc
index a78bc1b55c..e7819e139f 100644
--- a/cpp/src/arrow/io/interfaces.cc
+++ b/cpp/src/arrow/io/interfaces.cc
@@ -175,6 +175,20 @@ Future<std::shared_ptr<Buffer>> RandomAccessFile::ReadAsync(int64_t position,
   return ReadAsync(io_context(), position, nbytes);
 }
 
+std::vector<Future<std::shared_ptr<Buffer>>> RandomAccessFile::ReadManyAsync(
+    const IOContext& ctx, const std::vector<ReadRange>& ranges) {
+  std::vector<Future<std::shared_ptr<Buffer>>> ret;
+  for (auto r : ranges) {
+    ret.push_back(this->ReadAsync(ctx, r.offset, r.length));
+  }
+  return ret;
+}
+
+std::vector<Future<std::shared_ptr<Buffer>>> RandomAccessFile::ReadManyAsync(
+    const std::vector<ReadRange>& ranges) {
+  return ReadManyAsync(io_context(), ranges);
+}
+
 // Default WillNeed() implementation: no-op
 Status RandomAccessFile::WillNeed(const std::vector<ReadRange>& ranges) {
   return Status::OK();
diff --git a/cpp/src/arrow/io/interfaces.h b/cpp/src/arrow/io/interfaces.h
index 86e9ad2d52..c5355c9422 100644
--- a/cpp/src/arrow/io/interfaces.h
+++ b/cpp/src/arrow/io/interfaces.h
@@ -305,6 +305,27 @@ class ARROW_EXPORT RandomAccessFile : public InputStream, public Seekable {
   /// EXPERIMENTAL: Read data asynchronously, using the file's IOContext.
   Future<std::shared_ptr<Buffer>> ReadAsync(int64_t position, int64_t nbytes);
 
+  /// EXPERIMENTAL: Explicit multi-read.
+  /// \brief Request multiple reads at once
+  ///
+  /// The underlying filesystem may optimize these reads by coalescing small reads into
+  /// large reads or by breaking up large reads into multiple parallel smaller reads.  The
+  /// reads should be issued in parallel if it makes sense for the filesystem.
+  ///
+  /// One future will be returned for each input read range.  Multiple returned futures
+  /// may correspond to a single read.  Or, a single returned future may be a combined
+  /// result of several individual reads.
+  ///
+  /// \param[in] ranges The ranges to read
+  /// \return A future that will complete with the data from the requested range is
+  /// available
+  virtual std::vector<Future<std::shared_ptr<Buffer>>> ReadManyAsync(
+      const IOContext&, const std::vector<ReadRange>& ranges);
+
+  /// EXPERIMENTAL: Explicit multi-read, using the file's IOContext.
+  std::vector<Future<std::shared_ptr<Buffer>>> ReadManyAsync(
+      const std::vector<ReadRange>& ranges);
+
   /// EXPERIMENTAL: Inform that the given ranges may be read soon.
   ///
   /// Some implementations might arrange to prefetch some of the data.