You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/12/01 14:23:44 UTC

[arrow-rs] branch master updated: fix(object_store,aws,gcp): multipart upload enforce size limit of 5 MiB not 5MB (#3234)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f5fd9a12 fix(object_store,aws,gcp): multipart upload enforce size limit of 5 MiB not 5MB (#3234)
8f5fd9a12 is described below

commit 8f5fd9a123bfd790baa392f11f16807468229fff
Author: Will Jones <wi...@gmail.com>
AuthorDate: Thu Dec 1 06:23:38 2022 -0800

    fix(object_store,aws,gcp): multipart upload enforce size limit of 5 MiB not 5MB (#3234)
    
    * fix: use better minimum part size
    
    * test: don't make the test larger than necessary
    
    * Further tweaks
    
    * Format
    
    Co-authored-by: Raphael Taylor-Davies <r....@googlemail.com>
---
 object_store/CONTRIBUTING.md  |  6 +++---
 object_store/src/lib.rs       |  3 ++-
 object_store/src/multipart.rs | 15 ++++++++++-----
 3 files changed, 15 insertions(+), 9 deletions(-)

diff --git a/object_store/CONTRIBUTING.md b/object_store/CONTRIBUTING.md
index e780ec5c9..4e6b3afe3 100644
--- a/object_store/CONTRIBUTING.md
+++ b/object_store/CONTRIBUTING.md
@@ -46,9 +46,9 @@ Setup environment
 
 ```
 export TEST_INTEGRATION=1
-export AWS_DEFAULT_REGION=us-east-1
-export AWS_ACCESS_KEY_ID=test
-export AWS_SECRET_ACCESS_KEY=test
+export OBJECT_STORE_AWS_DEFAULT_REGION=us-east-1
+export OBJECT_STORE_AWS_ACCESS_KEY_ID=test
+export OBJECT_STORE_AWS_SECRET_ACCESS_KEY=test
 export AWS_ENDPOINT=http://128.0.0.1:4566
 export OBJECT_STORE_BUCKET=test-bucket
 ```
diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index 6278d827b..a36bb5fb8 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -769,7 +769,8 @@ mod tests {
         assert_eq!(bytes_expected, bytes_written);
 
         // Can overwrite some storage
-        let data = get_vec_of_bytes(5_000, 5);
+        // Sizes carefully chosen to exactly hit min limit of 5 MiB
+        let data = get_vec_of_bytes(242_880, 22);
         let bytes_expected = data.concat();
         let (_, mut writer) = storage.put_multipart(&location).await.unwrap();
         for chunk in &data {
diff --git a/object_store/src/multipart.rs b/object_store/src/multipart.rs
index 102d8beda..de8591462 100644
--- a/object_store/src/multipart.rs
+++ b/object_store/src/multipart.rs
@@ -81,7 +81,11 @@ where
             current_buffer: Vec::new(),
             // TODO: Should self vary by provider?
             // TODO: Should we automatically increase then when part index gets large?
-            min_part_size: 5_000_000,
+
+            // Minimum size of 5 MiB
+            // https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
+            // https://cloud.google.com/storage/quotas#requests
+            min_part_size: 5_242_880,
             current_part_idx: 0,
             completion_task: None,
         }
@@ -113,13 +117,14 @@ where
         mut self: Pin<&mut Self>,
         cx: &mut std::task::Context<'_>,
         buf: &[u8],
-    ) -> std::task::Poll<Result<usize, io::Error>> {
+    ) -> Poll<Result<usize, io::Error>> {
         // Poll current tasks
         self.as_mut().poll_tasks(cx)?;
 
         // If adding buf to pending buffer would trigger send, check
         // whether we have capacity for another task.
-        let enough_to_send = (buf.len() + self.current_buffer.len()) > self.min_part_size;
+        let enough_to_send =
+            (buf.len() + self.current_buffer.len()) >= self.min_part_size;
         if enough_to_send && self.tasks.len() < self.max_concurrency {
             // If we do, copy into the buffer and submit the task, and return ready.
             self.current_buffer.extend_from_slice(buf);
@@ -149,7 +154,7 @@ where
     fn poll_flush(
         mut self: Pin<&mut Self>,
         cx: &mut std::task::Context<'_>,
-    ) -> std::task::Poll<Result<(), io::Error>> {
+    ) -> Poll<Result<(), io::Error>> {
         // Poll current tasks
         self.as_mut().poll_tasks(cx)?;
 
@@ -177,7 +182,7 @@ where
     fn poll_shutdown(
         mut self: Pin<&mut Self>,
         cx: &mut std::task::Context<'_>,
-    ) -> std::task::Poll<Result<(), io::Error>> {
+    ) -> Poll<Result<(), io::Error>> {
         // First, poll flush
         match self.as_mut().poll_flush(cx) {
             Poll::Pending => return Poll::Pending,