You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/01/25 15:24:36 UTC
[arrow-rs] branch master updated: Additional GCP authentication (#3541)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 42b2d55e4 Additional GCP authentication (#3541)
42b2d55e4 is described below
commit 42b2d55e407217a95fab70e8b6c9834f2a39fa1e
Author: Marius S <39...@users.noreply.github.com>
AuthorDate: Wed Jan 25 07:24:30 2023 -0800
Additional GCP authentication (#3541)
* Implement authentication with instance and application credentials
* Fix link in documentation
* Address feedback
* Instantiate InstanceCredentialsProvider client just once
---
object_store/src/gcp/credential.rs | 255 ++++++++++++++++++++++++++++++++++++-
object_store/src/gcp/mod.rs | 166 +++++++++++++-----------
2 files changed, 344 insertions(+), 77 deletions(-)
diff --git a/object_store/src/gcp/credential.rs b/object_store/src/gcp/credential.rs
index cc157dd41..56468568b 100644
--- a/object_store/src/gcp/credential.rs
+++ b/object_store/src/gcp/credential.rs
@@ -17,16 +17,30 @@
use crate::client::retry::RetryExt;
use crate::client::token::TemporaryToken;
+use crate::ClientOptions;
use crate::RetryConfig;
+use async_trait::async_trait;
use base64::prelude::BASE64_URL_SAFE_NO_PAD;
use base64::Engine;
+use futures::TryFutureExt;
use reqwest::{Client, Method};
use ring::signature::RsaKeyPair;
use snafu::{ResultExt, Snafu};
+use std::env;
+use std::fs::File;
+use std::io::BufReader;
+use std::path::Path;
use std::time::{Duration, Instant};
+use tracing::info;
#[derive(Debug, Snafu)]
pub enum Error {
+ #[snafu(display("Unable to open service account file: {}", source))]
+ OpenCredentials { source: std::io::Error },
+
+ #[snafu(display("Unable to decode service account file: {}", source))]
+ DecodeCredentials { source: serde_json::Error },
+
#[snafu(display("No RSA key found in pem file"))]
MissingKey,
@@ -47,6 +61,12 @@ pub enum Error {
#[snafu(display("Error getting token response body: {}", source))]
TokenResponseBody { source: reqwest::Error },
+
+ #[snafu(display("A configuration file was passed in but was not used."))]
+ UnusedConfigurationFile,
+
+ #[snafu(display("Error creating client: {}", source))]
+ Client { source: crate::Error },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -104,6 +124,15 @@ struct TokenResponse {
expires_in: u64,
}
+#[async_trait]
+pub trait TokenProvider: std::fmt::Debug + Send + Sync {
+ async fn fetch_token(
+ &self,
+ client: &Client,
+ retry: &RetryConfig,
+ ) -> Result<TemporaryToken<String>>;
+}
+
/// Encapsulates the logic to perform an OAuth token challenge
#[derive(Debug)]
pub struct OAuthProvider {
@@ -138,9 +167,12 @@ impl OAuthProvider {
random: ring::rand::SystemRandom::new(),
})
}
+}
+#[async_trait]
+impl TokenProvider for OAuthProvider {
/// Fetch a fresh token
- pub async fn fetch_token(
+ async fn fetch_token(
&self,
client: &Client,
retry: &RetryConfig,
@@ -195,6 +227,69 @@ impl OAuthProvider {
}
}
+fn read_credentials_file<T>(
+ service_account_path: impl AsRef<std::path::Path>,
+) -> Result<T>
+where
+ T: serde::de::DeserializeOwned,
+{
+ let file = File::open(service_account_path).context(OpenCredentialsSnafu)?;
+ let reader = BufReader::new(file);
+ serde_json::from_reader(reader).context(DecodeCredentialsSnafu)
+}
+
+/// A deserialized `service-account-********.json`-file.
+#[derive(serde::Deserialize, Debug)]
+pub struct ServiceAccountCredentials {
+ /// The private key in RSA format.
+ pub private_key: String,
+
+ /// The email address associated with the service account.
+ pub client_email: String,
+
+ /// Base URL for GCS
+ #[serde(default = "default_gcs_base_url")]
+ pub gcs_base_url: String,
+
+ /// Disable oauth and use empty tokens.
+ #[serde(default = "default_disable_oauth")]
+ pub disable_oauth: bool,
+}
+
+pub fn default_gcs_base_url() -> String {
+ "https://storage.googleapis.com".to_owned()
+}
+
+pub fn default_disable_oauth() -> bool {
+ false
+}
+
+impl ServiceAccountCredentials {
+ /// Create a new [`ServiceAccountCredentials`] from a file.
+ pub fn from_file<P: AsRef<std::path::Path>>(path: P) -> Result<Self> {
+ read_credentials_file(path)
+ }
+
+ /// Create a new [`ServiceAccountCredentials`] from a string.
+ pub fn from_key(key: &str) -> Result<Self> {
+ serde_json::from_str(key).context(DecodeCredentialsSnafu)
+ }
+
+ /// Create an [`OAuthProvider`] from this credentials struct.
+ pub fn token_provider(
+ self,
+ scope: &str,
+ audience: &str,
+ ) -> Result<Box<dyn TokenProvider>> {
+ Ok(Box::new(OAuthProvider::new(
+ self.client_email,
+ self.private_key,
+ scope.to_string(),
+ audience.to_string(),
+ )?) as Box<dyn TokenProvider>)
+ }
+}
+
/// Returns the number of seconds since unix epoch
fn seconds_since_epoch() -> u64 {
std::time::SystemTime::now()
@@ -205,7 +300,7 @@ fn seconds_since_epoch() -> u64 {
fn decode_first_rsa_key(private_key_pem: String) -> Result<RsaKeyPair> {
use rustls_pemfile::Item;
- use std::io::{BufReader, Cursor};
+ use std::io::Cursor;
let mut cursor = Cursor::new(private_key_pem);
let mut reader = BufReader::new(&mut cursor);
@@ -222,3 +317,159 @@ fn b64_encode_obj<T: serde::Serialize>(obj: &T) -> Result<String> {
let string = serde_json::to_string(obj).context(EncodeSnafu)?;
Ok(BASE64_URL_SAFE_NO_PAD.encode(string))
}
+
+/// A provider that uses the Google Cloud Platform metadata server to fetch a token.
+///
+/// <https://cloud.google.com/docs/authentication/get-id-token#metadata-server>
+#[derive(Debug, Default)]
+pub struct InstanceCredentialProvider {
+ audience: String,
+ client: Client,
+}
+
+impl InstanceCredentialProvider {
+ /// Create a new [`InstanceCredentialProvider`], we need to control the client in order to enable http access so save the options.
+ pub fn new<T: Into<String>>(
+ audience: T,
+ client_options: ClientOptions,
+ ) -> Result<Self> {
+ client_options
+ .with_allow_http(true)
+ .client()
+ .map(|client| Self {
+ audience: audience.into(),
+ client,
+ })
+ .context(ClientSnafu)
+ }
+}
+
+/// Make a request to the metadata server to fetch a token, using a a given hostname.
+async fn make_metadata_request(
+ client: &Client,
+ hostname: &str,
+ retry: &RetryConfig,
+ audience: &str,
+) -> Result<TokenResponse> {
+ let url = format!(
+ "http://{}/computeMetadata/v1/instance/service-accounts/default/token",
+ hostname
+ );
+ let response: TokenResponse = client
+ .request(Method::GET, url)
+ .header("Metadata-Flavor", "Google")
+ .query(&[("audience", audience)])
+ .send_retry(retry)
+ .await
+ .context(TokenRequestSnafu)?
+ .json()
+ .await
+ .context(TokenResponseBodySnafu)?;
+ Ok(response)
+}
+
+#[async_trait]
+impl TokenProvider for InstanceCredentialProvider {
+ /// Fetch a token from the metadata server.
+ /// Since the connection is local we need to enable http access and don't actually use the client object passed in.
+ async fn fetch_token(
+ &self,
+ _client: &Client,
+ retry: &RetryConfig,
+ ) -> Result<TemporaryToken<String>> {
+ const METADATA_IP: &str = "169.254.169.254";
+ const METADATA_HOST: &str = "metadata";
+
+ info!("fetching token from metadata server");
+ let response =
+ make_metadata_request(&self.client, METADATA_HOST, retry, &self.audience)
+ .or_else(|_| {
+ make_metadata_request(
+ &self.client,
+ METADATA_IP,
+ retry,
+ &self.audience,
+ )
+ })
+ .await?;
+ let token = TemporaryToken {
+ token: response.access_token,
+ expiry: Instant::now() + Duration::from_secs(response.expires_in),
+ };
+ Ok(token)
+ }
+}
+
+/// A deserialized `application_default_credentials.json`-file.
+/// <https://cloud.google.com/docs/authentication/application-default-credentials#personal>
+#[derive(serde::Deserialize, Debug)]
+pub struct ApplicationDefaultCredentials {
+ client_id: String,
+ client_secret: String,
+ refresh_token: String,
+ #[serde(rename = "type")]
+ type_: String,
+}
+
+impl ApplicationDefaultCredentials {
+ const DEFAULT_TOKEN_GCP_URI: &'static str =
+ "https://accounts.google.com/o/oauth2/token";
+ const CREDENTIALS_PATH: &'static str =
+ ".config/gcloud/application_default_credentials.json";
+ const EXPECTED_TYPE: &str = "authorized_user";
+
+ // Create a new application default credential in the following situations:
+ // 1. a file is passed in and the type matches.
+ // 2. without argument if the well-known configuration file is present.
+ pub fn new(path: Option<&str>) -> Result<Option<Self>, Error> {
+ if let Some(path) = path {
+ if let Ok(credentials) = read_credentials_file::<Self>(path) {
+ if credentials.type_ == Self::EXPECTED_TYPE {
+ return Ok(Some(credentials));
+ }
+ }
+ // Return an error if the path has not been used.
+ return Err(Error::UnusedConfigurationFile);
+ }
+ if let Some(home) = env::var_os("HOME") {
+ let path = Path::new(&home).join(Self::CREDENTIALS_PATH);
+
+ // It's expected for this file to not exist unless it has been explicitly configured by the user.
+ if path.try_exists().unwrap_or(false) {
+ return read_credentials_file::<Self>(path).map(Some);
+ }
+ }
+ Ok(None)
+ }
+}
+
+#[async_trait]
+impl TokenProvider for ApplicationDefaultCredentials {
+ async fn fetch_token(
+ &self,
+ client: &Client,
+ retry: &RetryConfig,
+ ) -> Result<TemporaryToken<String>, Error> {
+ let body = [
+ ("grant_type", "refresh_token"),
+ ("client_id", &self.client_id),
+ ("client_secret", &self.client_secret),
+ ("refresh_token", &self.refresh_token),
+ ];
+
+ let response = client
+ .request(Method::POST, Self::DEFAULT_TOKEN_GCP_URI)
+ .form(&body)
+ .send_retry(retry)
+ .await
+ .context(TokenRequestSnafu)?
+ .json::<TokenResponse>()
+ .await
+ .context(TokenResponseBodySnafu)?;
+ let token = TemporaryToken {
+ token: response.access_token,
+ expiry: Instant::now() + Duration::from_secs(response.expires_in),
+ };
+ Ok(token)
+ }
+}
diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs
index 28972c4a6..871413b43 100644
--- a/object_store/src/gcp/mod.rs
+++ b/object_store/src/gcp/mod.rs
@@ -30,8 +30,7 @@
//! consider implementing automatic clean up of unused parts that are older than one
//! week.
use std::collections::BTreeSet;
-use std::fs::File;
-use std::io::{self, BufReader};
+use std::io;
use std::ops::Range;
use std::str::FromStr;
use std::sync::Arc;
@@ -59,18 +58,15 @@ use crate::{
RetryConfig,
};
-use credential::OAuthProvider;
+use self::credential::{
+ default_gcs_base_url, ApplicationDefaultCredentials, InstanceCredentialProvider,
+ ServiceAccountCredentials, TokenProvider,
+};
mod credential;
#[derive(Debug, Snafu)]
enum Error {
- #[snafu(display("Unable to open service account file: {}", source))]
- OpenCredentials { source: std::io::Error },
-
- #[snafu(display("Unable to decode service account file: {}", source))]
- DecodeCredentials { source: serde_json::Error },
-
#[snafu(display("Got invalid XML response for {} {}: {}", method, url, source))]
InvalidXMLResponse {
source: quick_xml::de::DeError,
@@ -121,8 +117,8 @@ enum Error {
#[snafu(display("Missing bucket name"))]
MissingBucketName {},
- #[snafu(display("Missing service account path or key"))]
- MissingServiceAccountPathOrKey,
+ #[snafu(display("Could not find either metadata credentials or configuration properties to initialize GCS credentials."))]
+ MissingCredentials,
#[snafu(display(
"One of service account path or service account key may be provided."
@@ -185,32 +181,6 @@ impl From<Error> for super::Error {
}
}
-/// A deserialized `service-account-********.json`-file.
-#[derive(serde::Deserialize, Debug)]
-struct ServiceAccountCredentials {
- /// The private key in RSA format.
- pub private_key: String,
-
- /// The email address associated with the service account.
- pub client_email: String,
-
- /// Base URL for GCS
- #[serde(default = "default_gcs_base_url")]
- pub gcs_base_url: String,
-
- /// Disable oauth and use empty tokens.
- #[serde(default = "default_disable_oauth")]
- pub disable_oauth: bool,
-}
-
-fn default_gcs_base_url() -> String {
- "https://storage.googleapis.com".to_owned()
-}
-
-fn default_disable_oauth() -> bool {
- false
-}
-
#[derive(serde::Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
struct ListResponse {
@@ -267,7 +237,7 @@ struct GoogleCloudStorageClient {
client: Client,
base_url: String,
- oauth_provider: Option<OAuthProvider>,
+ token_provider: Option<Arc<Box<dyn TokenProvider>>>,
token_cache: TokenCache<String>,
bucket_name: String,
@@ -282,11 +252,11 @@ struct GoogleCloudStorageClient {
impl GoogleCloudStorageClient {
async fn get_token(&self) -> Result<String> {
- if let Some(oauth_provider) = &self.oauth_provider {
+ if let Some(token_provider) = &self.token_provider {
Ok(self
.token_cache
.get_or_insert_with(|| {
- oauth_provider.fetch_token(&self.client, &self.retry_config)
+ token_provider.fetch_token(&self.client, &self.retry_config)
})
.await
.context(CredentialSnafu)?)
@@ -779,14 +749,6 @@ impl ObjectStore for GoogleCloudStorage {
}
}
-fn reader_credentials_file(
- service_account_path: impl AsRef<std::path::Path>,
-) -> Result<ServiceAccountCredentials> {
- let file = File::open(service_account_path).context(OpenCredentialsSnafu)?;
- let reader = BufReader::new(file);
- Ok(serde_json::from_reader(reader).context(DecodeCredentialsSnafu)?)
-}
-
/// Configure a connection to Google Cloud Storage using the specified
/// credentials.
///
@@ -806,6 +768,7 @@ pub struct GoogleCloudStorageBuilder {
url: Option<String>,
service_account_path: Option<String>,
service_account_key: Option<String>,
+ application_credentials_path: Option<String>,
retry_config: RetryConfig,
client_options: ClientOptions,
}
@@ -862,6 +825,11 @@ pub enum GoogleConfigKey {
/// - `bucket`
/// - `bucket_name`
Bucket,
+
+ /// Application credentials path
+ ///
+ /// See [`GoogleCloudStorageBuilder::with_application_credentials`].
+ ApplicationCredentials,
}
impl AsRef<str> for GoogleConfigKey {
@@ -870,6 +838,7 @@ impl AsRef<str> for GoogleConfigKey {
Self::ServiceAccount => "google_service_account",
Self::ServiceAccountKey => "google_service_account_key",
Self::Bucket => "google_bucket",
+ Self::ApplicationCredentials => "google_application_credentials",
}
}
}
@@ -889,6 +858,7 @@ impl FromStr for GoogleConfigKey {
"google_bucket" | "google_bucket_name" | "bucket" | "bucket_name" => {
Ok(Self::Bucket)
}
+ "google_application_credentials" => Ok(Self::ApplicationCredentials),
_ => Err(Error::UnknownConfigurationKey { key: s.into() }.into()),
}
}
@@ -900,6 +870,7 @@ impl Default for GoogleCloudStorageBuilder {
bucket_name: None,
service_account_path: None,
service_account_key: None,
+ application_credentials_path: None,
retry_config: Default::default(),
client_options: ClientOptions::new().with_allow_http(true),
url: None,
@@ -988,6 +959,9 @@ impl GoogleCloudStorageBuilder {
self.service_account_key = Some(value.into())
}
GoogleConfigKey::Bucket => self.bucket_name = Some(value.into()),
+ GoogleConfigKey::ApplicationCredentials => {
+ self.application_credentials_path = Some(value.into())
+ }
};
Ok(self)
}
@@ -1069,6 +1043,17 @@ impl GoogleCloudStorageBuilder {
self
}
+ /// Set the path to the application credentials file.
+ ///
+ /// <https://cloud.google.com/docs/authentication/provide-credentials-adc>
+ pub fn with_application_credentials(
+ mut self,
+ application_credentials_path: impl Into<String>,
+ ) -> Self {
+ self.application_credentials_path = Some(application_credentials_path.into());
+ self
+ }
+
/// Set the retry configuration
pub fn with_retry(mut self, retry_config: RetryConfig) -> Self {
self.retry_config = retry_config;
@@ -1098,44 +1083,75 @@ impl GoogleCloudStorageBuilder {
let client = self.client_options.client()?;
- let credentials = match (self.service_account_path, self.service_account_key) {
- (Some(path), None) => reader_credentials_file(path)?,
- (None, Some(key)) => {
- serde_json::from_str(&key).context(DecodeCredentialsSnafu)?
- }
- (None, None) => return Err(Error::MissingServiceAccountPathOrKey.into()),
- (Some(_), Some(_)) => {
- return Err(Error::ServiceAccountPathAndKeyProvided.into())
- }
- };
+ // First try to initialize from the service account information.
+ let service_account_credentials =
+ match (self.service_account_path, self.service_account_key) {
+ (Some(path), None) => Some(
+ ServiceAccountCredentials::from_file(path)
+ .context(CredentialSnafu)?,
+ ),
+ (None, Some(key)) => Some(
+ ServiceAccountCredentials::from_key(&key).context(CredentialSnafu)?,
+ ),
+ (None, None) => None,
+ (Some(_), Some(_)) => {
+ return Err(Error::ServiceAccountPathAndKeyProvided.into())
+ }
+ };
+
+ // Then try to initialize from the application credentials file, or the environment.
+ let application_default_credentials = ApplicationDefaultCredentials::new(
+ self.application_credentials_path.as_deref(),
+ )
+ .context(CredentialSnafu)?;
+
+ let disable_oauth = service_account_credentials
+ .as_ref()
+ .map(|c| c.disable_oauth)
+ .unwrap_or(false);
+
+ let gcs_base_url = service_account_credentials
+ .as_ref()
+ .map(|c| c.gcs_base_url.clone())
+ .unwrap_or_else(default_gcs_base_url);
// TODO: https://cloud.google.com/storage/docs/authentication#oauth-scopes
let scope = "https://www.googleapis.com/auth/devstorage.full_control";
- let audience = "https://www.googleapis.com/oauth2/v4/token".to_string();
-
- let oauth_provider = (!credentials.disable_oauth)
- .then(|| {
- OAuthProvider::new(
- credentials.client_email,
- credentials.private_key,
- scope.to_string(),
- audience,
+ let audience = "https://www.googleapis.com/oauth2/v4/token";
+
+ let token_provider = if disable_oauth {
+ None
+ } else {
+ let best_provider = if let Some(credentials) = service_account_credentials {
+ Some(
+ credentials
+ .token_provider(scope, audience)
+ .context(CredentialSnafu)?,
)
- })
- .transpose()
- .context(CredentialSnafu)?;
+ } else if let Some(credentials) = application_default_credentials {
+ Some(Box::new(credentials) as Box<dyn TokenProvider>)
+ } else {
+ Some(Box::new(
+ InstanceCredentialProvider::new(
+ audience,
+ self.client_options.clone(),
+ )
+ .context(CredentialSnafu)?,
+ ) as Box<dyn TokenProvider>)
+ };
+
+ // A provider is required at this point, bail out if we don't have one.
+ Some(best_provider.ok_or(Error::MissingCredentials)?)
+ };
let encoded_bucket_name =
percent_encode(bucket_name.as_bytes(), NON_ALPHANUMERIC).to_string();
- // The cloud storage crate currently only supports authentication via
- // environment variables. Set the environment variable explicitly so
- // that we can optionally accept command line arguments instead.
Ok(GoogleCloudStorage {
client: Arc::new(GoogleCloudStorageClient {
client,
- base_url: credentials.gcs_base_url,
- oauth_provider,
+ base_url: gcs_base_url,
+ token_provider: token_provider.map(Arc::new),
token_cache: Default::default(),
bucket_name,
bucket_name_encoded: encoded_bucket_name,