You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/10/25 20:04:38 UTC

[arrow-rs] branch master updated: Add experimental AWS_PROFILE support (#2178) (#2891)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c315ce2e Add experimental AWS_PROFILE support (#2178) (#2891)
9c315ce2e is described below

commit 9c315ce2eacb1d8c6591c5186747c04a045a53a7
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Wed Oct 26 09:04:33 2022 +1300

    Add experimental AWS_PROFILE support (#2178) (#2891)
    
    * Add experimental AWS_PROFILE support (#2178)
    
    * Add docs
    
    * Include region
---
 .github/workflows/object_store.yml |   2 +
 object_store/Cargo.toml            |   7 ++
 object_store/src/aws/client.rs     |   2 +-
 object_store/src/aws/credential.rs | 152 +++++++++++++++++++++++++------------
 object_store/src/aws/mod.rs        | 103 ++++++++++++++++++-------
 5 files changed, 191 insertions(+), 75 deletions(-)

diff --git a/.github/workflows/object_store.yml b/.github/workflows/object_store.yml
index 6996aa706..2afcb4344 100644
--- a/.github/workflows/object_store.yml
+++ b/.github/workflows/object_store.yml
@@ -50,6 +50,8 @@ jobs:
         run: cargo clippy -p object_store -- -D warnings
       - name: Run clippy with aws feature
         run: cargo clippy -p object_store --features aws -- -D warnings
+      - name: Run clippy with aws_profile feature
+        run: cargo clippy -p object_store --features aws_profile -- -D warnings
       - name: Run clippy with gcp feature
         run: cargo clippy -p object_store --features gcp -- -D warnings
       - name: Run clippy with azure feature
diff --git a/object_store/Cargo.toml b/object_store/Cargo.toml
index e52137383..fc2af7e51 100644
--- a/object_store/Cargo.toml
+++ b/object_store/Cargo.toml
@@ -52,12 +52,19 @@ reqwest = { version = "0.11", default-features = false, features = ["rustls-tls"
 ring = { version = "0.16", default-features = false, features = ["std"], optional = true }
 rustls-pemfile = { version = "1.0", default-features = false, optional = true }
 
+# AWS Profile support
+aws-types = { version = "0.49", optional = true }
+aws-config = { version = "0.49", optional = true }
+
 [features]
 cloud = ["serde", "serde_json", "quick-xml", "reqwest", "reqwest/json", "reqwest/stream", "chrono/serde", "base64", "rand", "ring"]
 azure = ["cloud"]
 gcp = ["cloud", "rustls-pemfile"]
 aws = ["cloud"]
 
+# Experimental support for AWS_PROFILE
+aws_profile = ["aws", "aws-config", "aws-types"]
+
 [dev-dependencies] # In alphabetical order
 dotenv = "0.15.0"
 tempfile = "3.1.0"
diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs
index 29621626c..a07cdb3c6 100644
--- a/object_store/src/aws/client.rs
+++ b/object_store/src/aws/client.rs
@@ -198,7 +198,7 @@ pub struct S3Config {
     pub endpoint: String,
     pub bucket: String,
     pub bucket_endpoint: String,
-    pub credentials: CredentialProvider,
+    pub credentials: Box<dyn CredentialProvider>,
     pub retry_config: RetryConfig,
     pub allow_http: bool,
 }
diff --git a/object_store/src/aws/credential.rs b/object_store/src/aws/credential.rs
index ada855b48..32430d7f9 100644
--- a/object_store/src/aws/credential.rs
+++ b/object_store/src/aws/credential.rs
@@ -22,6 +22,7 @@ use crate::util::hmac_sha256;
 use crate::{Result, RetryConfig};
 use bytes::Buf;
 use chrono::{DateTime, Utc};
+use futures::future::BoxFuture;
 use futures::TryFutureExt;
 use percent_encoding::utf8_percent_encode;
 use reqwest::header::{HeaderMap, HeaderValue};
@@ -289,21 +290,8 @@ fn canonicalize_headers(header_map: &HeaderMap) -> (String, String) {
 }
 
 /// Provides credentials for use when signing requests
-#[derive(Debug)]
-pub enum CredentialProvider {
-    Static(StaticCredentialProvider),
-    Instance(InstanceCredentialProvider),
-    WebIdentity(WebIdentityProvider),
-}
-
-impl CredentialProvider {
-    pub async fn get_credential(&self) -> Result<Arc<AwsCredential>> {
-        match self {
-            Self::Static(s) => Ok(Arc::clone(&s.credential)),
-            Self::Instance(c) => c.get_credential().await,
-            Self::WebIdentity(c) => c.get_credential().await,
-        }
-    }
+pub trait CredentialProvider: std::fmt::Debug + Send + Sync {
+    fn get_credential(&self) -> BoxFuture<'_, Result<Arc<AwsCredential>>>;
 }
 
 /// A static set of credentials
@@ -312,6 +300,12 @@ pub struct StaticCredentialProvider {
     pub credential: Arc<AwsCredential>,
 }
 
+impl CredentialProvider for StaticCredentialProvider {
+    fn get_credential(&self) -> BoxFuture<'_, Result<Arc<AwsCredential>>> {
+        Box::pin(futures::future::ready(Ok(Arc::clone(&self.credential))))
+    }
+}
+
 /// Credentials sourced from the instance metadata service
 ///
 /// <https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html>
@@ -324,22 +318,20 @@ pub struct InstanceCredentialProvider {
     pub metadata_endpoint: String,
 }
 
-impl InstanceCredentialProvider {
-    async fn get_credential(&self) -> Result<Arc<AwsCredential>> {
-        self.cache
-            .get_or_insert_with(|| {
-                instance_creds(
-                    &self.client,
-                    &self.retry_config,
-                    &self.metadata_endpoint,
-                    self.imdsv1_fallback,
-                )
-                .map_err(|source| crate::Error::Generic {
-                    store: "S3",
-                    source,
-                })
+impl CredentialProvider for InstanceCredentialProvider {
+    fn get_credential(&self) -> BoxFuture<'_, Result<Arc<AwsCredential>>> {
+        Box::pin(self.cache.get_or_insert_with(|| {
+            instance_creds(
+                &self.client,
+                &self.retry_config,
+                &self.metadata_endpoint,
+                self.imdsv1_fallback,
+            )
+            .map_err(|source| crate::Error::Generic {
+                store: "S3",
+                source,
             })
-            .await
+        }))
     }
 }
 
@@ -357,24 +349,22 @@ pub struct WebIdentityProvider {
     pub retry_config: RetryConfig,
 }
 
-impl WebIdentityProvider {
-    async fn get_credential(&self) -> Result<Arc<AwsCredential>> {
-        self.cache
-            .get_or_insert_with(|| {
-                web_identity(
-                    &self.client,
-                    &self.retry_config,
-                    &self.token,
-                    &self.role_arn,
-                    &self.session_name,
-                    &self.endpoint,
-                )
-                .map_err(|source| crate::Error::Generic {
-                    store: "S3",
-                    source,
-                })
+impl CredentialProvider for WebIdentityProvider {
+    fn get_credential(&self) -> BoxFuture<'_, Result<Arc<AwsCredential>>> {
+        Box::pin(self.cache.get_or_insert_with(|| {
+            web_identity(
+                &self.client,
+                &self.retry_config,
+                &self.token,
+                &self.role_arn,
+                &self.session_name,
+                &self.endpoint,
+            )
+            .map_err(|source| crate::Error::Generic {
+                store: "S3",
+                source,
             })
-            .await
+        }))
     }
 }
 
@@ -520,6 +510,74 @@ async fn web_identity(
     })
 }
 
+#[cfg(feature = "aws_profile")]
+mod profile {
+    use super::*;
+    use aws_config::profile::ProfileFileCredentialsProvider;
+    use aws_config::provider_config::ProviderConfig;
+    use aws_types::credentials::ProvideCredentials;
+    use aws_types::region::Region;
+    use std::time::SystemTime;
+
+    #[derive(Debug)]
+    pub struct ProfileProvider {
+        cache: TokenCache<Arc<AwsCredential>>,
+        credentials: ProfileFileCredentialsProvider,
+    }
+
+    impl ProfileProvider {
+        pub fn new(name: String, region: String) -> Self {
+            let config = ProviderConfig::default().with_region(Some(Region::new(region)));
+
+            Self {
+                cache: Default::default(),
+                credentials: ProfileFileCredentialsProvider::builder()
+                    .configure(&config)
+                    .profile_name(name)
+                    .build(),
+            }
+        }
+    }
+
+    impl CredentialProvider for ProfileProvider {
+        fn get_credential(&self) -> BoxFuture<'_, Result<Arc<AwsCredential>>> {
+            Box::pin(self.cache.get_or_insert_with(move || async move {
+                let c =
+                    self.credentials
+                        .provide_credentials()
+                        .await
+                        .map_err(|source| crate::Error::Generic {
+                            store: "S3",
+                            source: Box::new(source),
+                        })?;
+
+                let t_now = SystemTime::now();
+                let expiry = match c.expiry().and_then(|e| e.duration_since(t_now).ok()) {
+                    Some(ttl) => Instant::now() + ttl,
+                    None => {
+                        return Err(crate::Error::Generic {
+                            store: "S3",
+                            source: "Invalid expiry".into(),
+                        })
+                    }
+                };
+
+                Ok(TemporaryToken {
+                    token: Arc::new(AwsCredential {
+                        key_id: c.access_key_id().to_string(),
+                        secret_key: c.secret_access_key().to_string(),
+                        token: c.session_token().map(ToString::to_string),
+                    }),
+                    expiry,
+                })
+            }))
+        }
+    }
+}
+
+#[cfg(feature = "aws_profile")]
+pub use profile::ProfileProvider;
+
 #[cfg(test)]
 mod tests {
     use super::*;
diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs
index c08a6353f..4a810658c 100644
--- a/object_store/src/aws/mod.rs
+++ b/object_store/src/aws/mod.rs
@@ -109,6 +109,9 @@ enum Error {
     #[snafu(display("Missing SecretAccessKey"))]
     MissingSecretAccessKey,
 
+    #[snafu(display("Profile support requires aws_profile feature"))]
+    MissingProfileFeature,
+
     #[snafu(display("ETag Header missing from response"))]
     MissingEtag,
 
@@ -359,6 +362,7 @@ pub struct AmazonS3Builder {
     imdsv1_fallback: bool,
     virtual_hosted_style_request: bool,
     metadata_endpoint: Option<String>,
+    profile: Option<String>,
 }
 
 impl AmazonS3Builder {
@@ -370,13 +374,14 @@ impl AmazonS3Builder {
     /// Fill the [`AmazonS3Builder`] with regular AWS environment variables
     ///
     /// Variables extracted from environment:
-    /// * AWS_ACCESS_KEY_ID -> access_key_id
-    /// * AWS_SECRET_ACCESS_KEY -> secret_access_key
-    /// * AWS_DEFAULT_REGION -> region
-    /// * AWS_ENDPOINT -> endpoint
-    /// * AWS_SESSION_TOKEN -> token
-    /// * AWS_CONTAINER_CREDENTIALS_RELATIVE_URI -> <https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-iam-roles.html>
-    /// * AWS_ALLOW_HTTP -> set to "true" to permit HTTP connections without TLS
+    /// * `AWS_ACCESS_KEY_ID` -> access_key_id
+    /// * `AWS_SECRET_ACCESS_KEY` -> secret_access_key
+    /// * `AWS_DEFAULT_REGION` -> region
+    /// * `AWS_ENDPOINT` -> endpoint
+    /// * `AWS_SESSION_TOKEN` -> token
+    /// * `AWS_CONTAINER_CREDENTIALS_RELATIVE_URI` -> <https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-iam-roles.html>
+    /// * `AWS_ALLOW_HTTP` -> set to "true" to permit HTTP connections without TLS
+    /// * `AWS_PROFILE` -> set profile name, requires `aws_profile` feature enabled
     /// # Example
     /// ```
     /// use object_store::aws::AmazonS3Builder;
@@ -408,6 +413,10 @@ impl AmazonS3Builder {
             builder.token = Some(token);
         }
 
+        if let Ok(profile) = std::env::var("AWS_PROFILE") {
+            builder.profile = Some(profile);
+        }
+
         // This env var is set in ECS
         // https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-iam-roles.html
         if let Ok(metadata_relative_uri) =
@@ -528,6 +537,24 @@ impl AmazonS3Builder {
         self
     }
 
+    /// Set the AWS profile name, see <https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-profiles.html>
+    ///
+    /// This makes use of [aws-config] to provide credentials and therefore requires
+    /// the `aws-profile` feature to be enabled
+    ///
+    /// It is strongly encouraged that users instead make use of a credential manager
+    /// such as [aws-vault] not only to avoid the significant additional dependencies,
+    /// but also to avoid storing credentials in [plain text on disk]
+    ///
+    /// [aws-config]: https://docs.rs/aws-config
+    /// [aws-vault]: https://github.com/99designs/aws-vault
+    /// [plain text on disk]: https://99designs.com.au/blog/engineering/aws-vault/
+    #[cfg(feature = "aws_profile")]
+    pub fn with_profile(mut self, profile: impl Into<String>) -> Self {
+        self.profile = Some(profile.into());
+        self
+    }
+
     /// Create a [`AmazonS3`] instance from the provided values,
     /// consuming `self`.
     pub fn build(self) -> Result<AmazonS3> {
@@ -537,13 +564,13 @@ impl AmazonS3Builder {
         let credentials = match (self.access_key_id, self.secret_access_key, self.token) {
             (Some(key_id), Some(secret_key), token) => {
                 info!("Using Static credential provider");
-                CredentialProvider::Static(StaticCredentialProvider {
+                Box::new(StaticCredentialProvider {
                     credential: Arc::new(AwsCredential {
                         key_id,
                         secret_key,
                         token,
                     }),
-                })
+                }) as _
             }
             (None, Some(_), _) => return Err(Error::MissingAccessKeyId.into()),
             (Some(_), None, _) => return Err(Error::MissingSecretAccessKey.into()),
@@ -565,7 +592,7 @@ impl AmazonS3Builder {
                     // Disallow non-HTTPs requests
                     let client = Client::builder().https_only(true).build().unwrap();
 
-                    CredentialProvider::WebIdentity(WebIdentityProvider {
+                    Box::new(WebIdentityProvider {
                         cache: Default::default(),
                         token,
                         session_name,
@@ -573,24 +600,30 @@ impl AmazonS3Builder {
                         endpoint,
                         client,
                         retry_config: self.retry_config.clone(),
-                    })
-                }
-                _ => {
-                    info!("Using Instance credential provider");
-
-                    // The instance metadata endpoint is access over HTTP
-                    let client = Client::builder().https_only(false).build().unwrap();
-
-                    CredentialProvider::Instance(InstanceCredentialProvider {
-                        cache: Default::default(),
-                        client,
-                        retry_config: self.retry_config.clone(),
-                        imdsv1_fallback: self.imdsv1_fallback,
-                        metadata_endpoint: self
-                            .metadata_endpoint
-                            .unwrap_or_else(|| METADATA_ENDPOINT.into()),
-                    })
+                    }) as _
                 }
+                _ => match self.profile {
+                    Some(profile) => {
+                        info!("Using profile \"{}\" credential provider", profile);
+                        profile_credentials(profile, region.clone())?
+                    }
+                    None => {
+                        info!("Using Instance credential provider");
+
+                        // The instance metadata endpoint is access over HTTP
+                        let client = Client::builder().https_only(false).build().unwrap();
+
+                        Box::new(InstanceCredentialProvider {
+                            cache: Default::default(),
+                            client,
+                            retry_config: self.retry_config.clone(),
+                            imdsv1_fallback: self.imdsv1_fallback,
+                            metadata_endpoint: self
+                                .metadata_endpoint
+                                .unwrap_or_else(|| METADATA_ENDPOINT.into()),
+                        }) as _
+                    }
+                },
             },
         };
 
@@ -628,6 +661,22 @@ impl AmazonS3Builder {
     }
 }
 
+#[cfg(feature = "aws_profile")]
+fn profile_credentials(
+    profile: String,
+    region: String,
+) -> Result<Box<dyn CredentialProvider>> {
+    Ok(Box::new(credential::ProfileProvider::new(profile, region)))
+}
+
+#[cfg(not(feature = "aws_profile"))]
+fn profile_credentials(
+    _profile: String,
+    _region: String,
+) -> Result<Box<dyn CredentialProvider>> {
+    Err(Error::MissingProfileFeature.into())
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;