You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ap...@apache.org on 2023/06/03 13:59:35 UTC

[arrow] branch main updated: GH-34363: [C++] Use equal size parts in S3 upload for R2 compatibility (#35808)

This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1ecb04f398 GH-34363: [C++] Use equal size parts in S3 upload for R2 compatibility (#35808)
1ecb04f398 is described below

commit 1ecb04f39851a709a0382df908043b453c42e281
Author: Will Jones <wi...@gmail.com>
AuthorDate: Sat Jun 3 06:59:22 2023 -0700

    GH-34363: [C++] Use equal size parts in S3 upload for R2 compatibility (#35808)
    
    ### Rationale for this change
    
    S3 and Minio allow varied part sizes, but R2 doesn't yet. So for now we'll change to only write
    
    ### What changes are included in this PR?
    
    Alters the `DoWrite` implementation to used fixed size parts.
    
    ### Are these changes tested?
    
    * [x] Tested manually against R2
    * [x] Tested manually against S3
    
    ### Are there any user-facing changes?
    
    * Closes: #34363
    
    Authored-by: Will Jones <wi...@gmail.com>
    Signed-off-by: Antoine Pitrou <an...@python.org>
---
 cpp/src/arrow/filesystem/s3fs.cc | 81 +++++++++++++++++++++-------------------
 1 file changed, 43 insertions(+), 38 deletions(-)

diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc
index dc033b9958..c3a6eb0eac 100644
--- a/cpp/src/arrow/filesystem/s3fs.cc
+++ b/cpp/src/arrow/filesystem/s3fs.cc
@@ -1150,11 +1150,14 @@ class ObjectInputFile final : public io::RandomAccessFile {
   std::shared_ptr<const KeyValueMetadata> metadata_;
 };
 
-// Minimum size for each part of a multipart upload, except for the last part.
-// AWS doc says "5 MB" but it's not clear whether those are MB or MiB,
-// so I chose the safer value.
-// (see https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadUploadPart.html)
-static constexpr int64_t kMinimumPartUpload = 5 * 1024 * 1024;
+// Upload size per part. While AWS and Minio support different sizes for each
+// part (only requiring a minimum of 5MB), Cloudflare R2 requires that every
+// part be exactly equal (except for the last part). We set this to 10 MB, so
+// that in combination with the maximum number of parts of 10,000, this gives a
+// file limit of 100k MB (or about 98 GB).
+// (see https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html)
+// (for rational, see: https://github.com/apache/arrow/issues/34363)
+static constexpr int64_t kPartUploadSize = 10 * 1024 * 1024;
 
 // An OutputStream that writes to a S3 object
 class ObjectOutputStream final : public io::OutputStream {
@@ -1304,27 +1307,44 @@ class ObjectOutputStream final : public io::OutputStream {
       return Status::Invalid("Operation on closed stream");
     }
 
-    if (!current_part_ && nbytes >= part_upload_threshold_) {
-      // No current part and data large enough, upload it directly
-      // (without copying if the buffer is owned)
-      RETURN_NOT_OK(UploadPart(data, nbytes, owned_buffer));
-      pos_ += nbytes;
-      return Status::OK();
+    const int8_t* data_ptr = reinterpret_cast<const int8_t*>(data);
+    auto advance_ptr = [&data_ptr, &nbytes](const int64_t offset) {
+      data_ptr += offset;
+      nbytes -= offset;
+    };
+
+    // Handle case where we have some bytes bufferred from prior calls.
+    if (current_part_size_ > 0) {
+      // Try to fill current buffer
+      const int64_t to_copy = std::min(nbytes, kPartUploadSize - current_part_size_);
+      RETURN_NOT_OK(current_part_->Write(data_ptr, to_copy));
+      current_part_size_ += to_copy;
+      advance_ptr(to_copy);
+      pos_ += to_copy;
+
+      // If buffer isn't full, break
+      if (current_part_size_ < kPartUploadSize) {
+        return Status::OK();
+      }
+
+      // Upload current buffer
+      RETURN_NOT_OK(CommitCurrentPart());
     }
-    // Can't upload data on its own, need to buffer it
-    if (!current_part_) {
-      ARROW_ASSIGN_OR_RAISE(
-          current_part_,
-          io::BufferOutputStream::Create(part_upload_threshold_, io_context_.pool()));
-      current_part_size_ = 0;
+
+    // We can upload chunks without copying them into a buffer
+    while (nbytes >= kPartUploadSize) {
+      RETURN_NOT_OK(UploadPart(data_ptr, kPartUploadSize));
+      advance_ptr(kPartUploadSize);
+      pos_ += kPartUploadSize;
     }
-    RETURN_NOT_OK(current_part_->Write(data, nbytes));
-    pos_ += nbytes;
-    current_part_size_ += nbytes;
 
-    if (current_part_size_ >= part_upload_threshold_) {
-      // Current part large enough, upload it
-      RETURN_NOT_OK(CommitCurrentPart());
+    // Buffer remaining bytes
+    if (nbytes > 0) {
+      current_part_size_ = nbytes;
+      ARROW_ASSIGN_OR_RAISE(current_part_, io::BufferOutputStream::Create(
+                                               kPartUploadSize, io_context_.pool()));
+      RETURN_NOT_OK(current_part_->Write(data_ptr, current_part_size_));
+      pos_ += current_part_size_;
     }
 
     return Status::OK();
@@ -1407,20 +1427,6 @@ class ObjectOutputStream final : public io::OutputStream {
     }
 
     ++part_number_;
-    // With up to 10000 parts in an upload (S3 limit), a stream writing chunks
-    // of exactly 5MB would be limited to 50GB total.  To avoid that, we bump
-    // the upload threshold every 100 parts.  So the pattern is:
-    // - part 1 to 99: 5MB threshold
-    // - part 100 to 199: 10MB threshold
-    // - part 200 to 299: 15MB threshold
-    // ...
-    // - part 9900 to 9999: 500MB threshold
-    // So the total size limit is 2475000MB or ~2.4TB, while keeping manageable
-    // chunk sizes and avoiding too much buffering in the common case of a small-ish
-    // stream.  If the limit's not enough, we can revisit.
-    if (part_number_ % 100 == 0) {
-      part_upload_threshold_ += kMinimumPartUpload;
-    }
 
     return Status::OK();
   }
@@ -1482,7 +1488,6 @@ class ObjectOutputStream final : public io::OutputStream {
   int32_t part_number_ = 1;
   std::shared_ptr<io::BufferOutputStream> current_part_;
   int64_t current_part_size_ = 0;
-  int64_t part_upload_threshold_ = kMinimumPartUpload;
 
   // This struct is kept alive through background writes to avoid problems
   // in the completion handler.