You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/05/15 11:01:25 UTC
[arrow-rs] branch master updated: Add ObjectStore::get_opts (#2241) (#4212)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new bac40c6bf Add ObjectStore::get_opts (#2241) (#4212)
bac40c6bf is described below
commit bac40c6bfc6b390b3550acf42c9f099f867e1734
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Mon May 15 12:01:19 2023 +0100
Add ObjectStore::get_opts (#2241) (#4212)
* Add ObjectStore::get_opts (#2241)
* Cleanup error handling
* Review feedback
---
object_store/src/aws/client.rs | 36 +++-----
object_store/src/aws/credential.rs | 8 +-
object_store/src/aws/mod.rs | 39 +++------
object_store/src/azure/client.rs | 58 ++++---------
object_store/src/azure/mod.rs | 46 ++++------
object_store/src/chunked.rs | 7 +-
object_store/src/client/mod.rs | 37 +++++++-
object_store/src/client/retry.rs | 37 +++++++-
object_store/src/gcp/mod.rs | 128 ++++++++++-----------------
object_store/src/http/client.rs | 21 ++---
object_store/src/http/mod.rs | 21 +----
object_store/src/lib.rs | 172 ++++++++++++++++++++++++++++++++++++-
object_store/src/limit.rs | 20 +++--
object_store/src/local.rs | 57 +++++++-----
object_store/src/memory.rs | 28 +++---
object_store/src/prefix.rs | 19 ++--
object_store/src/throttle.rs | 43 ++++++----
object_store/src/util.rs | 7 --
18 files changed, 470 insertions(+), 314 deletions(-)
diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs
index 9634c740d..b2d01abfb 100644
--- a/object_store/src/aws/client.rs
+++ b/object_store/src/aws/client.rs
@@ -17,27 +17,25 @@
use crate::aws::checksum::Checksum;
use crate::aws::credential::{AwsCredential, CredentialExt, CredentialProvider};
-use crate::aws::STRICT_PATH_ENCODE_SET;
+use crate::aws::{STORE, STRICT_PATH_ENCODE_SET};
use crate::client::pagination::stream_paginated;
use crate::client::retry::RetryExt;
+use crate::client::GetOptionsExt;
use crate::multipart::UploadPart;
use crate::path::DELIMITER;
-use crate::util::{format_http_range, format_prefix};
+use crate::util::format_prefix;
use crate::{
- BoxStream, ClientOptions, ListResult, MultipartId, ObjectMeta, Path, Result,
- RetryConfig, StreamExt,
+ BoxStream, ClientOptions, GetOptions, ListResult, MultipartId, ObjectMeta, Path,
+ Result, RetryConfig, StreamExt,
};
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use bytes::{Buf, Bytes};
use chrono::{DateTime, Utc};
use percent_encoding::{utf8_percent_encode, PercentEncode};
-use reqwest::{
- header::CONTENT_TYPE, Client as ReqwestClient, Method, Response, StatusCode,
-};
+use reqwest::{header::CONTENT_TYPE, Client as ReqwestClient, Method, Response};
use serde::{Deserialize, Serialize};
use snafu::{ResultExt, Snafu};
-use std::ops::Range;
use std::sync::Arc;
/// A specialized `Error` for object store-related errors
@@ -102,16 +100,9 @@ impl From<Error> for crate::Error {
Error::GetRequest { source, path }
| Error::DeleteRequest { source, path }
| Error::CopyRequest { source, path }
- | Error::PutRequest { source, path }
- if matches!(source.status(), Some(StatusCode::NOT_FOUND)) =>
- {
- Self::NotFound {
- path,
- source: Box::new(source),
- }
- }
+ | Error::PutRequest { source, path } => source.error(STORE, path),
_ => Self::Generic {
- store: "S3",
+ store: STORE,
source: Box::new(err),
},
}
@@ -245,11 +236,9 @@ impl S3Client {
pub async fn get_request(
&self,
path: &Path,
- range: Option<Range<usize>>,
+ options: GetOptions,
head: bool,
) -> Result<Response> {
- use reqwest::header::RANGE;
-
let credential = self.get_credential().await?;
let url = self.config.path_url(path);
let method = match head {
@@ -257,13 +246,10 @@ impl S3Client {
false => Method::GET,
};
- let mut builder = self.client.request(method, url);
-
- if let Some(range) = range {
- builder = builder.header(RANGE, format_http_range(range));
- }
+ let builder = self.client.request(method, url);
let response = builder
+ .with_get_options(options)
.with_aws_sigv4(
credential.as_ref(),
&self.config.region,
diff --git a/object_store/src/aws/credential.rs b/object_store/src/aws/credential.rs
index c4cb7cfe1..16cdf35d0 100644
--- a/object_store/src/aws/credential.rs
+++ b/object_store/src/aws/credential.rs
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-use crate::aws::STRICT_ENCODE_SET;
+use crate::aws::{STORE, STRICT_ENCODE_SET};
use crate::client::retry::RetryExt;
use crate::client::token::{TemporaryToken, TokenCache};
use crate::util::hmac_sha256;
@@ -330,7 +330,7 @@ impl CredentialProvider for InstanceCredentialProvider {
self.imdsv1_fallback,
)
.map_err(|source| crate::Error::Generic {
- store: "S3",
+ store: STORE,
source,
})
}))
@@ -363,7 +363,7 @@ impl CredentialProvider for WebIdentityProvider {
&self.endpoint,
)
.map_err(|source| crate::Error::Generic {
- store: "S3",
+ store: STORE,
source,
})
}))
@@ -552,7 +552,7 @@ mod profile {
.provide_credentials()
.await
.map_err(|source| crate::Error::Generic {
- store: "S3",
+ store: STORE,
source: Box::new(source),
})?;
let t_now = SystemTime::now();
diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs
index 6fa5e1c85..3f9b4803f 100644
--- a/object_store/src/aws/mod.rs
+++ b/object_store/src/aws/mod.rs
@@ -40,7 +40,6 @@ use itertools::Itertools;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt, Snafu};
use std::collections::BTreeSet;
-use std::ops::Range;
use std::str::FromStr;
use std::sync::Arc;
use tokio::io::AsyncWrite;
@@ -57,8 +56,8 @@ use crate::client::ClientConfigKey;
use crate::config::ConfigValue;
use crate::multipart::{CloudMultiPartUpload, CloudMultiPartUploadImpl, UploadPart};
use crate::{
- ClientOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Path,
- Result, RetryConfig, StreamExt,
+ ClientOptions, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta,
+ ObjectStore, Path, Result, RetryConfig, StreamExt,
};
mod checksum;
@@ -79,6 +78,8 @@ pub(crate) const STRICT_ENCODE_SET: percent_encoding::AsciiSet =
/// This struct is used to maintain the URI path encoding
const STRICT_PATH_ENCODE_SET: percent_encoding::AsciiSet = STRICT_ENCODE_SET.remove(b'/');
+const STORE: &str = "S3";
+
/// Default metadata endpoint
static METADATA_ENDPOINT: &str = "http://169.254.169.254";
@@ -160,10 +161,10 @@ impl From<Error> for super::Error {
fn from(source: Error) -> Self {
match source {
Error::UnknownConfigurationKey { key } => {
- Self::UnknownConfigurationKey { store: "S3", key }
+ Self::UnknownConfigurationKey { store: STORE, key }
}
_ => Self::Generic {
- store: "S3",
+ store: STORE,
source: Box::new(source),
},
}
@@ -246,12 +247,12 @@ impl ObjectStore for AmazonS3 {
.await
}
- async fn get(&self, location: &Path) -> Result<GetResult> {
- let response = self.client.get_request(location, None, false).await?;
+ async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
+ let response = self.client.get_request(location, options, false).await?;
let stream = response
.bytes_stream()
.map_err(|source| crate::Error::Generic {
- store: "S3",
+ store: STORE,
source: Box::new(source),
})
.boxed();
@@ -259,26 +260,13 @@ impl ObjectStore for AmazonS3 {
Ok(GetResult::Stream(stream))
}
- async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
- let bytes = self
- .client
- .get_request(location, Some(range), false)
- .await?
- .bytes()
- .await
- .map_err(|source| client::Error::GetResponseBody {
- source,
- path: location.to_string(),
- })?;
- Ok(bytes)
- }
-
async fn head(&self, location: &Path) -> Result<ObjectMeta> {
use reqwest::header::{CONTENT_LENGTH, ETAG, LAST_MODIFIED};
+ let options = GetOptions::default();
// Extract meta from headers
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html#API_HeadObject_ResponseSyntax
- let response = self.client.get_request(location, None, true).await?;
+ let response = self.client.get_request(location, options, true).await?;
let headers = response.headers();
let last_modified = headers
@@ -1169,8 +1157,8 @@ fn profile_credentials(
mod tests {
use super::*;
use crate::tests::{
- get_nonexistent_object, list_uses_directories_correctly, list_with_delimiter,
- put_get_delete_list_opts, rename_and_copy, stream_get,
+ get_nonexistent_object, get_opts, list_uses_directories_correctly,
+ list_with_delimiter, put_get_delete_list_opts, rename_and_copy, stream_get,
};
use bytes::Bytes;
use std::collections::HashMap;
@@ -1417,6 +1405,7 @@ mod tests {
// Localstack doesn't support listing with spaces https://github.com/localstack/localstack/issues/6328
put_get_delete_list_opts(&integration, is_local).await;
+ get_opts(&integration).await;
list_uses_directories_correctly(&integration).await;
list_with_delimiter(&integration).await;
rename_and_copy(&integration).await;
diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs
index 87432f62b..4611986e3 100644
--- a/object_store/src/azure/client.rs
+++ b/object_store/src/azure/client.rs
@@ -17,13 +17,15 @@
use super::credential::{AzureCredential, CredentialProvider};
use crate::azure::credential::*;
+use crate::azure::STORE;
use crate::client::pagination::stream_paginated;
use crate::client::retry::RetryExt;
+use crate::client::GetOptionsExt;
use crate::path::DELIMITER;
-use crate::util::{deserialize_rfc1123, format_http_range, format_prefix};
+use crate::util::{deserialize_rfc1123, format_prefix};
use crate::{
- BoxStream, ClientOptions, ListResult, ObjectMeta, Path, Result, RetryConfig,
- StreamExt,
+ BoxStream, ClientOptions, GetOptions, ListResult, ObjectMeta, Path, Result,
+ RetryConfig, StreamExt,
};
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
@@ -32,13 +34,12 @@ use chrono::{DateTime, Utc};
use itertools::Itertools;
use reqwest::header::CONTENT_TYPE;
use reqwest::{
- header::{HeaderValue, CONTENT_LENGTH, IF_NONE_MATCH, RANGE},
+ header::{HeaderValue, CONTENT_LENGTH, IF_NONE_MATCH},
Client as ReqwestClient, Method, Response, StatusCode,
};
use serde::{Deserialize, Serialize};
use snafu::{ResultExt, Snafu};
use std::collections::HashMap;
-use std::ops::Range;
use url::Url;
/// A specialized `Error` for object store-related errors
@@ -69,12 +70,6 @@ pub(crate) enum Error {
path: String,
},
- #[snafu(display("Error performing copy request {}: {}", path, source))]
- CopyRequest {
- source: crate::client::retry::Error,
- path: String,
- },
-
#[snafu(display("Error performing list request: {}", source))]
ListRequest { source: crate::client::retry::Error },
@@ -95,25 +90,9 @@ impl From<Error> for crate::Error {
match err {
Error::GetRequest { source, path }
| Error::DeleteRequest { source, path }
- | Error::CopyRequest { source, path }
- | Error::PutRequest { source, path }
- if matches!(source.status(), Some(StatusCode::NOT_FOUND)) =>
- {
- Self::NotFound {
- path,
- source: Box::new(source),
- }
- }
- Error::CopyRequest { source, path }
- if matches!(source.status(), Some(StatusCode::CONFLICT)) =>
- {
- Self::AlreadyExists {
- path,
- source: Box::new(source),
- }
- }
+ | Error::PutRequest { source, path } => source.error(STORE, path),
_ => Self::Generic {
- store: "MicrosoftAzure",
+ store: STORE,
source: Box::new(err),
},
}
@@ -175,7 +154,7 @@ impl AzureClient {
// and we want to use it in an infallible function
HeaderValue::from_str(&format!("Bearer {token}")).map_err(|err| {
crate::Error::Generic {
- store: "MicrosoftAzure",
+ store: STORE,
source: Box::new(err),
}
})?,
@@ -193,7 +172,7 @@ impl AzureClient {
// and we want to use it in an infallible function
HeaderValue::from_str(&format!("Bearer {token}")).map_err(|err| {
crate::Error::Generic {
- store: "MicrosoftAzure",
+ store: STORE,
source: Box::new(err),
}
})?,
@@ -253,7 +232,7 @@ impl AzureClient {
pub async fn get_request(
&self,
path: &Path,
- range: Option<Range<usize>>,
+ options: GetOptions,
head: bool,
) -> Result<Response> {
let credential = self.get_credential().await?;
@@ -263,17 +242,14 @@ impl AzureClient {
false => Method::GET,
};
- let mut builder = self
+ let builder = self
.client
.request(method, url)
.header(CONTENT_LENGTH, HeaderValue::from_static("0"))
.body(Bytes::new());
- if let Some(range) = range {
- builder = builder.header(RANGE, format_http_range(range));
- }
-
let response = builder
+ .with_get_options(options)
.with_azure_authorization(&credential, &self.config.account)
.send_retry(&self.config.retry_config)
.await
@@ -338,8 +314,12 @@ impl AzureClient {
.with_azure_authorization(&credential, &self.config.account)
.send_retry(&self.config.retry_config)
.await
- .context(CopyRequestSnafu {
- path: from.as_ref(),
+ .map_err(|err| match err.status() {
+ Some(StatusCode::CONFLICT) => crate::Error::AlreadyExists {
+ source: Box::new(err),
+ path: to.to_string(),
+ },
+ _ => err.error(STORE, from.to_string()),
})?;
Ok(())
diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs
index c2cfdfe6a..6726241aa 100644
--- a/object_store/src/azure/mod.rs
+++ b/object_store/src/azure/mod.rs
@@ -31,8 +31,8 @@ use crate::client::token::TokenCache;
use crate::{
multipart::{CloudMultiPartUpload, CloudMultiPartUploadImpl, UploadPart},
path::Path,
- ClientOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result,
- RetryConfig,
+ ClientOptions, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta,
+ ObjectStore, Result, RetryConfig,
};
use async_trait::async_trait;
use base64::prelude::BASE64_STANDARD;
@@ -45,7 +45,6 @@ use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt, Snafu};
use std::fmt::{Debug, Formatter};
use std::io;
-use std::ops::Range;
use std::sync::Arc;
use std::{collections::BTreeSet, str::FromStr};
use tokio::io::AsyncWrite;
@@ -59,6 +58,8 @@ pub use credential::authority_hosts;
mod client;
mod credential;
+const STORE: &str = "MicrosoftAzure";
+
/// The well-known account used by Azurite and the legacy Azure Storage Emulator.
/// <https://docs.microsoft.com/azure/storage/common/storage-use-azurite#well-known-storage-account-and-key>
const EMULATOR_ACCOUNT: &str = "devstoreaccount1";
@@ -150,12 +151,11 @@ enum Error {
impl From<Error> for super::Error {
fn from(source: Error) -> Self {
match source {
- Error::UnknownConfigurationKey { key } => Self::UnknownConfigurationKey {
- store: "MicrosoftAzure",
- key,
- },
+ Error::UnknownConfigurationKey { key } => {
+ Self::UnknownConfigurationKey { store: STORE, key }
+ }
_ => Self::Generic {
- store: "MicrosoftAzure",
+ store: STORE,
source: Box::new(source),
},
}
@@ -209,12 +209,12 @@ impl ObjectStore for MicrosoftAzure {
Ok(())
}
- async fn get(&self, location: &Path) -> Result<GetResult> {
- let response = self.client.get_request(location, None, false).await?;
+ async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
+ let response = self.client.get_request(location, options, false).await?;
let stream = response
.bytes_stream()
.map_err(|source| crate::Error::Generic {
- store: "MicrosoftAzure",
+ store: STORE,
source: Box::new(source),
})
.boxed();
@@ -222,26 +222,13 @@ impl ObjectStore for MicrosoftAzure {
Ok(GetResult::Stream(stream))
}
- async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
- let bytes = self
- .client
- .get_request(location, Some(range), false)
- .await?
- .bytes()
- .await
- .map_err(|source| client::Error::GetResponseBody {
- source,
- path: location.to_string(),
- })?;
- Ok(bytes)
- }
-
async fn head(&self, location: &Path) -> Result<ObjectMeta> {
use reqwest::header::{CONTENT_LENGTH, ETAG, LAST_MODIFIED};
+ let options = GetOptions::default();
// Extract meta from headers
// https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob-properties
- let response = self.client.get_request(location, None, true).await?;
+ let response = self.client.get_request(location, options, true).await?;
let headers = response.headers();
let last_modified = headers
@@ -1103,8 +1090,9 @@ fn split_sas(sas: &str) -> Result<Vec<(String, String)>, Error> {
mod tests {
use super::*;
use crate::tests::{
- copy_if_not_exists, list_uses_directories_correctly, list_with_delimiter,
- put_get_delete_list, put_get_delete_list_opts, rename_and_copy, stream_get,
+ copy_if_not_exists, get_opts, list_uses_directories_correctly,
+ list_with_delimiter, put_get_delete_list, put_get_delete_list_opts,
+ rename_and_copy, stream_get,
};
use std::collections::HashMap;
use std::env;
@@ -1175,6 +1163,7 @@ mod tests {
async fn azure_blob_test() {
let integration = maybe_skip_integration!().build().unwrap();
put_get_delete_list_opts(&integration, false).await;
+ get_opts(&integration).await;
list_uses_directories_correctly(&integration).await;
list_with_delimiter(&integration).await;
rename_and_copy(&integration).await;
@@ -1203,6 +1192,7 @@ mod tests {
let integration = builder.build().unwrap();
put_get_delete_list(&integration).await;
+ get_opts(&integration).await;
list_uses_directories_correctly(&integration).await;
list_with_delimiter(&integration).await;
rename_and_copy(&integration).await;
diff --git a/object_store/src/chunked.rs b/object_store/src/chunked.rs
index aebefec61..c639d7e89 100644
--- a/object_store/src/chunked.rs
+++ b/object_store/src/chunked.rs
@@ -30,7 +30,7 @@ use tokio::io::AsyncWrite;
use crate::path::Path;
use crate::util::maybe_spawn_blocking;
-use crate::{GetResult, ListResult, ObjectMeta, ObjectStore};
+use crate::{GetOptions, GetResult, ListResult, ObjectMeta, ObjectStore};
use crate::{MultipartId, Result};
/// Wraps a [`ObjectStore`] and makes its get response return chunks
@@ -81,8 +81,8 @@ impl ObjectStore for ChunkedStore {
self.inner.abort_multipart(location, multipart_id).await
}
- async fn get(&self, location: &Path) -> Result<GetResult> {
- match self.inner.get(location).await? {
+ async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
+ match self.inner.get_opts(location, options).await? {
GetResult::File(std_file, ..) => {
let reader = BufReader::new(std_file);
let chunk_size = self.chunk_size;
@@ -245,6 +245,7 @@ mod tests {
let integration = ChunkedStore::new(Arc::clone(integration), 100);
put_get_delete_list(&integration).await;
+ get_opts(&integration).await;
list_uses_directories_correctly(&integration).await;
list_with_delimiter(&integration).await;
rename_and_copy(&integration).await;
diff --git a/object_store/src/client/mod.rs b/object_store/src/client/mod.rs
index ccf1b4a3b..be44a9f99 100644
--- a/object_store/src/client/mod.rs
+++ b/object_store/src/client/mod.rs
@@ -31,11 +31,12 @@ use std::str::FromStr;
use std::time::Duration;
use reqwest::header::{HeaderMap, HeaderValue};
-use reqwest::{Client, ClientBuilder, Proxy};
+use reqwest::{Client, ClientBuilder, Proxy, RequestBuilder};
use serde::{Deserialize, Serialize};
use crate::config::{fmt_duration, ConfigValue};
use crate::path::Path;
+use crate::GetOptions;
fn map_client_error(e: reqwest::Error) -> super::Error {
super::Error::Generic {
@@ -462,6 +463,40 @@ impl ClientOptions {
}
}
+pub trait GetOptionsExt {
+ fn with_get_options(self, options: GetOptions) -> Self;
+}
+
+impl GetOptionsExt for RequestBuilder {
+ fn with_get_options(mut self, options: GetOptions) -> Self {
+ use hyper::header::*;
+
+ if let Some(range) = options.range {
+ let range = format!("bytes={}-{}", range.start, range.end.saturating_sub(1));
+ self = self.header(RANGE, range);
+ }
+
+ if let Some(tag) = options.if_match {
+ self = self.header(IF_MATCH, tag);
+ }
+
+ if let Some(tag) = options.if_none_match {
+ self = self.header(IF_NONE_MATCH, tag);
+ }
+
+ const DATE_FORMAT: &str = "%a, %d %b %Y %H:%M:%S GMT";
+ if let Some(date) = options.if_unmodified_since {
+ self = self.header(IF_UNMODIFIED_SINCE, date.format(DATE_FORMAT).to_string());
+ }
+
+ if let Some(date) = options.if_modified_since {
+ self = self.header(IF_MODIFIED_SINCE, date.format(DATE_FORMAT).to_string());
+ }
+
+ self
+ }
+}
+
#[cfg(test)]
mod tests {
use super::*;
diff --git a/object_store/src/client/retry.rs b/object_store/src/client/retry.rs
index f9c2dd300..39a913142 100644
--- a/object_store/src/client/retry.rs
+++ b/object_store/src/client/retry.rs
@@ -32,6 +32,7 @@ pub struct Error {
retries: usize,
message: String,
source: Option<reqwest::Error>,
+ status: Option<StatusCode>,
}
impl std::fmt::Display for Error {
@@ -57,7 +58,28 @@ impl std::error::Error for Error {
impl Error {
/// Returns the status code associated with this error if any
pub fn status(&self) -> Option<StatusCode> {
- self.source.as_ref().and_then(|e| e.status())
+ self.status
+ }
+
+ pub fn error(self, store: &'static str, path: String) -> crate::Error {
+ match self.status {
+ Some(StatusCode::NOT_FOUND) => crate::Error::NotFound {
+ path,
+ source: Box::new(self),
+ },
+ Some(StatusCode::NOT_MODIFIED) => crate::Error::NotModified {
+ path,
+ source: Box::new(self),
+ },
+ Some(StatusCode::PRECONDITION_FAILED) => crate::Error::Precondition {
+ path,
+ source: Box::new(self),
+ },
+ _ => crate::Error::Generic {
+ store,
+ source: Box::new(self),
+ },
+ }
}
}
@@ -146,6 +168,14 @@ impl RetryExt for reqwest::RequestBuilder {
match s.send().await {
Ok(r) => match r.error_for_status_ref() {
Ok(_) if r.status().is_success() => return Ok(r),
+ Ok(r) if r.status() == StatusCode::NOT_MODIFIED => {
+ return Err(Error{
+ message: "not modified".to_string(),
+ retries,
+ status: Some(r.status()),
+ source: None,
+ })
+ }
Ok(r) => {
let is_bare_redirect = r.status().is_redirection() && !r.headers().contains_key(LOCATION);
let message = match is_bare_redirect {
@@ -157,6 +187,7 @@ impl RetryExt for reqwest::RequestBuilder {
return Err(Error{
message,
retries,
+ status: Some(r.status()),
source: None,
})
}
@@ -180,6 +211,7 @@ impl RetryExt for reqwest::RequestBuilder {
return Err(Error{
message,
retries,
+ status: Some(status),
source: Some(e),
})
@@ -209,7 +241,8 @@ impl RetryExt for reqwest::RequestBuilder {
return Err(Error{
retries,
message: "request error".to_string(),
- source: Some(e)
+ status: e.status(),
+ source: Some(e),
})
}
let sleep = backoff.next();
diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs
index 375b4d8f8..41a91fef8 100644
--- a/object_store/src/gcp/mod.rs
+++ b/object_store/src/gcp/mod.rs
@@ -31,7 +31,6 @@
//! week.
use std::collections::BTreeSet;
use std::io;
-use std::ops::Range;
use std::str::FromStr;
use std::sync::Arc;
@@ -40,7 +39,6 @@ use bytes::{Buf, Bytes};
use chrono::{DateTime, Utc};
use futures::{stream::BoxStream, StreamExt, TryStreamExt};
use percent_encoding::{percent_encode, NON_ALPHANUMERIC};
-use reqwest::header::RANGE;
use reqwest::{header, Client, Method, Response, StatusCode};
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt, Snafu};
@@ -49,14 +47,14 @@ use url::Url;
use crate::client::pagination::stream_paginated;
use crate::client::retry::RetryExt;
-use crate::client::ClientConfigKey;
+use crate::client::{ClientConfigKey, GetOptionsExt};
use crate::{
client::token::TokenCache,
multipart::{CloudMultiPartUpload, CloudMultiPartUploadImpl, UploadPart},
path::{Path, DELIMITER},
- util::{format_http_range, format_prefix},
- ClientOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result,
- RetryConfig,
+ util::format_prefix,
+ ClientOptions, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta,
+ ObjectStore, Result, RetryConfig,
};
use self::credential::{
@@ -66,6 +64,8 @@ use self::credential::{
mod credential;
+const STORE: &str = "GCS";
+
#[derive(Debug, Snafu)]
enum Error {
#[snafu(display("Got invalid XML response for {} {}: {}", method, url, source))]
@@ -100,15 +100,12 @@ enum Error {
path: String,
},
- #[snafu(display("Error performing copy request {}: {}", path, source))]
- CopyRequest {
+ #[snafu(display("Error performing put request {}: {}", path, source))]
+ PutRequest {
source: crate::client::retry::Error,
path: String,
},
- #[snafu(display("Error performing put request: {}", source))]
- PutRequest { source: crate::client::retry::Error },
-
#[snafu(display("Error getting put response body: {}", source))]
PutResponseBody { source: reqwest::Error },
@@ -129,12 +126,6 @@ enum Error {
#[snafu(display("GCP credential error: {}", source))]
Credential { source: credential::Error },
- #[snafu(display("Already exists: {}", path))]
- AlreadyExists {
- source: crate::client::retry::Error,
- path: String,
- },
-
#[snafu(display("Unable parse source url. Url: {}, Error: {}", url, source))]
UnableToParseUrl {
source: url::ParseError,
@@ -159,23 +150,12 @@ impl From<Error> for super::Error {
match err {
Error::GetRequest { source, path }
| Error::DeleteRequest { source, path }
- | Error::CopyRequest { source, path }
- if matches!(source.status(), Some(StatusCode::NOT_FOUND)) =>
- {
- Self::NotFound {
- path,
- source: Box::new(source),
- }
- }
- Error::AlreadyExists { source, path } => Self::AlreadyExists {
- source: Box::new(source),
- path,
- },
+ | Error::PutRequest { source, path } => source.error(STORE, path),
Error::UnknownConfigurationKey { key } => {
- Self::UnknownConfigurationKey { store: "GCS", key }
+ Self::UnknownConfigurationKey { store: STORE, key }
}
_ => Self::Generic {
- store: "GCS",
+ store: STORE,
source: Box::new(err),
},
}
@@ -280,26 +260,23 @@ impl GoogleCloudStorageClient {
async fn get_request(
&self,
path: &Path,
- range: Option<Range<usize>>,
+ options: GetOptions,
head: bool,
) -> Result<Response> {
let token = self.get_token().await?;
let url = self.object_url(path);
- let mut builder = self.client.request(Method::GET, url);
-
- if let Some(range) = range {
- builder = builder.header(RANGE, format_http_range(range));
- }
-
let alt = match head {
true => "json",
false => "media",
};
+ let builder = self.client.request(Method::GET, url);
+
let response = builder
.bearer_auth(token)
.query(&[("alt", alt)])
+ .with_get_options(options)
.send_retry(&self.retry_config)
.await
.context(GetRequestSnafu {
@@ -331,7 +308,9 @@ impl GoogleCloudStorageClient {
.body(payload)
.send_retry(&self.retry_config)
.await
- .context(PutRequestSnafu)?;
+ .context(PutRequestSnafu {
+ path: path.as_ref(),
+ })?;
Ok(())
}
@@ -355,7 +334,9 @@ impl GoogleCloudStorageClient {
.query(&[("uploads", "")])
.send_retry(&self.retry_config)
.await
- .context(PutRequestSnafu)?;
+ .context(PutRequestSnafu {
+ path: path.as_ref(),
+ })?;
let data = response.bytes().await.context(PutResponseBodySnafu)?;
let result: InitiateMultipartUploadResult = quick_xml::de::from_reader(
@@ -387,7 +368,7 @@ impl GoogleCloudStorageClient {
.query(&[("uploadId", multipart_id)])
.send_retry(&self.retry_config)
.await
- .context(PutRequestSnafu)?;
+ .context(PutRequestSnafu { path })?;
Ok(())
}
@@ -444,22 +425,12 @@ impl GoogleCloudStorageClient {
.header(header::CONTENT_LENGTH, 0)
.send_retry(&self.retry_config)
.await
- .map_err(|err| {
- if err
- .status()
- .map(|status| status == reqwest::StatusCode::PRECONDITION_FAILED)
- .unwrap_or_else(|| false)
- {
- Error::AlreadyExists {
- source: err,
- path: to.to_string(),
- }
- } else {
- Error::CopyRequest {
- source: err,
- path: from.to_string(),
- }
- }
+ .map_err(|err| match err.status() {
+ Some(StatusCode::PRECONDITION_FAILED) => crate::Error::AlreadyExists {
+ source: Box::new(err),
+ path: to.to_string(),
+ },
+ _ => err.error(STORE, from.to_string()),
})?;
Ok(())
@@ -667,12 +638,18 @@ impl ObjectStore for GoogleCloudStorage {
Ok(())
}
- async fn get(&self, location: &Path) -> Result<GetResult> {
- let response = self.client.get_request(location, None, false).await?;
+ async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
+ if options.if_modified_since.is_some() || options.if_unmodified_since.is_some() {
+ return Err(super::Error::NotSupported {
+ source: "ModifiedSince Preconditions not supported by GoogleCloudStorage JSON API".to_string().into(),
+ });
+ }
+
+ let response = self.client.get_request(location, options, false).await?;
let stream = response
.bytes_stream()
.map_err(|source| crate::Error::Generic {
- store: "GCS",
+ store: STORE,
source: Box::new(source),
})
.boxed();
@@ -680,18 +657,9 @@ impl ObjectStore for GoogleCloudStorage {
Ok(GetResult::Stream(stream))
}
- async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
- let response = self
- .client
- .get_request(location, Some(range), false)
- .await?;
- Ok(response.bytes().await.context(GetResponseBodySnafu {
- path: location.as_ref(),
- })?)
- }
-
async fn head(&self, location: &Path) -> Result<ObjectMeta> {
- let response = self.client.get_request(location, None, true).await?;
+ let options = GetOptions::default();
+ let response = self.client.get_request(location, options, true).await?;
let object = response.json().await.context(GetResponseBodySnafu {
path: location.as_ref(),
})?;
@@ -1224,13 +1192,7 @@ mod test {
use std::io::Write;
use tempfile::NamedTempFile;
- use crate::{
- tests::{
- copy_if_not_exists, get_nonexistent_object, list_uses_directories_correctly,
- list_with_delimiter, put_get_delete_list, rename_and_copy, stream_get,
- },
- Error as ObjectStoreError, ObjectStore,
- };
+ use crate::tests::*;
use super::*;
@@ -1299,6 +1261,8 @@ mod test {
// Fake GCS server does not yet implement XML Multipart uploads
// https://github.com/fsouza/fake-gcs-server/issues/852
stream_get(&integration).await;
+ // Fake GCS server doesn't currently honor preconditions
+ get_opts(&integration).await;
}
}
@@ -1311,7 +1275,7 @@ mod test {
let err = integration.get(&location).await.unwrap_err();
assert!(
- matches!(err, ObjectStoreError::NotFound { .. }),
+ matches!(err, crate::Error::NotFound { .. }),
"unexpected error type: {err}"
);
}
@@ -1330,7 +1294,7 @@ mod test {
.unwrap_err();
assert!(
- matches!(err, ObjectStoreError::NotFound { .. }),
+ matches!(err, crate::Error::NotFound { .. }),
"unexpected error type: {err}"
);
}
@@ -1343,7 +1307,7 @@ mod test {
let err = integration.delete(&location).await.unwrap_err();
assert!(
- matches!(err, ObjectStoreError::NotFound { .. }),
+ matches!(err, crate::Error::NotFound { .. }),
"unexpected error type: {err}"
);
}
@@ -1359,7 +1323,7 @@ mod test {
let err = integration.delete(&location).await.unwrap_err();
assert!(
- matches!(err, ObjectStoreError::NotFound { .. }),
+ matches!(err, crate::Error::NotFound { .. }),
"unexpected error type: {err}"
);
}
diff --git a/object_store/src/http/client.rs b/object_store/src/http/client.rs
index 5ef272180..4e58eb0b2 100644
--- a/object_store/src/http/client.rs
+++ b/object_store/src/http/client.rs
@@ -16,17 +16,17 @@
// under the License.
use crate::client::retry::{self, RetryConfig, RetryExt};
+use crate::client::GetOptionsExt;
use crate::path::{Path, DELIMITER};
-use crate::util::{deserialize_rfc1123, format_http_range};
-use crate::{ClientOptions, ObjectMeta, Result};
+use crate::util::deserialize_rfc1123;
+use crate::{ClientOptions, GetOptions, ObjectMeta, Result};
use bytes::{Buf, Bytes};
use chrono::{DateTime, Utc};
use percent_encoding::percent_decode_str;
-use reqwest::header::{CONTENT_TYPE, RANGE};
+use reqwest::header::CONTENT_TYPE;
use reqwest::{Method, Response, StatusCode};
use serde::Deserialize;
use snafu::{OptionExt, ResultExt, Snafu};
-use std::ops::Range;
use url::Url;
#[derive(Debug, Snafu)]
@@ -229,19 +229,12 @@ impl Client {
Ok(())
}
- pub async fn get(
- &self,
- location: &Path,
- range: Option<Range<usize>>,
- ) -> Result<Response> {
+ pub async fn get(&self, location: &Path, options: GetOptions) -> Result<Response> {
let url = self.path_url(location);
- let mut builder = self.client.get(url);
-
- if let Some(range) = range {
- builder = builder.header(RANGE, format_http_range(range));
- }
+ let builder = self.client.get(url);
builder
+ .with_get_options(options)
.send_retry(&self.retry_config)
.await
.map_err(|source| match source.status() {
diff --git a/object_store/src/http/mod.rs b/object_store/src/http/mod.rs
index c91faa235..bed19722c 100644
--- a/object_store/src/http/mod.rs
+++ b/object_store/src/http/mod.rs
@@ -31,8 +31,6 @@
//! [rfc2518]: https://datatracker.ietf.org/doc/html/rfc2518
//! [WebDAV]: https://en.wikipedia.org/wiki/WebDAV
-use std::ops::Range;
-
use async_trait::async_trait;
use bytes::Bytes;
use futures::stream::BoxStream;
@@ -45,8 +43,8 @@ use url::Url;
use crate::http::client::Client;
use crate::path::Path;
use crate::{
- ClientOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result,
- RetryConfig,
+ ClientOptions, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta,
+ ObjectStore, Result, RetryConfig,
};
mod client;
@@ -119,8 +117,8 @@ impl ObjectStore for HttpStore {
Err(super::Error::NotImplemented)
}
- async fn get(&self, location: &Path) -> Result<GetResult> {
- let response = self.client.get(location, None).await?;
+ async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
+ let response = self.client.get(location, options).await?;
let stream = response
.bytes_stream()
.map_err(|source| Error::Reqwest { source }.into())
@@ -129,17 +127,6 @@ impl ObjectStore for HttpStore {
Ok(GetResult::Stream(stream))
}
- async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
- let bytes = self
- .client
- .get(location, Some(range))
- .await?
- .bytes()
- .await
- .context(ReqwestSnafu)?;
- Ok(bytes)
- }
-
async fn head(&self, location: &Path) -> Result<ObjectMeta> {
let status = self.client.list(Some(location), "0").await?;
match status.response.len() {
diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index 2c93802ed..75f9ca7df 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -346,11 +346,24 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
}
/// Return the bytes that are stored at the specified location.
- async fn get(&self, location: &Path) -> Result<GetResult>;
+ async fn get(&self, location: &Path) -> Result<GetResult> {
+ self.get_opts(location, GetOptions::default()).await
+ }
+
+ /// Perform a get request with options
+ ///
+ /// Note: options.range will be ignored if [`GetResult::File`]
+ async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult>;
/// Return the bytes that are stored at the specified location
/// in the given byte range
- async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes>;
+ async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
+ let options = GetOptions {
+ range: Some(range),
+ ..Default::default()
+ };
+ self.get_opts(location, options).await?.bytes().await
+ }
/// Return the bytes that are stored at the specified location
/// in the given byte ranges
@@ -478,6 +491,10 @@ impl ObjectStore for Box<dyn ObjectStore> {
self.as_ref().get(location).await
}
+ async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
+ self.as_ref().get_opts(location, options).await
+ }
+
async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
self.as_ref().get_range(location, range).await
}
@@ -558,6 +575,66 @@ pub struct ObjectMeta {
pub e_tag: Option<String>,
}
+/// Options for a get request, such as range
+#[derive(Debug, Default)]
+pub struct GetOptions {
+ /// Request will succeed if the `ObjectMeta::e_tag` matches
+ /// otherwise returning [`Error::Precondition`]
+ ///
+ /// <https://datatracker.ietf.org/doc/html/rfc9110#name-if-match>
+ pub if_match: Option<String>,
+ /// Request will succeed if the `ObjectMeta::e_tag` does not match
+ /// otherwise returning [`Error::NotModified`]
+ ///
+ /// <https://datatracker.ietf.org/doc/html/rfc9110#section-13.1.2>
+ pub if_none_match: Option<String>,
+ /// Request will succeed if the object has been modified since
+ ///
+ /// <https://datatracker.ietf.org/doc/html/rfc9110#section-13.1.3>
+ pub if_modified_since: Option<DateTime<Utc>>,
+ /// Request will succeed if the object has not been modified since
+ /// otherwise returning [`Error::Precondition`]
+ ///
+ /// Some stores, such as S3, will only return `NotModified` for exact
+ /// timestamp matches, instead of for any timestamp greater than or equal.
+ ///
+ /// <https://datatracker.ietf.org/doc/html/rfc9110#section-13.1.4>
+ pub if_unmodified_since: Option<DateTime<Utc>>,
+ /// Request transfer of only the specified range of bytes
+ /// otherwise returning [`Error::NotModified`]
+ ///
+ /// <https://datatracker.ietf.org/doc/html/rfc9110#name-range>
+ pub range: Option<Range<usize>>,
+}
+
+impl GetOptions {
+ /// Returns an error if the modification conditions on this request are not satisfied
+ fn check_modified(
+ &self,
+ location: &Path,
+ last_modified: DateTime<Utc>,
+ ) -> Result<()> {
+ if let Some(date) = self.if_modified_since {
+ if last_modified <= date {
+ return Err(Error::NotModified {
+ path: location.to_string(),
+ source: format!("{} >= {}", date, last_modified).into(),
+ });
+ }
+ }
+
+ if let Some(date) = self.if_unmodified_since {
+ if last_modified > date {
+ return Err(Error::Precondition {
+ path: location.to_string(),
+ source: format!("{} < {}", date, last_modified).into(),
+ });
+ }
+ }
+ Ok(())
+ }
+}
+
/// Result for a get request
///
/// This special cases the case of a local file, as some systems may
@@ -702,6 +779,18 @@ pub enum Error {
source: Box<dyn std::error::Error + Send + Sync + 'static>,
},
+ #[snafu(display("Request precondition failure for path {}: {}", path, source))]
+ Precondition {
+ path: String,
+ source: Box<dyn std::error::Error + Send + Sync + 'static>,
+ },
+
+ #[snafu(display("Object at location {} not modified: {}", path, source))]
+ NotModified {
+ path: String,
+ source: Box<dyn std::error::Error + Send + Sync + 'static>,
+ },
+
#[snafu(display("Operation not yet implemented."))]
NotImplemented,
@@ -1025,6 +1114,85 @@ mod tests {
delete_fixtures(storage).await;
}
+ pub(crate) async fn get_opts(storage: &dyn ObjectStore) {
+ let path = Path::from("test");
+ storage.put(&path, "foo".into()).await.unwrap();
+ let meta = storage.head(&path).await.unwrap();
+
+ let options = GetOptions {
+ if_unmodified_since: Some(meta.last_modified),
+ ..GetOptions::default()
+ };
+ match storage.get_opts(&path, options).await {
+ Ok(_) | Err(Error::NotSupported { .. }) => {}
+ Err(e) => panic!("{e}"),
+ }
+
+ let options = GetOptions {
+ if_unmodified_since: Some(meta.last_modified + chrono::Duration::hours(10)),
+ ..GetOptions::default()
+ };
+ match storage.get_opts(&path, options).await {
+ Ok(_) | Err(Error::NotSupported { .. }) => {}
+ Err(e) => panic!("{e}"),
+ }
+
+ let options = GetOptions {
+ if_unmodified_since: Some(meta.last_modified - chrono::Duration::hours(10)),
+ ..GetOptions::default()
+ };
+ match storage.get_opts(&path, options).await {
+ Err(Error::Precondition { .. } | Error::NotSupported { .. }) => {}
+ d => panic!("{d:?}"),
+ }
+
+ let options = GetOptions {
+ if_modified_since: Some(meta.last_modified),
+ ..GetOptions::default()
+ };
+ match storage.get_opts(&path, options).await {
+ Err(Error::NotModified { .. } | Error::NotSupported { .. }) => {}
+ d => panic!("{d:?}"),
+ }
+
+ let options = GetOptions {
+ if_modified_since: Some(meta.last_modified - chrono::Duration::hours(10)),
+ ..GetOptions::default()
+ };
+ match storage.get_opts(&path, options).await {
+ Ok(_) | Err(Error::NotSupported { .. }) => {}
+ Err(e) => panic!("{e}"),
+ }
+
+ if let Some(tag) = meta.e_tag {
+ let options = GetOptions {
+ if_match: Some(tag.clone()),
+ ..GetOptions::default()
+ };
+ storage.get_opts(&path, options).await.unwrap();
+
+ let options = GetOptions {
+ if_match: Some("invalid".to_string()),
+ ..GetOptions::default()
+ };
+ let err = storage.get_opts(&path, options).await.unwrap_err();
+ assert!(matches!(err, Error::Precondition { .. }), "{err}");
+
+ let options = GetOptions {
+ if_none_match: Some(tag.clone()),
+ ..GetOptions::default()
+ };
+ let err = storage.get_opts(&path, options).await.unwrap_err();
+ assert!(matches!(err, Error::NotModified { .. }), "{err}");
+
+ let options = GetOptions {
+ if_none_match: Some("invalid".to_string()),
+ ..GetOptions::default()
+ };
+ storage.get_opts(&path, options).await.unwrap();
+ }
+ }
+
fn get_vec_of_bytes(chunk_length: usize, num_chunks: usize) -> Vec<Bytes> {
std::iter::repeat(Bytes::from_iter(std::iter::repeat(b'x').take(chunk_length)))
.take(num_chunks)
diff --git a/object_store/src/limit.rs b/object_store/src/limit.rs
index d0d9f73c5..e0091115d 100644
--- a/object_store/src/limit.rs
+++ b/object_store/src/limit.rs
@@ -18,8 +18,8 @@
//! An object store that limits the maximum concurrency of the wrapped implementation
use crate::{
- BoxStream, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Path, Result,
- StreamExt,
+ BoxStream, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore,
+ Path, Result, StreamExt,
};
use async_trait::async_trait;
use bytes::Bytes;
@@ -114,6 +114,16 @@ impl<T: ObjectStore> ObjectStore for LimitStore<T> {
}
}
+ async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
+ let permit = Arc::clone(&self.semaphore).acquire_owned().await.unwrap();
+ match self.inner.get_opts(location, options).await? {
+ r @ GetResult::File(_, _) => Ok(r),
+ GetResult::Stream(s) => {
+ Ok(GetResult::Stream(PermitWrapper::new(s, permit).boxed()))
+ }
+ }
+ }
+
async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
let _permit = self.semaphore.acquire().await.unwrap();
self.inner.get_range(location, range).await
@@ -251,10 +261,7 @@ impl<T: AsyncWrite + Unpin> AsyncWrite for PermitWrapper<T> {
mod tests {
use crate::limit::LimitStore;
use crate::memory::InMemory;
- use crate::tests::{
- list_uses_directories_correctly, list_with_delimiter, put_get_delete_list,
- rename_and_copy, stream_get,
- };
+ use crate::tests::*;
use crate::ObjectStore;
use std::time::Duration;
use tokio::time::timeout;
@@ -266,6 +273,7 @@ mod tests {
let integration = LimitStore::new(memory, max_requests);
put_get_delete_list(&integration).await;
+ get_opts(&integration).await;
list_uses_directories_correctly(&integration).await;
list_with_delimiter(&integration).await;
rename_and_copy(&integration).await;
diff --git a/object_store/src/local.rs b/object_store/src/local.rs
index b40f5a777..26a8bf336 100644
--- a/object_store/src/local.rs
+++ b/object_store/src/local.rs
@@ -19,7 +19,7 @@
use crate::{
maybe_spawn_blocking,
path::{absolute_path_to_url, Path},
- GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result,
+ GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result,
};
use async_trait::async_trait;
use bytes::Bytes;
@@ -56,7 +56,7 @@ pub(crate) enum Error {
},
#[snafu(display("Unable to access metadata for {}: {}", path, source))]
- UnableToAccessMetadata {
+ Metadata {
source: Box<dyn std::error::Error + Send + Sync + 'static>,
path: String,
},
@@ -360,10 +360,27 @@ impl ObjectStore for LocalFileSystem {
Err(super::Error::NotImplemented)
}
- async fn get(&self, location: &Path) -> Result<GetResult> {
- let path = self.config.path_to_filesystem(location)?;
+ async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
+ if options.if_match.is_some() || options.if_none_match.is_some() {
+ return Err(super::Error::NotSupported {
+ source: "ETags not supported by LocalFileSystem".to_string().into(),
+ });
+ }
+
+ let location = location.clone();
+ let path = self.config.path_to_filesystem(&location)?;
maybe_spawn_blocking(move || {
let file = open_file(&path)?;
+ if options.if_unmodified_since.is_some()
+ || options.if_modified_since.is_some()
+ {
+ let metadata = file.metadata().map_err(|e| Error::Metadata {
+ source: e.into(),
+ path: location.to_string(),
+ })?;
+ options.check_modified(&location, last_modified(&metadata))?;
+ }
+
Ok(GetResult::File(file, path))
})
.await
@@ -408,7 +425,7 @@ impl ObjectStore for LocalFileSystem {
source: e,
}
} else {
- Error::UnableToAccessMetadata {
+ Error::Metadata {
source: e.into(),
path: location.to_string(),
}
@@ -878,21 +895,22 @@ fn open_file(path: &PathBuf) -> Result<File> {
}
fn convert_entry(entry: DirEntry, location: Path) -> Result<ObjectMeta> {
- let metadata = entry
- .metadata()
- .map_err(|e| Error::UnableToAccessMetadata {
- source: e.into(),
- path: location.to_string(),
- })?;
+ let metadata = entry.metadata().map_err(|e| Error::Metadata {
+ source: e.into(),
+ path: location.to_string(),
+ })?;
convert_metadata(metadata, location)
}
-fn convert_metadata(metadata: std::fs::Metadata, location: Path) -> Result<ObjectMeta> {
- let last_modified: DateTime<Utc> = metadata
+fn last_modified(metadata: &std::fs::Metadata) -> DateTime<Utc> {
+ metadata
.modified()
.expect("Modified file time should be supported on this platform")
- .into();
+ .into()
+}
+fn convert_metadata(metadata: std::fs::Metadata, location: Path) -> Result<ObjectMeta> {
+ let last_modified = last_modified(&metadata);
let size = usize::try_from(metadata.len()).context(FileSizeOverflowedUsizeSnafu {
path: location.as_ref(),
})?;
@@ -956,13 +974,7 @@ fn convert_walkdir_result(
mod tests {
use super::*;
use crate::test_util::flatten_list_stream;
- use crate::{
- tests::{
- copy_if_not_exists, get_nonexistent_object, list_uses_directories_correctly,
- list_with_delimiter, put_get_delete_list, rename_and_copy, stream_get,
- },
- Error as ObjectStoreError, ObjectStore,
- };
+ use crate::tests::*;
use futures::TryStreamExt;
use tempfile::{NamedTempFile, TempDir};
use tokio::io::AsyncWriteExt;
@@ -973,6 +985,7 @@ mod tests {
let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
put_get_delete_list(&integration).await;
+ get_opts(&integration).await;
list_uses_directories_correctly(&integration).await;
list_with_delimiter(&integration).await;
rename_and_copy(&integration).await;
@@ -1085,7 +1098,7 @@ mod tests {
let err = get_nonexistent_object(&integration, Some(location))
.await
.unwrap_err();
- if let ObjectStoreError::NotFound { path, source } = err {
+ if let crate::Error::NotFound { path, source } = err {
let source_variant = source.downcast_ref::<std::io::Error>();
assert!(
matches!(source_variant, Some(std::io::Error { .. }),),
diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs
index b01ffbb02..82d485997 100644
--- a/object_store/src/memory.rs
+++ b/object_store/src/memory.rs
@@ -16,8 +16,8 @@
// under the License.
//! An in-memory object store implementation
-use crate::MultipartId;
use crate::{path::Path, GetResult, ListResult, ObjectMeta, ObjectStore, Result};
+use crate::{GetOptions, MultipartId};
use async_trait::async_trait;
use bytes::Bytes;
use chrono::{DateTime, Utc};
@@ -128,12 +128,17 @@ impl ObjectStore for InMemory {
}))
}
- async fn get(&self, location: &Path) -> Result<GetResult> {
- let data = self.entry(location).await?;
+ async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
+ if options.if_match.is_some() || options.if_none_match.is_some() {
+ return Err(super::Error::NotSupported {
+ source: "ETags not supported by InMemory".to_string().into(),
+ });
+ }
+ let (data, last_modified) = self.entry(location).await?;
+ options.check_modified(location, last_modified)?;
- Ok(GetResult::Stream(
- futures::stream::once(async move { Ok(data.0) }).boxed(),
- ))
+ let stream = futures::stream::once(futures::future::ready(Ok(data)));
+ Ok(GetResult::Stream(stream.boxed()))
}
async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
@@ -391,19 +396,14 @@ mod tests {
use super::*;
- use crate::{
- tests::{
- copy_if_not_exists, get_nonexistent_object, list_uses_directories_correctly,
- list_with_delimiter, put_get_delete_list, rename_and_copy, stream_get,
- },
- Error as ObjectStoreError, ObjectStore,
- };
+ use crate::tests::*;
#[tokio::test]
async fn in_memory_test() {
let integration = InMemory::new();
put_get_delete_list(&integration).await;
+ get_opts(&integration).await;
list_uses_directories_correctly(&integration).await;
list_with_delimiter(&integration).await;
rename_and_copy(&integration).await;
@@ -443,7 +443,7 @@ mod tests {
let err = get_nonexistent_object(&integration, Some(location))
.await
.unwrap_err();
- if let ObjectStoreError::NotFound { path, source } = err {
+ if let crate::Error::NotFound { path, source } = err {
let source_variant = source.downcast_ref::<Error>();
assert!(
matches!(source_variant, Some(Error::NoDataInMemory { .. }),),
diff --git a/object_store/src/prefix.rs b/object_store/src/prefix.rs
index 94836d33c..ffe509411 100644
--- a/object_store/src/prefix.rs
+++ b/object_store/src/prefix.rs
@@ -22,7 +22,9 @@ use std::ops::Range;
use tokio::io::AsyncWrite;
use crate::path::Path;
-use crate::{GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result};
+use crate::{
+ GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result,
+};
#[doc(hidden)]
#[deprecated(note = "Use PrefixStore")]
@@ -117,6 +119,15 @@ impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
self.inner.get_range(&full_path, range).await
}
+ async fn get_opts(
+ &self,
+ location: &Path,
+ options: GetOptions,
+ ) -> Result<GetResult> {
+ let full_path = self.full_path(location);
+ self.inner.get_opts(&full_path, options).await
+ }
+
async fn get_ranges(
&self,
location: &Path,
@@ -206,10 +217,7 @@ mod tests {
use super::*;
use crate::local::LocalFileSystem;
use crate::test_util::flatten_list_stream;
- use crate::tests::{
- copy_if_not_exists, list_uses_directories_correctly, list_with_delimiter,
- put_get_delete_list, rename_and_copy, stream_get,
- };
+ use crate::tests::*;
use tempfile::TempDir;
@@ -220,6 +228,7 @@ mod tests {
let integration = PrefixStore::new(inner, "prefix");
put_get_delete_list(&integration).await;
+ get_opts(&integration).await;
list_uses_directories_correctly(&integration).await;
list_with_delimiter(&integration).await;
rename_and_copy(&integration).await;
diff --git a/object_store/src/throttle.rs b/object_store/src/throttle.rs
index e51303114..fb90afcec 100644
--- a/object_store/src/throttle.rs
+++ b/object_store/src/throttle.rs
@@ -20,8 +20,8 @@ use parking_lot::Mutex;
use std::ops::Range;
use std::{convert::TryInto, sync::Arc};
-use crate::MultipartId;
use crate::{path::Path, GetResult, ListResult, ObjectMeta, ObjectStore, Result};
+use crate::{GetOptions, MultipartId};
use async_trait::async_trait;
use bytes::Bytes;
use futures::{stream::BoxStream, FutureExt, StreamExt};
@@ -179,17 +179,18 @@ impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {
// need to copy to avoid moving / referencing `self`
let wait_get_per_byte = self.config().wait_get_per_byte;
- self.inner.get(location).await.map(|result| {
- let s = match result {
- GetResult::Stream(s) => s,
- GetResult::File(_, _) => unimplemented!(),
- };
+ let result = self.inner.get(location).await?;
+ Ok(throttle_get(result, wait_get_per_byte))
+ }
- GetResult::Stream(throttle_stream(s, move |bytes| {
- let bytes_len: u32 = usize_to_u32_saturate(bytes.len());
- wait_get_per_byte * bytes_len
- }))
- })
+ async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
+ sleep(self.config().wait_get_per_call).await;
+
+ // need to copy to avoid moving / referencing `self`
+ let wait_get_per_byte = self.config().wait_get_per_byte;
+
+ let result = self.inner.get_opts(location, options).await?;
+ Ok(throttle_get(result, wait_get_per_byte))
}
async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
@@ -299,6 +300,18 @@ fn usize_to_u32_saturate(x: usize) -> u32 {
x.try_into().unwrap_or(u32::MAX)
}
+fn throttle_get(result: GetResult, wait_get_per_byte: Duration) -> GetResult {
+ let s = match result {
+ GetResult::Stream(s) => s,
+ GetResult::File(_, _) => unimplemented!(),
+ };
+
+ GetResult::Stream(throttle_stream(s, move |bytes| {
+ let bytes_len: u32 = usize_to_u32_saturate(bytes.len());
+ wait_get_per_byte * bytes_len
+ }))
+}
+
fn throttle_stream<T: Send + 'static, E: Send + 'static, F>(
stream: BoxStream<'_, Result<T, E>>,
delay: F,
@@ -317,13 +330,7 @@ where
#[cfg(test)]
mod tests {
use super::*;
- use crate::{
- memory::InMemory,
- tests::{
- copy_if_not_exists, list_uses_directories_correctly, list_with_delimiter,
- put_get_delete_list, rename_and_copy,
- },
- };
+ use crate::{memory::InMemory, tests::*};
use bytes::Bytes;
use futures::TryStreamExt;
use tokio::time::Duration;
diff --git a/object_store/src/util.rs b/object_store/src/util.rs
index e5c701dd8..ba4c68345 100644
--- a/object_store/src/util.rs
+++ b/object_store/src/util.rs
@@ -44,13 +44,6 @@ pub fn format_prefix(prefix: Option<&crate::path::Path>) -> Option<String> {
.map(|p| format!("{}{}", p.as_ref(), crate::path::DELIMITER))
}
-/// Returns a formatted HTTP range header as per
-/// <https://httpwg.org/specs/rfc7233.html#header.range>
-#[cfg(any(feature = "aws", feature = "gcp", feature = "azure", feature = "http"))]
-pub fn format_http_range(range: std::ops::Range<usize>) -> String {
- format!("bytes={}-{}", range.start, range.end.saturating_sub(1))
-}
-
#[cfg(any(feature = "aws", feature = "azure"))]
pub(crate) fn hmac_sha256(
secret: impl AsRef<[u8]>,