You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/05/18 08:18:44 UTC
[arrow-rs] branch master updated: Extract Common Listing and Retrieval Functionality (#4220)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new fe1b574f7 Extract Common Listing and Retrieval Functionality (#4220)
fe1b574f7 is described below
commit fe1b574f7bef356691b1ee22f10f20b1b06d1502
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Thu May 18 09:18:38 2023 +0100
Extract Common Listing and Retrieval Functionality (#4220)
* Factor out common cloud storage client functionality
* Remove format_prefix
* Review feedback
---
object_store/src/aws/client.rs | 224 ++++++++++-----------
object_store/src/aws/mod.rs | 63 +-----
object_store/src/azure/client.rs | 104 +++++-----
object_store/src/azure/mod.rs | 55 +----
object_store/src/client/get.rs | 70 +++++++
object_store/src/client/list.rs | 162 ++++++++++-----
.../src/client/{list.rs => list_response.rs} | 0
object_store/src/client/mod.rs | 18 +-
object_store/src/gcp/mod.rs | 148 +++++---------
object_store/src/util.rs | 8 -
10 files changed, 412 insertions(+), 440 deletions(-)
diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs
index 2c45050fa..cfce35254 100644
--- a/object_store/src/aws/client.rs
+++ b/object_store/src/aws/client.rs
@@ -18,17 +18,17 @@
use crate::aws::checksum::Checksum;
use crate::aws::credential::{AwsCredential, CredentialExt};
use crate::aws::{AwsCredentialProvider, STORE, STRICT_PATH_ENCODE_SET};
-use crate::client::list::ListResponse;
-use crate::client::pagination::stream_paginated;
+use crate::client::get::GetClient;
+use crate::client::list::ListClient;
+use crate::client::list_response::ListResponse;
use crate::client::retry::RetryExt;
use crate::client::GetOptionsExt;
use crate::multipart::UploadPart;
use crate::path::DELIMITER;
-use crate::util::format_prefix;
use crate::{
- BoxStream, ClientOptions, GetOptions, ListResult, MultipartId, Path, Result,
- RetryConfig, StreamExt,
+ ClientOptions, GetOptions, ListResult, MultipartId, Path, Result, RetryConfig,
};
+use async_trait::async_trait;
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use bytes::{Buf, Bytes};
@@ -169,40 +169,6 @@ impl S3Client {
self.config.credentials.get_credential().await
}
- /// Make an S3 GET request <https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html>
- pub async fn get_request(
- &self,
- path: &Path,
- options: GetOptions,
- head: bool,
- ) -> Result<Response> {
- let credential = self.get_credential().await?;
- let url = self.config.path_url(path);
- let method = match head {
- true => Method::HEAD,
- false => Method::GET,
- };
-
- let builder = self.client.request(method, url);
-
- let response = builder
- .with_get_options(options)
- .with_aws_sigv4(
- credential.as_ref(),
- &self.config.region,
- "s3",
- self.config.sign_payload,
- None,
- )
- .send_retry(&self.config.retry_config)
- .await
- .context(GetRequestSnafu {
- path: path.as_ref(),
- })?;
-
- Ok(response)
- }
-
/// Make an S3 PUT request <https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html>
pub async fn put_request<T: Serialize + ?Sized + Sync>(
&self,
@@ -302,88 +268,6 @@ impl S3Client {
Ok(())
}
- /// Make an S3 List request <https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html>
- async fn list_request(
- &self,
- prefix: Option<&str>,
- delimiter: bool,
- token: Option<&str>,
- offset: Option<&str>,
- ) -> Result<(ListResult, Option<String>)> {
- let credential = self.get_credential().await?;
- let url = self.config.bucket_endpoint.clone();
-
- let mut query = Vec::with_capacity(4);
-
- if let Some(token) = token {
- query.push(("continuation-token", token))
- }
-
- if delimiter {
- query.push(("delimiter", DELIMITER))
- }
-
- query.push(("list-type", "2"));
-
- if let Some(prefix) = prefix {
- query.push(("prefix", prefix))
- }
-
- if let Some(offset) = offset {
- query.push(("start-after", offset))
- }
-
- let response = self
- .client
- .request(Method::GET, &url)
- .query(&query)
- .with_aws_sigv4(
- credential.as_ref(),
- &self.config.region,
- "s3",
- self.config.sign_payload,
- None,
- )
- .send_retry(&self.config.retry_config)
- .await
- .context(ListRequestSnafu)?
- .bytes()
- .await
- .context(ListResponseBodySnafu)?;
-
- let mut response: ListResponse = quick_xml::de::from_reader(response.reader())
- .context(InvalidListResponseSnafu)?;
- let token = response.next_continuation_token.take();
-
- Ok((response.try_into()?, token))
- }
-
- /// Perform a list operation automatically handling pagination
- pub fn list_paginated(
- &self,
- prefix: Option<&Path>,
- delimiter: bool,
- offset: Option<&Path>,
- ) -> BoxStream<'_, Result<ListResult>> {
- let offset = offset.map(|x| x.to_string());
- let prefix = format_prefix(prefix);
- stream_paginated(
- (prefix, offset),
- move |(prefix, offset), token| async move {
- let (r, next_token) = self
- .list_request(
- prefix.as_deref(),
- delimiter,
- token.as_deref(),
- offset.as_deref(),
- )
- .await?;
- Ok((r, (prefix, offset), next_token))
- },
- )
- .boxed()
- }
-
pub async fn create_multipart(&self, location: &Path) -> Result<MultipartId> {
let credential = self.get_credential().await?;
let url = format!("{}?uploads=", self.config.path_url(location),);
@@ -451,6 +335,104 @@ impl S3Client {
}
}
+#[async_trait]
+impl GetClient for S3Client {
+ const STORE: &'static str = STORE;
+
+ /// Make an S3 GET request <https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html>
+ async fn get_request(
+ &self,
+ path: &Path,
+ options: GetOptions,
+ head: bool,
+ ) -> Result<Response> {
+ let credential = self.get_credential().await?;
+ let url = self.config.path_url(path);
+ let method = match head {
+ true => Method::HEAD,
+ false => Method::GET,
+ };
+
+ let builder = self.client.request(method, url);
+
+ let response = builder
+ .with_get_options(options)
+ .with_aws_sigv4(
+ credential.as_ref(),
+ &self.config.region,
+ "s3",
+ self.config.sign_payload,
+ None,
+ )
+ .send_retry(&self.config.retry_config)
+ .await
+ .context(GetRequestSnafu {
+ path: path.as_ref(),
+ })?;
+
+ Ok(response)
+ }
+}
+
+#[async_trait]
+impl ListClient for S3Client {
+ /// Make an S3 List request <https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html>
+ async fn list_request(
+ &self,
+ prefix: Option<&str>,
+ delimiter: bool,
+ token: Option<&str>,
+ offset: Option<&str>,
+ ) -> Result<(ListResult, Option<String>)> {
+ let credential = self.get_credential().await?;
+ let url = self.config.bucket_endpoint.clone();
+
+ let mut query = Vec::with_capacity(4);
+
+ if let Some(token) = token {
+ query.push(("continuation-token", token))
+ }
+
+ if delimiter {
+ query.push(("delimiter", DELIMITER))
+ }
+
+ query.push(("list-type", "2"));
+
+ if let Some(prefix) = prefix {
+ query.push(("prefix", prefix))
+ }
+
+ if let Some(offset) = offset {
+ query.push(("start-after", offset))
+ }
+
+ let response = self
+ .client
+ .request(Method::GET, &url)
+ .query(&query)
+ .with_aws_sigv4(
+ credential.as_ref(),
+ &self.config.region,
+ "s3",
+ self.config.sign_payload,
+ None,
+ )
+ .send_retry(&self.config.retry_config)
+ .await
+ .context(ListRequestSnafu)?
+ .bytes()
+ .await
+ .context(ListResponseBodySnafu)?;
+
+ let mut response: ListResponse = quick_xml::de::from_reader(response.reader())
+ .context(InvalidListResponseSnafu)?;
+ let token = response.next_continuation_token.take();
+
+ Ok((response.try_into()?, token))
+ }
+}
+
fn encode_path(path: &Path) -> PercentEncode<'_> {
utf8_percent_encode(path.as_ref(), &STRICT_PATH_ENCODE_SET)
}
diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs
index e71124fba..4c6d34660 100644
--- a/object_store/src/aws/mod.rs
+++ b/object_store/src/aws/mod.rs
@@ -34,11 +34,9 @@
use async_trait::async_trait;
use bytes::Bytes;
use futures::stream::BoxStream;
-use futures::TryStreamExt;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt, Snafu};
-use std::collections::BTreeSet;
use std::str::FromStr;
use std::sync::Arc;
use tokio::io::AsyncWrite;
@@ -48,7 +46,8 @@ use url::Url;
pub use crate::aws::checksum::Checksum;
use crate::aws::client::{S3Client, S3Config};
use crate::aws::credential::{InstanceCredentialProvider, WebIdentityProvider};
-use crate::client::header::header_meta;
+use crate::client::get::GetClientExt;
+use crate::client::list::ListClientExt;
use crate::client::{
ClientConfigKey, CredentialProvider, StaticCredentialProvider,
TokenCredentialProvider,
@@ -57,7 +56,7 @@ use crate::config::ConfigValue;
use crate::multipart::{CloudMultiPartUpload, CloudMultiPartUploadImpl, UploadPart};
use crate::{
ClientOptions, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta,
- ObjectStore, Path, Result, RetryConfig, StreamExt,
+ ObjectStore, Path, Result, RetryConfig,
};
mod checksum;
@@ -138,11 +137,6 @@ enum Error {
#[snafu(display("Failed to parse the region for bucket '{}'", bucket))]
RegionParse { bucket: String },
-
- #[snafu(display("Failed to parse headers: {}", source))]
- Header {
- source: crate::client::header::Error,
- },
}
impl From<Error> for super::Error {
@@ -244,24 +238,11 @@ impl ObjectStore for AmazonS3 {
}
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
- let response = self.client.get_request(location, options, false).await?;
- let stream = response
- .bytes_stream()
- .map_err(|source| crate::Error::Generic {
- store: STORE,
- source: Box::new(source),
- })
- .boxed();
-
- Ok(GetResult::Stream(stream))
+ self.client.get_opts(location, options).await
}
async fn head(&self, location: &Path) -> Result<ObjectMeta> {
- let options = GetOptions::default();
- // Extract meta from headers
- // https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html#API_HeadObject_ResponseSyntax
- let response = self.client.get_request(location, options, true).await?;
- Ok(header_meta(location, response.headers()).context(HeaderSnafu)?)
+ self.client.head(location).await
}
async fn delete(&self, location: &Path) -> Result<()> {
@@ -272,14 +253,7 @@ impl ObjectStore for AmazonS3 {
&self,
prefix: Option<&Path>,
) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
- let stream = self
- .client
- .list_paginated(prefix, false, None)
- .map_ok(|r| futures::stream::iter(r.objects.into_iter().map(Ok)))
- .try_flatten()
- .boxed();
-
- Ok(stream)
+ self.client.list(prefix).await
}
async fn list_with_offset(
@@ -287,32 +261,11 @@ impl ObjectStore for AmazonS3 {
prefix: Option<&Path>,
offset: &Path,
) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
- let stream = self
- .client
- .list_paginated(prefix, false, Some(offset))
- .map_ok(|r| futures::stream::iter(r.objects.into_iter().map(Ok)))
- .try_flatten()
- .boxed();
-
- Ok(stream)
+ self.client.list_with_offset(prefix, offset).await
}
async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
- let mut stream = self.client.list_paginated(prefix, true, None);
-
- let mut common_prefixes = BTreeSet::new();
- let mut objects = Vec::new();
-
- while let Some(result) = stream.next().await {
- let response = result?;
- common_prefixes.extend(response.common_prefixes.into_iter());
- objects.extend(response.objects.into_iter());
- }
-
- Ok(ListResult {
- common_prefixes: common_prefixes.into_iter().collect(),
- objects,
- })
+ self.client.list_with_delimiter(prefix).await
}
async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs
index 5f165c007..868a803e9 100644
--- a/object_store/src/azure/client.rs
+++ b/object_store/src/azure/client.rs
@@ -18,15 +18,16 @@
use super::credential::AzureCredential;
use crate::azure::credential::*;
use crate::azure::{AzureCredentialProvider, STORE};
-use crate::client::pagination::stream_paginated;
+use crate::client::get::GetClient;
+use crate::client::list::ListClient;
use crate::client::retry::RetryExt;
use crate::client::GetOptionsExt;
use crate::path::DELIMITER;
-use crate::util::{deserialize_rfc1123, format_prefix};
+use crate::util::deserialize_rfc1123;
use crate::{
- BoxStream, ClientOptions, GetOptions, ListResult, ObjectMeta, Path, Result,
- RetryConfig, StreamExt,
+ ClientOptions, GetOptions, ListResult, ObjectMeta, Path, Result, RetryConfig,
};
+use async_trait::async_trait;
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use bytes::{Buf, Bytes};
@@ -187,40 +188,6 @@ impl AzureClient {
path: path.as_ref(),
})?;
- Ok(response)
- }
-
- /// Make an Azure GET request
- /// <https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob>
- /// <https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob-properties>
- pub async fn get_request(
- &self,
- path: &Path,
- options: GetOptions,
- head: bool,
- ) -> Result<Response> {
- let credential = self.get_credential().await?;
- let url = self.config.path_url(path);
- let method = match head {
- true => Method::HEAD,
- false => Method::GET,
- };
-
- let builder = self
- .client
- .request(method, url)
- .header(CONTENT_LENGTH, HeaderValue::from_static("0"))
- .body(Bytes::new());
-
- let response = builder
- .with_get_options(options)
- .with_azure_authorization(&credential, &self.config.account)
- .send_retry(&self.config.retry_config)
- .await
- .context(GetRequestSnafu {
- path: path.as_ref(),
- })?;
-
match response.headers().get("x-ms-resource-type") {
Some(resource) if resource.as_ref() != b"file" => {
Err(crate::Error::NotFound {
@@ -300,14 +267,59 @@ impl AzureClient {
Ok(())
}
+}
+#[async_trait]
+impl GetClient for AzureClient {
+ const STORE: &'static str = STORE;
+
+ /// Make an Azure GET request
+ /// <https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob>
+ /// <https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob-properties>
+ async fn get_request(
+ &self,
+ path: &Path,
+ options: GetOptions,
+ head: bool,
+ ) -> Result<Response> {
+ let credential = self.get_credential().await?;
+ let url = self.config.path_url(path);
+ let method = match head {
+ true => Method::HEAD,
+ false => Method::GET,
+ };
+
+ let builder = self
+ .client
+ .request(method, url)
+ .header(CONTENT_LENGTH, HeaderValue::from_static("0"))
+ .body(Bytes::new());
+
+ let response = builder
+ .with_get_options(options)
+ .with_azure_authorization(&credential, &self.config.account)
+ .send_retry(&self.config.retry_config)
+ .await
+ .context(GetRequestSnafu {
+ path: path.as_ref(),
+ })?;
+
+ Ok(response)
+ }
+}
+
+#[async_trait]
+impl ListClient for AzureClient {
/// Make an Azure List request <https://docs.microsoft.com/en-us/rest/api/storageservices/list-blobs>
async fn list_request(
&self,
prefix: Option<&str>,
delimiter: bool,
token: Option<&str>,
+ offset: Option<&str>,
) -> Result<(ListResult, Option<String>)> {
+ assert!(offset.is_none()); // Not yet supported
+
let credential = self.get_credential().await?;
let url = self.config.path_url(&Path::default());
@@ -346,22 +358,6 @@ impl AzureClient {
Ok((to_list_result(response, prefix)?, token))
}
-
- /// Perform a list operation automatically handling pagination
- pub fn list_paginated(
- &self,
- prefix: Option<&Path>,
- delimiter: bool,
- ) -> BoxStream<'_, Result<ListResult>> {
- let prefix = format_prefix(prefix);
- stream_paginated(prefix, move |prefix, token| async move {
- let (r, next_token) = self
- .list_request(prefix.as_deref(), delimiter, token.as_deref())
- .await?;
- Ok((r, prefix, next_token))
- })
- .boxed()
- }
}
/// Raw / internal response from list requests
diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs
index 069b033d1..d27350383 100644
--- a/object_store/src/azure/mod.rs
+++ b/object_store/src/azure/mod.rs
@@ -37,18 +37,19 @@ use async_trait::async_trait;
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use bytes::Bytes;
-use futures::{stream::BoxStream, StreamExt, TryStreamExt};
+use futures::stream::BoxStream;
use percent_encoding::percent_decode_str;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt, Snafu};
use std::fmt::{Debug, Formatter};
use std::io;
+use std::str::FromStr;
use std::sync::Arc;
-use std::{collections::BTreeSet, str::FromStr};
use tokio::io::AsyncWrite;
use url::Url;
-use crate::client::header::header_meta;
+use crate::client::get::GetClientExt;
+use crate::client::list::ListClientExt;
use crate::client::{
ClientConfigKey, CredentialProvider, StaticCredentialProvider,
TokenCredentialProvider,
@@ -128,11 +129,6 @@ enum Error {
#[snafu(display("ETag Header missing from response"))]
MissingEtag,
-
- #[snafu(display("Failed to parse headers: {}", source))]
- Header {
- source: crate::client::header::Error,
- },
}
impl From<Error> for super::Error {
@@ -204,25 +200,11 @@ impl ObjectStore for MicrosoftAzure {
}
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
- let response = self.client.get_request(location, options, false).await?;
- let stream = response
- .bytes_stream()
- .map_err(|source| crate::Error::Generic {
- store: STORE,
- source: Box::new(source),
- })
- .boxed();
-
- Ok(GetResult::Stream(stream))
+ self.client.get_opts(location, options).await
}
async fn head(&self, location: &Path) -> Result<ObjectMeta> {
- let options = GetOptions::default();
-
- // Extract meta from headers
- // https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob-properties
- let response = self.client.get_request(location, options, true).await?;
- Ok(header_meta(location, response.headers()).context(HeaderSnafu)?)
+ self.client.head(location).await
}
async fn delete(&self, location: &Path) -> Result<()> {
@@ -233,32 +215,11 @@ impl ObjectStore for MicrosoftAzure {
&self,
prefix: Option<&Path>,
) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
- let stream = self
- .client
- .list_paginated(prefix, false)
- .map_ok(|r| futures::stream::iter(r.objects.into_iter().map(Ok)))
- .try_flatten()
- .boxed();
-
- Ok(stream)
+ self.client.list(prefix).await
}
async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
- let mut stream = self.client.list_paginated(prefix, true);
-
- let mut common_prefixes = BTreeSet::new();
- let mut objects = Vec::new();
-
- while let Some(result) = stream.next().await {
- let response = result?;
- common_prefixes.extend(response.common_prefixes.into_iter());
- objects.extend(response.objects.into_iter());
- }
-
- Ok(ListResult {
- common_prefixes: common_prefixes.into_iter().collect(),
- objects,
- })
+ self.client.list_with_delimiter(prefix).await
}
async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
diff --git a/object_store/src/client/get.rs b/object_store/src/client/get.rs
new file mode 100644
index 000000000..3c66a72d8
--- /dev/null
+++ b/object_store/src/client/get.rs
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::client::header::header_meta;
+use crate::path::Path;
+use crate::Result;
+use crate::{Error, GetOptions, GetResult, ObjectMeta};
+use async_trait::async_trait;
+use futures::{StreamExt, TryStreamExt};
+use reqwest::Response;
+
+/// A client that can perform a get request
+#[async_trait]
+pub trait GetClient: Send + Sync + 'static {
+ const STORE: &'static str;
+
+ async fn get_request(
+ &self,
+ path: &Path,
+ options: GetOptions,
+ head: bool,
+ ) -> Result<Response>;
+}
+
+/// Extension trait for [`GetClient`] that adds common retrieval functionality
+#[async_trait]
+pub trait GetClientExt {
+ async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult>;
+
+ async fn head(&self, location: &Path) -> Result<ObjectMeta>;
+}
+
+#[async_trait]
+impl<T: GetClient> GetClientExt for T {
+ async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
+ let response = self.get_request(location, options, false).await?;
+ let stream = response
+ .bytes_stream()
+ .map_err(|source| Error::Generic {
+ store: T::STORE,
+ source: Box::new(source),
+ })
+ .boxed();
+
+ Ok(GetResult::Stream(stream))
+ }
+
+ async fn head(&self, location: &Path) -> Result<ObjectMeta> {
+ let options = GetOptions::default();
+ let response = self.get_request(location, options, true).await?;
+ header_meta(location, response.headers()).map_err(|e| Error::Generic {
+ store: T::STORE,
+ source: Box::new(e),
+ })
+ }
+}
diff --git a/object_store/src/client/list.rs b/object_store/src/client/list.rs
index 6a3889e3b..b2dbee27f 100644
--- a/object_store/src/client/list.rs
+++ b/object_store/src/client/list.rs
@@ -1,3 +1,4 @@
+// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
@@ -14,72 +15,123 @@
// specific language governing permissions and limitations
// under the License.
-//! The list response format used by GCP and AWS
-
+use crate::client::pagination::stream_paginated;
use crate::path::Path;
-use crate::{ListResult, ObjectMeta, Result};
-use chrono::{DateTime, Utc};
-use serde::Deserialize;
-
-#[derive(Debug, Deserialize)]
-#[serde(rename_all = "PascalCase")]
-pub struct ListResponse {
- #[serde(default)]
- pub contents: Vec<ListContents>,
- #[serde(default)]
- pub common_prefixes: Vec<ListPrefix>,
- #[serde(default)]
- pub next_continuation_token: Option<String>,
+use crate::Result;
+use crate::{ListResult, ObjectMeta};
+use async_trait::async_trait;
+use futures::stream::BoxStream;
+use futures::{StreamExt, TryStreamExt};
+use std::collections::BTreeSet;
+
+/// A client that can perform paginated list requests
+#[async_trait]
+pub trait ListClient: Send + Sync + 'static {
+ async fn list_request(
+ &self,
+ prefix: Option<&str>,
+ delimiter: bool,
+ token: Option<&str>,
+ offset: Option<&str>,
+ ) -> Result<(ListResult, Option<String>)>;
}
-impl TryFrom<ListResponse> for ListResult {
- type Error = crate::Error;
+/// Extension trait for [`ListClient`] that adds common listing functionality
+#[async_trait]
+pub trait ListClientExt {
+ fn list_paginated(
+ &self,
+ prefix: Option<&Path>,
+ delimiter: bool,
+ offset: Option<&Path>,
+ ) -> BoxStream<'_, Result<ListResult>>;
- fn try_from(value: ListResponse) -> Result<Self> {
- let common_prefixes = value
- .common_prefixes
- .into_iter()
- .map(|x| Ok(Path::parse(x.prefix)?))
- .collect::<Result<_>>()?;
+ async fn list(
+ &self,
+ prefix: Option<&Path>,
+ ) -> Result<BoxStream<'_, Result<ObjectMeta>>>;
- let objects = value
- .contents
- .into_iter()
- .map(TryFrom::try_from)
- .collect::<Result<_>>()?;
+ async fn list_with_offset(
+ &self,
+ prefix: Option<&Path>,
+ offset: &Path,
+ ) -> Result<BoxStream<'_, Result<ObjectMeta>>>;
- Ok(Self {
- common_prefixes,
- objects,
- })
- }
+ async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult>;
}
-#[derive(Debug, Deserialize)]
-#[serde(rename_all = "PascalCase")]
-pub struct ListPrefix {
- pub prefix: String,
-}
+#[async_trait]
+impl<T: ListClient> ListClientExt for T {
+ fn list_paginated(
+ &self,
+ prefix: Option<&Path>,
+ delimiter: bool,
+ offset: Option<&Path>,
+ ) -> BoxStream<'_, Result<ListResult>> {
+ let offset = offset.map(|x| x.to_string());
+ let prefix = prefix
+ .filter(|x| !x.as_ref().is_empty())
+ .map(|p| format!("{}{}", p.as_ref(), crate::path::DELIMITER));
-#[derive(Debug, Deserialize)]
-#[serde(rename_all = "PascalCase")]
-pub struct ListContents {
- pub key: String,
- pub size: usize,
- pub last_modified: DateTime<Utc>,
- #[serde(rename = "ETag")]
- pub e_tag: Option<String>,
-}
+ stream_paginated(
+ (prefix, offset),
+ move |(prefix, offset), token| async move {
+ let (r, next_token) = self
+ .list_request(
+ prefix.as_deref(),
+ delimiter,
+ token.as_deref(),
+ offset.as_deref(),
+ )
+ .await?;
+ Ok((r, (prefix, offset), next_token))
+ },
+ )
+ .boxed()
+ }
-impl TryFrom<ListContents> for ObjectMeta {
- type Error = crate::Error;
+ async fn list(
+ &self,
+ prefix: Option<&Path>,
+ ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
+ let stream = self
+ .list_paginated(prefix, false, None)
+ .map_ok(|r| futures::stream::iter(r.objects.into_iter().map(Ok)))
+ .try_flatten()
+ .boxed();
- fn try_from(value: ListContents) -> Result<Self> {
- Ok(Self {
- location: Path::parse(value.key)?,
- last_modified: value.last_modified,
- size: value.size,
- e_tag: value.e_tag,
+ Ok(stream)
+ }
+
+ async fn list_with_offset(
+ &self,
+ prefix: Option<&Path>,
+ offset: &Path,
+ ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
+ let stream = self
+ .list_paginated(prefix, false, Some(offset))
+ .map_ok(|r| futures::stream::iter(r.objects.into_iter().map(Ok)))
+ .try_flatten()
+ .boxed();
+
+ Ok(stream)
+ }
+
+ async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
+ let mut stream = self.list_paginated(prefix, true, None);
+
+ let mut common_prefixes = BTreeSet::new();
+ let mut objects = Vec::new();
+
+ while let Some(result) = stream.next().await {
+ let response = result?;
+ common_prefixes.extend(response.common_prefixes.into_iter());
+ objects.extend(response.objects.into_iter());
+ }
+
+ Ok(ListResult {
+ common_prefixes: common_prefixes.into_iter().collect(),
+ objects,
})
}
}
diff --git a/object_store/src/client/list.rs b/object_store/src/client/list_response.rs
similarity index 100%
copy from object_store/src/client/list.rs
copy to object_store/src/client/list_response.rs
diff --git a/object_store/src/client/mod.rs b/object_store/src/client/mod.rs
index 8c2357699..5f3a042be 100644
--- a/object_store/src/client/mod.rs
+++ b/object_store/src/client/mod.rs
@@ -20,9 +20,18 @@
pub mod backoff;
#[cfg(test)]
pub mod mock_server;
+
+pub mod retry;
+
#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
pub mod pagination;
-pub mod retry;
+
+#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
+pub mod get;
+
+#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
+pub mod list;
+
#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
pub mod token;
@@ -30,7 +39,7 @@ pub mod token;
pub mod header;
#[cfg(any(feature = "aws", feature = "gcp"))]
-pub mod list;
+pub mod list_response;
use async_trait::async_trait;
use std::collections::HashMap;
@@ -42,10 +51,9 @@ use reqwest::header::{HeaderMap, HeaderValue};
use reqwest::{Client, ClientBuilder, Proxy, RequestBuilder};
use serde::{Deserialize, Serialize};
-use crate::client::token::{TemporaryToken, TokenCache};
use crate::config::{fmt_duration, ConfigValue};
use crate::path::Path;
-use crate::{GetOptions, Result, RetryConfig};
+use crate::{GetOptions, Result};
fn map_client_error(e: reqwest::Error) -> super::Error {
super::Error::Generic {
@@ -545,6 +553,8 @@ where
#[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
mod cloud {
use super::*;
+ use crate::client::token::{TemporaryToken, TokenCache};
+ use crate::RetryConfig;
/// A [`CredentialProvider`] that uses [`Client`] to fetch temporary tokens
#[derive(Debug)]
diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs
index 21ba1588f..7b1127354 100644
--- a/object_store/src/gcp/mod.rs
+++ b/object_store/src/gcp/mod.rs
@@ -29,14 +29,13 @@
//! to abort the upload and drop those unneeded parts. In addition, you may wish to
//! consider implementing automatic clean up of unused parts that are older than one
//! week.
-use std::collections::BTreeSet;
use std::io;
use std::str::FromStr;
use std::sync::Arc;
use async_trait::async_trait;
use bytes::{Buf, Bytes};
-use futures::{stream::BoxStream, StreamExt, TryStreamExt};
+use futures::stream::BoxStream;
use percent_encoding::{percent_encode, utf8_percent_encode, NON_ALPHANUMERIC};
use reqwest::{header, Client, Method, Response, StatusCode};
use serde::{Deserialize, Serialize};
@@ -44,9 +43,9 @@ use snafu::{OptionExt, ResultExt, Snafu};
use tokio::io::AsyncWrite;
use url::Url;
-use crate::client::header::header_meta;
-use crate::client::list::ListResponse;
-use crate::client::pagination::stream_paginated;
+use crate::client::get::{GetClient, GetClientExt};
+use crate::client::list::{ListClient, ListClientExt};
+use crate::client::list_response::ListResponse;
use crate::client::retry::RetryExt;
use crate::client::{
ClientConfigKey, CredentialProvider, GetOptionsExt, StaticCredentialProvider,
@@ -55,7 +54,6 @@ use crate::client::{
use crate::{
multipart::{CloudMultiPartUpload, CloudMultiPartUploadImpl, UploadPart},
path::{Path, DELIMITER},
- util::format_prefix,
ClientOptions, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta,
ObjectStore, Result, RetryConfig,
};
@@ -150,11 +148,6 @@ enum Error {
#[snafu(display("Configuration key: '{}' is not known.", key))]
UnknownConfigurationKey { key: String },
-
- #[snafu(display("Failed to parse headers: {}", source))]
- Header {
- source: crate::client::header::Error,
- },
}
impl From<Error> for super::Error {
@@ -241,35 +234,6 @@ impl GoogleCloudStorageClient {
format!("{}/{}/{}", self.base_url, self.bucket_name_encoded, encoded)
}
- /// Perform a get request <https://cloud.google.com/storage/docs/xml-api/get-object-download>
- async fn get_request(
- &self,
- path: &Path,
- options: GetOptions,
- head: bool,
- ) -> Result<Response> {
- let credential = self.get_credential().await?;
- let url = self.object_url(path);
-
- let method = match head {
- true => Method::HEAD,
- false => Method::GET,
- };
-
- let response = self
- .client
- .request(method, url)
- .bearer_auth(&credential.bearer)
- .with_get_options(options)
- .send_retry(&self.retry_config)
- .await
- .context(GetRequestSnafu {
- path: path.as_ref(),
- })?;
-
- Ok(response)
- }
-
/// Perform a put request <https://cloud.google.com/storage/docs/xml-api/put-object-upload>
async fn put_request(&self, path: &Path, payload: Bytes) -> Result<()> {
let credential = self.get_credential().await?;
@@ -409,14 +373,54 @@ impl GoogleCloudStorageClient {
Ok(())
}
+}
+#[async_trait]
+impl GetClient for GoogleCloudStorageClient {
+ const STORE: &'static str = STORE;
+
+ /// Perform a get request <https://cloud.google.com/storage/docs/xml-api/get-object-download>
+ async fn get_request(
+ &self,
+ path: &Path,
+ options: GetOptions,
+ head: bool,
+ ) -> Result<Response> {
+ let credential = self.get_credential().await?;
+ let url = self.object_url(path);
+
+ let method = match head {
+ true => Method::HEAD,
+ false => Method::GET,
+ };
+
+ let response = self
+ .client
+ .request(method, url)
+ .bearer_auth(&credential.bearer)
+ .with_get_options(options)
+ .send_retry(&self.retry_config)
+ .await
+ .context(GetRequestSnafu {
+ path: path.as_ref(),
+ })?;
+
+ Ok(response)
+ }
+}
+
+#[async_trait]
+impl ListClient for GoogleCloudStorageClient {
/// Perform a list request <https://cloud.google.com/storage/docs/xml-api/get-bucket-list>
async fn list_request(
&self,
prefix: Option<&str>,
delimiter: bool,
page_token: Option<&str>,
- ) -> Result<ListResponse> {
+ offset: Option<&str>,
+ ) -> Result<(ListResult, Option<String>)> {
+ assert!(offset.is_none()); // Not yet supported
+
let credential = self.get_credential().await?;
let url = format!("{}/{}", self.base_url, self.bucket_name_encoded);
@@ -450,27 +454,11 @@ impl GoogleCloudStorageClient {
.await
.context(ListResponseBodySnafu)?;
- let response: ListResponse = quick_xml::de::from_reader(response.reader())
+ let mut response: ListResponse = quick_xml::de::from_reader(response.reader())
.context(InvalidListResponseSnafu)?;
- Ok(response)
- }
-
- /// Perform a list operation automatically handling pagination
- fn list_paginated(
- &self,
- prefix: Option<&Path>,
- delimiter: bool,
- ) -> BoxStream<'_, Result<ListResult>> {
- let prefix = format_prefix(prefix);
- stream_paginated(prefix, move |prefix, token| async move {
- let mut r = self
- .list_request(prefix.as_deref(), delimiter, token.as_deref())
- .await?;
- let next_token = r.next_continuation_token.take();
- Ok((r.try_into()?, prefix, next_token))
- })
- .boxed()
+ let token = response.next_continuation_token.take();
+ Ok((response.try_into()?, token))
}
}
@@ -613,22 +601,11 @@ impl ObjectStore for GoogleCloudStorage {
}
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
- let response = self.client.get_request(location, options, false).await?;
- let stream = response
- .bytes_stream()
- .map_err(|source| crate::Error::Generic {
- store: STORE,
- source: Box::new(source),
- })
- .boxed();
-
- Ok(GetResult::Stream(stream))
+ self.client.get_opts(location, options).await
}
async fn head(&self, location: &Path) -> Result<ObjectMeta> {
- let options = GetOptions::default();
- let response = self.client.get_request(location, options, true).await?;
- Ok(header_meta(location, response.headers()).context(HeaderSnafu)?)
+ self.client.head(location).await
}
async fn delete(&self, location: &Path) -> Result<()> {
@@ -639,32 +616,11 @@ impl ObjectStore for GoogleCloudStorage {
&self,
prefix: Option<&Path>,
) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
- let stream = self
- .client
- .list_paginated(prefix, false)
- .map_ok(|r| futures::stream::iter(r.objects.into_iter().map(Ok)))
- .try_flatten()
- .boxed();
-
- Ok(stream)
+ self.client.list(prefix).await
}
async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
- let mut stream = self.client.list_paginated(prefix, true);
-
- let mut common_prefixes = BTreeSet::new();
- let mut objects = Vec::new();
-
- while let Some(result) = stream.next().await {
- let response = result?;
- common_prefixes.extend(response.common_prefixes.into_iter());
- objects.extend(response.objects.into_iter());
- }
-
- Ok(ListResult {
- common_prefixes: common_prefixes.into_iter().collect(),
- objects,
- })
+ self.client.list_with_delimiter(prefix).await
}
async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
diff --git a/object_store/src/util.rs b/object_store/src/util.rs
index ba4c68345..79ca4bb7a 100644
--- a/object_store/src/util.rs
+++ b/object_store/src/util.rs
@@ -36,14 +36,6 @@ where
.map_err(serde::de::Error::custom)
}
-/// Returns the prefix to be passed to an object store
-#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
-pub fn format_prefix(prefix: Option<&crate::path::Path>) -> Option<String> {
- prefix
- .filter(|x| !x.as_ref().is_empty())
- .map(|p| format!("{}{}", p.as_ref(), crate::path::DELIMITER))
-}
-
#[cfg(any(feature = "aws", feature = "azure"))]
pub(crate) fn hmac_sha256(
secret: impl AsRef<[u8]>,