You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/01/25 15:24:36 UTC

[arrow-rs] branch master updated: Additional GCP authentication (#3541)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 42b2d55e4 Additional GCP authentication (#3541)
42b2d55e4 is described below

commit 42b2d55e407217a95fab70e8b6c9834f2a39fa1e
Author: Marius S <39...@users.noreply.github.com>
AuthorDate: Wed Jan 25 07:24:30 2023 -0800

    Additional GCP authentication (#3541)
    
    * Implement authentication with instance and application credentials
    
    * Fix link in documentation
    
    * Address feedback
    
    * Instantiate InstanceCredentialsProvider client just once
---
 object_store/src/gcp/credential.rs | 255 ++++++++++++++++++++++++++++++++++++-
 object_store/src/gcp/mod.rs        | 166 +++++++++++++-----------
 2 files changed, 344 insertions(+), 77 deletions(-)

diff --git a/object_store/src/gcp/credential.rs b/object_store/src/gcp/credential.rs
index cc157dd41..56468568b 100644
--- a/object_store/src/gcp/credential.rs
+++ b/object_store/src/gcp/credential.rs
@@ -17,16 +17,30 @@
 
 use crate::client::retry::RetryExt;
 use crate::client::token::TemporaryToken;
+use crate::ClientOptions;
 use crate::RetryConfig;
+use async_trait::async_trait;
 use base64::prelude::BASE64_URL_SAFE_NO_PAD;
 use base64::Engine;
+use futures::TryFutureExt;
 use reqwest::{Client, Method};
 use ring::signature::RsaKeyPair;
 use snafu::{ResultExt, Snafu};
+use std::env;
+use std::fs::File;
+use std::io::BufReader;
+use std::path::Path;
 use std::time::{Duration, Instant};
+use tracing::info;
 
 #[derive(Debug, Snafu)]
 pub enum Error {
+    #[snafu(display("Unable to open service account file: {}", source))]
+    OpenCredentials { source: std::io::Error },
+
+    #[snafu(display("Unable to decode service account file: {}", source))]
+    DecodeCredentials { source: serde_json::Error },
+
     #[snafu(display("No RSA key found in pem file"))]
     MissingKey,
 
@@ -47,6 +61,12 @@ pub enum Error {
 
     #[snafu(display("Error getting token response body: {}", source))]
     TokenResponseBody { source: reqwest::Error },
+
+    #[snafu(display("A configuration file was passed in but was not used."))]
+    UnusedConfigurationFile,
+
+    #[snafu(display("Error creating client: {}", source))]
+    Client { source: crate::Error },
 }
 
 pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -104,6 +124,15 @@ struct TokenResponse {
     expires_in: u64,
 }
 
+#[async_trait]
+pub trait TokenProvider: std::fmt::Debug + Send + Sync {
+    async fn fetch_token(
+        &self,
+        client: &Client,
+        retry: &RetryConfig,
+    ) -> Result<TemporaryToken<String>>;
+}
+
 /// Encapsulates the logic to perform an OAuth token challenge
 #[derive(Debug)]
 pub struct OAuthProvider {
@@ -138,9 +167,12 @@ impl OAuthProvider {
             random: ring::rand::SystemRandom::new(),
         })
     }
+}
 
+#[async_trait]
+impl TokenProvider for OAuthProvider {
     /// Fetch a fresh token
-    pub async fn fetch_token(
+    async fn fetch_token(
         &self,
         client: &Client,
         retry: &RetryConfig,
@@ -195,6 +227,69 @@ impl OAuthProvider {
     }
 }
 
+fn read_credentials_file<T>(
+    service_account_path: impl AsRef<std::path::Path>,
+) -> Result<T>
+where
+    T: serde::de::DeserializeOwned,
+{
+    let file = File::open(service_account_path).context(OpenCredentialsSnafu)?;
+    let reader = BufReader::new(file);
+    serde_json::from_reader(reader).context(DecodeCredentialsSnafu)
+}
+
+/// A deserialized `service-account-********.json`-file.
+#[derive(serde::Deserialize, Debug)]
+pub struct ServiceAccountCredentials {
+    /// The private key in RSA format.
+    pub private_key: String,
+
+    /// The email address associated with the service account.
+    pub client_email: String,
+
+    /// Base URL for GCS
+    #[serde(default = "default_gcs_base_url")]
+    pub gcs_base_url: String,
+
+    /// Disable oauth and use empty tokens.
+    #[serde(default = "default_disable_oauth")]
+    pub disable_oauth: bool,
+}
+
+pub fn default_gcs_base_url() -> String {
+    "https://storage.googleapis.com".to_owned()
+}
+
+pub fn default_disable_oauth() -> bool {
+    false
+}
+
+impl ServiceAccountCredentials {
+    /// Create a new [`ServiceAccountCredentials`] from a file.
+    pub fn from_file<P: AsRef<std::path::Path>>(path: P) -> Result<Self> {
+        read_credentials_file(path)
+    }
+
+    /// Create a new [`ServiceAccountCredentials`] from a string.
+    pub fn from_key(key: &str) -> Result<Self> {
+        serde_json::from_str(key).context(DecodeCredentialsSnafu)
+    }
+
+    /// Create an [`OAuthProvider`] from this credentials struct.
+    pub fn token_provider(
+        self,
+        scope: &str,
+        audience: &str,
+    ) -> Result<Box<dyn TokenProvider>> {
+        Ok(Box::new(OAuthProvider::new(
+            self.client_email,
+            self.private_key,
+            scope.to_string(),
+            audience.to_string(),
+        )?) as Box<dyn TokenProvider>)
+    }
+}
+
 /// Returns the number of seconds since unix epoch
 fn seconds_since_epoch() -> u64 {
     std::time::SystemTime::now()
@@ -205,7 +300,7 @@ fn seconds_since_epoch() -> u64 {
 
 fn decode_first_rsa_key(private_key_pem: String) -> Result<RsaKeyPair> {
     use rustls_pemfile::Item;
-    use std::io::{BufReader, Cursor};
+    use std::io::Cursor;
 
     let mut cursor = Cursor::new(private_key_pem);
     let mut reader = BufReader::new(&mut cursor);
@@ -222,3 +317,159 @@ fn b64_encode_obj<T: serde::Serialize>(obj: &T) -> Result<String> {
     let string = serde_json::to_string(obj).context(EncodeSnafu)?;
     Ok(BASE64_URL_SAFE_NO_PAD.encode(string))
 }
+
+/// A provider that uses the Google Cloud Platform metadata server to fetch a token.
+///
+/// <https://cloud.google.com/docs/authentication/get-id-token#metadata-server>
+#[derive(Debug, Default)]
+pub struct InstanceCredentialProvider {
+    audience: String,
+    client: Client,
+}
+
+impl InstanceCredentialProvider {
+    /// Create a new [`InstanceCredentialProvider`], we need to control the client in order to enable http access so save the options.
+    pub fn new<T: Into<String>>(
+        audience: T,
+        client_options: ClientOptions,
+    ) -> Result<Self> {
+        client_options
+            .with_allow_http(true)
+            .client()
+            .map(|client| Self {
+                audience: audience.into(),
+                client,
+            })
+            .context(ClientSnafu)
+    }
+}
+
+/// Make a request to the metadata server to fetch a token, using a a given hostname.
+async fn make_metadata_request(
+    client: &Client,
+    hostname: &str,
+    retry: &RetryConfig,
+    audience: &str,
+) -> Result<TokenResponse> {
+    let url = format!(
+        "http://{}/computeMetadata/v1/instance/service-accounts/default/token",
+        hostname
+    );
+    let response: TokenResponse = client
+        .request(Method::GET, url)
+        .header("Metadata-Flavor", "Google")
+        .query(&[("audience", audience)])
+        .send_retry(retry)
+        .await
+        .context(TokenRequestSnafu)?
+        .json()
+        .await
+        .context(TokenResponseBodySnafu)?;
+    Ok(response)
+}
+
+#[async_trait]
+impl TokenProvider for InstanceCredentialProvider {
+    /// Fetch a token from the metadata server.
+    /// Since the connection is local we need to enable http access and don't actually use the client object passed in.
+    async fn fetch_token(
+        &self,
+        _client: &Client,
+        retry: &RetryConfig,
+    ) -> Result<TemporaryToken<String>> {
+        const METADATA_IP: &str = "169.254.169.254";
+        const METADATA_HOST: &str = "metadata";
+
+        info!("fetching token from metadata server");
+        let response =
+            make_metadata_request(&self.client, METADATA_HOST, retry, &self.audience)
+                .or_else(|_| {
+                    make_metadata_request(
+                        &self.client,
+                        METADATA_IP,
+                        retry,
+                        &self.audience,
+                    )
+                })
+                .await?;
+        let token = TemporaryToken {
+            token: response.access_token,
+            expiry: Instant::now() + Duration::from_secs(response.expires_in),
+        };
+        Ok(token)
+    }
+}
+
+/// A deserialized `application_default_credentials.json`-file.
+/// <https://cloud.google.com/docs/authentication/application-default-credentials#personal>
+#[derive(serde::Deserialize, Debug)]
+pub struct ApplicationDefaultCredentials {
+    client_id: String,
+    client_secret: String,
+    refresh_token: String,
+    #[serde(rename = "type")]
+    type_: String,
+}
+
+impl ApplicationDefaultCredentials {
+    const DEFAULT_TOKEN_GCP_URI: &'static str =
+        "https://accounts.google.com/o/oauth2/token";
+    const CREDENTIALS_PATH: &'static str =
+        ".config/gcloud/application_default_credentials.json";
+    const EXPECTED_TYPE: &str = "authorized_user";
+
+    // Create a new application default credential in the following situations:
+    //  1. a file is passed in and the type matches.
+    //  2. without argument if the well-known configuration file is present.
+    pub fn new(path: Option<&str>) -> Result<Option<Self>, Error> {
+        if let Some(path) = path {
+            if let Ok(credentials) = read_credentials_file::<Self>(path) {
+                if credentials.type_ == Self::EXPECTED_TYPE {
+                    return Ok(Some(credentials));
+                }
+            }
+            // Return an error if the path has not been used.
+            return Err(Error::UnusedConfigurationFile);
+        }
+        if let Some(home) = env::var_os("HOME") {
+            let path = Path::new(&home).join(Self::CREDENTIALS_PATH);
+
+            // It's expected for this file to not exist unless it has been explicitly configured by the user.
+            if path.try_exists().unwrap_or(false) {
+                return read_credentials_file::<Self>(path).map(Some);
+            }
+        }
+        Ok(None)
+    }
+}
+
+#[async_trait]
+impl TokenProvider for ApplicationDefaultCredentials {
+    async fn fetch_token(
+        &self,
+        client: &Client,
+        retry: &RetryConfig,
+    ) -> Result<TemporaryToken<String>, Error> {
+        let body = [
+            ("grant_type", "refresh_token"),
+            ("client_id", &self.client_id),
+            ("client_secret", &self.client_secret),
+            ("refresh_token", &self.refresh_token),
+        ];
+
+        let response = client
+            .request(Method::POST, Self::DEFAULT_TOKEN_GCP_URI)
+            .form(&body)
+            .send_retry(retry)
+            .await
+            .context(TokenRequestSnafu)?
+            .json::<TokenResponse>()
+            .await
+            .context(TokenResponseBodySnafu)?;
+        let token = TemporaryToken {
+            token: response.access_token,
+            expiry: Instant::now() + Duration::from_secs(response.expires_in),
+        };
+        Ok(token)
+    }
+}
diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs
index 28972c4a6..871413b43 100644
--- a/object_store/src/gcp/mod.rs
+++ b/object_store/src/gcp/mod.rs
@@ -30,8 +30,7 @@
 //! consider implementing automatic clean up of unused parts that are older than one
 //! week.
 use std::collections::BTreeSet;
-use std::fs::File;
-use std::io::{self, BufReader};
+use std::io;
 use std::ops::Range;
 use std::str::FromStr;
 use std::sync::Arc;
@@ -59,18 +58,15 @@ use crate::{
     RetryConfig,
 };
 
-use credential::OAuthProvider;
+use self::credential::{
+    default_gcs_base_url, ApplicationDefaultCredentials, InstanceCredentialProvider,
+    ServiceAccountCredentials, TokenProvider,
+};
 
 mod credential;
 
 #[derive(Debug, Snafu)]
 enum Error {
-    #[snafu(display("Unable to open service account file: {}", source))]
-    OpenCredentials { source: std::io::Error },
-
-    #[snafu(display("Unable to decode service account file: {}", source))]
-    DecodeCredentials { source: serde_json::Error },
-
     #[snafu(display("Got invalid XML response for {} {}: {}", method, url, source))]
     InvalidXMLResponse {
         source: quick_xml::de::DeError,
@@ -121,8 +117,8 @@ enum Error {
     #[snafu(display("Missing bucket name"))]
     MissingBucketName {},
 
-    #[snafu(display("Missing service account path or key"))]
-    MissingServiceAccountPathOrKey,
+    #[snafu(display("Could not find either metadata credentials or configuration properties to initialize GCS credentials."))]
+    MissingCredentials,
 
     #[snafu(display(
         "One of service account path or service account key may be provided."
@@ -185,32 +181,6 @@ impl From<Error> for super::Error {
     }
 }
 
-/// A deserialized `service-account-********.json`-file.
-#[derive(serde::Deserialize, Debug)]
-struct ServiceAccountCredentials {
-    /// The private key in RSA format.
-    pub private_key: String,
-
-    /// The email address associated with the service account.
-    pub client_email: String,
-
-    /// Base URL for GCS
-    #[serde(default = "default_gcs_base_url")]
-    pub gcs_base_url: String,
-
-    /// Disable oauth and use empty tokens.
-    #[serde(default = "default_disable_oauth")]
-    pub disable_oauth: bool,
-}
-
-fn default_gcs_base_url() -> String {
-    "https://storage.googleapis.com".to_owned()
-}
-
-fn default_disable_oauth() -> bool {
-    false
-}
-
 #[derive(serde::Deserialize, Debug)]
 #[serde(rename_all = "camelCase")]
 struct ListResponse {
@@ -267,7 +237,7 @@ struct GoogleCloudStorageClient {
     client: Client,
     base_url: String,
 
-    oauth_provider: Option<OAuthProvider>,
+    token_provider: Option<Arc<Box<dyn TokenProvider>>>,
     token_cache: TokenCache<String>,
 
     bucket_name: String,
@@ -282,11 +252,11 @@ struct GoogleCloudStorageClient {
 
 impl GoogleCloudStorageClient {
     async fn get_token(&self) -> Result<String> {
-        if let Some(oauth_provider) = &self.oauth_provider {
+        if let Some(token_provider) = &self.token_provider {
             Ok(self
                 .token_cache
                 .get_or_insert_with(|| {
-                    oauth_provider.fetch_token(&self.client, &self.retry_config)
+                    token_provider.fetch_token(&self.client, &self.retry_config)
                 })
                 .await
                 .context(CredentialSnafu)?)
@@ -779,14 +749,6 @@ impl ObjectStore for GoogleCloudStorage {
     }
 }
 
-fn reader_credentials_file(
-    service_account_path: impl AsRef<std::path::Path>,
-) -> Result<ServiceAccountCredentials> {
-    let file = File::open(service_account_path).context(OpenCredentialsSnafu)?;
-    let reader = BufReader::new(file);
-    Ok(serde_json::from_reader(reader).context(DecodeCredentialsSnafu)?)
-}
-
 /// Configure a connection to Google Cloud Storage using the specified
 /// credentials.
 ///
@@ -806,6 +768,7 @@ pub struct GoogleCloudStorageBuilder {
     url: Option<String>,
     service_account_path: Option<String>,
     service_account_key: Option<String>,
+    application_credentials_path: Option<String>,
     retry_config: RetryConfig,
     client_options: ClientOptions,
 }
@@ -862,6 +825,11 @@ pub enum GoogleConfigKey {
     /// - `bucket`
     /// - `bucket_name`
     Bucket,
+
+    /// Application credentials path
+    ///
+    /// See [`GoogleCloudStorageBuilder::with_application_credentials`].
+    ApplicationCredentials,
 }
 
 impl AsRef<str> for GoogleConfigKey {
@@ -870,6 +838,7 @@ impl AsRef<str> for GoogleConfigKey {
             Self::ServiceAccount => "google_service_account",
             Self::ServiceAccountKey => "google_service_account_key",
             Self::Bucket => "google_bucket",
+            Self::ApplicationCredentials => "google_application_credentials",
         }
     }
 }
@@ -889,6 +858,7 @@ impl FromStr for GoogleConfigKey {
             "google_bucket" | "google_bucket_name" | "bucket" | "bucket_name" => {
                 Ok(Self::Bucket)
             }
+            "google_application_credentials" => Ok(Self::ApplicationCredentials),
             _ => Err(Error::UnknownConfigurationKey { key: s.into() }.into()),
         }
     }
@@ -900,6 +870,7 @@ impl Default for GoogleCloudStorageBuilder {
             bucket_name: None,
             service_account_path: None,
             service_account_key: None,
+            application_credentials_path: None,
             retry_config: Default::default(),
             client_options: ClientOptions::new().with_allow_http(true),
             url: None,
@@ -988,6 +959,9 @@ impl GoogleCloudStorageBuilder {
                 self.service_account_key = Some(value.into())
             }
             GoogleConfigKey::Bucket => self.bucket_name = Some(value.into()),
+            GoogleConfigKey::ApplicationCredentials => {
+                self.application_credentials_path = Some(value.into())
+            }
         };
         Ok(self)
     }
@@ -1069,6 +1043,17 @@ impl GoogleCloudStorageBuilder {
         self
     }
 
+    /// Set the path to the application credentials file.
+    ///
+    /// <https://cloud.google.com/docs/authentication/provide-credentials-adc>
+    pub fn with_application_credentials(
+        mut self,
+        application_credentials_path: impl Into<String>,
+    ) -> Self {
+        self.application_credentials_path = Some(application_credentials_path.into());
+        self
+    }
+
     /// Set the retry configuration
     pub fn with_retry(mut self, retry_config: RetryConfig) -> Self {
         self.retry_config = retry_config;
@@ -1098,44 +1083,75 @@ impl GoogleCloudStorageBuilder {
 
         let client = self.client_options.client()?;
 
-        let credentials = match (self.service_account_path, self.service_account_key) {
-            (Some(path), None) => reader_credentials_file(path)?,
-            (None, Some(key)) => {
-                serde_json::from_str(&key).context(DecodeCredentialsSnafu)?
-            }
-            (None, None) => return Err(Error::MissingServiceAccountPathOrKey.into()),
-            (Some(_), Some(_)) => {
-                return Err(Error::ServiceAccountPathAndKeyProvided.into())
-            }
-        };
+        // First try to initialize from the service account information.
+        let service_account_credentials =
+            match (self.service_account_path, self.service_account_key) {
+                (Some(path), None) => Some(
+                    ServiceAccountCredentials::from_file(path)
+                        .context(CredentialSnafu)?,
+                ),
+                (None, Some(key)) => Some(
+                    ServiceAccountCredentials::from_key(&key).context(CredentialSnafu)?,
+                ),
+                (None, None) => None,
+                (Some(_), Some(_)) => {
+                    return Err(Error::ServiceAccountPathAndKeyProvided.into())
+                }
+            };
+
+        // Then try to initialize from the application credentials file, or the environment.
+        let application_default_credentials = ApplicationDefaultCredentials::new(
+            self.application_credentials_path.as_deref(),
+        )
+        .context(CredentialSnafu)?;
+
+        let disable_oauth = service_account_credentials
+            .as_ref()
+            .map(|c| c.disable_oauth)
+            .unwrap_or(false);
+
+        let gcs_base_url = service_account_credentials
+            .as_ref()
+            .map(|c| c.gcs_base_url.clone())
+            .unwrap_or_else(default_gcs_base_url);
 
         // TODO: https://cloud.google.com/storage/docs/authentication#oauth-scopes
         let scope = "https://www.googleapis.com/auth/devstorage.full_control";
-        let audience = "https://www.googleapis.com/oauth2/v4/token".to_string();
-
-        let oauth_provider = (!credentials.disable_oauth)
-            .then(|| {
-                OAuthProvider::new(
-                    credentials.client_email,
-                    credentials.private_key,
-                    scope.to_string(),
-                    audience,
+        let audience = "https://www.googleapis.com/oauth2/v4/token";
+
+        let token_provider = if disable_oauth {
+            None
+        } else {
+            let best_provider = if let Some(credentials) = service_account_credentials {
+                Some(
+                    credentials
+                        .token_provider(scope, audience)
+                        .context(CredentialSnafu)?,
                 )
-            })
-            .transpose()
-            .context(CredentialSnafu)?;
+            } else if let Some(credentials) = application_default_credentials {
+                Some(Box::new(credentials) as Box<dyn TokenProvider>)
+            } else {
+                Some(Box::new(
+                    InstanceCredentialProvider::new(
+                        audience,
+                        self.client_options.clone(),
+                    )
+                    .context(CredentialSnafu)?,
+                ) as Box<dyn TokenProvider>)
+            };
+
+            // A provider is required at this point, bail out if we don't have one.
+            Some(best_provider.ok_or(Error::MissingCredentials)?)
+        };
 
         let encoded_bucket_name =
             percent_encode(bucket_name.as_bytes(), NON_ALPHANUMERIC).to_string();
 
-        // The cloud storage crate currently only supports authentication via
-        // environment variables. Set the environment variable explicitly so
-        // that we can optionally accept command line arguments instead.
         Ok(GoogleCloudStorage {
             client: Arc::new(GoogleCloudStorageClient {
                 client,
-                base_url: credentials.gcs_base_url,
-                oauth_provider,
+                base_url: gcs_base_url,
+                token_provider: token_provider.map(Arc::new),
                 token_cache: Default::default(),
                 bucket_name,
                 bucket_name_encoded: encoded_bucket_name,