You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/08/27 16:54:27 UTC
[arrow-rs] branch master updated: ObjectStore cleanup (#2587) (#2590)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 86446ea88 ObjectStore cleanup (#2587) (#2590)
86446ea88 is described below
commit 86446ea88b6fc3e11b47aeedb9eb24f4d69b2f13
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Sat Aug 27 17:54:22 2022 +0100
ObjectStore cleanup (#2587) (#2590)
* ObjectStore cleanup (#2587)
* Fix CI
---
.github/workflows/object_store.yml | 19 ++++-
.github/workflows/parquet.yml | 2 +-
object_store/Cargo.toml | 4 +-
object_store/src/azure/client.rs | 13 +--
object_store/src/azure/credential.rs | 99 +++++++++++++++++++++-
object_store/src/azure/mod.rs | 16 ++--
object_store/src/client/mod.rs | 2 -
object_store/src/gcp.rs | 15 +++-
.../src/{client/oauth.rs => gcp/credential.rs} | 85 +------------------
object_store/src/lib.rs | 4 -
10 files changed, 141 insertions(+), 118 deletions(-)
diff --git a/.github/workflows/object_store.yml b/.github/workflows/object_store.yml
index 8e4b80d94..118dbd932 100644
--- a/.github/workflows/object_store.yml
+++ b/.github/workflows/object_store.yml
@@ -42,9 +42,22 @@ jobs:
rustup toolchain install stable
rustup default stable
rustup component add clippy
- - name: Run clippy
- run: |
- cargo clippy -p object_store --all-features --all-targets -- -D warnings
+ # Run different tests for the library on its own as well as
+ # all targets to ensure that it still works in the absence of
+ # features that might be enabled by dev-dependencies of other
+ # targets.
+ - name: Run clippy with default features
+ run: cargo clippy -p object_store -- -D warnings
+ - name: Run clippy with aws feature
+ run: cargo clippy -p object_store --features aws -- -D warnings
+ - name: Run clippy with gcp feature
+ run: cargo clippy -p object_store --features gcp -- -D warnings
+ - name: Run clippy with azure feature
+ run: cargo clippy -p object_store --features azure -- -D warnings
+ - name: Run clippy with all features
+ run: cargo clippy -p object_store --all-features -- -D warnings
+ - name: Run clippy with all features and all targets
+ run: cargo clippy -p object_store --all-features --all-targets -- -D warnings
# test the crate
linux-test:
diff --git a/.github/workflows/parquet.yml b/.github/workflows/parquet.yml
index e3f667510..42cb06bb0 100644
--- a/.github/workflows/parquet.yml
+++ b/.github/workflows/parquet.yml
@@ -58,7 +58,7 @@ jobs:
cargo test -p parquet --all-features
- # test compilaton
+ # test compilation
linux-features:
name: Check Compilation
runs-on: ubuntu-latest
diff --git a/object_store/Cargo.toml b/object_store/Cargo.toml
index 2be233c83..ccfe12393 100644
--- a/object_store/Cargo.toml
+++ b/object_store/Cargo.toml
@@ -53,9 +53,9 @@ ring = { version = "0.16", default-features = false, features = ["std"], optiona
rustls-pemfile = { version = "1.0", default-features = false, optional = true }
[features]
-cloud = ["serde", "serde_json", "quick-xml", "reqwest", "reqwest/json", "reqwest/stream", "chrono/serde", "rustls-pemfile", "base64", "rand", "ring"]
+cloud = ["serde", "serde_json", "quick-xml", "reqwest", "reqwest/json", "reqwest/stream", "chrono/serde", "base64", "rand", "ring"]
azure = ["cloud"]
-gcp = ["cloud"]
+gcp = ["cloud", "rustls-pemfile"]
aws = ["cloud"]
[dev-dependencies] # In alphabetical order
diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs
index 722f6768a..9f87a888c 100644
--- a/object_store/src/azure/client.rs
+++ b/object_store/src/azure/client.rs
@@ -75,20 +75,13 @@ pub(crate) enum Error {
#[snafu(display("Error getting list response body: {}", source))]
ListResponseBody { source: reqwest::Error },
- #[snafu(display("Error performing create multipart request: {}", source))]
- CreateMultipartRequest { source: reqwest::Error },
-
- #[snafu(display("Error performing complete multipart request: {}", source))]
- CompleteMultipartRequest { source: reqwest::Error },
-
#[snafu(display("Got invalid list response: {}", source))]
InvalidListResponse { source: quick_xml::de::DeError },
- #[snafu(display("Got invalid multipart response: {}", source))]
- InvalidMultipartResponse { source: quick_xml::de::DeError },
-
#[snafu(display("Error authorizing request: {}", source))]
- Authorization { source: crate::client::oauth::Error },
+ Authorization {
+ source: crate::azure::credential::Error,
+ },
}
impl From<Error> for crate::Error {
diff --git a/object_store/src/azure/credential.rs b/object_store/src/azure/credential.rs
index 9357e8089..721fcaea4 100644
--- a/object_store/src/azure/credential.rs
+++ b/object_store/src/azure/credential.rs
@@ -15,19 +15,24 @@
// specific language governing permissions and limitations
// under the License.
-use crate::client::oauth::ClientSecretOAuthProvider;
+use crate::client::retry::RetryExt;
+use crate::client::token::{TemporaryToken, TokenCache};
use crate::util::hmac_sha256;
+use crate::RetryConfig;
use chrono::Utc;
+use reqwest::header::ACCEPT;
use reqwest::{
header::{
HeaderMap, HeaderName, HeaderValue, AUTHORIZATION, CONTENT_ENCODING,
CONTENT_LANGUAGE, CONTENT_LENGTH, CONTENT_TYPE, DATE, IF_MATCH,
IF_MODIFIED_SINCE, IF_NONE_MATCH, IF_UNMODIFIED_SINCE, RANGE,
},
- Method, RequestBuilder,
+ Client, Method, RequestBuilder,
};
+use snafu::{ResultExt, Snafu};
use std::borrow::Cow;
use std::str;
+use std::time::{Duration, Instant};
use url::Url;
static AZURE_VERSION: HeaderValue = HeaderValue::from_static("2021-08-06");
@@ -38,6 +43,18 @@ pub(crate) static DELETE_SNAPSHOTS: HeaderName =
pub(crate) static COPY_SOURCE: HeaderName = HeaderName::from_static("x-ms-copy-source");
static CONTENT_MD5: HeaderName = HeaderName::from_static("content-md5");
pub(crate) static RFC1123_FMT: &str = "%a, %d %h %Y %T GMT";
+const CONTENT_TYPE_JSON: &str = "application/json";
+
+#[derive(Debug, Snafu)]
+pub enum Error {
+ #[snafu(display("Error performing token request: {}", source))]
+ TokenRequest { source: crate::client::retry::Error },
+
+ #[snafu(display("Error getting token response body: {}", source))]
+ TokenResponseBody { source: reqwest::Error },
+}
+
+pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Provides credentials for use when signing requests
#[derive(Debug)]
@@ -253,3 +270,81 @@ fn lexy_sort<'a>(
values.sort_unstable();
values
}
+
+#[derive(serde::Deserialize, Debug)]
+struct TokenResponse {
+ access_token: String,
+ expires_in: u64,
+}
+
+/// Encapsulates the logic to perform an OAuth token challenge
+#[derive(Debug)]
+pub struct ClientSecretOAuthProvider {
+ scope: String,
+ token_url: String,
+ client_id: String,
+ client_secret: String,
+ cache: TokenCache<String>,
+}
+
+impl ClientSecretOAuthProvider {
+ /// Create a new [`ClientSecretOAuthProvider`] for an azure backed store
+ pub fn new(
+ client_id: String,
+ client_secret: String,
+ tenant_id: String,
+ authority_host: Option<String>,
+ ) -> Self {
+ let authority_host = authority_host
+ .unwrap_or_else(|| authority_hosts::AZURE_PUBLIC_CLOUD.to_owned());
+
+ Self {
+ scope: "https://storage.azure.com/.default".to_owned(),
+ token_url: format!("{}/{}/oauth2/v2.0/token", authority_host, tenant_id),
+ client_id,
+ client_secret,
+ cache: TokenCache::default(),
+ }
+ }
+
+ /// Fetch a token
+ pub async fn fetch_token(
+ &self,
+ client: &Client,
+ retry: &RetryConfig,
+ ) -> Result<String> {
+ self.cache
+ .get_or_insert_with(|| self.fetch_token_inner(client, retry))
+ .await
+ }
+
+ /// Fetch a fresh token
+ async fn fetch_token_inner(
+ &self,
+ client: &Client,
+ retry: &RetryConfig,
+ ) -> Result<TemporaryToken<String>> {
+ let response: TokenResponse = client
+ .request(Method::POST, &self.token_url)
+ .header(ACCEPT, HeaderValue::from_static(CONTENT_TYPE_JSON))
+ .form(&[
+ ("client_id", self.client_id.as_str()),
+ ("client_secret", self.client_secret.as_str()),
+ ("scope", self.scope.as_str()),
+ ("grant_type", "client_credentials"),
+ ])
+ .send_retry(retry)
+ .await
+ .context(TokenRequestSnafu)?
+ .json()
+ .await
+ .context(TokenResponseBodySnafu)?;
+
+ let token = TemporaryToken {
+ token: response.access_token,
+ expiry: Instant::now() + Duration::from_secs(response.expires_in),
+ };
+
+ Ok(token)
+ }
+}
diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs
index c659e1f80..dd1cde9c7 100644
--- a/object_store/src/azure/mod.rs
+++ b/object_store/src/azure/mod.rs
@@ -110,6 +110,9 @@ enum Error {
#[snafu(display("At least one authorization option must be specified"))]
MissingCredentials {},
+
+ #[snafu(display("Azure credential error: {}", source), context(false))]
+ Credential { source: credential::Error },
}
impl From<Error> for super::Error {
@@ -539,13 +542,12 @@ impl MicrosoftAzureBuilder {
} else if let (Some(client_id), Some(client_secret), Some(tenant_id)) =
(client_id, client_secret, tenant_id)
{
- let client_credential =
- crate::client::oauth::ClientSecretOAuthProvider::new_azure(
- client_id,
- client_secret,
- tenant_id,
- authority_host,
- );
+ let client_credential = credential::ClientSecretOAuthProvider::new(
+ client_id,
+ client_secret,
+ tenant_id,
+ authority_host,
+ );
Ok(credential::CredentialProvider::ClientSecret(
client_credential,
))
diff --git a/object_store/src/client/mod.rs b/object_store/src/client/mod.rs
index 10e8d9196..e6de3e929 100644
--- a/object_store/src/client/mod.rs
+++ b/object_store/src/client/mod.rs
@@ -18,8 +18,6 @@
//! Generic utilities reqwest based ObjectStore implementations
pub mod backoff;
-#[cfg(any(feature = "gcp", feature = "azure"))]
-pub mod oauth;
pub mod pagination;
pub mod retry;
pub mod token;
diff --git a/object_store/src/gcp.rs b/object_store/src/gcp.rs
index e9c7d0249..65adf9128 100644
--- a/object_store/src/gcp.rs
+++ b/object_store/src/gcp.rs
@@ -48,13 +48,17 @@ use tokio::io::AsyncWrite;
use crate::client::pagination::stream_paginated;
use crate::client::retry::RetryExt;
use crate::{
- client::{oauth::OAuthProvider, token::TokenCache},
+ client::token::TokenCache,
multipart::{CloudMultiPartUpload, CloudMultiPartUploadImpl, UploadPart},
path::{Path, DELIMITER},
util::{format_http_range, format_prefix},
GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result, RetryConfig,
};
+use credential::OAuthProvider;
+
+mod credential;
+
#[derive(Debug, Snafu)]
enum Error {
#[snafu(display("Unable to open service account file: {}", source))]
@@ -115,6 +119,9 @@ enum Error {
#[snafu(display("Missing service account path"))]
MissingServiceAccountPath,
+
+ #[snafu(display("GCP credential error: {}", source))]
+ Credential { source: credential::Error },
}
impl From<Error> for super::Error {
@@ -240,7 +247,8 @@ impl GoogleCloudStorageClient {
.get_or_insert_with(|| {
oauth_provider.fetch_token(&self.client, &self.retry_config)
})
- .await?)
+ .await
+ .context(CredentialSnafu)?)
} else {
Ok("".to_owned())
}
@@ -818,7 +826,8 @@ impl GoogleCloudStorageBuilder {
audience,
)
})
- .transpose()?;
+ .transpose()
+ .context(CredentialSnafu)?;
let encoded_bucket_name =
percent_encode(bucket_name.as_bytes(), NON_ALPHANUMERIC).to_string();
diff --git a/object_store/src/client/oauth.rs b/object_store/src/gcp/credential.rs
similarity index 72%
rename from object_store/src/client/oauth.rs
rename to object_store/src/gcp/credential.rs
index 6b3acea10..5b8cdb848 100644
--- a/object_store/src/client/oauth.rs
+++ b/object_store/src/gcp/credential.rs
@@ -16,17 +16,13 @@
// under the License.
use crate::client::retry::RetryExt;
-use crate::client::token::{TemporaryToken, TokenCache};
+use crate::client::token::TemporaryToken;
use crate::RetryConfig;
-use reqwest::header::{HeaderMap, HeaderValue, ACCEPT};
use reqwest::{Client, Method};
use ring::signature::RsaKeyPair;
use snafu::{ResultExt, Snafu};
use std::time::{Duration, Instant};
-const CONTENT_TYPE_JSON: &str = "application/json";
-const AZURE_STORAGE_TOKEN_SCOPE: &str = "https://storage.azure.com/.default";
-
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("No RSA key found in pem file"))]
@@ -224,82 +220,3 @@ fn b64_encode_obj<T: serde::Serialize>(obj: &T) -> Result<String> {
let string = serde_json::to_string(obj).context(EncodeSnafu)?;
Ok(base64::encode_config(string, base64::URL_SAFE_NO_PAD))
}
-
-/// Encapsulates the logic to perform an OAuth token challenge
-#[derive(Debug)]
-pub struct ClientSecretOAuthProvider {
- scope: String,
- token_url: String,
- client_id: String,
- client_secret: String,
- cache: TokenCache<String>,
-}
-
-impl ClientSecretOAuthProvider {
- /// Create a new [`ClientSecretOAuthProvider`] for an azure backed store
- pub fn new_azure(
- client_id: String,
- client_secret: String,
- tenant_id: String,
- authority_host: Option<String>,
- ) -> Self {
- let authority_host = authority_host.unwrap_or_else(|| {
- crate::azure::authority_hosts::AZURE_PUBLIC_CLOUD.to_owned()
- });
-
- Self {
- scope: AZURE_STORAGE_TOKEN_SCOPE.to_owned(),
- token_url: format!("{}/{}/oauth2/v2.0/token", authority_host, tenant_id),
- client_id,
- client_secret,
- cache: TokenCache::default(),
- }
- }
-
- /// Fetch a token
- pub async fn fetch_token(
- &self,
- client: &Client,
- retry: &RetryConfig,
- ) -> Result<String> {
- self.cache
- .get_or_insert_with(|| self.fetch_token_inner(client, retry))
- .await
- }
-
- /// Fetch a fresh token
- async fn fetch_token_inner(
- &self,
- client: &Client,
- retry: &RetryConfig,
- ) -> Result<TemporaryToken<String>> {
- let mut headers = HeaderMap::new();
- headers.append(ACCEPT, HeaderValue::from_static(CONTENT_TYPE_JSON));
-
- let mut params = std::collections::HashMap::new();
- params.insert("client_id", self.client_id.as_str());
- params.insert("client_secret", self.client_secret.as_str());
- params.insert("scope", self.scope.as_str());
- params.insert("grant_type", "client_credentials");
-
- let response: TokenResponse = client
- .request(Method::POST, &self.token_url)
- .headers(headers)
- .form(¶ms)
- .send_retry(retry)
- .await
- .context(TokenRequestSnafu)?
- .error_for_status()
- .context(TokenResponseBodySnafu)?
- .json()
- .await
- .context(TokenResponseBodySnafu)?;
-
- let token = TemporaryToken {
- token: response.access_token,
- expiry: Instant::now() + Duration::from_secs(response.expires_in),
- };
-
- Ok(token)
- }
-}
diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index 5811eba1a..9ed9db9e9 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -465,10 +465,6 @@ pub enum Error {
#[snafu(display("Operation not yet implemented."))]
NotImplemented,
-
- #[cfg(feature = "gcp")]
- #[snafu(display("OAuth error: {}", source), context(false))]
- OAuth { source: client::oauth::Error },
}
impl From<Error> for std::io::Error {