You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/04/03 12:56:54 UTC

[arrow-rs] branch master updated: feat: add etag for objectMeta (#3937)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new a9b1120eb feat: add etag for objectMeta (#3937)
a9b1120eb is described below

commit a9b1120eb81f9414aa84cfc97f5133c8bfbae9ca
Author: Alex Huang <hu...@gmail.com>
AuthorDate: Mon Apr 3 14:56:47 2023 +0200

    feat: add etag for objectMeta (#3937)
    
    * feat: add etag for objectMeta
    
    * replace the manual etag in response
    
    * fix typo
    
    * use option for e_tag
    
    * remove useless packages
---
 object_store/src/aws/client.rs   |  3 +++
 object_store/src/aws/mod.rs      |  7 ++++++-
 object_store/src/azure/client.rs |  4 +++-
 object_store/src/azure/mod.rs    | 12 +++++++++++-
 object_store/src/gcp/mod.rs      |  5 ++++-
 object_store/src/http/client.rs  |  7 ++++++-
 object_store/src/lib.rs          |  2 ++
 object_store/src/local.rs        |  4 +++-
 object_store/src/memory.rs       |  4 ++++
 object_store/src/prefix.rs       |  3 +++
 10 files changed, 45 insertions(+), 6 deletions(-)

diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs
index 7ac4b705b..9634c740d 100644
--- a/object_store/src/aws/client.rs
+++ b/object_store/src/aws/client.rs
@@ -164,6 +164,8 @@ pub struct ListContents {
     pub key: String,
     pub size: usize,
     pub last_modified: DateTime<Utc>,
+    #[serde(rename = "ETag")]
+    pub e_tag: Option<String>,
 }
 
 impl TryFrom<ListContents> for ObjectMeta {
@@ -174,6 +176,7 @@ impl TryFrom<ListContents> for ObjectMeta {
             location: Path::parse(value.key)?,
             last_modified: value.last_modified,
             size: value.size,
+            e_tag: value.e_tag,
         })
     }
 }
diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs
index 1e302e688..f88960b4b 100644
--- a/object_store/src/aws/mod.rs
+++ b/object_store/src/aws/mod.rs
@@ -232,7 +232,7 @@ impl ObjectStore for AmazonS3 {
     }
 
     async fn head(&self, location: &Path) -> Result<ObjectMeta> {
-        use reqwest::header::{CONTENT_LENGTH, LAST_MODIFIED};
+        use reqwest::header::{CONTENT_LENGTH, ETAG, LAST_MODIFIED};
 
         // Extract meta from headers
         // https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html#API_HeadObject_ResponseSyntax
@@ -256,10 +256,15 @@ impl ObjectStore for AmazonS3 {
         let content_length = content_length
             .parse()
             .context(InvalidContentLengthSnafu { content_length })?;
+
+        let e_tag = headers.get(ETAG).context(MissingEtagSnafu)?;
+        let e_tag = e_tag.to_str().context(BadHeaderSnafu)?;
+
         Ok(ObjectMeta {
             location: location.clone(),
             last_modified,
             size: content_length,
+            e_tag: Some(e_tag.to_string()),
         })
     }
 
diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs
index 494303dff..87432f62b 100644
--- a/object_store/src/azure/client.rs
+++ b/object_store/src/azure/client.rs
@@ -489,6 +489,7 @@ impl TryFrom<Blob> for ObjectMeta {
             location: Path::parse(value.name)?,
             last_modified: value.properties.last_modified,
             size: value.properties.content_length as usize,
+            e_tag: value.properties.e_tag,
         })
     }
 }
