You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/05/15 11:01:25 UTC

[arrow-rs] branch master updated: Add ObjectStore::get_opts (#2241) (#4212)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new bac40c6bf Add ObjectStore::get_opts (#2241) (#4212)
bac40c6bf is described below

commit bac40c6bfc6b390b3550acf42c9f099f867e1734
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Mon May 15 12:01:19 2023 +0100

    Add ObjectStore::get_opts (#2241) (#4212)
    
    * Add ObjectStore::get_opts (#2241)
    
    * Cleanup error handling
    
    * Review feedback
---
 object_store/src/aws/client.rs     |  36 +++-----
 object_store/src/aws/credential.rs |   8 +-
 object_store/src/aws/mod.rs        |  39 +++------
 object_store/src/azure/client.rs   |  58 ++++---------
 object_store/src/azure/mod.rs      |  46 ++++------
 object_store/src/chunked.rs        |   7 +-
 object_store/src/client/mod.rs     |  37 +++++++-
 object_store/src/client/retry.rs   |  37 +++++++-
 object_store/src/gcp/mod.rs        | 128 ++++++++++-----------------
 object_store/src/http/client.rs    |  21 ++---
 object_store/src/http/mod.rs       |  21 +----
 object_store/src/lib.rs            | 172 ++++++++++++++++++++++++++++++++++++-
 object_store/src/limit.rs          |  20 +++--
 object_store/src/local.rs          |  57 +++++++-----
 object_store/src/memory.rs         |  28 +++---
 object_store/src/prefix.rs         |  19 ++--
 object_store/src/throttle.rs       |  43 ++++++----
 object_store/src/util.rs           |   7 --
 18 files changed, 470 insertions(+), 314 deletions(-)

diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs
index 9634c740d..b2d01abfb 100644
--- a/object_store/src/aws/client.rs
+++ b/object_store/src/aws/client.rs
@@ -17,27 +17,25 @@
 
 use crate::aws::checksum::Checksum;
 use crate::aws::credential::{AwsCredential, CredentialExt, CredentialProvider};
-use crate::aws::STRICT_PATH_ENCODE_SET;
+use crate::aws::{STORE, STRICT_PATH_ENCODE_SET};
 use crate::client::pagination::stream_paginated;
 use crate::client::retry::RetryExt;
+use crate::client::GetOptionsExt;
 use crate::multipart::UploadPart;
 use crate::path::DELIMITER;
-use crate::util::{format_http_range, format_prefix};
+use crate::util::format_prefix;
 use crate::{
-    BoxStream, ClientOptions, ListResult, MultipartId, ObjectMeta, Path, Result,
-    RetryConfig, StreamExt,
+    BoxStream, ClientOptions, GetOptions, ListResult, MultipartId, ObjectMeta, Path,
+    Result, RetryConfig, StreamExt,
 };
 use base64::prelude::BASE64_STANDARD;
 use base64::Engine;
 use bytes::{Buf, Bytes};
 use chrono::{DateTime, Utc};
 use percent_encoding::{utf8_percent_encode, PercentEncode};
-use reqwest::{
-    header::CONTENT_TYPE, Client as ReqwestClient, Method, Response, StatusCode,
-};
+use reqwest::{header::CONTENT_TYPE, Client as ReqwestClient, Method, Response};
 use serde::{Deserialize, Serialize};
 use snafu::{ResultExt, Snafu};
-use std::ops::Range;
 use std::sync::Arc;
 
 /// A specialized `Error` for object store-related errors
@@ -102,16 +100,9 @@ impl From<Error> for crate::Error {
             Error::GetRequest { source, path }
             | Error::DeleteRequest { source, path }
             | Error::CopyRequest { source, path }
-            | Error::PutRequest { source, path }
-                if matches!(source.status(), Some(StatusCode::NOT_FOUND)) =>
-            {
-                Self::NotFound {
-                    path,
-                    source: Box::new(source),
-                }
-            }
+            | Error::PutRequest { source, path } => source.error(STORE, path),
             _ => Self::Generic {
-                store: "S3",
+                store: STORE,
                 source: Box::new(err),
             },
         }
@@ -245,11 +236,9 @@ impl S3Client {
     pub async fn get_request(
         &self,
         path: &Path,
-        range: Option<Range<usize>>,
+        options: GetOptions,
         head: bool,
     ) -> Result<Response> {
-        use reqwest::header::RANGE;
-
         let credential = self.get_credential().await?;
         let url = self.config.path_url(path);
         let method = match head {
@@ -257,13 +246,10 @@ impl S3Client {
             false => Method::GET,
         };
 
-        let mut builder = self.client.request(method, url);
-
-        if let Some(range) = range {
-            builder = builder.header(RANGE, format_http_range(range));
-        }
+        let builder = self.client.request(method, url);
 
         let response = builder
+            .with_get_options(options)
             .with_aws_sigv4(
                 credential.as_ref(),
                 &self.config.region,
diff --git a/object_store/src/aws/credential.rs b/object_store/src/aws/credential.rs
index c4cb7cfe1..16cdf35d0 100644
--- a/object_store/src/aws/credential.rs
+++ b/object_store/src/aws/credential.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::aws::STRICT_ENCODE_SET;
+use crate::aws::{STORE, STRICT_ENCODE_SET};
 use crate::client::retry::RetryExt;
 use crate::client::token::{TemporaryToken, TokenCache};
 use crate::util::hmac_sha256;
@@ -330,7 +330,7 @@ impl CredentialProvider for InstanceCredentialProvider {
                 self.imdsv1_fallback,
             )
             .map_err(|source| crate::Error::Generic {
-                store: "S3",
+                store: STORE,
                 source,
             })
         }))
@@ -363,7 +363,7 @@ impl CredentialProvider for WebIdentityProvider {
                 &self.endpoint,
             )
             .map_err(|source| crate::Error::Generic {
-                store: "S3",
+                store: STORE,
                 source,
             })
         }))
@@ -552,7 +552,7 @@ mod profile {
                         .provide_credentials()
                         .await
                         .map_err(|source| crate::Error::Generic {
-                            store: "S3",
+                            store: STORE,
                             source: Box::new(source),
                         })?;
                 let t_now = SystemTime::now();
diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs
index 6fa5e1c85..3f9b4803f 100644
--- a/object_store/src/aws/mod.rs
+++ b/object_store/src/aws/mod.rs
@@ -40,7 +40,6 @@ use itertools::Itertools;
 use serde::{Deserialize, Serialize};
 use snafu::{ensure, OptionExt, ResultExt, Snafu};
 use std::collections::BTreeSet;
-use std::ops::Range;
 use std::str::FromStr;
 use std::sync::Arc;
 use tokio::io::AsyncWrite;
@@ -57,8 +56,8 @@ use crate::client::ClientConfigKey;
 use crate::config::ConfigValue;
 use crate::multipart::{CloudMultiPartUpload, CloudMultiPartUploadImpl, UploadPart};
 use crate::{
-    ClientOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Path,
-    Result, RetryConfig, StreamExt,
+    ClientOptions, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta,
+    ObjectStore, Path, Result, RetryConfig, StreamExt,
 };
 
 mod checksum;
@@ -79,6 +78,8 @@ pub(crate) const STRICT_ENCODE_SET: percent_encoding::AsciiSet =
 /// This struct is used to maintain the URI path encoding
 const STRICT_PATH_ENCODE_SET: percent_encoding::AsciiSet = STRICT_ENCODE_SET.remove(b'/');
 
+const STORE: &str = "S3";
+
 /// Default metadata endpoint
 static METADATA_ENDPOINT: &str = "http://169.254.169.254";
 
@@ -160,10 +161,10 @@ impl From<Error> for super::Error {
     fn from(source: Error) -> Self {
         match source {
             Error::UnknownConfigurationKey { key } => {
-                Self::UnknownConfigurationKey { store: "S3", key }
+                Self::UnknownConfigurationKey { store: STORE, key }
             }
             _ => Self::Generic {
-                store: "S3",
+                store: STORE,
                 source: Box::new(source),
             },
         }
@@ -246,12 +247,12 @@ impl ObjectStore for AmazonS3 {
             .await
     }
 
-    async fn get(&self, location: &Path) -> Result<GetResult> {
-        let response = self.client.get_request(location, None, false).await?;
+    async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
+        let response = self.client.get_request(location, options, false).await?;
         let stream = response
             .bytes_stream()
             .map_err(|source| crate::Error::Generic {
-                store: "S3",
+                store: STORE,
                 source: Box::new(source),
             })
             .boxed();
@@ -259,26 +260,13 @@ impl ObjectStore for AmazonS3 {
         Ok(GetResult::Stream(stream))
     }
 
-    async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
-        let bytes = self
-            .client
-            .get_request(location, Some(range), false)
-            .await?
-            .bytes()
-            .await
-            .map_err(|source| client::Error::GetResponseBody {
-                source,
-                path: location.to_string(),
-            })?;
-        Ok(bytes)
-    }
-
     async fn head(&self, location: &Path) -> Result<ObjectMeta> {
         use reqwest::header::{CONTENT_LENGTH, ETAG, LAST_MODIFIED};
 
+        let options = GetOptions::default();
         // Extract meta from headers
         // https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html#API_HeadObject_ResponseSyntax
-        let response = self.client.get_request(location, None, true).await?;
+        let response = self.client.get_request(location, options, true).await?;
         let headers = response.headers();
 
         let last_modified = headers
@@ -1169,8 +1157,8 @@ fn profile_credentials(
 mod tests {
     use super::*;
     use crate::tests::{
-        get_nonexistent_object, list_uses_directories_correctly, list_with_delimiter,
-        put_get_delete_list_opts, rename_and_copy, stream_get,
+        get_nonexistent_object, get_opts, list_uses_directories_correctly,
+        list_with_delimiter, put_get_delete_list_opts, rename_and_copy, stream_get,
     };
     use bytes::Bytes;
     use std::collections::HashMap;
@@ -1417,6 +1405,7 @@ mod tests {
 
         // Localstack doesn't support listing with spaces https://github.com/localstack/localstack/issues/6328
         put_get_delete_list_opts(&integration, is_local).await;
+        get_opts(&integration).await;
         list_uses_directories_correctly(&integration).await;
         list_with_delimiter(&integration).await;
         rename_and_copy(&integration).await;
diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs
index 87432f62b..4611986e3 100644
--- a/object_store/src/azure/client.rs
+++ b/object_store/src/azure/client.rs
@@ -17,13 +17,15 @@
 
 use super::credential::{AzureCredential, CredentialProvider};
 use crate::azure::credential::*;
+use crate::azure::STORE;
 use crate::client::pagination::stream_paginated;
 use crate::client::retry::RetryExt;
+use crate::client::GetOptionsExt;
 use crate::path::DELIMITER;
-use crate::util::{deserialize_rfc1123, format_http_range, format_prefix};
+use crate::util::{deserialize_rfc1123, format_prefix};
 use crate::{
-    BoxStream, ClientOptions, ListResult, ObjectMeta, Path, Result, RetryConfig,
-    StreamExt,
+    BoxStream, ClientOptions, GetOptions, ListResult, ObjectMeta, Path, Result,
+    RetryConfig, StreamExt,
 };
 use base64::prelude::BASE64_STANDARD;
 use base64::Engine;
@@ -32,13 +34,12 @@ use chrono::{DateTime, Utc};
 use itertools::Itertools;
 use reqwest::header::CONTENT_TYPE;
 use reqwest::{
-    header::{HeaderValue, CONTENT_LENGTH, IF_NONE_MATCH, RANGE},
+    header::{HeaderValue, CONTENT_LENGTH, IF_NONE_MATCH},
     Client as ReqwestClient, Method, Response, StatusCode,
 };
 use serde::{Deserialize, Serialize};
 use snafu::{ResultExt, Snafu};
 use std::collections::HashMap;
-use std::ops::Range;
 use url::Url;
 
 /// A specialized `Error` for object store-related errors
@@ -69,12 +70,6 @@ pub(crate) enum Error {
         path: String,
     },
 
-    #[snafu(display("Error performing copy request {}: {}", path, source))]
-    CopyRequest {
-        source: crate::client::retry::Error,
-        path: String,
-    },
-
     #[snafu(display("Error performing list request: {}", source))]
     ListRequest { source: crate::client::retry::Error },
 
@@ -95,25 +90,9 @@ impl From<Error> for crate::Error {
         match err {
             Error::GetRequest { source, path }
             | Error::DeleteRequest { source, path }
-            | Error::CopyRequest { source, path }
-            | Error::PutRequest { source, path }
-                if matches!(source.status(), Some(StatusCode::NOT_FOUND)) =>
-            {
-                Self::NotFound {
-                    path,
-                    source: Box::new(source),
-                }
-            }
-            Error::CopyRequest { source, path }
-                if matches!(source.status(), Some(StatusCode::CONFLICT)) =>
-            {
-                Self::AlreadyExists {
-                    path,
-                    source: Box::new(source),
-                }
-            }
+            | Error::PutRequest { source, path } => source.error(STORE, path),
             _ => Self::Generic {
-                store: "MicrosoftAzure",
+                store: STORE,
                 source: Box::new(err),
             },
         }
@@ -175,7 +154,7 @@ impl AzureClient {
                     // and we want to use it in an infallible function
                     HeaderValue::from_str(&format!("Bearer {token}")).map_err(|err| {
                         crate::Error::Generic {
-                            store: "MicrosoftAzure",
+                            store: STORE,
                             source: Box::new(err),
                         }
                     })?,
@@ -193,7 +172,7 @@ impl AzureClient {
                     // and we want to use it in an infallible function
                     HeaderValue::from_str(&format!("Bearer {token}")).map_err(|err| {
                         crate::Error::Generic {
-                            store: "MicrosoftAzure",
+                            store: STORE,
                             source: Box::new(err),
                         }
                     })?,
@@ -253,7 +232,7 @@ impl AzureClient {
     pub async fn get_request(
         &self,
         path: &Path,
-        range: Option<Range<usize>>,
+        options: GetOptions,
         head: bool,
     ) -> Result<Response> {
         let credential = self.get_credential().await?;
@@ -263,17 +242,14 @@ impl AzureClient {
             false => Method::GET,
         };
 
-        let mut builder = self
+        let builder = self
             .client
             .request(method, url)
             .header(CONTENT_LENGTH, HeaderValue::from_static("0"))
             .body(Bytes::new());
 
-        if let Some(range) = range {
-            builder = builder.header(RANGE, format_http_range(range));
-        }
-
         let response = builder
+            .with_get_options(options)
             .with_azure_authorization(&credential, &self.config.account)
             .send_retry(&self.config.retry_config)
             .await
@@ -338,8 +314,12 @@ impl AzureClient {
             .with_azure_authorization(&credential, &self.config.account)
             .send_retry(&self.config.retry_config)
             .await
-            .context(CopyRequestSnafu {
-                path: from.as_ref(),
+            .map_err(|err| match err.status() {
+                Some(StatusCode::CONFLICT) => crate::Error::AlreadyExists {
+                    source: Box::new(err),
+                    path: to.to_string(),
+                },
+                _ => err.error(STORE, from.to_string()),
             })?;
 
         Ok(())
diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs
index c2cfdfe6a..6726241aa 100644
--- a/object_store/src/azure/mod.rs
+++ b/object_store/src/azure/mod.rs
@@ -31,8 +31,8 @@ use crate::client::token::TokenCache;
 use crate::{
     multipart::{CloudMultiPartUpload, CloudMultiPartUploadImpl, UploadPart},
     path::Path,
-    ClientOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result,
-    RetryConfig,
+    ClientOptions, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta,
+    ObjectStore, Result, RetryConfig,
 };
 use async_trait::async_trait;
 use base64::prelude::BASE64_STANDARD;
@@ -45,7 +45,6 @@ use serde::{Deserialize, Serialize};
 use snafu::{OptionExt, ResultExt, Snafu};
 use std::fmt::{Debug, Formatter};
 use std::io;
-use std::ops::Range;
 use std::sync::Arc;
 use std::{collections::BTreeSet, str::FromStr};
 use tokio::io::AsyncWrite;
@@ -59,6 +58,8 @@ pub use credential::authority_hosts;
 mod client;
 mod credential;
 
+const STORE: &str = "MicrosoftAzure";
+
 /// The well-known account used by Azurite and the legacy Azure Storage Emulator.
 /// <https://docs.microsoft.com/azure/storage/common/storage-use-azurite#well-known-storage-account-and-key>
 const EMULATOR_ACCOUNT: &str = "devstoreaccount1";
@@ -150,12 +151,11 @@ enum Error {
 impl From<Error> for super::Error {
     fn from(source: Error) -> Self {
         match source {
-            Error::UnknownConfigurationKey { key } => Self::UnknownConfigurationKey {
-                store: "MicrosoftAzure",
-                key,
-            },
+            Error::UnknownConfigurationKey { key } => {
+                Self::UnknownConfigurationKey { store: STORE, key }
+            }
             _ => Self::Generic {
-                store: "MicrosoftAzure",
+                store: STORE,
                 source: Box::new(source),
             },
         }
@@ -209,12 +209,12 @@ impl ObjectStore for MicrosoftAzure {
         Ok(())
     }
 
-    async fn get(&self, location: &Path) -> Result<GetResult> {
-        let response = self.client.get_request(location, None, false).await?;
+    async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
+        let response = self.client.get_request(location, options, false).await?;
         let stream = response
             .bytes_stream()
             .map_err(|source| crate::Error::Generic {
-                store: "MicrosoftAzure",
+                store: STORE,
                 source: Box::new(source),
             })
             .boxed();
@@ -222,26 +222,13 @@ impl ObjectStore for MicrosoftAzure {
         Ok(GetResult::Stream(stream))
     }
 
-    async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
-        let bytes = self
-            .client
-            .get_request(location, Some(range), false)
-            .await?
-            .bytes()
-            .await
-            .map_err(|source| client::Error::GetResponseBody {
-                source,
-                path: location.to_string(),
-            })?;
-        Ok(bytes)
-    }
-
     async fn head(&self, location: &Path) -> Result<ObjectMeta> {
         use reqwest::header::{CONTENT_LENGTH, ETAG, LAST_MODIFIED};
+        let options = GetOptions::default();
 
         // Extract meta from headers
         // https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob-properties
-        let response = self.client.get_request(location, None, true).await?;
+        let response = self.client.get_request(location, options, true).await?;
         let headers = response.headers();
 
         let last_modified = headers
@@ -1103,8 +1090,9 @@ fn split_sas(sas: &str) -> Result<Vec<(String, String)>, Error> {
 mod tests {
     use super::*;
     use crate::tests::{
-        copy_if_not_exists, list_uses_directories_correctly, list_with_delimiter,
-        put_get_delete_list, put_get_delete_list_opts, rename_and_copy, stream_get,
+        copy_if_not_exists, get_opts, list_uses_directories_correctly,
+        list_with_delimiter, put_get_delete_list, put_get_delete_list_opts,
+        rename_and_copy, stream_get,
     };
     use std::collections::HashMap;
     use std::env;
@@ -1175,6 +1163,7 @@ mod tests {
     async fn azure_blob_test() {
         let integration = maybe_skip_integration!().build().unwrap();
         put_get_delete_list_opts(&integration, false).await;
+        get_opts(&integration).await;
         list_uses_directories_correctly(&integration).await;
         list_with_delimiter(&integration).await;
         rename_and_copy(&integration).await;
@@ -1203,6 +1192,7 @@ mod tests {
         let integration = builder.build().unwrap();
 
         put_get_delete_list(&integration).await;
+        get_opts(&integration).await;
         list_uses_directories_correctly(&integration).await;
         list_with_delimiter(&integration).await;
         rename_and_copy(&integration).await;
diff --git a/object_store/src/chunked.rs b/object_store/src/chunked.rs
index aebefec61..c639d7e89 100644
--- a/object_store/src/chunked.rs
+++ b/object_store/src/chunked.rs
@@ -30,7 +30,7 @@ use tokio::io::AsyncWrite;
 
 use crate::path::Path;
 use crate::util::maybe_spawn_blocking;
-use crate::{GetResult, ListResult, ObjectMeta, ObjectStore};
+use crate::{GetOptions, GetResult, ListResult, ObjectMeta, ObjectStore};
 use crate::{MultipartId, Result};
 
 /// Wraps a [`ObjectStore`] and makes its get response return chunks
@@ -81,8 +81,8 @@ impl ObjectStore for ChunkedStore {
         self.inner.abort_multipart(location, multipart_id).await
     }
 
-    async fn get(&self, location: &Path) -> Result<GetResult> {
-        match self.inner.get(location).await? {
+    async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
+        match self.inner.get_opts(location, options).await? {
             GetResult::File(std_file, ..) => {
                 let reader = BufReader::new(std_file);
                 let chunk_size = self.chunk_size;
@@ -245,6 +245,7 @@ mod tests {
             let integration = ChunkedStore::new(Arc::clone(integration), 100);
 
             put_get_delete_list(&integration).await;
+            get_opts(&integration).await;
             list_uses_directories_correctly(&integration).await;
             list_with_delimiter(&integration).await;
             rename_and_copy(&integration).await;
diff --git a/object_store/src/client/mod.rs b/object_store/src/client/mod.rs
index ccf1b4a3b..be44a9f99 100644
--- a/object_store/src/client/mod.rs
+++ b/object_store/src/client/mod.rs
@@ -31,11 +31,12 @@ use std::str::FromStr;
 use std::time::Duration;
 
 use reqwest::header::{HeaderMap, HeaderValue};
-use reqwest::{Client, ClientBuilder, Proxy};
+use reqwest::{Client, ClientBuilder, Proxy, RequestBuilder};
 use serde::{Deserialize, Serialize};
 
 use crate::config::{fmt_duration, ConfigValue};
 use crate::path::Path;
+use crate::GetOptions;
 
 fn map_client_error(e: reqwest::Error) -> super::Error {
     super::Error::Generic {
@@ -462,6 +463,40 @@ impl ClientOptions {
     }
 }
 
+pub trait GetOptionsExt {
+    fn with_get_options(self, options: GetOptions) -> Self;
+}
+
+impl GetOptionsExt for RequestBuilder {
+    fn with_get_options(mut self, options: GetOptions) -> Self {
+        use hyper::header::*;
+
+        if let Some(range) = options.range {
+            let range = format!("bytes={}-{}", range.start, range.end.saturating_sub(1));
+            self = self.header(RANGE, range);
+        }
+
+        if let Some(tag) = options.if_match {
+            self = self.header(IF_MATCH, tag);
+        }
+
+        if let Some(tag) = options.if_none_match {
+            self = self.header(IF_NONE_MATCH, tag);
+        }
+
+        const DATE_FORMAT: &str = "%a, %d %b %Y %H:%M:%S GMT";
+        if let Some(date) = options.if_unmodified_since {
+            self = self.header(IF_UNMODIFIED_SINCE, date.format(DATE_FORMAT).to_string());
+        }
+
+        if let Some(date) = options.if_modified_since {
+            self = self.header(IF_MODIFIED_SINCE, date.format(DATE_FORMAT).to_string());
+        }
+
+        self
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
diff --git a/object_store/src/client/retry.rs b/object_store/src/client/retry.rs
index f9c2dd300..39a913142 100644
--- a/object_store/src/client/retry.rs
+++ b/object_store/src/client/retry.rs
@@ -32,6 +32,7 @@ pub struct Error {
     retries: usize,
     message: String,
     source: Option<reqwest::Error>,
+    status: Option<StatusCode>,
 }
 
 impl std::fmt::Display for Error {
@@ -57,7 +58,28 @@ impl std::error::Error for Error {
 impl Error {
     /// Returns the status code associated with this error if any
     pub fn status(&self) -> Option<StatusCode> {
-        self.source.as_ref().and_then(|e| e.status())
+        self.status
+    }
+
+    pub fn error(self, store: &'static str, path: String) -> crate::Error {
+        match self.status {
+            Some(StatusCode::NOT_FOUND) => crate::Error::NotFound {
+                path,
+                source: Box::new(self),
+            },
+            Some(StatusCode::NOT_MODIFIED) => crate::Error::NotModified {
+                path,
+                source: Box::new(self),
+            },
+            Some(StatusCode::PRECONDITION_FAILED) => crate::Error::Precondition {
+                path,
+                source: Box::new(self),
+            },
+            _ => crate::Error::Generic {
+                store,
+                source: Box::new(self),
+            },
+        }
     }
 }
 
@@ -146,6 +168,14 @@ impl RetryExt for reqwest::RequestBuilder {
                 match s.send().await {
                     Ok(r) => match r.error_for_status_ref() {
                         Ok(_) if r.status().is_success() => return Ok(r),
+                        Ok(r) if r.status() == StatusCode::NOT_MODIFIED => {
+                            return Err(Error{
+                                message: "not modified".to_string(),
+                                retries,
+                                status: Some(r.status()),
+                                source: None,
+                            })
+                        }
                         Ok(r) => {
                             let is_bare_redirect = r.status().is_redirection() && !r.headers().contains_key(LOCATION);
                             let message = match is_bare_redirect {
@@ -157,6 +187,7 @@ impl RetryExt for reqwest::RequestBuilder {
                             return Err(Error{
                                 message,
                                 retries,
+                                status: Some(r.status()),
                                 source: None,
                             })
                         }
@@ -180,6 +211,7 @@ impl RetryExt for reqwest::RequestBuilder {
                                 return Err(Error{
                                     message,
                                     retries,
+                                    status: Some(status),
                                     source: Some(e),
                                 })
 
@@ -209,7 +241,8 @@ impl RetryExt for reqwest::RequestBuilder {
                             return Err(Error{
                                 retries,
                                 message: "request error".to_string(),
-                                source: Some(e)
+                                status: e.status(),
+                                source: Some(e),
                             })
                         }
                         let sleep = backoff.next();
diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs
index 375b4d8f8..41a91fef8 100644
--- a/object_store/src/gcp/mod.rs
+++ b/object_store/src/gcp/mod.rs
@@ -31,7 +31,6 @@
 //! week.
 use std::collections::BTreeSet;
 use std::io;
-use std::ops::Range;
 use std::str::FromStr;
 use std::sync::Arc;
 
@@ -40,7 +39,6 @@ use bytes::{Buf, Bytes};
 use chrono::{DateTime, Utc};
 use futures::{stream::BoxStream, StreamExt, TryStreamExt};
 use percent_encoding::{percent_encode, NON_ALPHANUMERIC};
-use reqwest::header::RANGE;
 use reqwest::{header, Client, Method, Response, StatusCode};
 use serde::{Deserialize, Serialize};
 use snafu::{OptionExt, ResultExt, Snafu};
@@ -49,14 +47,14 @@ use url::Url;
 
 use crate::client::pagination::stream_paginated;
 use crate::client::retry::RetryExt;
-use crate::client::ClientConfigKey;
+use crate::client::{ClientConfigKey, GetOptionsExt};
 use crate::{
     client::token::TokenCache,
     multipart::{CloudMultiPartUpload, CloudMultiPartUploadImpl, UploadPart},
     path::{Path, DELIMITER},
-    util::{format_http_range, format_prefix},
-    ClientOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result,
-    RetryConfig,
+    util::format_prefix,
+    ClientOptions, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta,
+    ObjectStore, Result, RetryConfig,
 };
 
 use self::credential::{
@@ -66,6 +64,8 @@ use self::credential::{
 
 mod credential;
 
+const STORE: &str = "GCS";
+
 #[derive(Debug, Snafu)]
 enum Error {
     #[snafu(display("Got invalid XML response for {} {}: {}", method, url, source))]
@@ -100,15 +100,12 @@ enum Error {
         path: String,
     },
 
-    #[snafu(display("Error performing copy request {}: {}", path, source))]
-    CopyRequest {
+    #[snafu(display("Error performing put request {}: {}", path, source))]
+    PutRequest {
         source: crate::client::retry::Error,
         path: String,
     },
 
-    #[snafu(display("Error performing put request: {}", source))]
-    PutRequest { source: crate::client::retry::Error },
-
     #[snafu(display("Error getting put response body: {}", source))]
     PutResponseBody { source: reqwest::Error },
 
@@ -129,12 +126,6 @@ enum Error {
     #[snafu(display("GCP credential error: {}", source))]
     Credential { source: credential::Error },
 
-    #[snafu(display("Already exists: {}", path))]
-    AlreadyExists {
-        source: crate::client::retry::Error,
-        path: String,
-    },
-
     #[snafu(display("Unable parse source url. Url: {}, Error: {}", url, source))]
     UnableToParseUrl {
         source: url::ParseError,
@@ -159,23 +150,12 @@ impl From<Error> for super::Error {
         match err {
             Error::GetRequest { source, path }
             | Error::DeleteRequest { source, path }
-            | Error::CopyRequest { source, path }
-                if matches!(source.status(), Some(StatusCode::NOT_FOUND)) =>
-            {
-                Self::NotFound {
-                    path,
-                    source: Box::new(source),
-                }
-            }
-            Error::AlreadyExists { source, path } => Self::AlreadyExists {
-                source: Box::new(source),
-                path,
-            },
+            | Error::PutRequest { source, path } => source.error(STORE, path),
             Error::UnknownConfigurationKey { key } => {
-                Self::UnknownConfigurationKey { store: "GCS", key }
+                Self::UnknownConfigurationKey { store: STORE, key }
             }
             _ => Self::Generic {
-                store: "GCS",
+                store: STORE,
                 source: Box::new(err),
             },
         }
@@ -280,26 +260,23 @@ impl GoogleCloudStorageClient {
     async fn get_request(
         &self,
         path: &Path,
-        range: Option<Range<usize>>,
+        options: GetOptions,
         head: bool,
     ) -> Result<Response> {
         let token = self.get_token().await?;
         let url = self.object_url(path);
 
-        let mut builder = self.client.request(Method::GET, url);
-
-        if let Some(range) = range {
-            builder = builder.header(RANGE, format_http_range(range));
-        }
-
         let alt = match head {
             true => "json",
             false => "media",
         };
 
+        let builder = self.client.request(Method::GET, url);
+
         let response = builder
             .bearer_auth(token)
             .query(&[("alt", alt)])
+            .with_get_options(options)
             .send_retry(&self.retry_config)
             .await
             .context(GetRequestSnafu {
@@ -331,7 +308,9 @@ impl GoogleCloudStorageClient {
             .body(payload)
             .send_retry(&self.retry_config)
             .await
-            .context(PutRequestSnafu)?;
+            .context(PutRequestSnafu {
+                path: path.as_ref(),
+            })?;
 
         Ok(())
     }
@@ -355,7 +334,9 @@ impl GoogleCloudStorageClient {
             .query(&[("uploads", "")])
             .send_retry(&self.retry_config)
             .await
-            .context(PutRequestSnafu)?;
+            .context(PutRequestSnafu {
+                path: path.as_ref(),
+            })?;
 
         let data = response.bytes().await.context(PutResponseBodySnafu)?;
         let result: InitiateMultipartUploadResult = quick_xml::de::from_reader(
@@ -387,7 +368,7 @@ impl GoogleCloudStorageClient {
             .query(&[("uploadId", multipart_id)])
             .send_retry(&self.retry_config)
             .await
-            .context(PutRequestSnafu)?;
+            .context(PutRequestSnafu { path })?;
 
         Ok(())
     }
@@ -444,22 +425,12 @@ impl GoogleCloudStorageClient {
             .header(header::CONTENT_LENGTH, 0)
             .send_retry(&self.retry_config)
             .await
-            .map_err(|err| {
-                if err
-                    .status()
-                    .map(|status| status == reqwest::StatusCode::PRECONDITION_FAILED)
-                    .unwrap_or_else(|| false)
-                {
-                    Error::AlreadyExists {
-                        source: err,
-                        path: to.to_string(),
-                    }
-                } else {
-                    Error::CopyRequest {
-                        source: err,
-                        path: from.to_string(),
-                    }
-                }
+            .map_err(|err| match err.status() {
+                Some(StatusCode::PRECONDITION_FAILED) => crate::Error::AlreadyExists {
+                    source: Box::new(err),
+                    path: to.to_string(),
+                },
+                _ => err.error(STORE, from.to_string()),
             })?;
 
         Ok(())
@@ -667,12 +638,18 @@ impl ObjectStore for GoogleCloudStorage {
         Ok(())
     }
 
-    async fn get(&self, location: &Path) -> Result<GetResult> {
-        let response = self.client.get_request(location, None, false).await?;
+    async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
+        if options.if_modified_since.is_some() || options.if_unmodified_since.is_some() {
+            return Err(super::Error::NotSupported {
+                source: "ModifiedSince Preconditions not supported by GoogleCloudStorage JSON API".to_string().into(),
+            });
+        }
+
+        let response = self.client.get_request(location, options, false).await?;
         let stream = response
             .bytes_stream()
             .map_err(|source| crate::Error::Generic {
-                store: "GCS",
+                store: STORE,
                 source: Box::new(source),
             })
             .boxed();
@@ -680,18 +657,9 @@ impl ObjectStore for GoogleCloudStorage {
         Ok(GetResult::Stream(stream))
     }
 
-    async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
-        let response = self
-            .client
-            .get_request(location, Some(range), false)
-            .await?;
-        Ok(response.bytes().await.context(GetResponseBodySnafu {
-            path: location.as_ref(),
-        })?)
-    }
-
     async fn head(&self, location: &Path) -> Result<ObjectMeta> {
-        let response = self.client.get_request(location, None, true).await?;
+        let options = GetOptions::default();
+        let response = self.client.get_request(location, options, true).await?;
         let object = response.json().await.context(GetResponseBodySnafu {
             path: location.as_ref(),
         })?;
@@ -1224,13 +1192,7 @@ mod test {
     use std::io::Write;
     use tempfile::NamedTempFile;
 
-    use crate::{
-        tests::{
-            copy_if_not_exists, get_nonexistent_object, list_uses_directories_correctly,
-            list_with_delimiter, put_get_delete_list, rename_and_copy, stream_get,
-        },
-        Error as ObjectStoreError, ObjectStore,
-    };
+    use crate::tests::*;
 
     use super::*;
 
@@ -1299,6 +1261,8 @@ mod test {
             // Fake GCS server does not yet implement XML Multipart uploads
             // https://github.com/fsouza/fake-gcs-server/issues/852
             stream_get(&integration).await;
+            // Fake GCS server doesn't currently honor preconditions
+            get_opts(&integration).await;
         }
     }
 
@@ -1311,7 +1275,7 @@ mod test {
         let err = integration.get(&location).await.unwrap_err();
 
         assert!(
-            matches!(err, ObjectStoreError::NotFound { .. }),
+            matches!(err, crate::Error::NotFound { .. }),
             "unexpected error type: {err}"
         );
     }
@@ -1330,7 +1294,7 @@ mod test {
             .unwrap_err();
 
         assert!(
-            matches!(err, ObjectStoreError::NotFound { .. }),
+            matches!(err, crate::Error::NotFound { .. }),
             "unexpected error type: {err}"
         );
     }
@@ -1343,7 +1307,7 @@ mod test {
 
         let err = integration.delete(&location).await.unwrap_err();
         assert!(
-            matches!(err, ObjectStoreError::NotFound { .. }),
+            matches!(err, crate::Error::NotFound { .. }),
             "unexpected error type: {err}"
         );
     }
@@ -1359,7 +1323,7 @@ mod test {
 
         let err = integration.delete(&location).await.unwrap_err();
         assert!(
-            matches!(err, ObjectStoreError::NotFound { .. }),
+            matches!(err, crate::Error::NotFound { .. }),
             "unexpected error type: {err}"
         );
     }
diff --git a/object_store/src/http/client.rs b/object_store/src/http/client.rs
index 5ef272180..4e58eb0b2 100644
--- a/object_store/src/http/client.rs
+++ b/object_store/src/http/client.rs
@@ -16,17 +16,17 @@
 // under the License.
 
 use crate::client::retry::{self, RetryConfig, RetryExt};
+use crate::client::GetOptionsExt;
 use crate::path::{Path, DELIMITER};
-use crate::util::{deserialize_rfc1123, format_http_range};
-use crate::{ClientOptions, ObjectMeta, Result};
+use crate::util::deserialize_rfc1123;
+use crate::{ClientOptions, GetOptions, ObjectMeta, Result};
 use bytes::{Buf, Bytes};
 use chrono::{DateTime, Utc};
 use percent_encoding::percent_decode_str;
-use reqwest::header::{CONTENT_TYPE, RANGE};
+use reqwest::header::CONTENT_TYPE;
 use reqwest::{Method, Response, StatusCode};
 use serde::Deserialize;
 use snafu::{OptionExt, ResultExt, Snafu};
-use std::ops::Range;
 use url::Url;
 
 #[derive(Debug, Snafu)]
@@ -229,19 +229,12 @@ impl Client {
         Ok(())
     }
 
-    pub async fn get(
-        &self,
-        location: &Path,
-        range: Option<Range<usize>>,
-    ) -> Result<Response> {
+    pub async fn get(&self, location: &Path, options: GetOptions) -> Result<Response> {
         let url = self.path_url(location);
-        let mut builder = self.client.get(url);
-
-        if let Some(range) = range {
-            builder = builder.header(RANGE, format_http_range(range));
-        }
+        let builder = self.client.get(url);
 
         builder
+            .with_get_options(options)
             .send_retry(&self.retry_config)
             .await
             .map_err(|source| match source.status() {
diff --git a/object_store/src/http/mod.rs b/object_store/src/http/mod.rs
index c91faa235..bed19722c 100644
--- a/object_store/src/http/mod.rs
+++ b/object_store/src/http/mod.rs
@@ -31,8 +31,6 @@
 //! [rfc2518]: https://datatracker.ietf.org/doc/html/rfc2518
 //! [WebDAV]: https://en.wikipedia.org/wiki/WebDAV
 
-use std::ops::Range;
-
 use async_trait::async_trait;
 use bytes::Bytes;
 use futures::stream::BoxStream;
@@ -45,8 +43,8 @@ use url::Url;
 use crate::http::client::Client;
 use crate::path::Path;
 use crate::{
-    ClientOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result,
-    RetryConfig,
+    ClientOptions, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta,
+    ObjectStore, Result, RetryConfig,
 };
 
 mod client;
@@ -119,8 +117,8 @@ impl ObjectStore for HttpStore {
         Err(super::Error::NotImplemented)
     }
 
-    async fn get(&self, location: &Path) -> Result<GetResult> {
-        let response = self.client.get(location, None).await?;
+    async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
+        let response = self.client.get(location, options).await?;
         let stream = response
             .bytes_stream()
             .map_err(|source| Error::Reqwest { source }.into())
@@ -129,17 +127,6 @@ impl ObjectStore for HttpStore {
         Ok(GetResult::Stream(stream))
     }
 
-    async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
-        let bytes = self
-            .client
-            .get(location, Some(range))
-            .await?
-            .bytes()
-            .await
-            .context(ReqwestSnafu)?;
-        Ok(bytes)
-    }
-
     async fn head(&self, location: &Path) -> Result<ObjectMeta> {
         let status = self.client.list(Some(location), "0").await?;
         match status.response.len() {
diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index 2c93802ed..75f9ca7df 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -346,11 +346,24 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
     }
 
     /// Return the bytes that are stored at the specified location.
-    async fn get(&self, location: &Path) -> Result<GetResult>;
+    async fn get(&self, location: &Path) -> Result<GetResult> {
+        self.get_opts(location, GetOptions::default()).await
+    }
+
+    /// Perform a get request with options
+    ///
+    /// Note: options.range will be ignored if [`GetResult::File`]
+    async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult>;
 
     /// Return the bytes that are stored at the specified location
     /// in the given byte range
-    async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes>;
+    async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
+        let options = GetOptions {
+            range: Some(range),
+            ..Default::default()
+        };
+        self.get_opts(location, options).await?.bytes().await
+    }
 
     /// Return the bytes that are stored at the specified location
     /// in the given byte ranges
@@ -478,6 +491,10 @@ impl ObjectStore for Box<dyn ObjectStore> {
         self.as_ref().get(location).await
     }
 
+    async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
+        self.as_ref().get_opts(location, options).await
+    }
+
     async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
         self.as_ref().get_range(location, range).await
     }
@@ -558,6 +575,66 @@ pub struct ObjectMeta {
     pub e_tag: Option<String>,
 }
 
+/// Options for a get request, such as range
+#[derive(Debug, Default)]
+pub struct GetOptions {
+    /// Request will succeed if the `ObjectMeta::e_tag` matches
+    /// otherwise returning [`Error::Precondition`]
+    ///
+    /// <https://datatracker.ietf.org/doc/html/rfc9110#name-if-match>
+    pub if_match: Option<String>,
+    /// Request will succeed if the `ObjectMeta::e_tag` does not match
+    /// otherwise returning [`Error::NotModified`]
+    ///
+    /// <https://datatracker.ietf.org/doc/html/rfc9110#section-13.1.2>
+    pub if_none_match: Option<String>,
+    /// Request will succeed if the object has been modified since
+    ///
+    /// <https://datatracker.ietf.org/doc/html/rfc9110#section-13.1.3>
+    pub if_modified_since: Option<DateTime<Utc>>,
+    /// Request will succeed if the object has not been modified since
+    /// otherwise returning [`Error::Precondition`]
+    ///
+    /// Some stores, such as S3, will only return `NotModified` for exact
+    /// timestamp matches, instead of for any timestamp greater than or equal.
+    ///
+    /// <https://datatracker.ietf.org/doc/html/rfc9110#section-13.1.4>
+    pub if_unmodified_since: Option<DateTime<Utc>>,
+    /// Request transfer of only the specified range of bytes
+    /// otherwise returning [`Error::NotModified`]
+    ///
+    /// <https://datatracker.ietf.org/doc/html/rfc9110#name-range>
+    pub range: Option<Range<usize>>,
+}
+
+impl GetOptions {
+    /// Returns an error if the modification conditions on this request are not satisfied
+    fn check_modified(
+        &self,
+        location: &Path,
+        last_modified: DateTime<Utc>,
+    ) -> Result<()> {
+        if let Some(date) = self.if_modified_since {
+            if last_modified <= date {
+                return Err(Error::NotModified {
+                    path: location.to_string(),
+                    source: format!("{} >= {}", date, last_modified).into(),
+                });
+            }
+        }
+
+        if let Some(date) = self.if_unmodified_since {
+            if last_modified > date {
+                return Err(Error::Precondition {
+                    path: location.to_string(),
+                    source: format!("{} < {}", date, last_modified).into(),
+                });
+            }
+        }
+        Ok(())
+    }
+}
+
 /// Result for a get request
 ///
 /// This special cases the case of a local file, as some systems may
@@ -702,6 +779,18 @@ pub enum Error {
         source: Box<dyn std::error::Error + Send + Sync + 'static>,
     },
 
+    #[snafu(display("Request precondition failure for path {}: {}", path, source))]
+    Precondition {
+        path: String,
+        source: Box<dyn std::error::Error + Send + Sync + 'static>,
+    },
+
+    #[snafu(display("Object at location {} not modified: {}", path, source))]
+    NotModified {
+        path: String,
+        source: Box<dyn std::error::Error + Send + Sync + 'static>,
+    },
+
     #[snafu(display("Operation not yet implemented."))]
     NotImplemented,
 
@@ -1025,6 +1114,85 @@ mod tests {
         delete_fixtures(storage).await;
     }
 
+    pub(crate) async fn get_opts(storage: &dyn ObjectStore) {
+        let path = Path::from("test");
+        storage.put(&path, "foo".into()).await.unwrap();
+        let meta = storage.head(&path).await.unwrap();
+
+        let options = GetOptions {
+            if_unmodified_since: Some(meta.last_modified),
+            ..GetOptions::default()
+        };
+        match storage.get_opts(&path, options).await {
+            Ok(_) | Err(Error::NotSupported { .. }) => {}
+            Err(e) => panic!("{e}"),
+        }
+
+        let options = GetOptions {
+            if_unmodified_since: Some(meta.last_modified + chrono::Duration::hours(10)),
+            ..GetOptions::default()
+        };
+        match storage.get_opts(&path, options).await {
+            Ok(_) | Err(Error::NotSupported { .. }) => {}
+            Err(e) => panic!("{e}"),
+        }
+
+        let options = GetOptions {
+            if_unmodified_since: Some(meta.last_modified - chrono::Duration::hours(10)),
+            ..GetOptions::default()
+        };
+        match storage.get_opts(&path, options).await {
+            Err(Error::Precondition { .. } | Error::NotSupported { .. }) => {}
+            d => panic!("{d:?}"),
+        }
+
+        let options = GetOptions {
+            if_modified_since: Some(meta.last_modified),
+            ..GetOptions::default()
+        };
+        match storage.get_opts(&path, options).await {
+            Err(Error::NotModified { .. } | Error::NotSupported { .. }) => {}
+            d => panic!("{d:?}"),
+        }
+
+        let options = GetOptions {
+            if_modified_since: Some(meta.last_modified - chrono::Duration::hours(10)),
+            ..GetOptions::default()
+        };
+        match storage.get_opts(&path, options).await {
+            Ok(_) | Err(Error::NotSupported { .. }) => {}
+            Err(e) => panic!("{e}"),
+        }
+
+        if let Some(tag) = meta.e_tag {
+            let options = GetOptions {
+                if_match: Some(tag.clone()),
+                ..GetOptions::default()
+            };
+            storage.get_opts(&path, options).await.unwrap();
+
+            let options = GetOptions {
+                if_match: Some("invalid".to_string()),
+                ..GetOptions::default()
+            };
+            let err = storage.get_opts(&path, options).await.unwrap_err();
+            assert!(matches!(err, Error::Precondition { .. }), "{err}");
+
+            let options = GetOptions {
+                if_none_match: Some(tag.clone()),
+                ..GetOptions::default()
+            };
+            let err = storage.get_opts(&path, options).await.unwrap_err();
+            assert!(matches!(err, Error::NotModified { .. }), "{err}");
+
+            let options = GetOptions {
+                if_none_match: Some("invalid".to_string()),
+                ..GetOptions::default()
+            };
+            storage.get_opts(&path, options).await.unwrap();
+        }
+    }
+
     fn get_vec_of_bytes(chunk_length: usize, num_chunks: usize) -> Vec<Bytes> {
         std::iter::repeat(Bytes::from_iter(std::iter::repeat(b'x').take(chunk_length)))
             .take(num_chunks)
diff --git a/object_store/src/limit.rs b/object_store/src/limit.rs
index d0d9f73c5..e0091115d 100644
--- a/object_store/src/limit.rs
+++ b/object_store/src/limit.rs
@@ -18,8 +18,8 @@
 //! An object store that limits the maximum concurrency of the wrapped implementation
 
 use crate::{
-    BoxStream, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Path, Result,
-    StreamExt,
+    BoxStream, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore,
+    Path, Result, StreamExt,
 };
 use async_trait::async_trait;
 use bytes::Bytes;
@@ -114,6 +114,16 @@ impl<T: ObjectStore> ObjectStore for LimitStore<T> {
         }
     }
 
+    async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
+        let permit = Arc::clone(&self.semaphore).acquire_owned().await.unwrap();
+        match self.inner.get_opts(location, options).await? {
+            r @ GetResult::File(_, _) => Ok(r),
+            GetResult::Stream(s) => {
+                Ok(GetResult::Stream(PermitWrapper::new(s, permit).boxed()))
+            }
+        }
+    }
+
     async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
         let _permit = self.semaphore.acquire().await.unwrap();
         self.inner.get_range(location, range).await
@@ -251,10 +261,7 @@ impl<T: AsyncWrite + Unpin> AsyncWrite for PermitWrapper<T> {
 mod tests {
     use crate::limit::LimitStore;
     use crate::memory::InMemory;
-    use crate::tests::{
-        list_uses_directories_correctly, list_with_delimiter, put_get_delete_list,
-        rename_and_copy, stream_get,
-    };
+    use crate::tests::*;
     use crate::ObjectStore;
     use std::time::Duration;
     use tokio::time::timeout;
@@ -266,6 +273,7 @@ mod tests {
         let integration = LimitStore::new(memory, max_requests);
 
         put_get_delete_list(&integration).await;
+        get_opts(&integration).await;
         list_uses_directories_correctly(&integration).await;
         list_with_delimiter(&integration).await;
         rename_and_copy(&integration).await;
diff --git a/object_store/src/local.rs b/object_store/src/local.rs
index b40f5a777..26a8bf336 100644
--- a/object_store/src/local.rs
+++ b/object_store/src/local.rs
@@ -19,7 +19,7 @@
 use crate::{
     maybe_spawn_blocking,
     path::{absolute_path_to_url, Path},
-    GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result,
+    GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result,
 };
 use async_trait::async_trait;
 use bytes::Bytes;
@@ -56,7 +56,7 @@ pub(crate) enum Error {
     },
 
     #[snafu(display("Unable to access metadata for {}: {}", path, source))]
-    UnableToAccessMetadata {
+    Metadata {
         source: Box<dyn std::error::Error + Send + Sync + 'static>,
         path: String,
     },
@@ -360,10 +360,27 @@ impl ObjectStore for LocalFileSystem {
         Err(super::Error::NotImplemented)
     }
 
-    async fn get(&self, location: &Path) -> Result<GetResult> {
-        let path = self.config.path_to_filesystem(location)?;
+    async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
+        if options.if_match.is_some() || options.if_none_match.is_some() {
+            return Err(super::Error::NotSupported {
+                source: "ETags not supported by LocalFileSystem".to_string().into(),
+            });
+        }
+
+        let location = location.clone();
+        let path = self.config.path_to_filesystem(&location)?;
         maybe_spawn_blocking(move || {
             let file = open_file(&path)?;
+            if options.if_unmodified_since.is_some()
+                || options.if_modified_since.is_some()
+            {
+                let metadata = file.metadata().map_err(|e| Error::Metadata {
+                    source: e.into(),
+                    path: location.to_string(),
+                })?;
+                options.check_modified(&location, last_modified(&metadata))?;
+            }
+
             Ok(GetResult::File(file, path))
         })
         .await
@@ -408,7 +425,7 @@ impl ObjectStore for LocalFileSystem {
                         source: e,
                     }
                 } else {
-                    Error::UnableToAccessMetadata {
+                    Error::Metadata {
                         source: e.into(),
                         path: location.to_string(),
                     }
@@ -878,21 +895,22 @@ fn open_file(path: &PathBuf) -> Result<File> {
 }
 
 fn convert_entry(entry: DirEntry, location: Path) -> Result<ObjectMeta> {
-    let metadata = entry
-        .metadata()
-        .map_err(|e| Error::UnableToAccessMetadata {
-            source: e.into(),
-            path: location.to_string(),
-        })?;
+    let metadata = entry.metadata().map_err(|e| Error::Metadata {
+        source: e.into(),
+        path: location.to_string(),
+    })?;
     convert_metadata(metadata, location)
 }
 
-fn convert_metadata(metadata: std::fs::Metadata, location: Path) -> Result<ObjectMeta> {
-    let last_modified: DateTime<Utc> = metadata
+fn last_modified(metadata: &std::fs::Metadata) -> DateTime<Utc> {
+    metadata
         .modified()
         .expect("Modified file time should be supported on this platform")
-        .into();
+        .into()
+}
 
+fn convert_metadata(metadata: std::fs::Metadata, location: Path) -> Result<ObjectMeta> {
+    let last_modified = last_modified(&metadata);
     let size = usize::try_from(metadata.len()).context(FileSizeOverflowedUsizeSnafu {
         path: location.as_ref(),
     })?;
@@ -956,13 +974,7 @@ fn convert_walkdir_result(
 mod tests {
     use super::*;
     use crate::test_util::flatten_list_stream;
-    use crate::{
-        tests::{
-            copy_if_not_exists, get_nonexistent_object, list_uses_directories_correctly,
-            list_with_delimiter, put_get_delete_list, rename_and_copy, stream_get,
-        },
-        Error as ObjectStoreError, ObjectStore,
-    };
+    use crate::tests::*;
     use futures::TryStreamExt;
     use tempfile::{NamedTempFile, TempDir};
     use tokio::io::AsyncWriteExt;
@@ -973,6 +985,7 @@ mod tests {
         let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
 
         put_get_delete_list(&integration).await;
+        get_opts(&integration).await;
         list_uses_directories_correctly(&integration).await;
         list_with_delimiter(&integration).await;
         rename_and_copy(&integration).await;
@@ -1085,7 +1098,7 @@ mod tests {
         let err = get_nonexistent_object(&integration, Some(location))
             .await
             .unwrap_err();
-        if let ObjectStoreError::NotFound { path, source } = err {
+        if let crate::Error::NotFound { path, source } = err {
             let source_variant = source.downcast_ref::<std::io::Error>();
             assert!(
                 matches!(source_variant, Some(std::io::Error { .. }),),
diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs
index b01ffbb02..82d485997 100644
--- a/object_store/src/memory.rs
+++ b/object_store/src/memory.rs
@@ -16,8 +16,8 @@
 // under the License.
 
 //! An in-memory object store implementation
-use crate::MultipartId;
 use crate::{path::Path, GetResult, ListResult, ObjectMeta, ObjectStore, Result};
+use crate::{GetOptions, MultipartId};
 use async_trait::async_trait;
 use bytes::Bytes;
 use chrono::{DateTime, Utc};
@@ -128,12 +128,17 @@ impl ObjectStore for InMemory {
         }))
     }
 
-    async fn get(&self, location: &Path) -> Result<GetResult> {
-        let data = self.entry(location).await?;
+    async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
+        if options.if_match.is_some() || options.if_none_match.is_some() {
+            return Err(super::Error::NotSupported {
+                source: "ETags not supported by InMemory".to_string().into(),
+            });
+        }
+        let (data, last_modified) = self.entry(location).await?;
+        options.check_modified(location, last_modified)?;
 
-        Ok(GetResult::Stream(
-            futures::stream::once(async move { Ok(data.0) }).boxed(),
-        ))
+        let stream = futures::stream::once(futures::future::ready(Ok(data)));
+        Ok(GetResult::Stream(stream.boxed()))
     }
 
     async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
@@ -391,19 +396,14 @@ mod tests {
 
     use super::*;
 
-    use crate::{
-        tests::{
-            copy_if_not_exists, get_nonexistent_object, list_uses_directories_correctly,
-            list_with_delimiter, put_get_delete_list, rename_and_copy, stream_get,
-        },
-        Error as ObjectStoreError, ObjectStore,
-    };
+    use crate::tests::*;
 
     #[tokio::test]
     async fn in_memory_test() {
         let integration = InMemory::new();
 
         put_get_delete_list(&integration).await;
+        get_opts(&integration).await;
         list_uses_directories_correctly(&integration).await;
         list_with_delimiter(&integration).await;
         rename_and_copy(&integration).await;
@@ -443,7 +443,7 @@ mod tests {
         let err = get_nonexistent_object(&integration, Some(location))
             .await
             .unwrap_err();
-        if let ObjectStoreError::NotFound { path, source } = err {
+        if let crate::Error::NotFound { path, source } = err {
             let source_variant = source.downcast_ref::<Error>();
             assert!(
                 matches!(source_variant, Some(Error::NoDataInMemory { .. }),),
diff --git a/object_store/src/prefix.rs b/object_store/src/prefix.rs
index 94836d33c..ffe509411 100644
--- a/object_store/src/prefix.rs
+++ b/object_store/src/prefix.rs
@@ -22,7 +22,9 @@ use std::ops::Range;
 use tokio::io::AsyncWrite;
 
 use crate::path::Path;
-use crate::{GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result};
+use crate::{
+    GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result,
+};
 
 #[doc(hidden)]
 #[deprecated(note = "Use PrefixStore")]
@@ -117,6 +119,15 @@ impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
         self.inner.get_range(&full_path, range).await
     }
 
+    async fn get_opts(
+        &self,
+        location: &Path,
+        options: GetOptions,
+    ) -> Result<GetResult> {
+        let full_path = self.full_path(location);
+        self.inner.get_opts(&full_path, options).await
+    }
+
     async fn get_ranges(
         &self,
         location: &Path,
@@ -206,10 +217,7 @@ mod tests {
     use super::*;
     use crate::local::LocalFileSystem;
     use crate::test_util::flatten_list_stream;
-    use crate::tests::{
-        copy_if_not_exists, list_uses_directories_correctly, list_with_delimiter,
-        put_get_delete_list, rename_and_copy, stream_get,
-    };
+    use crate::tests::*;
 
     use tempfile::TempDir;
 
@@ -220,6 +228,7 @@ mod tests {
         let integration = PrefixStore::new(inner, "prefix");
 
         put_get_delete_list(&integration).await;
+        get_opts(&integration).await;
         list_uses_directories_correctly(&integration).await;
         list_with_delimiter(&integration).await;
         rename_and_copy(&integration).await;
diff --git a/object_store/src/throttle.rs b/object_store/src/throttle.rs
index e51303114..fb90afcec 100644
--- a/object_store/src/throttle.rs
+++ b/object_store/src/throttle.rs
@@ -20,8 +20,8 @@ use parking_lot::Mutex;
 use std::ops::Range;
 use std::{convert::TryInto, sync::Arc};
 
-use crate::MultipartId;
 use crate::{path::Path, GetResult, ListResult, ObjectMeta, ObjectStore, Result};
+use crate::{GetOptions, MultipartId};
 use async_trait::async_trait;
 use bytes::Bytes;
 use futures::{stream::BoxStream, FutureExt, StreamExt};
@@ -179,17 +179,18 @@ impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {
         // need to copy to avoid moving / referencing `self`
         let wait_get_per_byte = self.config().wait_get_per_byte;
 
-        self.inner.get(location).await.map(|result| {
-            let s = match result {
-                GetResult::Stream(s) => s,
-                GetResult::File(_, _) => unimplemented!(),
-            };
+        let result = self.inner.get(location).await?;
+        Ok(throttle_get(result, wait_get_per_byte))
+    }
 
-            GetResult::Stream(throttle_stream(s, move |bytes| {
-                let bytes_len: u32 = usize_to_u32_saturate(bytes.len());
-                wait_get_per_byte * bytes_len
-            }))
-        })
+    async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
+        sleep(self.config().wait_get_per_call).await;
+
+        // need to copy to avoid moving / referencing `self`
+        let wait_get_per_byte = self.config().wait_get_per_byte;
+
+        let result = self.inner.get_opts(location, options).await?;
+        Ok(throttle_get(result, wait_get_per_byte))
     }
 
     async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
@@ -299,6 +300,18 @@ fn usize_to_u32_saturate(x: usize) -> u32 {
     x.try_into().unwrap_or(u32::MAX)
 }
 
+fn throttle_get(result: GetResult, wait_get_per_byte: Duration) -> GetResult {
+    let s = match result {
+        GetResult::Stream(s) => s,
+        GetResult::File(_, _) => unimplemented!(),
+    };
+
+    GetResult::Stream(throttle_stream(s, move |bytes| {
+        let bytes_len: u32 = usize_to_u32_saturate(bytes.len());
+        wait_get_per_byte * bytes_len
+    }))
+}
+
 fn throttle_stream<T: Send + 'static, E: Send + 'static, F>(
     stream: BoxStream<'_, Result<T, E>>,
     delay: F,
@@ -317,13 +330,7 @@ where
 #[cfg(test)]
 mod tests {
     use super::*;
-    use crate::{
-        memory::InMemory,
-        tests::{
-            copy_if_not_exists, list_uses_directories_correctly, list_with_delimiter,
-            put_get_delete_list, rename_and_copy,
-        },
-    };
+    use crate::{memory::InMemory, tests::*};
     use bytes::Bytes;
     use futures::TryStreamExt;
     use tokio::time::Duration;
diff --git a/object_store/src/util.rs b/object_store/src/util.rs
index e5c701dd8..ba4c68345 100644
--- a/object_store/src/util.rs
+++ b/object_store/src/util.rs
@@ -44,13 +44,6 @@ pub fn format_prefix(prefix: Option<&crate::path::Path>) -> Option<String> {
         .map(|p| format!("{}{}", p.as_ref(), crate::path::DELIMITER))
 }
 
-/// Returns a formatted HTTP range header as per
-/// <https://httpwg.org/specs/rfc7233.html#header.range>
-#[cfg(any(feature = "aws", feature = "gcp", feature = "azure", feature = "http"))]
-pub fn format_http_range(range: std::ops::Range<usize>) -> String {
-    format!("bytes={}-{}", range.start, range.end.saturating_sub(1))
-}
-
 #[cfg(any(feature = "aws", feature = "azure"))]
 pub(crate) fn hmac_sha256(
     secret: impl AsRef<[u8]>,