You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/08/27 16:54:27 UTC

[arrow-rs] branch master updated: ObjectStore cleanup (#2587) (#2590)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 86446ea88 ObjectStore cleanup (#2587) (#2590)
86446ea88 is described below

commit 86446ea88b6fc3e11b47aeedb9eb24f4d69b2f13
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Sat Aug 27 17:54:22 2022 +0100

    ObjectStore cleanup (#2587) (#2590)
    
    * ObjectStore cleanup (#2587)
    
    * Fix CI
---
 .github/workflows/object_store.yml                 | 19 ++++-
 .github/workflows/parquet.yml                      |  2 +-
 object_store/Cargo.toml                            |  4 +-
 object_store/src/azure/client.rs                   | 13 +--
 object_store/src/azure/credential.rs               | 99 +++++++++++++++++++++-
 object_store/src/azure/mod.rs                      | 16 ++--
 object_store/src/client/mod.rs                     |  2 -
 object_store/src/gcp.rs                            | 15 +++-
 .../src/{client/oauth.rs => gcp/credential.rs}     | 85 +------------------
 object_store/src/lib.rs                            |  4 -
 10 files changed, 141 insertions(+), 118 deletions(-)

diff --git a/.github/workflows/object_store.yml b/.github/workflows/object_store.yml
index 8e4b80d94..118dbd932 100644
--- a/.github/workflows/object_store.yml
+++ b/.github/workflows/object_store.yml
@@ -42,9 +42,22 @@ jobs:
           rustup toolchain install stable
           rustup default stable
           rustup component add clippy
-      - name: Run clippy
-        run: |
-          cargo clippy -p object_store --all-features --all-targets -- -D warnings
+      # Run different tests for the library on its own as well as
+      # all targets to ensure that it still works in the absence of
+      # features that might be enabled by dev-dependencies of other
+      # targets.
+      - name: Run clippy with default features
+        run: cargo clippy -p object_store -- -D warnings
+      - name: Run clippy with aws feature
+        run: cargo clippy -p object_store --features aws -- -D warnings
+      - name: Run clippy with gcp feature
+        run: cargo clippy -p object_store --features gcp -- -D warnings
+      - name: Run clippy with azure feature
+        run: cargo clippy -p object_store --features azure -- -D warnings
+      - name: Run clippy with all features
+        run: cargo clippy -p object_store --all-features -- -D warnings
+      - name: Run clippy with all features and all targets
+        run: cargo clippy -p object_store --all-features --all-targets -- -D warnings
 
   # test the crate
   linux-test:
diff --git a/.github/workflows/parquet.yml b/.github/workflows/parquet.yml
index e3f667510..42cb06bb0 100644
--- a/.github/workflows/parquet.yml
+++ b/.github/workflows/parquet.yml
@@ -58,7 +58,7 @@ jobs:
           cargo test -p parquet --all-features
 
 
-  # test compilaton
+  # test compilation
   linux-features:
     name: Check Compilation
     runs-on: ubuntu-latest
diff --git a/object_store/Cargo.toml b/object_store/Cargo.toml
index 2be233c83..ccfe12393 100644
--- a/object_store/Cargo.toml
+++ b/object_store/Cargo.toml
@@ -53,9 +53,9 @@ ring = { version = "0.16", default-features = false, features = ["std"], optiona
 rustls-pemfile = { version = "1.0", default-features = false, optional = true }
 
 [features]
-cloud = ["serde", "serde_json", "quick-xml", "reqwest", "reqwest/json", "reqwest/stream", "chrono/serde", "rustls-pemfile", "base64", "rand", "ring"]
+cloud = ["serde", "serde_json", "quick-xml", "reqwest", "reqwest/json", "reqwest/stream", "chrono/serde", "base64", "rand", "ring"]
 azure = ["cloud"]
-gcp = ["cloud"]
+gcp = ["cloud", "rustls-pemfile"]
 aws = ["cloud"]
 
 [dev-dependencies] # In alphabetical order
diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs
index 722f6768a..9f87a888c 100644
--- a/object_store/src/azure/client.rs
+++ b/object_store/src/azure/client.rs
@@ -75,20 +75,13 @@ pub(crate) enum Error {
     #[snafu(display("Error getting list response body: {}", source))]
     ListResponseBody { source: reqwest::Error },
 
-    #[snafu(display("Error performing create multipart request: {}", source))]
-    CreateMultipartRequest { source: reqwest::Error },
-
-    #[snafu(display("Error performing complete multipart request: {}", source))]
-    CompleteMultipartRequest { source: reqwest::Error },
-
     #[snafu(display("Got invalid list response: {}", source))]
     InvalidListResponse { source: quick_xml::de::DeError },
 
-    #[snafu(display("Got invalid multipart response: {}", source))]
-    InvalidMultipartResponse { source: quick_xml::de::DeError },
-
     #[snafu(display("Error authorizing request: {}", source))]
-    Authorization { source: crate::client::oauth::Error },
+    Authorization {
+        source: crate::azure::credential::Error,
+    },
 }
 
 impl From<Error> for crate::Error {
diff --git a/object_store/src/azure/credential.rs b/object_store/src/azure/credential.rs
index 9357e8089..721fcaea4 100644
--- a/object_store/src/azure/credential.rs
+++ b/object_store/src/azure/credential.rs
@@ -15,19 +15,24 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::client::oauth::ClientSecretOAuthProvider;
+use crate::client::retry::RetryExt;
+use crate::client::token::{TemporaryToken, TokenCache};
 use crate::util::hmac_sha256;
+use crate::RetryConfig;
 use chrono::Utc;
+use reqwest::header::ACCEPT;
 use reqwest::{
     header::{
         HeaderMap, HeaderName, HeaderValue, AUTHORIZATION, CONTENT_ENCODING,
         CONTENT_LANGUAGE, CONTENT_LENGTH, CONTENT_TYPE, DATE, IF_MATCH,
         IF_MODIFIED_SINCE, IF_NONE_MATCH, IF_UNMODIFIED_SINCE, RANGE,
     },
-    Method, RequestBuilder,
+    Client, Method, RequestBuilder,
 };
+use snafu::{ResultExt, Snafu};
 use std::borrow::Cow;
 use std::str;
+use std::time::{Duration, Instant};
 use url::Url;
 
 static AZURE_VERSION: HeaderValue = HeaderValue::from_static("2021-08-06");
@@ -38,6 +43,18 @@ pub(crate) static DELETE_SNAPSHOTS: HeaderName =
 pub(crate) static COPY_SOURCE: HeaderName = HeaderName::from_static("x-ms-copy-source");
 static CONTENT_MD5: HeaderName = HeaderName::from_static("content-md5");
 pub(crate) static RFC1123_FMT: &str = "%a, %d %h %Y %T GMT";
+const CONTENT_TYPE_JSON: &str = "application/json";
+
+#[derive(Debug, Snafu)]
+pub enum Error {
+    #[snafu(display("Error performing token request: {}", source))]
+    TokenRequest { source: crate::client::retry::Error },
+
+    #[snafu(display("Error getting token response body: {}", source))]
+    TokenResponseBody { source: reqwest::Error },
+}
+
+pub type Result<T, E = Error> = std::result::Result<T, E>;
 
 /// Provides credentials for use when signing requests
 #[derive(Debug)]
@@ -253,3 +270,81 @@ fn lexy_sort<'a>(
     values.sort_unstable();
     values
 }
+
+#[derive(serde::Deserialize, Debug)]
+struct TokenResponse {
+    access_token: String,
+    expires_in: u64,
+}
+
+/// Encapsulates the logic to perform an OAuth token challenge
+#[derive(Debug)]
+pub struct ClientSecretOAuthProvider {
+    scope: String,
+    token_url: String,
+    client_id: String,
+    client_secret: String,
+    cache: TokenCache<String>,
+}
+
+impl ClientSecretOAuthProvider {
+    /// Create a new [`ClientSecretOAuthProvider`] for an azure backed store
+    pub fn new(
+        client_id: String,
+        client_secret: String,
+        tenant_id: String,
+        authority_host: Option<String>,
+    ) -> Self {
+        let authority_host = authority_host
+            .unwrap_or_else(|| authority_hosts::AZURE_PUBLIC_CLOUD.to_owned());
+
+        Self {
+            scope: "https://storage.azure.com/.default".to_owned(),
+            token_url: format!("{}/{}/oauth2/v2.0/token", authority_host, tenant_id),
+            client_id,
+            client_secret,
+            cache: TokenCache::default(),
+        }
+    }
+
+    /// Fetch a token
+    pub async fn fetch_token(
+        &self,
+        client: &Client,
+        retry: &RetryConfig,
+    ) -> Result<String> {
+        self.cache
+            .get_or_insert_with(|| self.fetch_token_inner(client, retry))
+            .await
+    }
+
+    /// Fetch a fresh token
+    async fn fetch_token_inner(
+        &self,
+        client: &Client,
+        retry: &RetryConfig,
+    ) -> Result<TemporaryToken<String>> {
+        let response: TokenResponse = client
+            .request(Method::POST, &self.token_url)
+            .header(ACCEPT, HeaderValue::from_static(CONTENT_TYPE_JSON))
+            .form(&[
+                ("client_id", self.client_id.as_str()),
+                ("client_secret", self.client_secret.as_str()),
+                ("scope", self.scope.as_str()),
+                ("grant_type", "client_credentials"),
+            ])
+            .send_retry(retry)
+            .await
+            .context(TokenRequestSnafu)?
+            .json()
+            .await
+            .context(TokenResponseBodySnafu)?;
+
+        let token = TemporaryToken {
+            token: response.access_token,
+            expiry: Instant::now() + Duration::from_secs(response.expires_in),
+        };
+
+        Ok(token)
+    }
+}
diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs
index c659e1f80..dd1cde9c7 100644
--- a/object_store/src/azure/mod.rs
+++ b/object_store/src/azure/mod.rs
@@ -110,6 +110,9 @@ enum Error {
 
     #[snafu(display("At least one authorization option must be specified"))]
     MissingCredentials {},
+
+    #[snafu(display("Azure credential error: {}", source), context(false))]
+    Credential { source: credential::Error },
 }
 
 impl From<Error> for super::Error {
@@ -539,13 +542,12 @@ impl MicrosoftAzureBuilder {
             } else if let (Some(client_id), Some(client_secret), Some(tenant_id)) =
                 (client_id, client_secret, tenant_id)
             {
-                let client_credential =
-                    crate::client::oauth::ClientSecretOAuthProvider::new_azure(
-                        client_id,
-                        client_secret,
-                        tenant_id,
-                        authority_host,
-                    );
+                let client_credential = credential::ClientSecretOAuthProvider::new(
+                    client_id,
+                    client_secret,
+                    tenant_id,
+                    authority_host,
+                );
                 Ok(credential::CredentialProvider::ClientSecret(
                     client_credential,
                 ))
diff --git a/object_store/src/client/mod.rs b/object_store/src/client/mod.rs
index 10e8d9196..e6de3e929 100644
--- a/object_store/src/client/mod.rs
+++ b/object_store/src/client/mod.rs
@@ -18,8 +18,6 @@
 //! Generic utilities reqwest based ObjectStore implementations
 
 pub mod backoff;
-#[cfg(any(feature = "gcp", feature = "azure"))]
-pub mod oauth;
 pub mod pagination;
 pub mod retry;
 pub mod token;
diff --git a/object_store/src/gcp.rs b/object_store/src/gcp.rs
index e9c7d0249..65adf9128 100644
--- a/object_store/src/gcp.rs
+++ b/object_store/src/gcp.rs
@@ -48,13 +48,17 @@ use tokio::io::AsyncWrite;
 use crate::client::pagination::stream_paginated;
 use crate::client::retry::RetryExt;
 use crate::{
-    client::{oauth::OAuthProvider, token::TokenCache},
+    client::token::TokenCache,
     multipart::{CloudMultiPartUpload, CloudMultiPartUploadImpl, UploadPart},
     path::{Path, DELIMITER},
     util::{format_http_range, format_prefix},
     GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result, RetryConfig,
 };
 
+use credential::OAuthProvider;
+
+mod credential;
+
 #[derive(Debug, Snafu)]
 enum Error {
     #[snafu(display("Unable to open service account file: {}", source))]
@@ -115,6 +119,9 @@ enum Error {
 
     #[snafu(display("Missing service account path"))]
     MissingServiceAccountPath,
+
+    #[snafu(display("GCP credential error: {}", source))]
+    Credential { source: credential::Error },
 }
 
 impl From<Error> for super::Error {
@@ -240,7 +247,8 @@ impl GoogleCloudStorageClient {
                 .get_or_insert_with(|| {
                     oauth_provider.fetch_token(&self.client, &self.retry_config)
                 })
-                .await?)
+                .await
+                .context(CredentialSnafu)?)
         } else {
             Ok("".to_owned())
         }
@@ -818,7 +826,8 @@ impl GoogleCloudStorageBuilder {
                     audience,
                 )
             })
-            .transpose()?;
+            .transpose()
+            .context(CredentialSnafu)?;
 
         let encoded_bucket_name =
             percent_encode(bucket_name.as_bytes(), NON_ALPHANUMERIC).to_string();
diff --git a/object_store/src/client/oauth.rs b/object_store/src/gcp/credential.rs
similarity index 72%
rename from object_store/src/client/oauth.rs
rename to object_store/src/gcp/credential.rs
index 6b3acea10..5b8cdb848 100644
--- a/object_store/src/client/oauth.rs
+++ b/object_store/src/gcp/credential.rs
@@ -16,17 +16,13 @@
 // under the License.
 
 use crate::client::retry::RetryExt;
-use crate::client::token::{TemporaryToken, TokenCache};
+use crate::client::token::TemporaryToken;
 use crate::RetryConfig;
-use reqwest::header::{HeaderMap, HeaderValue, ACCEPT};
 use reqwest::{Client, Method};
 use ring::signature::RsaKeyPair;
 use snafu::{ResultExt, Snafu};
 use std::time::{Duration, Instant};
 
-const CONTENT_TYPE_JSON: &str = "application/json";
-const AZURE_STORAGE_TOKEN_SCOPE: &str = "https://storage.azure.com/.default";
-
 #[derive(Debug, Snafu)]
 pub enum Error {
     #[snafu(display("No RSA key found in pem file"))]
@@ -224,82 +220,3 @@ fn b64_encode_obj<T: serde::Serialize>(obj: &T) -> Result<String> {
     let string = serde_json::to_string(obj).context(EncodeSnafu)?;
     Ok(base64::encode_config(string, base64::URL_SAFE_NO_PAD))
 }
-
-/// Encapsulates the logic to perform an OAuth token challenge
-#[derive(Debug)]
-pub struct ClientSecretOAuthProvider {
-    scope: String,
-    token_url: String,
-    client_id: String,
-    client_secret: String,
-    cache: TokenCache<String>,
-}
-
-impl ClientSecretOAuthProvider {
-    /// Create a new [`ClientSecretOAuthProvider`] for an azure backed store
-    pub fn new_azure(
-        client_id: String,
-        client_secret: String,
-        tenant_id: String,
-        authority_host: Option<String>,
-    ) -> Self {
-        let authority_host = authority_host.unwrap_or_else(|| {
-            crate::azure::authority_hosts::AZURE_PUBLIC_CLOUD.to_owned()
-        });
-
-        Self {
-            scope: AZURE_STORAGE_TOKEN_SCOPE.to_owned(),
-            token_url: format!("{}/{}/oauth2/v2.0/token", authority_host, tenant_id),
-            client_id,
-            client_secret,
-            cache: TokenCache::default(),
-        }
-    }
-
-    /// Fetch a token
-    pub async fn fetch_token(
-        &self,
-        client: &Client,
-        retry: &RetryConfig,
-    ) -> Result<String> {
-        self.cache
-            .get_or_insert_with(|| self.fetch_token_inner(client, retry))
-            .await
-    }
-
-    /// Fetch a fresh token
-    async fn fetch_token_inner(
-        &self,
-        client: &Client,
-        retry: &RetryConfig,
-    ) -> Result<TemporaryToken<String>> {
-        let mut headers = HeaderMap::new();
-        headers.append(ACCEPT, HeaderValue::from_static(CONTENT_TYPE_JSON));
-
-        let mut params = std::collections::HashMap::new();
-        params.insert("client_id", self.client_id.as_str());
-        params.insert("client_secret", self.client_secret.as_str());
-        params.insert("scope", self.scope.as_str());
-        params.insert("grant_type", "client_credentials");
-
-        let response: TokenResponse = client
-            .request(Method::POST, &self.token_url)
-            .headers(headers)
-            .form(&params)
-            .send_retry(retry)
-            .await
-            .context(TokenRequestSnafu)?
-            .error_for_status()
-            .context(TokenResponseBodySnafu)?
-            .json()
-            .await
-            .context(TokenResponseBodySnafu)?;
-
-        let token = TemporaryToken {
-            token: response.access_token,
-            expiry: Instant::now() + Duration::from_secs(response.expires_in),
-        };
-
-        Ok(token)
-    }
-}
diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index 5811eba1a..9ed9db9e9 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -465,10 +465,6 @@ pub enum Error {
 
     #[snafu(display("Operation not yet implemented."))]
     NotImplemented,
-
-    #[cfg(feature = "gcp")]
-    #[snafu(display("OAuth error: {}", source), context(false))]
-    OAuth { source: client::oauth::Error },
 }
 
 impl From<Error> for std::io::Error {