You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/12/01 14:23:44 UTC
[arrow-rs] branch master updated: fix(object_store,aws,gcp): multipart upload enforce size limit of 5 MiB not 5MB (#3234)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 8f5fd9a12 fix(object_store,aws,gcp): multipart upload enforce size limit of 5 MiB not 5MB (#3234)
8f5fd9a12 is described below
commit 8f5fd9a123bfd790baa392f11f16807468229fff
Author: Will Jones <wi...@gmail.com>
AuthorDate: Thu Dec 1 06:23:38 2022 -0800
fix(object_store,aws,gcp): multipart upload enforce size limit of 5 MiB not 5MB (#3234)
* fix: use better minimum part size
* test: don't make the test larger than necessary
* Further tweaks
* Format
Co-authored-by: Raphael Taylor-Davies <r....@googlemail.com>
---
object_store/CONTRIBUTING.md | 6 +++---
object_store/src/lib.rs | 3 ++-
object_store/src/multipart.rs | 15 ++++++++++-----
3 files changed, 15 insertions(+), 9 deletions(-)
diff --git a/object_store/CONTRIBUTING.md b/object_store/CONTRIBUTING.md
index e780ec5c9..4e6b3afe3 100644
--- a/object_store/CONTRIBUTING.md
+++ b/object_store/CONTRIBUTING.md
@@ -46,9 +46,9 @@ Setup environment
```
export TEST_INTEGRATION=1
-export AWS_DEFAULT_REGION=us-east-1
-export AWS_ACCESS_KEY_ID=test
-export AWS_SECRET_ACCESS_KEY=test
+export OBJECT_STORE_AWS_DEFAULT_REGION=us-east-1
+export OBJECT_STORE_AWS_ACCESS_KEY_ID=test
+export OBJECT_STORE_AWS_SECRET_ACCESS_KEY=test
export AWS_ENDPOINT=http://128.0.0.1:4566
export OBJECT_STORE_BUCKET=test-bucket
```
diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index 6278d827b..a36bb5fb8 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -769,7 +769,8 @@ mod tests {
assert_eq!(bytes_expected, bytes_written);
// Can overwrite some storage
- let data = get_vec_of_bytes(5_000, 5);
+ // Sizes carefully chosen to exactly hit min limit of 5 MiB
+ let data = get_vec_of_bytes(242_880, 22);
let bytes_expected = data.concat();
let (_, mut writer) = storage.put_multipart(&location).await.unwrap();
for chunk in &data {
diff --git a/object_store/src/multipart.rs b/object_store/src/multipart.rs
index 102d8beda..de8591462 100644
--- a/object_store/src/multipart.rs
+++ b/object_store/src/multipart.rs
@@ -81,7 +81,11 @@ where
current_buffer: Vec::new(),
// TODO: Should self vary by provider?
// TODO: Should we automatically increase then when part index gets large?
- min_part_size: 5_000_000,
+
+ // Minimum size of 5 MiB
+ // https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
+ // https://cloud.google.com/storage/quotas#requests
+ min_part_size: 5_242_880,
current_part_idx: 0,
completion_task: None,
}
@@ -113,13 +117,14 @@ where
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
- ) -> std::task::Poll<Result<usize, io::Error>> {
+ ) -> Poll<Result<usize, io::Error>> {
// Poll current tasks
self.as_mut().poll_tasks(cx)?;
// If adding buf to pending buffer would trigger send, check
// whether we have capacity for another task.
- let enough_to_send = (buf.len() + self.current_buffer.len()) > self.min_part_size;
+ let enough_to_send =
+ (buf.len() + self.current_buffer.len()) >= self.min_part_size;
if enough_to_send && self.tasks.len() < self.max_concurrency {
// If we do, copy into the buffer and submit the task, and return ready.
self.current_buffer.extend_from_slice(buf);
@@ -149,7 +154,7 @@ where
fn poll_flush(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
- ) -> std::task::Poll<Result<(), io::Error>> {
+ ) -> Poll<Result<(), io::Error>> {
// Poll current tasks
self.as_mut().poll_tasks(cx)?;
@@ -177,7 +182,7 @@ where
fn poll_shutdown(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
- ) -> std::task::Poll<Result<(), io::Error>> {
+ ) -> Poll<Result<(), io::Error>> {
// First, poll flush
match self.as_mut().poll_flush(cx) {
Poll::Pending => return Poll::Pending,