You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/07/14 16:52:27 UTC

[arrow-rs] branch master updated: Handle empty S3 payloads (#4514) (#4518)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new edeb7bbd9 Handle empty S3 payloads (#4514) (#4518)
edeb7bbd9 is described below

commit edeb7bbd92b1e8069ce1a031a4e23f7bfbc4f1d6
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Fri Jul 14 12:52:20 2023 -0400

    Handle empty S3 payloads (#4514) (#4518)
---
 object_store/src/aws/client.rs   | 27 ++++++++++++++++-----------
 object_store/src/aws/mod.rs      |  4 ++--
 object_store/src/azure/client.rs |  2 +-
 object_store/src/lib.rs          |  9 +++++++++
 4 files changed, 28 insertions(+), 14 deletions(-)

diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs
index 0c2493651..971d2c608 100644
--- a/object_store/src/aws/client.rs
+++ b/object_store/src/aws/client.rs
@@ -35,7 +35,10 @@ use bytes::{Buf, Bytes};
 use itertools::Itertools;
 use percent_encoding::{utf8_percent_encode, PercentEncode};
 use quick_xml::events::{self as xml_events};
-use reqwest::{header::CONTENT_TYPE, Client as ReqwestClient, Method, Response};
+use reqwest::{
+    header::{CONTENT_LENGTH, CONTENT_TYPE},
+    Client as ReqwestClient, Method, Response,
+};
 use serde::{Deserialize, Serialize};
 use snafu::{ResultExt, Snafu};
 use std::sync::Arc;
@@ -236,7 +239,7 @@ impl S3Client {
     pub async fn put_request<T: Serialize + ?Sized + Sync>(
         &self,
         path: &Path,
-        bytes: Option<Bytes>,
+        bytes: Bytes,
         query: &T,
     ) -> Result<Response> {
         let credential = self.get_credential().await?;
@@ -244,18 +247,20 @@ impl S3Client {
         let mut builder = self.client.request(Method::PUT, url);
         let mut payload_sha256 = None;
 
-        if let Some(bytes) = bytes {
-            if let Some(checksum) = self.config().checksum {
-                let digest = checksum.digest(&bytes);
-                builder = builder
-                    .header(checksum.header_name(), BASE64_STANDARD.encode(&digest));
-                if checksum == Checksum::SHA256 {
-                    payload_sha256 = Some(digest);
-                }
+        if let Some(checksum) = self.config().checksum {
+            let digest = checksum.digest(&bytes);
+            builder =
+                builder.header(checksum.header_name(), BASE64_STANDARD.encode(&digest));
+            if checksum == Checksum::SHA256 {
+                payload_sha256 = Some(digest);
             }
-            builder = builder.body(bytes);
         }
 
+        builder = match bytes.is_empty() {
+            true => builder.header(CONTENT_LENGTH, 0), // Handle empty uploads (#4514)
+            false => builder.body(bytes),
+        };
+
         if let Some(value) = self.config().client_options.get_content_type(path) {
             builder = builder.header(CONTENT_TYPE, value);
         }
diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs
index 8a486f986..e74e6f2df 100644
--- a/object_store/src/aws/mod.rs
+++ b/object_store/src/aws/mod.rs
@@ -211,7 +211,7 @@ impl AmazonS3 {
 #[async_trait]
 impl ObjectStore for AmazonS3 {
     async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
-        self.client.put_request(location, Some(bytes), &()).await?;
+        self.client.put_request(location, bytes, &()).await?;
         Ok(())
     }
 
@@ -321,7 +321,7 @@ impl CloudMultiPartUploadImpl for S3MultiPartUpload {
             .client
             .put_request(
                 &self.location,
-                Some(buf.into()),
+                buf.into(),
                 &[("partNumber", &part), ("uploadId", &self.upload_id)],
             )
             .await?;
diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs
index 5ed6f2443..e18135c2c 100644
--- a/object_store/src/azure/client.rs
+++ b/object_store/src/azure/client.rs
@@ -387,7 +387,7 @@ fn to_list_result(value: ListResultInternal, prefix: Option<&str>) -> Result<Lis
         .map(ObjectMeta::try_from)
         // Note: workaround for gen2 accounts with hierarchical namespaces. These accounts also
         // return path segments as "directories" and include blobs in list requests with prefix,
-        // if the prefix mateches the blob. When we want directories, its always via
+        // if the prefix matches the blob. When we want directories, its always via
         // the BlobPrefix mechanics, and during lists we state that prefixes are evaluated on path segment basis.
         .filter_map_ok(|obj| {
             if obj.size > 0 && obj.location.as_ref().len() > prefix.as_ref().len() {
diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index 4867d485d..94261e7d4 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -1258,6 +1258,15 @@ mod tests {
         }
 
         delete_fixtures(storage).await;
+
+        let path = Path::from("empty");
+        storage.put(&path, Bytes::new()).await.unwrap();
+        let meta = storage.head(&path).await.unwrap();
+        assert_eq!(meta.size, 0);
+        let data = storage.get(&path).await.unwrap().bytes().await.unwrap();
+        assert_eq!(data.len(), 0);
+
+        storage.delete(&path).await.unwrap();
     }
 
     pub(crate) async fn get_opts(storage: &dyn ObjectStore) {