You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2020/05/04 19:23:47 UTC

[arrow] branch master updated: ARROW-8692: [C++] Avoid memory copies when downloading from S3

This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a5d010  ARROW-8692: [C++] Avoid memory copies when downloading from S3
9a5d010 is described below

commit 9a5d010556ac6b9e30115e41fff281870f48e830
Author: Antoine Pitrou <an...@python.org>
AuthorDate: Mon May 4 15:22:47 2020 -0400

    ARROW-8692: [C++] Avoid memory copies when downloading from S3
    
    The AWS SDK creates a auto-growing StringStream by default, entailing multiple memory copies when transferring large data blocks (because of resizes).  Instead, write directly into the target data area.
    
    Low-level benchmarks with a local Minio server:
    * before:
    ```
    -----------------------------------------------------------------------------------------------------
    Benchmark                                           Time             CPU   Iterations UserCounters...
    -----------------------------------------------------------------------------------------------------
    MinioFixture/ReadAll500Mib/real_time        434528630 ns    431461370 ns            2 bytes_per_second=1.1237G/s items_per_second=2.30134/s
    MinioFixture/ReadChunked500Mib/real_time    419380389 ns    339293384 ns            2 bytes_per_second=1.16429G/s items_per_second=2.38447/s
    MinioFixture/ReadCoalesced500Mib/real_time  258812283 ns       470149 ns            3 bytes_per_second=1.88662G/s items_per_second=3.8638/s
    ```
    * after:
    ```
    MinioFixture/ReadAll500Mib/real_time        194620947 ns    161227337 ns            4 bytes_per_second=2.50888G/s items_per_second=5.13819/s
    MinioFixture/ReadChunked500Mib/real_time    276437393 ns    183030215 ns            3 bytes_per_second=1.76634G/s items_per_second=3.61746/s
    MinioFixture/ReadCoalesced500Mib/real_time   86693750 ns       448568 ns            6 bytes_per_second=5.63225G/s items_per_second=11.5349/s
    ```
    
    Parquet read benchmarks from a local Minio server show speedups from 1.1x to 1.9x.
    
    Closes #7098 from pitrou/ARROW-8692-s3-avoid-copies
    
    Authored-by: Antoine Pitrou <an...@python.org>
    Signed-off-by: David Li <li...@gmail.com>
---
 cpp/src/arrow/filesystem/s3_internal.h     | 21 +++++++++----
 cpp/src/arrow/filesystem/s3fs.cc           | 49 ++++++++++++++++++------------
 cpp/src/arrow/filesystem/s3fs_benchmark.cc | 19 +++++++-----
 3 files changed, 56 insertions(+), 33 deletions(-)

diff --git a/cpp/src/arrow/filesystem/s3_internal.h b/cpp/src/arrow/filesystem/s3_internal.h
index 709fa20..aad1902 100644
--- a/cpp/src/arrow/filesystem/s3_internal.h
+++ b/cpp/src/arrow/filesystem/s3_internal.h
@@ -102,9 +102,9 @@ Status ErrorToStatus(const Aws::Client::AWSError<ErrorType>& error) {
   return ErrorToStatus(std::string(), error);
 }
 
