You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ap...@apache.org on 2023/06/03 13:59:35 UTC
[arrow] branch main updated: GH-34363: [C++] Use equal size parts in S3 upload for R2 compatibility (#35808)
This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 1ecb04f398 GH-34363: [C++] Use equal size parts in S3 upload for R2 compatibility (#35808)
1ecb04f398 is described below
commit 1ecb04f39851a709a0382df908043b453c42e281
Author: Will Jones <wi...@gmail.com>
AuthorDate: Sat Jun 3 06:59:22 2023 -0700
GH-34363: [C++] Use equal size parts in S3 upload for R2 compatibility (#35808)
### Rationale for this change
S3 and Minio allow varied part sizes, but R2 doesn't yet. So for now we'll change to only write
### What changes are included in this PR?
Alters the `DoWrite` implementation to used fixed size parts.
### Are these changes tested?
* [x] Tested manually against R2
* [x] Tested manually against S3
### Are there any user-facing changes?
* Closes: #34363
Authored-by: Will Jones <wi...@gmail.com>
Signed-off-by: Antoine Pitrou <an...@python.org>
---
cpp/src/arrow/filesystem/s3fs.cc | 81 +++++++++++++++++++++-------------------
1 file changed, 43 insertions(+), 38 deletions(-)
diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc
index dc033b9958..c3a6eb0eac 100644
--- a/cpp/src/arrow/filesystem/s3fs.cc
+++ b/cpp/src/arrow/filesystem/s3fs.cc
@@ -1150,11 +1150,14 @@ class ObjectInputFile final : public io::RandomAccessFile {
std::shared_ptr<const KeyValueMetadata> metadata_;
};
-// Minimum size for each part of a multipart upload, except for the last part.
-// AWS doc says "5 MB" but it's not clear whether those are MB or MiB,
-// so I chose the safer value.
-// (see https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadUploadPart.html)
-static constexpr int64_t kMinimumPartUpload = 5 * 1024 * 1024;
+// Upload size per part. While AWS and Minio support different sizes for each
+// part (only requiring a minimum of 5MB), Cloudflare R2 requires that every
+// part be exactly equal (except for the last part). We set this to 10 MB, so
+// that in combination with the maximum number of parts of 10,000, this gives a
+// file limit of 100k MB (or about 98 GB).
+// (see https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html)
+// (for rational, see: https://github.com/apache/arrow/issues/34363)
+static constexpr int64_t kPartUploadSize = 10 * 1024 * 1024;
// An OutputStream that writes to a S3 object
class ObjectOutputStream final : public io::OutputStream {
@@ -1304,27 +1307,44 @@ class ObjectOutputStream final : public io::OutputStream {
return Status::Invalid("Operation on closed stream");
}
- if (!current_part_ && nbytes >= part_upload_threshold_) {
- // No current part and data large enough, upload it directly
- // (without copying if the buffer is owned)
- RETURN_NOT_OK(UploadPart(data, nbytes, owned_buffer));
- pos_ += nbytes;
- return Status::OK();
+ const int8_t* data_ptr = reinterpret_cast<const int8_t*>(data);
+ auto advance_ptr = [&data_ptr, &nbytes](const int64_t offset) {
+ data_ptr += offset;
+ nbytes -= offset;
+ };
+
+ // Handle case where we have some bytes bufferred from prior calls.
+ if (current_part_size_ > 0) {
+ // Try to fill current buffer
+ const int64_t to_copy = std::min(nbytes, kPartUploadSize - current_part_size_);
+ RETURN_NOT_OK(current_part_->Write(data_ptr, to_copy));
+ current_part_size_ += to_copy;
+ advance_ptr(to_copy);
+ pos_ += to_copy;
+
+ // If buffer isn't full, break
+ if (current_part_size_ < kPartUploadSize) {
+ return Status::OK();
+ }
+
+ // Upload current buffer
+ RETURN_NOT_OK(CommitCurrentPart());
}
- // Can't upload data on its own, need to buffer it
- if (!current_part_) {
- ARROW_ASSIGN_OR_RAISE(
- current_part_,
- io::BufferOutputStream::Create(part_upload_threshold_, io_context_.pool()));
- current_part_size_ = 0;
+
+ // We can upload chunks without copying them into a buffer
+ while (nbytes >= kPartUploadSize) {
+ RETURN_NOT_OK(UploadPart(data_ptr, kPartUploadSize));
+ advance_ptr(kPartUploadSize);
+ pos_ += kPartUploadSize;
}
- RETURN_NOT_OK(current_part_->Write(data, nbytes));
- pos_ += nbytes;
- current_part_size_ += nbytes;
- if (current_part_size_ >= part_upload_threshold_) {
- // Current part large enough, upload it
- RETURN_NOT_OK(CommitCurrentPart());
+ // Buffer remaining bytes
+ if (nbytes > 0) {
+ current_part_size_ = nbytes;
+ ARROW_ASSIGN_OR_RAISE(current_part_, io::BufferOutputStream::Create(
+ kPartUploadSize, io_context_.pool()));
+ RETURN_NOT_OK(current_part_->Write(data_ptr, current_part_size_));
+ pos_ += current_part_size_;
}
return Status::OK();
@@ -1407,20 +1427,6 @@ class ObjectOutputStream final : public io::OutputStream {
}
++part_number_;
- // With up to 10000 parts in an upload (S3 limit), a stream writing chunks
- // of exactly 5MB would be limited to 50GB total. To avoid that, we bump
- // the upload threshold every 100 parts. So the pattern is:
- // - part 1 to 99: 5MB threshold
- // - part 100 to 199: 10MB threshold
- // - part 200 to 299: 15MB threshold
- // ...
- // - part 9900 to 9999: 500MB threshold
- // So the total size limit is 2475000MB or ~2.4TB, while keeping manageable
- // chunk sizes and avoiding too much buffering in the common case of a small-ish
- // stream. If the limit's not enough, we can revisit.
- if (part_number_ % 100 == 0) {
- part_upload_threshold_ += kMinimumPartUpload;
- }
return Status::OK();
}
@@ -1482,7 +1488,6 @@ class ObjectOutputStream final : public io::OutputStream {
int32_t part_number_ = 1;
std::shared_ptr<io::BufferOutputStream> current_part_;
int64_t current_part_size_ = 0;
- int64_t part_upload_threshold_ = kMinimumPartUpload;
// This struct is kept alive through background writes to avoid problems
// in the completion handler.