You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/05/18 08:18:44 UTC

[arrow-rs] branch master updated: Extract Common Listing and Retrieval Functionality (#4220)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new fe1b574f7 Extract Common Listing and Retrieval Functionality (#4220)
fe1b574f7 is described below

commit fe1b574f7bef356691b1ee22f10f20b1b06d1502
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Thu May 18 09:18:38 2023 +0100

    Extract Common Listing and Retrieval Functionality (#4220)
    
    * Factor out common cloud storage client functionality
    
    * Remove format_prefix
    
    * Review feedback
---
 object_store/src/aws/client.rs                     | 224 ++++++++++-----------
 object_store/src/aws/mod.rs                        |  63 +-----
 object_store/src/azure/client.rs                   | 104 +++++-----
 object_store/src/azure/mod.rs                      |  55 +----
 object_store/src/client/get.rs                     |  70 +++++++
 object_store/src/client/list.rs                    | 162 ++++++++++-----
 .../src/client/{list.rs => list_response.rs}       |   0
 object_store/src/client/mod.rs                     |  18 +-
 object_store/src/gcp/mod.rs                        | 148 +++++---------
 object_store/src/util.rs                           |   8 -
 10 files changed, 412 insertions(+), 440 deletions(-)

diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs
index 2c45050fa..cfce35254 100644
--- a/object_store/src/aws/client.rs
+++ b/object_store/src/aws/client.rs
@@ -18,17 +18,17 @@
 use crate::aws::checksum::Checksum;
 use crate::aws::credential::{AwsCredential, CredentialExt};
 use crate::aws::{AwsCredentialProvider, STORE, STRICT_PATH_ENCODE_SET};
-use crate::client::list::ListResponse;
-use crate::client::pagination::stream_paginated;
+use crate::client::get::GetClient;
+use crate::client::list::ListClient;
+use crate::client::list_response::ListResponse;
 use crate::client::retry::RetryExt;
 use crate::client::GetOptionsExt;
 use crate::multipart::UploadPart;
 use crate::path::DELIMITER;
-use crate::util::format_prefix;
 use crate::{
-    BoxStream, ClientOptions, GetOptions, ListResult, MultipartId, Path, Result,
-    RetryConfig, StreamExt,
+    ClientOptions, GetOptions, ListResult, MultipartId, Path, Result, RetryConfig,
 };
+use async_trait::async_trait;
 use base64::prelude::BASE64_STANDARD;
 use base64::Engine;
 use bytes::{Buf, Bytes};
@@ -169,40 +169,6 @@ impl S3Client {
         self.config.credentials.get_credential().await
     }
 
-    /// Make an S3 GET request <https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html>
-    pub async fn get_request(
-        &self,
-        path: &Path,
-        options: GetOptions,
-        head: bool,
-    ) -> Result<Response> {
-        let credential = self.get_credential().await?;
-        let url = self.config.path_url(path);
-        let method = match head {
-            true => Method::HEAD,
-            false => Method::GET,
-        };
-
-        let builder = self.client.request(method, url);
-
-        let response = builder
-            .with_get_options(options)
-            .with_aws_sigv4(
-                credential.as_ref(),
-                &self.config.region,
-                "s3",
-                self.config.sign_payload,
-                None,
-            )
-            .send_retry(&self.config.retry_config)
-            .await
-            .context(GetRequestSnafu {
-                path: path.as_ref(),
-            })?;
-
-        Ok(response)
-    }
-
     /// Make an S3 PUT request <https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html>
     pub async fn put_request<T: Serialize + ?Sized + Sync>(
         &self,
@@ -302,88 +268,6 @@ impl S3Client {
         Ok(())
     }
 
-    /// Make an S3 List request <https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html>
-    async fn list_request(
-        &self,
-        prefix: Option<&str>,
-        delimiter: bool,
-        token: Option<&str>,
-        offset: Option<&str>,
-    ) -> Result<(ListResult, Option<String>)> {
-        let credential = self.get_credential().await?;
-        let url = self.config.bucket_endpoint.clone();
-
-        let mut query = Vec::with_capacity(4);
-
-        if let Some(token) = token {
-            query.push(("continuation-token", token))
-        }
-
-        if delimiter {
-            query.push(("delimiter", DELIMITER))
-        }
-
-        query.push(("list-type", "2"));
-
-        if let Some(prefix) = prefix {
-            query.push(("prefix", prefix))
-        }
-
-        if let Some(offset) = offset {
-            query.push(("start-after", offset))
-        }
-
-        let response = self
-            .client
-            .request(Method::GET, &url)
-            .query(&query)
-            .with_aws_sigv4(
-                credential.as_ref(),
-                &self.config.region,
-                "s3",
-                self.config.sign_payload,
-                None,
-            )
-            .send_retry(&self.config.retry_config)
-            .await
-            .context(ListRequestSnafu)?
-            .bytes()
-            .await
-            .context(ListResponseBodySnafu)?;
-
-        let mut response: ListResponse = quick_xml::de::from_reader(response.reader())
-            .context(InvalidListResponseSnafu)?;
-        let token = response.next_continuation_token.take();
-
-        Ok((response.try_into()?, token))
-    }
-
-    /// Perform a list operation automatically handling pagination
-    pub fn list_paginated(
-        &self,
-        prefix: Option<&Path>,
-        delimiter: bool,
-        offset: Option<&Path>,
-    ) -> BoxStream<'_, Result<ListResult>> {
-        let offset = offset.map(|x| x.to_string());
-        let prefix = format_prefix(prefix);
-        stream_paginated(
-            (prefix, offset),
-            move |(prefix, offset), token| async move {
-                let (r, next_token) = self
-                    .list_request(
-                        prefix.as_deref(),
-                        delimiter,
-                        token.as_deref(),
-                        offset.as_deref(),
-                    )
-                    .await?;
-                Ok((r, (prefix, offset), next_token))
-            },
-        )
-        .boxed()
-    }
-
     pub async fn create_multipart(&self, location: &Path) -> Result<MultipartId> {
         let credential = self.get_credential().await?;
         let url = format!("{}?uploads=", self.config.path_url(location),);
@@ -451,6 +335,104 @@ impl S3Client {
     }
 }
 
+#[async_trait]
+impl GetClient for S3Client {
+    const STORE: &'static str = STORE;
+
+    /// Make an S3 GET request <https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html>
+    async fn get_request(
+        &self,
+        path: &Path,
+        options: GetOptions,
+        head: bool,
+    ) -> Result<Response> {
+        let credential = self.get_credential().await?;
+        let url = self.config.path_url(path);
+        let method = match head {
+            true => Method::HEAD,
+            false => Method::GET,
+        };
+
+        let builder = self.client.request(method, url);
+
+        let response = builder
+            .with_get_options(options)
+            .with_aws_sigv4(
+                credential.as_ref(),
+                &self.config.region,
+                "s3",
+                self.config.sign_payload,
+                None,
+            )
+            .send_retry(&self.config.retry_config)
+            .await
+            .context(GetRequestSnafu {
+                path: path.as_ref(),
+            })?;
+
+        Ok(response)
+    }
+}
+
+#[async_trait]
+impl ListClient for S3Client {
+    /// Make an S3 List request <https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html>
+    async fn list_request(
+        &self,
+        prefix: Option<&str>,
+        delimiter: bool,
+        token: Option<&str>,
+        offset: Option<&str>,
+    ) -> Result<(ListResult, Option<String>)> {
+        let credential = self.get_credential().await?;
+        let url = self.config.bucket_endpoint.clone();
+
+        let mut query = Vec::with_capacity(4);
+
+        if let Some(token) = token {
+            query.push(("continuation-token", token))
+        }
+
+        if delimiter {
+            query.push(("delimiter", DELIMITER))
+        }
+
+        query.push(("list-type", "2"));
+
+        if let Some(prefix) = prefix {
+            query.push(("prefix", prefix))
+        }
+
+        if let Some(offset) = offset {
+            query.push(("start-after", offset))
+        }
+
+        let response = self
+            .client
+            .request(Method::GET, &url)
+            .query(&query)
+            .with_aws_sigv4(
+                credential.as_ref(),
+                &self.config.region,
+                "s3",
+                self.config.sign_payload,
+                None,
+            )
+            .send_retry(&self.config.retry_config)
+            .await
+            .context(ListRequestSnafu)?
+            .bytes()
+            .await
+            .context(ListResponseBodySnafu)?;
+
+        let mut response: ListResponse = quick_xml::de::from_reader(response.reader())
+            .context(InvalidListResponseSnafu)?;
+        let token = response.next_continuation_token.take();
+
+        Ok((response.try_into()?, token))
+    }
+}
+
 fn encode_path(path: &Path) -> PercentEncode<'_> {
     utf8_percent_encode(path.as_ref(), &STRICT_PATH_ENCODE_SET)
 }
diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs
index e71124fba..4c6d34660 100644
--- a/object_store/src/aws/mod.rs
+++ b/object_store/src/aws/mod.rs
@@ -34,11 +34,9 @@
 use async_trait::async_trait;
 use bytes::Bytes;
 use futures::stream::BoxStream;
-use futures::TryStreamExt;
 use itertools::Itertools;
 use serde::{Deserialize, Serialize};
 use snafu::{ensure, OptionExt, ResultExt, Snafu};
-use std::collections::BTreeSet;
 use std::str::FromStr;
 use std::sync::Arc;
 use tokio::io::AsyncWrite;
@@ -48,7 +46,8 @@ use url::Url;
 pub use crate::aws::checksum::Checksum;
 use crate::aws::client::{S3Client, S3Config};
 use crate::aws::credential::{InstanceCredentialProvider, WebIdentityProvider};
-use crate::client::header::header_meta;
+use crate::client::get::GetClientExt;
+use crate::client::list::ListClientExt;
 use crate::client::{
     ClientConfigKey, CredentialProvider, StaticCredentialProvider,
     TokenCredentialProvider,
@@ -57,7 +56,7 @@ use crate::config::ConfigValue;
 use crate::multipart::{CloudMultiPartUpload, CloudMultiPartUploadImpl, UploadPart};
 use crate::{
     ClientOptions, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta,
-    ObjectStore, Path, Result, RetryConfig, StreamExt,
+    ObjectStore, Path, Result, RetryConfig,
 };
 
 mod checksum;
@@ -138,11 +137,6 @@ enum Error {
 
     #[snafu(display("Failed to parse the region for bucket '{}'", bucket))]
     RegionParse { bucket: String },
-
-    #[snafu(display("Failed to parse headers: {}", source))]
-    Header {
-        source: crate::client::header::Error,
-    },
 }
 
 impl From<Error> for super::Error {
@@ -244,24 +238,11 @@ impl ObjectStore for AmazonS3 {
     }
 
     async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
-        let response = self.client.get_request(location, options, false).await?;
-        let stream = response
-            .bytes_stream()
-            .map_err(|source| crate::Error::Generic {
-                store: STORE,
-                source: Box::new(source),
-            })
-            .boxed();
-
-        Ok(GetResult::Stream(stream))
+        self.client.get_opts(location, options).await
     }
 
     async fn head(&self, location: &Path) -> Result<ObjectMeta> {
-        let options = GetOptions::default();
-        // Extract meta from headers
-        // https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html#API_HeadObject_ResponseSyntax
-        let response = self.client.get_request(location, options, true).await?;
-        Ok(header_meta(location, response.headers()).context(HeaderSnafu)?)
+        self.client.head(location).await
     }
 
     async fn delete(&self, location: &Path) -> Result<()> {
@@ -272,14 +253,7 @@ impl ObjectStore for AmazonS3 {
         &self,
         prefix: Option<&Path>,
     ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
-        let stream = self
-            .client
-            .list_paginated(prefix, false, None)
-            .map_ok(|r| futures::stream::iter(r.objects.into_iter().map(Ok)))
-            .try_flatten()
-            .boxed();
-
-        Ok(stream)
+        self.client.list(prefix).await
     }
 
     async fn list_with_offset(
@@ -287,32 +261,11 @@ impl ObjectStore for AmazonS3 {
         prefix: Option<&Path>,
         offset: &Path,
     ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
-        let stream = self
-            .client
-            .list_paginated(prefix, false, Some(offset))
-            .map_ok(|r| futures::stream::iter(r.objects.into_iter().map(Ok)))
-            .try_flatten()
-            .boxed();
-
-        Ok(stream)
+        self.client.list_with_offset(prefix, offset).await
     }
 
     async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
-        let mut stream = self.client.list_paginated(prefix, true, None);
-
-        let mut common_prefixes = BTreeSet::new();
-        let mut objects = Vec::new();
-
-        while let Some(result) = stream.next().await {
-            let response = result?;
-            common_prefixes.extend(response.common_prefixes.into_iter());
-            objects.extend(response.objects.into_iter());
-        }
-
-        Ok(ListResult {
-            common_prefixes: common_prefixes.into_iter().collect(),
-            objects,
-        })
+        self.client.list_with_delimiter(prefix).await
     }
 
     async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs
index 5f165c007..868a803e9 100644
--- a/object_store/src/azure/client.rs
+++ b/object_store/src/azure/client.rs
@@ -18,15 +18,16 @@
 use super::credential::AzureCredential;
 use crate::azure::credential::*;
 use crate::azure::{AzureCredentialProvider, STORE};
-use crate::client::pagination::stream_paginated;
+use crate::client::get::GetClient;
+use crate::client::list::ListClient;
 use crate::client::retry::RetryExt;
 use crate::client::GetOptionsExt;
 use crate::path::DELIMITER;
-use crate::util::{deserialize_rfc1123, format_prefix};
+use crate::util::deserialize_rfc1123;
 use crate::{
-    BoxStream, ClientOptions, GetOptions, ListResult, ObjectMeta, Path, Result,
-    RetryConfig, StreamExt,
+    ClientOptions, GetOptions, ListResult, ObjectMeta, Path, Result, RetryConfig,
 };
+use async_trait::async_trait;
 use base64::prelude::BASE64_STANDARD;
 use base64::Engine;
 use bytes::{Buf, Bytes};
@@ -187,40 +188,6 @@ impl AzureClient {
                 path: path.as_ref(),
             })?;
 
-        Ok(response)
-    }
-
-    /// Make an Azure GET request
-    /// <https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob>
-    /// <https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob-properties>
-    pub async fn get_request(
-        &self,
-        path: &Path,
-        options: GetOptions,
-        head: bool,
-    ) -> Result<Response> {
-        let credential = self.get_credential().await?;
-        let url = self.config.path_url(path);
-        let method = match head {
-            true => Method::HEAD,
-            false => Method::GET,
-        };
-
-        let builder = self
-            .client
-            .request(method, url)
-            .header(CONTENT_LENGTH, HeaderValue::from_static("0"))
-            .body(Bytes::new());
-
-        let response = builder
-            .with_get_options(options)
-            .with_azure_authorization(&credential, &self.config.account)
-            .send_retry(&self.config.retry_config)
-            .await
-            .context(GetRequestSnafu {
-                path: path.as_ref(),
-            })?;
-
         match response.headers().get("x-ms-resource-type") {
             Some(resource) if resource.as_ref() != b"file" => {
                 Err(crate::Error::NotFound {
@@ -300,14 +267,59 @@ impl AzureClient {
 
         Ok(())
     }
+}
 
+#[async_trait]
+impl GetClient for AzureClient {
+    const STORE: &'static str = STORE;
+
+    /// Make an Azure GET request
+    /// <https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob>
+    /// <https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob-properties>
+    async fn get_request(
+        &self,
+        path: &Path,
+        options: GetOptions,
+        head: bool,
+    ) -> Result<Response> {
+        let credential = self.get_credential().await?;
+        let url = self.config.path_url(path);
+        let method = match head {
+            true => Method::HEAD,
+            false => Method::GET,
+        };
+
+        let builder = self
+            .client
+            .request(method, url)
+            .header(CONTENT_LENGTH, HeaderValue::from_static("0"))
+            .body(Bytes::new());
+
+        let response = builder
+            .with_get_options(options)
+            .with_azure_authorization(&credential, &self.config.account)
+            .send_retry(&self.config.retry_config)
+            .await
+            .context(GetRequestSnafu {
+                path: path.as_ref(),
+            })?;
+
+        Ok(response)
+    }
+}
+
+#[async_trait]
+impl ListClient for AzureClient {
     /// Make an Azure List request <https://docs.microsoft.com/en-us/rest/api/storageservices/list-blobs>
     async fn list_request(
         &self,
         prefix: Option<&str>,
         delimiter: bool,
         token: Option<&str>,
+        offset: Option<&str>,
     ) -> Result<(ListResult, Option<String>)> {
+        assert!(offset.is_none()); // Not yet supported
+
         let credential = self.get_credential().await?;
         let url = self.config.path_url(&Path::default());
 
@@ -346,22 +358,6 @@ impl AzureClient {
 
         Ok((to_list_result(response, prefix)?, token))
     }
-
-    /// Perform a list operation automatically handling pagination
-    pub fn list_paginated(
-        &self,
-        prefix: Option<&Path>,
-        delimiter: bool,
-    ) -> BoxStream<'_, Result<ListResult>> {
-        let prefix = format_prefix(prefix);
-        stream_paginated(prefix, move |prefix, token| async move {
-            let (r, next_token) = self
-                .list_request(prefix.as_deref(), delimiter, token.as_deref())
-                .await?;
-            Ok((r, prefix, next_token))
-        })
-        .boxed()
-    }
 }
 
 /// Raw / internal response from list requests
diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs
index 069b033d1..d27350383 100644
--- a/object_store/src/azure/mod.rs
+++ b/object_store/src/azure/mod.rs
@@ -37,18 +37,19 @@ use async_trait::async_trait;
 use base64::prelude::BASE64_STANDARD;
 use base64::Engine;
 use bytes::Bytes;
-use futures::{stream::BoxStream, StreamExt, TryStreamExt};
+use futures::stream::BoxStream;
 use percent_encoding::percent_decode_str;
 use serde::{Deserialize, Serialize};
 use snafu::{OptionExt, ResultExt, Snafu};
 use std::fmt::{Debug, Formatter};
 use std::io;
+use std::str::FromStr;
 use std::sync::Arc;
-use std::{collections::BTreeSet, str::FromStr};
 use tokio::io::AsyncWrite;
 use url::Url;
 
-use crate::client::header::header_meta;
+use crate::client::get::GetClientExt;
+use crate::client::list::ListClientExt;
 use crate::client::{
     ClientConfigKey, CredentialProvider, StaticCredentialProvider,
     TokenCredentialProvider,
@@ -128,11 +129,6 @@ enum Error {
 
     #[snafu(display("ETag Header missing from response"))]
     MissingEtag,
-
-    #[snafu(display("Failed to parse headers: {}", source))]
-    Header {
-        source: crate::client::header::Error,
-    },
 }
 
 impl From<Error> for super::Error {
@@ -204,25 +200,11 @@ impl ObjectStore for MicrosoftAzure {
     }
 
     async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
-        let response = self.client.get_request(location, options, false).await?;
-        let stream = response
-            .bytes_stream()
-            .map_err(|source| crate::Error::Generic {
-                store: STORE,
-                source: Box::new(source),
-            })
-            .boxed();
-
-        Ok(GetResult::Stream(stream))
+        self.client.get_opts(location, options).await
     }
 
     async fn head(&self, location: &Path) -> Result<ObjectMeta> {
-        let options = GetOptions::default();
-
-        // Extract meta from headers
-        // https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob-properties
-        let response = self.client.get_request(location, options, true).await?;
-        Ok(header_meta(location, response.headers()).context(HeaderSnafu)?)
+        self.client.head(location).await
     }
 
     async fn delete(&self, location: &Path) -> Result<()> {
@@ -233,32 +215,11 @@ impl ObjectStore for MicrosoftAzure {
         &self,
         prefix: Option<&Path>,
     ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
-        let stream = self
-            .client
-            .list_paginated(prefix, false)
-            .map_ok(|r| futures::stream::iter(r.objects.into_iter().map(Ok)))
-            .try_flatten()
-            .boxed();
-
-        Ok(stream)
+        self.client.list(prefix).await
     }
 
     async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
-        let mut stream = self.client.list_paginated(prefix, true);
-
-        let mut common_prefixes = BTreeSet::new();
-        let mut objects = Vec::new();
-
-        while let Some(result) = stream.next().await {
-            let response = result?;
-            common_prefixes.extend(response.common_prefixes.into_iter());
-            objects.extend(response.objects.into_iter());
-        }
-
-        Ok(ListResult {
-            common_prefixes: common_prefixes.into_iter().collect(),
-            objects,
-        })
+        self.client.list_with_delimiter(prefix).await
     }
 
     async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
diff --git a/object_store/src/client/get.rs b/object_store/src/client/get.rs
new file mode 100644
index 000000000..3c66a72d8
--- /dev/null
+++ b/object_store/src/client/get.rs
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::client::header::header_meta;
+use crate::path::Path;
+use crate::Result;
+use crate::{Error, GetOptions, GetResult, ObjectMeta};
+use async_trait::async_trait;
+use futures::{StreamExt, TryStreamExt};
+use reqwest::Response;
+
+/// A client that can perform a get request
+#[async_trait]
+pub trait GetClient: Send + Sync + 'static {
+    const STORE: &'static str;
+
+    async fn get_request(
+        &self,
+        path: &Path,
+        options: GetOptions,
+        head: bool,
+    ) -> Result<Response>;
+}
+
+/// Extension trait for [`GetClient`] that adds common retrieval functionality
+#[async_trait]
+pub trait GetClientExt {
+    async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult>;
+
+    async fn head(&self, location: &Path) -> Result<ObjectMeta>;
+}
+
+#[async_trait]
+impl<T: GetClient> GetClientExt for T {
+    async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
+        let response = self.get_request(location, options, false).await?;
+        let stream = response
+            .bytes_stream()
+            .map_err(|source| Error::Generic {
+                store: T::STORE,
+                source: Box::new(source),
+            })
+            .boxed();
+
+        Ok(GetResult::Stream(stream))
+    }
+
+    async fn head(&self, location: &Path) -> Result<ObjectMeta> {
+        let options = GetOptions::default();
+        let response = self.get_request(location, options, true).await?;
+        header_meta(location, response.headers()).map_err(|e| Error::Generic {
+            store: T::STORE,
+            source: Box::new(e),
+        })
+    }
+}
diff --git a/object_store/src/client/list.rs b/object_store/src/client/list.rs
index 6a3889e3b..b2dbee27f 100644
--- a/object_store/src/client/list.rs
+++ b/object_store/src/client/list.rs
@@ -1,3 +1,4 @@
+// Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
 // regarding copyright ownership.  The ASF licenses this file
@@ -14,72 +15,123 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! The list response format used by GCP and AWS
-
+use crate::client::pagination::stream_paginated;
 use crate::path::Path;
-use crate::{ListResult, ObjectMeta, Result};
-use chrono::{DateTime, Utc};
-use serde::Deserialize;
-
-#[derive(Debug, Deserialize)]
-#[serde(rename_all = "PascalCase")]
-pub struct ListResponse {
-    #[serde(default)]
-    pub contents: Vec<ListContents>,
-    #[serde(default)]
-    pub common_prefixes: Vec<ListPrefix>,
-    #[serde(default)]
-    pub next_continuation_token: Option<String>,
+use crate::Result;
+use crate::{ListResult, ObjectMeta};
+use async_trait::async_trait;
+use futures::stream::BoxStream;
+use futures::{StreamExt, TryStreamExt};
+use std::collections::BTreeSet;
+
+/// A client that can perform paginated list requests
+#[async_trait]
+pub trait ListClient: Send + Sync + 'static {
+    async fn list_request(
+        &self,
+        prefix: Option<&str>,
+        delimiter: bool,
+        token: Option<&str>,
+        offset: Option<&str>,
+    ) -> Result<(ListResult, Option<String>)>;
 }
 
-impl TryFrom<ListResponse> for ListResult {
-    type Error = crate::Error;
+/// Extension trait for [`ListClient`] that adds common listing functionality
+#[async_trait]
+pub trait ListClientExt {
+    fn list_paginated(
+        &self,
+        prefix: Option<&Path>,
+        delimiter: bool,
+        offset: Option<&Path>,
+    ) -> BoxStream<'_, Result<ListResult>>;
 
-    fn try_from(value: ListResponse) -> Result<Self> {
-        let common_prefixes = value
-            .common_prefixes
-            .into_iter()
-            .map(|x| Ok(Path::parse(x.prefix)?))
-            .collect::<Result<_>>()?;
+    async fn list(
+        &self,
+        prefix: Option<&Path>,
+    ) -> Result<BoxStream<'_, Result<ObjectMeta>>>;
 
-        let objects = value
-            .contents
-            .into_iter()
-            .map(TryFrom::try_from)
-            .collect::<Result<_>>()?;
+    async fn list_with_offset(
+        &self,
+        prefix: Option<&Path>,
+        offset: &Path,
+    ) -> Result<BoxStream<'_, Result<ObjectMeta>>>;
 
-        Ok(Self {
-            common_prefixes,
-            objects,
-        })
-    }
+    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult>;
 }
 
-#[derive(Debug, Deserialize)]
-#[serde(rename_all = "PascalCase")]
-pub struct ListPrefix {
-    pub prefix: String,
-}
+#[async_trait]
+impl<T: ListClient> ListClientExt for T {
+    fn list_paginated(
+        &self,
+        prefix: Option<&Path>,
+        delimiter: bool,
+        offset: Option<&Path>,
+    ) -> BoxStream<'_, Result<ListResult>> {
+        let offset = offset.map(|x| x.to_string());
+        let prefix = prefix
+            .filter(|x| !x.as_ref().is_empty())
+            .map(|p| format!("{}{}", p.as_ref(), crate::path::DELIMITER));
 
-#[derive(Debug, Deserialize)]
-#[serde(rename_all = "PascalCase")]
-pub struct ListContents {
-    pub key: String,
-    pub size: usize,
-    pub last_modified: DateTime<Utc>,
-    #[serde(rename = "ETag")]
-    pub e_tag: Option<String>,
-}
+        stream_paginated(
+            (prefix, offset),
+            move |(prefix, offset), token| async move {
+                let (r, next_token) = self
+                    .list_request(
+                        prefix.as_deref(),
+                        delimiter,
+                        token.as_deref(),
+                        offset.as_deref(),
+                    )
+                    .await?;
+                Ok((r, (prefix, offset), next_token))
+            },
+        )
+        .boxed()
+    }
 
-impl TryFrom<ListContents> for ObjectMeta {
-    type Error = crate::Error;
+    async fn list(
+        &self,
+        prefix: Option<&Path>,
+    ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
+        let stream = self
+            .list_paginated(prefix, false, None)
+            .map_ok(|r| futures::stream::iter(r.objects.into_iter().map(Ok)))
+            .try_flatten()
+            .boxed();
 
-    fn try_from(value: ListContents) -> Result<Self> {
-        Ok(Self {
-            location: Path::parse(value.key)?,
-            last_modified: value.last_modified,
-            size: value.size,
-            e_tag: value.e_tag,
+        Ok(stream)
+    }
+
+    async fn list_with_offset(
+        &self,
+        prefix: Option<&Path>,
+        offset: &Path,
+    ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
+        let stream = self
+            .list_paginated(prefix, false, Some(offset))
+            .map_ok(|r| futures::stream::iter(r.objects.into_iter().map(Ok)))
+            .try_flatten()
+            .boxed();
+
+        Ok(stream)
+    }
+
+    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
+        let mut stream = self.list_paginated(prefix, true, None);
+
+        let mut common_prefixes = BTreeSet::new();
+        let mut objects = Vec::new();
+
+        while let Some(result) = stream.next().await {
+            let response = result?;
+            common_prefixes.extend(response.common_prefixes.into_iter());
+            objects.extend(response.objects.into_iter());
+        }
+
+        Ok(ListResult {
+            common_prefixes: common_prefixes.into_iter().collect(),
+            objects,
         })
     }
 }
diff --git a/object_store/src/client/list.rs b/object_store/src/client/list_response.rs
similarity index 100%
copy from object_store/src/client/list.rs
copy to object_store/src/client/list_response.rs
diff --git a/object_store/src/client/mod.rs b/object_store/src/client/mod.rs
index 8c2357699..5f3a042be 100644
--- a/object_store/src/client/mod.rs
+++ b/object_store/src/client/mod.rs
@@ -20,9 +20,18 @@
 pub mod backoff;
 #[cfg(test)]
 pub mod mock_server;
+
+pub mod retry;
+
 #[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
 pub mod pagination;
-pub mod retry;
+
+#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
+pub mod get;
+
+#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
+pub mod list;
+
 #[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
 pub mod token;
 
@@ -30,7 +39,7 @@ pub mod token;
 pub mod header;
 
 #[cfg(any(feature = "aws", feature = "gcp"))]
-pub mod list;
+pub mod list_response;
 
 use async_trait::async_trait;
 use std::collections::HashMap;
@@ -42,10 +51,9 @@ use reqwest::header::{HeaderMap, HeaderValue};
 use reqwest::{Client, ClientBuilder, Proxy, RequestBuilder};
 use serde::{Deserialize, Serialize};
 
-use crate::client::token::{TemporaryToken, TokenCache};
 use crate::config::{fmt_duration, ConfigValue};
 use crate::path::Path;
-use crate::{GetOptions, Result, RetryConfig};
+use crate::{GetOptions, Result};
 
 fn map_client_error(e: reqwest::Error) -> super::Error {
     super::Error::Generic {
@@ -545,6 +553,8 @@ where
 #[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
 mod cloud {
     use super::*;
+    use crate::client::token::{TemporaryToken, TokenCache};
+    use crate::RetryConfig;
 
     /// A [`CredentialProvider`] that uses [`Client`] to fetch temporary tokens
     #[derive(Debug)]
diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs
index 21ba1588f..7b1127354 100644
--- a/object_store/src/gcp/mod.rs
+++ b/object_store/src/gcp/mod.rs
@@ -29,14 +29,13 @@
 //! to abort the upload and drop those unneeded parts. In addition, you may wish to
 //! consider implementing automatic clean up of unused parts that are older than one
 //! week.
-use std::collections::BTreeSet;
 use std::io;
 use std::str::FromStr;
 use std::sync::Arc;
 
 use async_trait::async_trait;
 use bytes::{Buf, Bytes};
-use futures::{stream::BoxStream, StreamExt, TryStreamExt};
+use futures::stream::BoxStream;
 use percent_encoding::{percent_encode, utf8_percent_encode, NON_ALPHANUMERIC};
 use reqwest::{header, Client, Method, Response, StatusCode};
 use serde::{Deserialize, Serialize};
@@ -44,9 +43,9 @@ use snafu::{OptionExt, ResultExt, Snafu};
 use tokio::io::AsyncWrite;
 use url::Url;
 
-use crate::client::header::header_meta;
-use crate::client::list::ListResponse;
-use crate::client::pagination::stream_paginated;
+use crate::client::get::{GetClient, GetClientExt};
+use crate::client::list::{ListClient, ListClientExt};
+use crate::client::list_response::ListResponse;
 use crate::client::retry::RetryExt;
 use crate::client::{
     ClientConfigKey, CredentialProvider, GetOptionsExt, StaticCredentialProvider,
@@ -55,7 +54,6 @@ use crate::client::{
 use crate::{
     multipart::{CloudMultiPartUpload, CloudMultiPartUploadImpl, UploadPart},
     path::{Path, DELIMITER},
-    util::format_prefix,
     ClientOptions, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta,
     ObjectStore, Result, RetryConfig,
 };
@@ -150,11 +148,6 @@ enum Error {
 
     #[snafu(display("Configuration key: '{}' is not known.", key))]
     UnknownConfigurationKey { key: String },
-
-    #[snafu(display("Failed to parse headers: {}", source))]
-    Header {
-        source: crate::client::header::Error,
-    },
 }
 
 impl From<Error> for super::Error {
@@ -241,35 +234,6 @@ impl GoogleCloudStorageClient {
         format!("{}/{}/{}", self.base_url, self.bucket_name_encoded, encoded)
     }
 
-    /// Perform a get request <https://cloud.google.com/storage/docs/xml-api/get-object-download>
-    async fn get_request(
-        &self,
-        path: &Path,
-        options: GetOptions,
-        head: bool,
-    ) -> Result<Response> {
-        let credential = self.get_credential().await?;
-        let url = self.object_url(path);
-
-        let method = match head {
-            true => Method::HEAD,
-            false => Method::GET,
-        };
-
-        let response = self
-            .client
-            .request(method, url)
-            .bearer_auth(&credential.bearer)
-            .with_get_options(options)
-            .send_retry(&self.retry_config)
-            .await
-            .context(GetRequestSnafu {
-                path: path.as_ref(),
-            })?;
-
-        Ok(response)
-    }
-
     /// Perform a put request <https://cloud.google.com/storage/docs/xml-api/put-object-upload>
     async fn put_request(&self, path: &Path, payload: Bytes) -> Result<()> {
         let credential = self.get_credential().await?;
@@ -409,14 +373,54 @@ impl GoogleCloudStorageClient {
 
         Ok(())
     }
+}
 
+#[async_trait]
+impl GetClient for GoogleCloudStorageClient {
+    const STORE: &'static str = STORE;
+
+    /// Perform a get request <https://cloud.google.com/storage/docs/xml-api/get-object-download>
+    async fn get_request(
+        &self,
+        path: &Path,
+        options: GetOptions,
+        head: bool,
+    ) -> Result<Response> {
+        let credential = self.get_credential().await?;
+        let url = self.object_url(path);
+
+        let method = match head {
+            true => Method::HEAD,
+            false => Method::GET,
+        };
+
+        let response = self
+            .client
+            .request(method, url)
+            .bearer_auth(&credential.bearer)
+            .with_get_options(options)
+            .send_retry(&self.retry_config)
+            .await
+            .context(GetRequestSnafu {
+                path: path.as_ref(),
+            })?;
+
+        Ok(response)
+    }
+}
+
+#[async_trait]
+impl ListClient for GoogleCloudStorageClient {
     /// Perform a list request <https://cloud.google.com/storage/docs/xml-api/get-bucket-list>
     async fn list_request(
         &self,
         prefix: Option<&str>,
         delimiter: bool,
         page_token: Option<&str>,
-    ) -> Result<ListResponse> {
+        offset: Option<&str>,
+    ) -> Result<(ListResult, Option<String>)> {
+        assert!(offset.is_none()); // Not yet supported
+
         let credential = self.get_credential().await?;
         let url = format!("{}/{}", self.base_url, self.bucket_name_encoded);
 
@@ -450,27 +454,11 @@ impl GoogleCloudStorageClient {
             .await
             .context(ListResponseBodySnafu)?;
 
-        let response: ListResponse = quick_xml::de::from_reader(response.reader())
+        let mut response: ListResponse = quick_xml::de::from_reader(response.reader())
             .context(InvalidListResponseSnafu)?;
 
-        Ok(response)
-    }
-
-    /// Perform a list operation automatically handling pagination
-    fn list_paginated(
-        &self,
-        prefix: Option<&Path>,
-        delimiter: bool,
-    ) -> BoxStream<'_, Result<ListResult>> {
-        let prefix = format_prefix(prefix);
-        stream_paginated(prefix, move |prefix, token| async move {
-            let mut r = self
-                .list_request(prefix.as_deref(), delimiter, token.as_deref())
-                .await?;
-            let next_token = r.next_continuation_token.take();
-            Ok((r.try_into()?, prefix, next_token))
-        })
-        .boxed()
+        let token = response.next_continuation_token.take();
+        Ok((response.try_into()?, token))
     }
 }
 
@@ -613,22 +601,11 @@ impl ObjectStore for GoogleCloudStorage {
     }
 
     async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
-        let response = self.client.get_request(location, options, false).await?;
-        let stream = response
-            .bytes_stream()
-            .map_err(|source| crate::Error::Generic {
-                store: STORE,
-                source: Box::new(source),
-            })
-            .boxed();
-
-        Ok(GetResult::Stream(stream))
+        self.client.get_opts(location, options).await
     }
 
     async fn head(&self, location: &Path) -> Result<ObjectMeta> {
-        let options = GetOptions::default();
-        let response = self.client.get_request(location, options, true).await?;
-        Ok(header_meta(location, response.headers()).context(HeaderSnafu)?)
+        self.client.head(location).await
     }
 
     async fn delete(&self, location: &Path) -> Result<()> {
@@ -639,32 +616,11 @@ impl ObjectStore for GoogleCloudStorage {
         &self,
         prefix: Option<&Path>,
     ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
-        let stream = self
-            .client
-            .list_paginated(prefix, false)
-            .map_ok(|r| futures::stream::iter(r.objects.into_iter().map(Ok)))
-            .try_flatten()
-            .boxed();
-
-        Ok(stream)
+        self.client.list(prefix).await
     }
 
     async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
-        let mut stream = self.client.list_paginated(prefix, true);
-
-        let mut common_prefixes = BTreeSet::new();
-        let mut objects = Vec::new();
-
-        while let Some(result) = stream.next().await {
-            let response = result?;
-            common_prefixes.extend(response.common_prefixes.into_iter());
-            objects.extend(response.objects.into_iter());
-        }
-
-        Ok(ListResult {
-            common_prefixes: common_prefixes.into_iter().collect(),
-            objects,
-        })
+        self.client.list_with_delimiter(prefix).await
     }
 
     async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
diff --git a/object_store/src/util.rs b/object_store/src/util.rs
index ba4c68345..79ca4bb7a 100644
--- a/object_store/src/util.rs
+++ b/object_store/src/util.rs
@@ -36,14 +36,6 @@ where
         .map_err(serde::de::Error::custom)
 }
 
-/// Returns the prefix to be passed to an object store
-#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
-pub fn format_prefix(prefix: Option<&crate::path::Path>) -> Option<String> {
-    prefix
-        .filter(|x| !x.as_ref().is_empty())
-        .map(|p| format!("{}{}", p.as_ref(), crate::path::DELIMITER))
-}
-
 #[cfg(any(feature = "aws", feature = "azure"))]
 pub(crate) fn hmac_sha256(
     secret: impl AsRef<[u8]>,