-template <typename Result, typename Error>
+template <typename AwsResult, typename Error>
 Status OutcomeToStatus(const std::string& prefix,
-                       const Aws::Utils::Outcome<Result, Error>& outcome) {
+                       const Aws::Utils::Outcome<AwsResult, Error>& outcome) {
   if (outcome.IsSuccess()) {
     return Status::OK();
   } else {
@@ -112,9 +112,9 @@ Status OutcomeToStatus(const std::string& prefix,
   }
 }
 
-template <typename Result, typename Error, typename... Args>
+template <typename AwsResult, typename Error, typename... Args>
 Status OutcomeToStatus(const std::tuple<Args&...>& prefix,
-                       const Aws::Utils::Outcome<Result, Error>& outcome) {
+                       const Aws::Utils::Outcome<AwsResult, Error>& outcome) {
   if (outcome.IsSuccess()) {
     return Status::OK();
   } else {
@@ -122,11 +122,20 @@ Status OutcomeToStatus(const std::tuple<Args&...>& prefix,
   }
 }
 
-template <typename Result, typename Error>
-Status OutcomeToStatus(const Aws::Utils::Outcome<Result, Error>& outcome) {
+template <typename AwsResult, typename Error>
+Status OutcomeToStatus(const Aws::Utils::Outcome<AwsResult, Error>& outcome) {
   return OutcomeToStatus(std::string(), outcome);
 }
 
+template <typename AwsResult, typename Error>
+Result<AwsResult> OutcomeToResult(Aws::Utils::Outcome<AwsResult, Error> outcome) {
+  if (outcome.IsSuccess()) {
+    return std::move(outcome).GetResultWithOwnership();
+  } else {
+    return ErrorToStatus(outcome.GetError());
+  }
+}
+
 inline Aws::String ToAwsString(const std::string& s) {
   // Direct construction of Aws::String from std::string doesn't work because
   // it uses a specific Allocator class.
diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc
index 14559cb..70c87f4 100644
--- a/cpp/src/arrow/filesystem/s3fs.cc
+++ b/cpp/src/arrow/filesystem/s3fs.cc
@@ -89,6 +89,7 @@ using ::arrow::fs::internal::FromAwsDatetime;
 using ::arrow::fs::internal::FromAwsString;
 using ::arrow::fs::internal::IsAlreadyExists;
 using ::arrow::fs::internal::IsNotFound;
+using ::arrow::fs::internal::OutcomeToResult;
 using ::arrow::fs::internal::OutcomeToStatus;
 using ::arrow::fs::internal::ToAwsString;
 using ::arrow::fs::internal::ToURLEncodedAwsString;
@@ -359,14 +360,35 @@ std::string FormatRange(int64_t start, int64_t length) {
   return ss.str();
 }
 
-Status GetObjectRange(Aws::S3::S3Client* client, const S3Path& path, int64_t start,
-                      int64_t length, S3Model::GetObjectResult* out) {
+// A non-copying iostream.
+// See https://stackoverflow.com/questions/35322033/aws-c-sdk-uploadpart-times-out
+// https://stackoverflow.com/questions/13059091/creating-an-input-stream-from-constant-memory
+class StringViewStream : Aws::Utils::Stream::PreallocatedStreamBuf, public std::iostream {
+ public:
+  StringViewStream(const void* data, int64_t nbytes)
+      : Aws::Utils::Stream::PreallocatedStreamBuf(
+            reinterpret_cast<unsigned char*>(const_cast<void*>(data)),
+            static_cast<size_t>(nbytes)),
+        std::iostream(this) {}
+};
+
+// By default, the AWS SDK reads object data into an auto-growing StringStream.
+// To avoid copies, read directly into our preallocated buffer instead.
+// See https://github.com/aws/aws-sdk-cpp/issues/64 for an alternative but
+// functionally similar recipe.
+Aws::IOStreamFactory AwsWriteableStreamFactory(void* data, int64_t nbytes) {
+  return [=]() { return new StringViewStream(data, nbytes); };
+}
+
+Result<S3Model::GetObjectResult> GetObjectRange(Aws::S3::S3Client* client,
+                                                const S3Path& path, int64_t start,
+                                                int64_t length, void* out) {
   S3Model::GetObjectRequest req;
   req.SetBucket(ToAwsString(path.bucket));
   req.SetKey(ToAwsString(path.key));
   req.SetRange(ToAwsString(FormatRange(start, length)));
-  ARROW_AWS_ASSIGN_OR_RAISE(*out, client->GetObject(req));
-  return Status::OK();
+  req.SetResponseStreamFactory(AwsWriteableStreamFactory(out, length));
+  return OutcomeToResult(client->GetObject(req));
 }
 
 // A RandomAccessFile that reads from a S3 object
@@ -452,11 +474,11 @@ class ObjectInputFile : public io::RandomAccessFile {
     }
 
     // Read the desired range of bytes
-    S3Model::GetObjectResult result;
-    RETURN_NOT_OK(GetObjectRange(client_, path_, position, nbytes, &result));
+    ARROW_ASSIGN_OR_RAISE(S3Model::GetObjectResult result,
+                          GetObjectRange(client_, path_, position, nbytes, out));
 
     auto& stream = result.GetBody();
-    stream.read(reinterpret_cast<char*>(out), nbytes);
+    stream.ignore(nbytes);
     // NOTE: the stream is a stringstream by default, there is no actual error
     // to check for.  However, stream.fail() may return true if EOF is reached.
     return stream.gcount();
@@ -499,19 +521,6 @@ class ObjectInputFile : public io::RandomAccessFile {
   int64_t content_length_ = -1;
 };
 
-// A non-copying istream.
-// See https://stackoverflow.com/questions/35322033/aws-c-sdk-uploadpart-times-out
-// https://stackoverflow.com/questions/13059091/creating-an-input-stream-from-constant-memory
-
-class StringViewStream : Aws::Utils::Stream::PreallocatedStreamBuf, public std::iostream {
- public:
-  StringViewStream(const void* data, int64_t nbytes)
-      : Aws::Utils::Stream::PreallocatedStreamBuf(
-            reinterpret_cast<unsigned char*>(const_cast<void*>(data)),
-            static_cast<size_t>(nbytes)),
-        std::iostream(this) {}
-};
-
 // Minimum size for each part of a multipart upload, except for the last part.
 // AWS doc says "5 MB" but it's not clear whether those are MB or MiB,
 // so I chose the safer value.
diff --git a/cpp/src/arrow/filesystem/s3fs_benchmark.cc b/cpp/src/arrow/filesystem/s3fs_benchmark.cc
index cf579f2..46bbbba 100644
--- a/cpp/src/arrow/filesystem/s3fs_benchmark.cc
+++ b/cpp/src/arrow/filesystem/s3fs_benchmark.cc
@@ -58,7 +58,8 @@ static const char* kEnvAwsRegion = "ARROW_TEST_S3_REGION";
 class MinioFixture : public benchmark::Fixture {
  public:
   void SetUp(const ::benchmark::State& state) override {
-    ASSERT_OK(minio_.Start());
+    minio_.reset(new MinioTestServer());
+    ASSERT_OK(minio_->Start());
 
     const char* region_str = std::getenv(kEnvAwsRegion);
     if (region_str) {
@@ -77,13 +78,13 @@ class MinioFixture : public benchmark::Fixture {
       std::cerr << "Using default bucket: " << bucket_ << std::endl;
     }
 
-    client_config_.endpointOverride = ToAwsString(minio_.connect_string());
+    client_config_.endpointOverride = ToAwsString(minio_->connect_string());
     client_config_.scheme = Aws::Http::Scheme::HTTP;
     if (!region_.empty()) {
       client_config_.region = ToAwsString(region_);
     }
     client_config_.retryStrategy = std::make_shared<ConnectRetryStrategy>();
-    credentials_ = {ToAwsString(minio_.access_key()), ToAwsString(minio_.secret_key())};
+    credentials_ = {ToAwsString(minio_->access_key()), ToAwsString(minio_->secret_key())};
     bool use_virtual_addressing = false;
     client_.reset(
         new Aws::S3::S3Client(credentials_, client_config_,
@@ -106,12 +107,12 @@ class MinioFixture : public benchmark::Fixture {
   }
 
   void MakeFileSystem() {
-    options_.ConfigureAccessKey(minio_.access_key(), minio_.secret_key());
+    options_.ConfigureAccessKey(minio_->access_key(), minio_->secret_key());
     options_.scheme = "http";
     if (!region_.empty()) {
       options_.region = region_;
     }
-    options_.endpoint_override = minio_.connect_string();
+    options_.endpoint_override = minio_->connect_string();
     ASSERT_OK_AND_ASSIGN(fs_, S3FileSystem::Make(options_));
   }
 
@@ -180,10 +181,14 @@ class MinioFixture : public benchmark::Fixture {
     return Status::OK();
   }
 
-  void TearDown(const ::benchmark::State& state) override { ASSERT_OK(minio_.Stop()); }
+  void TearDown(const ::benchmark::State& state) override {
+    ASSERT_OK(minio_->Stop());
+    // Delete temporary directory, freeing up disk space
+    minio_.reset();
+  }
 
  protected:
-  MinioTestServer minio_;
+  std::unique_ptr<MinioTestServer> minio_;
   std::string region_;
   std::string bucket_;
   Aws::Client::ClientConfiguration client_config_;