You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2020/05/04 19:23:47 UTC
[arrow] branch master updated: ARROW-8692: [C++] Avoid memory
copies when downloading from S3
This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 9a5d010 ARROW-8692: [C++] Avoid memory copies when downloading from S3
9a5d010 is described below
commit 9a5d010556ac6b9e30115e41fff281870f48e830
Author: Antoine Pitrou <an...@python.org>
AuthorDate: Mon May 4 15:22:47 2020 -0400
ARROW-8692: [C++] Avoid memory copies when downloading from S3
The AWS SDK creates a auto-growing StringStream by default, entailing multiple memory copies when transferring large data blocks (because of resizes). Instead, write directly into the target data area.
Low-level benchmarks with a local Minio server:
* before:
```
-----------------------------------------------------------------------------------------------------
Benchmark Time CPU Iterations UserCounters...
-----------------------------------------------------------------------------------------------------
MinioFixture/ReadAll500Mib/real_time 434528630 ns 431461370 ns 2 bytes_per_second=1.1237G/s items_per_second=2.30134/s
MinioFixture/ReadChunked500Mib/real_time 419380389 ns 339293384 ns 2 bytes_per_second=1.16429G/s items_per_second=2.38447/s
MinioFixture/ReadCoalesced500Mib/real_time 258812283 ns 470149 ns 3 bytes_per_second=1.88662G/s items_per_second=3.8638/s
```
* after:
```
MinioFixture/ReadAll500Mib/real_time 194620947 ns 161227337 ns 4 bytes_per_second=2.50888G/s items_per_second=5.13819/s
MinioFixture/ReadChunked500Mib/real_time 276437393 ns 183030215 ns 3 bytes_per_second=1.76634G/s items_per_second=3.61746/s
MinioFixture/ReadCoalesced500Mib/real_time 86693750 ns 448568 ns 6 bytes_per_second=5.63225G/s items_per_second=11.5349/s
```
Parquet read benchmarks from a local Minio server show speedups from 1.1x to 1.9x.
Closes #7098 from pitrou/ARROW-8692-s3-avoid-copies
Authored-by: Antoine Pitrou <an...@python.org>
Signed-off-by: David Li <li...@gmail.com>
---
cpp/src/arrow/filesystem/s3_internal.h | 21 +++++++++----
cpp/src/arrow/filesystem/s3fs.cc | 49 ++++++++++++++++++------------
cpp/src/arrow/filesystem/s3fs_benchmark.cc | 19 +++++++-----
3 files changed, 56 insertions(+), 33 deletions(-)
diff --git a/cpp/src/arrow/filesystem/s3_internal.h b/cpp/src/arrow/filesystem/s3_internal.h
index 709fa20..aad1902 100644
--- a/cpp/src/arrow/filesystem/s3_internal.h
+++ b/cpp/src/arrow/filesystem/s3_internal.h
@@ -102,9 +102,9 @@ Status ErrorToStatus(const Aws::Client::AWSError<ErrorType>& error) {
return ErrorToStatus(std::string(), error);
}
-template <typename Result, typename Error>
+template <typename AwsResult, typename Error>
Status OutcomeToStatus(const std::string& prefix,
- const Aws::Utils::Outcome<Result, Error>& outcome) {
+ const Aws::Utils::Outcome<AwsResult, Error>& outcome) {
if (outcome.IsSuccess()) {
return Status::OK();
} else {
@@ -112,9 +112,9 @@ Status OutcomeToStatus(const std::string& prefix,
}
}
-template <typename Result, typename Error, typename... Args>
+template <typename AwsResult, typename Error, typename... Args>
Status OutcomeToStatus(const std::tuple<Args&...>& prefix,
- const Aws::Utils::Outcome<Result, Error>& outcome) {
+ const Aws::Utils::Outcome<AwsResult, Error>& outcome) {
if (outcome.IsSuccess()) {
return Status::OK();
} else {
@@ -122,11 +122,20 @@ Status OutcomeToStatus(const std::tuple<Args&...>& prefix,
}
}
-template <typename Result, typename Error>
-Status OutcomeToStatus(const Aws::Utils::Outcome<Result, Error>& outcome) {
+template <typename AwsResult, typename Error>
+Status OutcomeToStatus(const Aws::Utils::Outcome<AwsResult, Error>& outcome) {
return OutcomeToStatus(std::string(), outcome);
}
+template <typename AwsResult, typename Error>
+Result<AwsResult> OutcomeToResult(Aws::Utils::Outcome<AwsResult, Error> outcome) {
+ if (outcome.IsSuccess()) {
+ return std::move(outcome).GetResultWithOwnership();
+ } else {
+ return ErrorToStatus(outcome.GetError());
+ }
+}
+
inline Aws::String ToAwsString(const std::string& s) {
// Direct construction of Aws::String from std::string doesn't work because
// it uses a specific Allocator class.
diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc
index 14559cb..70c87f4 100644
--- a/cpp/src/arrow/filesystem/s3fs.cc
+++ b/cpp/src/arrow/filesystem/s3fs.cc
@@ -89,6 +89,7 @@ using ::arrow::fs::internal::FromAwsDatetime;
using ::arrow::fs::internal::FromAwsString;
using ::arrow::fs::internal::IsAlreadyExists;
using ::arrow::fs::internal::IsNotFound;
+using ::arrow::fs::internal::OutcomeToResult;
using ::arrow::fs::internal::OutcomeToStatus;
using ::arrow::fs::internal::ToAwsString;
using ::arrow::fs::internal::ToURLEncodedAwsString;
@@ -359,14 +360,35 @@ std::string FormatRange(int64_t start, int64_t length) {
return ss.str();
}
-Status GetObjectRange(Aws::S3::S3Client* client, const S3Path& path, int64_t start,
- int64_t length, S3Model::GetObjectResult* out) {
+// A non-copying iostream.
+// See https://stackoverflow.com/questions/35322033/aws-c-sdk-uploadpart-times-out
+// https://stackoverflow.com/questions/13059091/creating-an-input-stream-from-constant-memory
+class StringViewStream : Aws::Utils::Stream::PreallocatedStreamBuf, public std::iostream {
+ public:
+ StringViewStream(const void* data, int64_t nbytes)
+ : Aws::Utils::Stream::PreallocatedStreamBuf(
+ reinterpret_cast<unsigned char*>(const_cast<void*>(data)),
+ static_cast<size_t>(nbytes)),
+ std::iostream(this) {}
+};
+
+// By default, the AWS SDK reads object data into an auto-growing StringStream.
+// To avoid copies, read directly into our preallocated buffer instead.
+// See https://github.com/aws/aws-sdk-cpp/issues/64 for an alternative but
+// functionally similar recipe.
+Aws::IOStreamFactory AwsWriteableStreamFactory(void* data, int64_t nbytes) {
+ return [=]() { return new StringViewStream(data, nbytes); };
+}
+
+Result<S3Model::GetObjectResult> GetObjectRange(Aws::S3::S3Client* client,
+ const S3Path& path, int64_t start,
+ int64_t length, void* out) {
S3Model::GetObjectRequest req;
req.SetBucket(ToAwsString(path.bucket));
req.SetKey(ToAwsString(path.key));
req.SetRange(ToAwsString(FormatRange(start, length)));
- ARROW_AWS_ASSIGN_OR_RAISE(*out, client->GetObject(req));
- return Status::OK();
+ req.SetResponseStreamFactory(AwsWriteableStreamFactory(out, length));
+ return OutcomeToResult(client->GetObject(req));
}
// A RandomAccessFile that reads from a S3 object
@@ -452,11 +474,11 @@ class ObjectInputFile : public io::RandomAccessFile {
}
// Read the desired range of bytes
- S3Model::GetObjectResult result;
- RETURN_NOT_OK(GetObjectRange(client_, path_, position, nbytes, &result));
+ ARROW_ASSIGN_OR_RAISE(S3Model::GetObjectResult result,
+ GetObjectRange(client_, path_, position, nbytes, out));
auto& stream = result.GetBody();
- stream.read(reinterpret_cast<char*>(out), nbytes);
+ stream.ignore(nbytes);
// NOTE: the stream is a stringstream by default, there is no actual error
// to check for. However, stream.fail() may return true if EOF is reached.
return stream.gcount();
@@ -499,19 +521,6 @@ class ObjectInputFile : public io::RandomAccessFile {
int64_t content_length_ = -1;
};
-// A non-copying istream.
-// See https://stackoverflow.com/questions/35322033/aws-c-sdk-uploadpart-times-out
-// https://stackoverflow.com/questions/13059091/creating-an-input-stream-from-constant-memory
-
-class StringViewStream : Aws::Utils::Stream::PreallocatedStreamBuf, public std::iostream {
- public:
- StringViewStream(const void* data, int64_t nbytes)
- : Aws::Utils::Stream::PreallocatedStreamBuf(
- reinterpret_cast<unsigned char*>(const_cast<void*>(data)),
- static_cast<size_t>(nbytes)),
- std::iostream(this) {}
-};
-
// Minimum size for each part of a multipart upload, except for the last part.
// AWS doc says "5 MB" but it's not clear whether those are MB or MiB,
// so I chose the safer value.
diff --git a/cpp/src/arrow/filesystem/s3fs_benchmark.cc b/cpp/src/arrow/filesystem/s3fs_benchmark.cc
index cf579f2..46bbbba 100644
--- a/cpp/src/arrow/filesystem/s3fs_benchmark.cc
+++ b/cpp/src/arrow/filesystem/s3fs_benchmark.cc
@@ -58,7 +58,8 @@ static const char* kEnvAwsRegion = "ARROW_TEST_S3_REGION";
class MinioFixture : public benchmark::Fixture {
public:
void SetUp(const ::benchmark::State& state) override {
- ASSERT_OK(minio_.Start());
+ minio_.reset(new MinioTestServer());
+ ASSERT_OK(minio_->Start());
const char* region_str = std::getenv(kEnvAwsRegion);
if (region_str) {
@@ -77,13 +78,13 @@ class MinioFixture : public benchmark::Fixture {
std::cerr << "Using default bucket: " << bucket_ << std::endl;
}
- client_config_.endpointOverride = ToAwsString(minio_.connect_string());
+ client_config_.endpointOverride = ToAwsString(minio_->connect_string());
client_config_.scheme = Aws::Http::Scheme::HTTP;
if (!region_.empty()) {
client_config_.region = ToAwsString(region_);
}
client_config_.retryStrategy = std::make_shared<ConnectRetryStrategy>();
- credentials_ = {ToAwsString(minio_.access_key()), ToAwsString(minio_.secret_key())};
+ credentials_ = {ToAwsString(minio_->access_key()), ToAwsString(minio_->secret_key())};
bool use_virtual_addressing = false;
client_.reset(
new Aws::S3::S3Client(credentials_, client_config_,
@@ -106,12 +107,12 @@ class MinioFixture : public benchmark::Fixture {
}
void MakeFileSystem() {
- options_.ConfigureAccessKey(minio_.access_key(), minio_.secret_key());
+ options_.ConfigureAccessKey(minio_->access_key(), minio_->secret_key());
options_.scheme = "http";
if (!region_.empty()) {
options_.region = region_;
}
- options_.endpoint_override = minio_.connect_string();
+ options_.endpoint_override = minio_->connect_string();
ASSERT_OK_AND_ASSIGN(fs_, S3FileSystem::Make(options_));
}
@@ -180,10 +181,14 @@ class MinioFixture : public benchmark::Fixture {
return Status::OK();
}
- void TearDown(const ::benchmark::State& state) override { ASSERT_OK(minio_.Stop()); }
+ void TearDown(const ::benchmark::State& state) override {
+ ASSERT_OK(minio_->Stop());
+ // Delete temporary directory, freeing up disk space
+ minio_.reset();
+ }
protected:
- MinioTestServer minio_;
+ std::unique_ptr<MinioTestServer> minio_;
std::string region_;
std::string bucket_;
Aws::Client::ClientConfiguration client_config_;