@@ -501,7 +502,6 @@ impl TryFrom<Blob> for ObjectMeta {
 struct BlobProperties {
     #[serde(deserialize_with = "deserialize_rfc1123", rename = "Last-Modified")]
     pub last_modified: DateTime<Utc>,
-    pub etag: String,
     #[serde(rename = "Content-Length")]
     pub content_length: u64,
     #[serde(rename = "Content-Type")]
@@ -510,6 +510,8 @@ struct BlobProperties {
     pub content_encoding: Option<String>,
     #[serde(rename = "Content-Language")]
     pub content_language: Option<String>,
+    #[serde(rename = "Etag")]
+    pub e_tag: Option<String>,
 }
 
 #[derive(Debug, Clone, PartialEq, Eq)]
diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs
index e5f1465ad..c2e72f214 100644
--- a/object_store/src/azure/mod.rs
+++ b/object_store/src/azure/mod.rs
@@ -140,6 +140,9 @@ enum Error {
 
     #[snafu(display("Configuration key: '{}' is not known.", key))]
     UnknownConfigurationKey { key: String },
+
+    #[snafu(display("ETag Header missing from response"))]
+    MissingEtag,
 }
 
 impl From<Error> for super::Error {
@@ -232,7 +235,7 @@ impl ObjectStore for MicrosoftAzure {
     }
 
     async fn head(&self, location: &Path) -> Result<ObjectMeta> {
-        use reqwest::header::{CONTENT_LENGTH, LAST_MODIFIED};
+        use reqwest::header::{CONTENT_LENGTH, ETAG, LAST_MODIFIED};
 
         // Extract meta from headers
         // https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob-properties
@@ -257,10 +260,17 @@ impl ObjectStore for MicrosoftAzure {
             .parse()
             .context(InvalidContentLengthSnafu { content_length })?;
 
+        let e_tag = headers
+            .get(ETAG)
+            .ok_or(Error::MissingEtag)?
+            .to_str()
+            .context(BadHeaderSnafu)?;
+
         Ok(ObjectMeta {
             location: location.clone(),
             last_modified,
             size: content_length,
+            e_tag: Some(e_tag.to_string()),
         })
     }
 
diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs
index fe79a6e07..5247693e6 100644
--- a/object_store/src/gcp/mod.rs
+++ b/object_store/src/gcp/mod.rs
@@ -196,6 +196,8 @@ struct Object {
     name: String,
     size: String,
     updated: DateTime<Utc>,
+    #[serde(rename = "etag")]
+    e_tag: Option<String>,
 }
 
 #[derive(serde::Deserialize, Debug)]
@@ -209,7 +211,6 @@ struct InitiateMultipartUploadResult {
 struct MultipartPart {
     #[serde(rename = "PartNumber")]
     part_number: usize,
-    #[serde(rename = "ETag")]
     e_tag: String,
 }
 
@@ -1170,11 +1171,13 @@ fn convert_object_meta(object: &Object) -> Result<ObjectMeta> {
     let location = Path::parse(&object.name)?;
     let last_modified = object.updated;
     let size = object.size.parse().context(InvalidSizeSnafu)?;
+    let e_tag = object.e_tag.clone();
 
     Ok(ObjectMeta {
         location,
         last_modified,
         size,
+        e_tag,
     })
 }
 
diff --git a/object_store/src/http/client.rs b/object_store/src/http/client.rs
index 799c5be0c..5ef272180 100644
--- a/object_store/src/http/client.rs
+++ b/object_store/src/http/client.rs
@@ -335,10 +335,12 @@ impl MultiStatusResponse {
 
     /// Returns this objects metadata as [`ObjectMeta`]
     pub fn object_meta(&self, base_url: &Url) -> Result<ObjectMeta> {
+        let last_modified = self.prop_stat.prop.last_modified;
         Ok(ObjectMeta {
             location: self.path(base_url)?,
-            last_modified: self.prop_stat.prop.last_modified,
+            last_modified,
             size: self.size()?,
+            e_tag: self.prop_stat.prop.e_tag.clone(),
         })
     }
 
@@ -364,6 +366,9 @@ pub struct Prop {
 
     #[serde(rename = "resourcetype")]
     resource_type: ResourceType,
+
+    #[serde(rename = "getetag")]
+    e_tag: Option<String>,
 }
 
 #[derive(Deserialize)]
diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index 573707128..c31027c07 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -548,6 +548,8 @@ pub struct ObjectMeta {
     pub last_modified: DateTime<Utc>,
     /// The size in bytes of the object
     pub size: usize,
+    /// The unique identifier for the object
+    pub e_tag: Option<String>,
 }
 
 /// Result for a get request
diff --git a/object_store/src/local.rs b/object_store/src/local.rs
index 9e710c28c..d2553d46f 100644
--- a/object_store/src/local.rs
+++ b/object_store/src/local.rs
@@ -23,6 +23,7 @@ use crate::{
 };
 use async_trait::async_trait;
 use bytes::Bytes;
+use chrono::{DateTime, Utc};
 use futures::future::BoxFuture;
 use futures::FutureExt;
 use futures::{stream::BoxStream, StreamExt};
@@ -887,7 +888,7 @@ fn convert_entry(entry: DirEntry, location: Path) -> Result<ObjectMeta> {
 }
 
 fn convert_metadata(metadata: std::fs::Metadata, location: Path) -> Result<ObjectMeta> {
-    let last_modified = metadata
+    let last_modified: DateTime<Utc> = metadata
         .modified()
         .expect("Modified file time should be supported on this platform")
         .into();
@@ -900,6 +901,7 @@ fn convert_metadata(metadata: std::fs::Metadata, location: Path) -> Result<Objec
         location,
         last_modified,
         size,
+        e_tag: None,
     })
 }
 
diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs
index 1433701e8..057a260f7 100644
--- a/object_store/src/memory.rs
+++ b/object_store/src/memory.rs
@@ -151,10 +151,12 @@ impl ObjectStore for InMemory {
 
     async fn head(&self, location: &Path) -> Result<ObjectMeta> {
         let entry = self.entry(location).await?;
+
         Ok(ObjectMeta {
             location: location.clone(),
             last_modified: entry.1,
             size: entry.0.len(),
+            e_tag: None,
         })
     }
 
@@ -185,6 +187,7 @@ impl ObjectStore for InMemory {
                     location: key.clone(),
                     last_modified: value.1,
                     size: value.0.len(),
+                    e_tag: None,
                 })
             })
             .collect();
@@ -228,6 +231,7 @@ impl ObjectStore for InMemory {
                     location: k.clone(),
                     last_modified: v.1,
                     size: v.0.len(),
+                    e_tag: None,
                 };
                 objects.push(object);
             }
diff --git a/object_store/src/prefix.rs b/object_store/src/prefix.rs
index 7e7e7167b..eba379553 100644
--- a/object_store/src/prefix.rs
+++ b/object_store/src/prefix.rs
@@ -108,6 +108,7 @@ impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
             last_modified: meta.last_modified,
             size: meta.size,
             location: self.strip_prefix(&meta.location).unwrap_or(meta.location),
+            e_tag: meta.e_tag,
         })
     }
 
@@ -128,6 +129,7 @@ impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
                 last_modified: meta.last_modified,
                 size: meta.size,
                 location: self.strip_prefix(&meta.location).unwrap_or(meta.location),
+                e_tag: meta.e_tag,
             })
             .boxed())
     }
@@ -155,6 +157,7 @@ impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
                             last_modified: meta.last_modified,
                             size: meta.size,
                             location: self.strip_prefix(&meta.location)?,
+                            e_tag: meta.e_tag.clone(),
                         })
                     })
                     .collect(),