You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@opendal.apache.org by xu...@apache.org on 2023/03/30 11:49:44 UTC

[incubator-opendal] branch main updated: feat: add cache_control to OpWrite (#1806)

This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new db36bfda feat: add cache_control to OpWrite (#1806)
db36bfda is described below

commit db36bfda9f81acaae9feba14fe499c01bed9caba
Author: Bas Zalmstra <ba...@prefix.dev>
AuthorDate: Thu Mar 30 13:49:39 2023 +0200

    feat: add cache_control to OpWrite (#1806)
---
 core/src/services/s3/backend.rs | 38 +++++++++++++++++++++++++++++++++-----
 core/src/services/s3/writer.rs  |  1 +
 core/src/types/ops.rs           | 13 +++++++++++++
 3 files changed, 47 insertions(+), 5 deletions(-)

diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs
index dcb8819e..66032608 100644
--- a/core/src/services/s3/backend.rs
+++ b/core/src/services/s3/backend.rs
@@ -26,10 +26,10 @@ use base64::prelude::BASE64_STANDARD;
 use base64::Engine;
 use bytes::Buf;
 use bytes::Bytes;
-use http::header::HeaderName;
 use http::header::CONTENT_DISPOSITION;
 use http::header::CONTENT_LENGTH;
 use http::header::CONTENT_TYPE;
+use http::header::{HeaderName, CACHE_CONTROL};
 use http::HeaderValue;
 use http::Request;
 use http::Response;
@@ -1135,7 +1135,8 @@ impl Accessor for S3Backend {
     }
 
     async fn create(&self, path: &str, _: OpCreate) -> Result<RpCreate> {
-        let mut req = self.s3_put_object_request(path, Some(0), None, None, AsyncBody::Empty)?;
+        let mut req =
+            self.s3_put_object_request(path, Some(0), None, None, None, AsyncBody::Empty)?;
 
         self.signer.sign(&mut req).map_err(new_request_sign_error)?;
 
@@ -1168,7 +1169,14 @@ impl Accessor for S3Backend {
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
         let upload_id = if args.append() {
-            let resp = self.s3_initiate_multipart_upload(path).await?;
+            let resp = self
+                .s3_initiate_multipart_upload(
+                    path,
+                    args.content_type(),
+                    args.content_disposition(),
+                    args.cache_control(),
+                )
+                .await?;
 
             let status = resp.status();
 
@@ -1249,7 +1257,7 @@ impl Accessor for S3Backend {
                 v.override_cache_control(),
             )?,
             PresignOperation::Write(_) => {
-                self.s3_put_object_request(path, None, None, None, AsyncBody::Empty)?
+                self.s3_put_object_request(path, None, None, None, None, AsyncBody::Empty)?
             }
         };
 
@@ -1400,6 +1408,7 @@ impl S3Backend {
         size: Option<usize>,
         content_type: Option<&str>,
         content_disposition: Option<&str>,
+        cache_control: Option<&str>,
         body: AsyncBody,
     ) -> Result<Request<AsyncBody>> {
         let p = build_abs_path(&self.root, path);
@@ -1420,6 +1429,10 @@ impl S3Backend {
             req = req.header(CONTENT_DISPOSITION, pos)
         }
 
+        if let Some(cache_control) = cache_control {
+            req = req.header(CACHE_CONTROL, cache_control)
+        }
+
         // Set SSE headers.
         req = self.insert_sse_headers(req, true);
 
@@ -1506,12 +1519,27 @@ impl S3Backend {
     async fn s3_initiate_multipart_upload(
         &self,
         path: &str,
+        content_type: Option<&str>,
+        content_disposition: Option<&str>,
+        cache_control: Option<&str>,
     ) -> Result<Response<IncomingAsyncBody>> {
         let p = build_abs_path(&self.root, path);
 
         let url = format!("{}/{}?uploads", self.endpoint, percent_encode_path(&p));
 
-        let req = Request::post(&url);
+        let mut req = Request::post(&url);
+
+        if let Some(mime) = content_type {
+            req = req.header(CONTENT_TYPE, mime)
+        }
+
+        if let Some(content_disposition) = content_disposition {
+            req = req.header(CONTENT_DISPOSITION, content_disposition)
+        }
+
+        if let Some(cache_control) = cache_control {
+            req = req.header(CACHE_CONTROL, cache_control)
+        }
 
         // Set SSE headers.
         let req = self.insert_sse_headers(req, true);
diff --git a/core/src/services/s3/writer.rs b/core/src/services/s3/writer.rs
index 5b749cd5..bbdb9045 100644
--- a/core/src/services/s3/writer.rs
+++ b/core/src/services/s3/writer.rs
@@ -61,6 +61,7 @@ impl oio::Write for S3Writer {
             Some(bs.len()),
             self.op.content_type(),
             self.op.content_disposition(),
+            self.op.cache_control(),
             AsyncBody::Bytes(bs),
         )?;
 
diff --git a/core/src/types/ops.rs b/core/src/types/ops.rs
index 4e698a36..f66655a6 100644
--- a/core/src/types/ops.rs
+++ b/core/src/types/ops.rs
@@ -293,6 +293,7 @@ pub struct OpWrite {
 
     content_type: Option<String>,
     content_disposition: Option<String>,
+    cache_control: Option<String>,
 }
 
 impl OpWrite {
@@ -305,6 +306,7 @@ impl OpWrite {
 
             content_type: None,
             content_disposition: None,
+            cache_control: None,
         }
     }
 
@@ -338,4 +340,15 @@ impl OpWrite {
         self.content_disposition = Some(content_disposition.to_string());
         self
     }
+
+    /// Get the cache control from option
+    pub fn cache_control(&self) -> Option<&str> {
+        self.cache_control.as_deref()
+    }
+
+    /// Set the content type of option
+    pub fn with_cache_control(mut self, cache_control: &str) -> Self {
+        self.cache_control = Some(cache_control.to_string());
+        self
+    }
 }