You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/10/25 20:04:38 UTC
[arrow-rs] branch master updated: Add experimental AWS_PROFILE support (#2178) (#2891)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 9c315ce2e Add experimental AWS_PROFILE support (#2178) (#2891)
9c315ce2e is described below
commit 9c315ce2eacb1d8c6591c5186747c04a045a53a7
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Wed Oct 26 09:04:33 2022 +1300
Add experimental AWS_PROFILE support (#2178) (#2891)
* Add experimental AWS_PROFILE support (#2178)
* Add docs
* Include region
---
.github/workflows/object_store.yml | 2 +
object_store/Cargo.toml | 7 ++
object_store/src/aws/client.rs | 2 +-
object_store/src/aws/credential.rs | 152 +++++++++++++++++++++++++------------
object_store/src/aws/mod.rs | 103 ++++++++++++++++++-------
5 files changed, 191 insertions(+), 75 deletions(-)
diff --git a/.github/workflows/object_store.yml b/.github/workflows/object_store.yml
index 6996aa706..2afcb4344 100644
--- a/.github/workflows/object_store.yml
+++ b/.github/workflows/object_store.yml
@@ -50,6 +50,8 @@ jobs:
run: cargo clippy -p object_store -- -D warnings
- name: Run clippy with aws feature
run: cargo clippy -p object_store --features aws -- -D warnings
+ - name: Run clippy with aws_profile feature
+ run: cargo clippy -p object_store --features aws_profile -- -D warnings
- name: Run clippy with gcp feature
run: cargo clippy -p object_store --features gcp -- -D warnings
- name: Run clippy with azure feature
diff --git a/object_store/Cargo.toml b/object_store/Cargo.toml
index e52137383..fc2af7e51 100644
--- a/object_store/Cargo.toml
+++ b/object_store/Cargo.toml
@@ -52,12 +52,19 @@ reqwest = { version = "0.11", default-features = false, features = ["rustls-tls"
ring = { version = "0.16", default-features = false, features = ["std"], optional = true }
rustls-pemfile = { version = "1.0", default-features = false, optional = true }
+# AWS Profile support
+aws-types = { version = "0.49", optional = true }
+aws-config = { version = "0.49", optional = true }
+
[features]
cloud = ["serde", "serde_json", "quick-xml", "reqwest", "reqwest/json", "reqwest/stream", "chrono/serde", "base64", "rand", "ring"]
azure = ["cloud"]
gcp = ["cloud", "rustls-pemfile"]
aws = ["cloud"]
+# Experimental support for AWS_PROFILE
+aws_profile = ["aws", "aws-config", "aws-types"]
+
[dev-dependencies] # In alphabetical order
dotenv = "0.15.0"
tempfile = "3.1.0"
diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs
index 29621626c..a07cdb3c6 100644
--- a/object_store/src/aws/client.rs
+++ b/object_store/src/aws/client.rs
@@ -198,7 +198,7 @@ pub struct S3Config {
pub endpoint: String,
pub bucket: String,
pub bucket_endpoint: String,
- pub credentials: CredentialProvider,
+ pub credentials: Box<dyn CredentialProvider>,
pub retry_config: RetryConfig,
pub allow_http: bool,
}
diff --git a/object_store/src/aws/credential.rs b/object_store/src/aws/credential.rs
index ada855b48..32430d7f9 100644
--- a/object_store/src/aws/credential.rs
+++ b/object_store/src/aws/credential.rs
@@ -22,6 +22,7 @@ use crate::util::hmac_sha256;
use crate::{Result, RetryConfig};
use bytes::Buf;
use chrono::{DateTime, Utc};
+use futures::future::BoxFuture;
use futures::TryFutureExt;
use percent_encoding::utf8_percent_encode;
use reqwest::header::{HeaderMap, HeaderValue};
@@ -289,21 +290,8 @@ fn canonicalize_headers(header_map: &HeaderMap) -> (String, String) {
}
/// Provides credentials for use when signing requests
-#[derive(Debug)]
-pub enum CredentialProvider {
- Static(StaticCredentialProvider),
- Instance(InstanceCredentialProvider),
- WebIdentity(WebIdentityProvider),
-}
-
-impl CredentialProvider {
- pub async fn get_credential(&self) -> Result<Arc<AwsCredential>> {
- match self {
- Self::Static(s) => Ok(Arc::clone(&s.credential)),
- Self::Instance(c) => c.get_credential().await,
- Self::WebIdentity(c) => c.get_credential().await,
- }
- }
+pub trait CredentialProvider: std::fmt::Debug + Send + Sync {
+ fn get_credential(&self) -> BoxFuture<'_, Result<Arc<AwsCredential>>>;
}
/// A static set of credentials
@@ -312,6 +300,12 @@ pub struct StaticCredentialProvider {
pub credential: Arc<AwsCredential>,
}
+impl CredentialProvider for StaticCredentialProvider {
+ fn get_credential(&self) -> BoxFuture<'_, Result<Arc<AwsCredential>>> {
+ Box::pin(futures::future::ready(Ok(Arc::clone(&self.credential))))
+ }
+}
+
/// Credentials sourced from the instance metadata service
///
/// <https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html>
@@ -324,22 +318,20 @@ pub struct InstanceCredentialProvider {
pub metadata_endpoint: String,
}
-impl InstanceCredentialProvider {
- async fn get_credential(&self) -> Result<Arc<AwsCredential>> {
- self.cache
- .get_or_insert_with(|| {
- instance_creds(
- &self.client,
- &self.retry_config,
- &self.metadata_endpoint,
- self.imdsv1_fallback,
- )
- .map_err(|source| crate::Error::Generic {
- store: "S3",
- source,
- })
+impl CredentialProvider for InstanceCredentialProvider {
+ fn get_credential(&self) -> BoxFuture<'_, Result<Arc<AwsCredential>>> {
+ Box::pin(self.cache.get_or_insert_with(|| {
+ instance_creds(
+ &self.client,
+ &self.retry_config,
+ &self.metadata_endpoint,
+ self.imdsv1_fallback,
+ )
+ .map_err(|source| crate::Error::Generic {
+ store: "S3",
+ source,
})
- .await
+ }))
}
}
@@ -357,24 +349,22 @@ pub struct WebIdentityProvider {
pub retry_config: RetryConfig,
}
-impl WebIdentityProvider {
- async fn get_credential(&self) -> Result<Arc<AwsCredential>> {
- self.cache
- .get_or_insert_with(|| {
- web_identity(
- &self.client,
- &self.retry_config,
- &self.token,
- &self.role_arn,
- &self.session_name,
- &self.endpoint,
- )
- .map_err(|source| crate::Error::Generic {
- store: "S3",
- source,
- })
+impl CredentialProvider for WebIdentityProvider {
+ fn get_credential(&self) -> BoxFuture<'_, Result<Arc<AwsCredential>>> {
+ Box::pin(self.cache.get_or_insert_with(|| {
+ web_identity(
+ &self.client,
+ &self.retry_config,
+ &self.token,
+ &self.role_arn,
+ &self.session_name,
+ &self.endpoint,
+ )
+ .map_err(|source| crate::Error::Generic {
+ store: "S3",
+ source,
})
- .await
+ }))
}
}
@@ -520,6 +510,74 @@ async fn web_identity(
})
}
+#[cfg(feature = "aws_profile")]
+mod profile {
+ use super::*;
+ use aws_config::profile::ProfileFileCredentialsProvider;
+ use aws_config::provider_config::ProviderConfig;
+ use aws_types::credentials::ProvideCredentials;
+ use aws_types::region::Region;
+ use std::time::SystemTime;
+
+ #[derive(Debug)]
+ pub struct ProfileProvider {
+ cache: TokenCache<Arc<AwsCredential>>,
+ credentials: ProfileFileCredentialsProvider,
+ }
+
+ impl ProfileProvider {
+ pub fn new(name: String, region: String) -> Self {
+ let config = ProviderConfig::default().with_region(Some(Region::new(region)));
+
+ Self {
+ cache: Default::default(),
+ credentials: ProfileFileCredentialsProvider::builder()
+ .configure(&config)
+ .profile_name(name)
+ .build(),
+ }
+ }
+ }
+
+ impl CredentialProvider for ProfileProvider {
+ fn get_credential(&self) -> BoxFuture<'_, Result<Arc<AwsCredential>>> {
+ Box::pin(self.cache.get_or_insert_with(move || async move {
+ let c =
+ self.credentials
+ .provide_credentials()
+ .await
+ .map_err(|source| crate::Error::Generic {
+ store: "S3",
+ source: Box::new(source),
+ })?;
+
+ let t_now = SystemTime::now();
+ let expiry = match c.expiry().and_then(|e| e.duration_since(t_now).ok()) {
+ Some(ttl) => Instant::now() + ttl,
+ None => {
+ return Err(crate::Error::Generic {
+ store: "S3",
+ source: "Invalid expiry".into(),
+ })
+ }
+ };
+
+ Ok(TemporaryToken {
+ token: Arc::new(AwsCredential {
+ key_id: c.access_key_id().to_string(),
+ secret_key: c.secret_access_key().to_string(),
+ token: c.session_token().map(ToString::to_string),
+ }),
+ expiry,
+ })
+ }))
+ }
+ }
+}
+
+#[cfg(feature = "aws_profile")]
+pub use profile::ProfileProvider;
+
#[cfg(test)]
mod tests {
use super::*;
diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs
index c08a6353f..4a810658c 100644
--- a/object_store/src/aws/mod.rs
+++ b/object_store/src/aws/mod.rs
@@ -109,6 +109,9 @@ enum Error {
#[snafu(display("Missing SecretAccessKey"))]
MissingSecretAccessKey,
+ #[snafu(display("Profile support requires aws_profile feature"))]
+ MissingProfileFeature,
+
#[snafu(display("ETag Header missing from response"))]
MissingEtag,
@@ -359,6 +362,7 @@ pub struct AmazonS3Builder {
imdsv1_fallback: bool,
virtual_hosted_style_request: bool,
metadata_endpoint: Option<String>,
+ profile: Option<String>,
}
impl AmazonS3Builder {
@@ -370,13 +374,14 @@ impl AmazonS3Builder {
/// Fill the [`AmazonS3Builder`] with regular AWS environment variables
///
/// Variables extracted from environment:
- /// * AWS_ACCESS_KEY_ID -> access_key_id
- /// * AWS_SECRET_ACCESS_KEY -> secret_access_key
- /// * AWS_DEFAULT_REGION -> region
- /// * AWS_ENDPOINT -> endpoint
- /// * AWS_SESSION_TOKEN -> token
- /// * AWS_CONTAINER_CREDENTIALS_RELATIVE_URI -> <https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-iam-roles.html>
- /// * AWS_ALLOW_HTTP -> set to "true" to permit HTTP connections without TLS
+ /// * `AWS_ACCESS_KEY_ID` -> access_key_id
+ /// * `AWS_SECRET_ACCESS_KEY` -> secret_access_key
+ /// * `AWS_DEFAULT_REGION` -> region
+ /// * `AWS_ENDPOINT` -> endpoint
+ /// * `AWS_SESSION_TOKEN` -> token
+ /// * `AWS_CONTAINER_CREDENTIALS_RELATIVE_URI` -> <https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-iam-roles.html>
+ /// * `AWS_ALLOW_HTTP` -> set to "true" to permit HTTP connections without TLS
+ /// * `AWS_PROFILE` -> set profile name, requires `aws_profile` feature enabled
/// # Example
/// ```
/// use object_store::aws::AmazonS3Builder;
@@ -408,6 +413,10 @@ impl AmazonS3Builder {
builder.token = Some(token);
}
+ if let Ok(profile) = std::env::var("AWS_PROFILE") {
+ builder.profile = Some(profile);
+ }
+
// This env var is set in ECS
// https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-iam-roles.html
if let Ok(metadata_relative_uri) =
@@ -528,6 +537,24 @@ impl AmazonS3Builder {
self
}
+ /// Set the AWS profile name, see <https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-profiles.html>
+ ///
+ /// This makes use of [aws-config] to provide credentials and therefore requires
+ /// the `aws-profile` feature to be enabled
+ ///
+ /// It is strongly encouraged that users instead make use of a credential manager
+ /// such as [aws-vault] not only to avoid the significant additional dependencies,
+ /// but also to avoid storing credentials in [plain text on disk]
+ ///
+ /// [aws-config]: https://docs.rs/aws-config
+ /// [aws-vault]: https://github.com/99designs/aws-vault
+ /// [plain text on disk]: https://99designs.com.au/blog/engineering/aws-vault/
+ #[cfg(feature = "aws_profile")]
+ pub fn with_profile(mut self, profile: impl Into<String>) -> Self {
+ self.profile = Some(profile.into());
+ self
+ }
+
/// Create a [`AmazonS3`] instance from the provided values,
/// consuming `self`.
pub fn build(self) -> Result<AmazonS3> {
@@ -537,13 +564,13 @@ impl AmazonS3Builder {
let credentials = match (self.access_key_id, self.secret_access_key, self.token) {
(Some(key_id), Some(secret_key), token) => {
info!("Using Static credential provider");
- CredentialProvider::Static(StaticCredentialProvider {
+ Box::new(StaticCredentialProvider {
credential: Arc::new(AwsCredential {
key_id,
secret_key,
token,
}),
- })
+ }) as _
}
(None, Some(_), _) => return Err(Error::MissingAccessKeyId.into()),
(Some(_), None, _) => return Err(Error::MissingSecretAccessKey.into()),
@@ -565,7 +592,7 @@ impl AmazonS3Builder {
// Disallow non-HTTPs requests
let client = Client::builder().https_only(true).build().unwrap();
- CredentialProvider::WebIdentity(WebIdentityProvider {
+ Box::new(WebIdentityProvider {
cache: Default::default(),
token,
session_name,
@@ -573,24 +600,30 @@ impl AmazonS3Builder {
endpoint,
client,
retry_config: self.retry_config.clone(),
- })
- }
- _ => {
- info!("Using Instance credential provider");
-
- // The instance metadata endpoint is access over HTTP
- let client = Client::builder().https_only(false).build().unwrap();
-
- CredentialProvider::Instance(InstanceCredentialProvider {
- cache: Default::default(),
- client,
- retry_config: self.retry_config.clone(),
- imdsv1_fallback: self.imdsv1_fallback,
- metadata_endpoint: self
- .metadata_endpoint
- .unwrap_or_else(|| METADATA_ENDPOINT.into()),
- })
+ }) as _
}
+ _ => match self.profile {
+ Some(profile) => {
+ info!("Using profile \"{}\" credential provider", profile);
+ profile_credentials(profile, region.clone())?
+ }
+ None => {
+ info!("Using Instance credential provider");
+
+ // The instance metadata endpoint is access over HTTP
+ let client = Client::builder().https_only(false).build().unwrap();
+
+ Box::new(InstanceCredentialProvider {
+ cache: Default::default(),
+ client,
+ retry_config: self.retry_config.clone(),
+ imdsv1_fallback: self.imdsv1_fallback,
+ metadata_endpoint: self
+ .metadata_endpoint
+ .unwrap_or_else(|| METADATA_ENDPOINT.into()),
+ }) as _
+ }
+ },
},
};
@@ -628,6 +661,22 @@ impl AmazonS3Builder {
}
}
+#[cfg(feature = "aws_profile")]
+fn profile_credentials(
+ profile: String,
+ region: String,
+) -> Result<Box<dyn CredentialProvider>> {
+ Ok(Box::new(credential::ProfileProvider::new(profile, region)))
+}
+
+#[cfg(not(feature = "aws_profile"))]
+fn profile_credentials(
+ _profile: String,
+ _region: String,
+) -> Result<Box<dyn CredentialProvider>> {
+ Err(Error::MissingProfileFeature.into())
+}
+
#[cfg(test)]
mod tests {
use super::*;