You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/02/04 02:44:43 UTC

[pulsar] 05/09: [tiered-storage] Allow AWS credentials to be refreshed (#9387)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 732c8393ecc92a0ea5d8eb7d4870116132f29441
Author: Addison Higham <ad...@gmail.com>
AuthorDate: Mon Feb 1 11:41:49 2021 -0700

    [tiered-storage] Allow AWS credentials to be refreshed (#9387)
    
    With the refactor of support azure, a regression occured where the AWS
    credentials were fetched once and then used through the entire process.
    
    This is a problem in AWS, where it is commonplace to use credentials
    that expire.
    
    The AWS credential provider chain takes care of this
    problem, but when intgrating with JClouds, that means we need the
    credential Supplier to return a new set of credentials each time.
    
    Luckily, AWS should intelligently cache this so we aren't thrashing the
    underlying credential mechanisms.
    
    This also adds a test to ensure this isn't broken in the future, it does a simple validation to ensure that the underlying credentials can change via AWS SystemPropertyCredentialProvider
    
    (cherry picked from commit 562b2e76df37113edb41bb4393afd4b6aaa2dc21)
---
 .../jcloud/provider/JCloudBlobStoreProvider.java   | 50 ++++++++++++--------
 .../provider/TieredStorageConfigurationTests.java  | 54 +++++++++++++---------
 2 files changed, 62 insertions(+), 42 deletions(-)

diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java
index 9d0871e..5c5f621 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java
@@ -23,6 +23,7 @@ import static org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorag
 import static org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration.S3_ROLE_SESSION_NAME_FIELD;
 
 import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.auth.AWSSessionCredentials;
 import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
 import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
@@ -279,37 +280,46 @@ public enum JCloudBlobStoreProvider implements Serializable, ConfigValidation, B
 
     static final CredentialBuilder AWS_CREDENTIAL_BUILDER = (TieredStorageConfiguration config) -> {
         if (config.getCredentials() == null) {
-            AWSCredentials awsCredentials = null;
+            final AWSCredentialsProvider authChain;
             try {
                 if (Strings.isNullOrEmpty(config.getConfigProperty(S3_ROLE_FIELD))) {
-                    awsCredentials = DefaultAWSCredentialsProviderChain.getInstance().getCredentials();
+                    authChain = DefaultAWSCredentialsProviderChain.getInstance();
                 } else {
-                    awsCredentials =
+                    authChain =
                             new STSAssumeRoleSessionCredentialsProvider.Builder(
                                     config.getConfigProperty(S3_ROLE_FIELD),
                                     config.getConfigProperty(S3_ROLE_SESSION_NAME_FIELD)
-                            ).build().getCredentials();
-                }
-
-                if (awsCredentials instanceof AWSSessionCredentials) {
-                    // if we have session credentials, we need to send the session token
-                    // this allows us to support EC2 metadata credentials
-                    SessionCredentials sessionCredentials =  SessionCredentials.builder()
-                            .accessKeyId(awsCredentials.getAWSAccessKeyId())
-                            .secretAccessKey(awsCredentials.getAWSSecretKey())
-                            .sessionToken(((AWSSessionCredentials) awsCredentials).getSessionToken())
-                            .build();
-                    config.setProviderCredentials(() -> sessionCredentials);
-                } else {
-                    Credentials credentials = new Credentials(
-                            awsCredentials.getAWSAccessKeyId(), awsCredentials.getAWSSecretKey());
-                    config.setProviderCredentials(() -> credentials);
+                            ).build();
                 }
 
+                // Important! Delay the building of actual credentials
+                // until later to support tokens that may be refreshed
+                // such as all session tokens
+                config.setProviderCredentials(() -> {
+                    AWSCredentials newCreds = authChain.getCredentials();
+                    Credentials jcloudCred = null;
+
+                    if (newCreds instanceof AWSSessionCredentials) {
+                        // if we have session credentials, we need to send the session token
+                        // this allows us to support EC2 metadata credentials
+                        jcloudCred = SessionCredentials.builder()
+                                .accessKeyId(newCreds.getAWSAccessKeyId())
+                                .secretAccessKey(newCreds.getAWSSecretKey())
+                                .sessionToken(((AWSSessionCredentials) newCreds).getSessionToken())
+                                .build();
+                    } else {
+                        // in the event we hit this branch, we likely don't have expiring
+                        // credentials, however, this still allows for the user to update
+                        // profiles creds or some other mechanism
+                        jcloudCred = new Credentials(
+                                newCreds.getAWSAccessKeyId(), newCreds.getAWSSecretKey());
+                    }
+                    return jcloudCred;
+                });
             } catch (Exception e) {
                 // allowed, some mock s3 service do not need credential
                 log.warn("Exception when get credentials for s3 ", e);
             }
         }
     };
-}
\ No newline at end of file
+}
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfigurationTests.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfigurationTests.java
index 41e6255..3a0c38c 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfigurationTests.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfigurationTests.java
@@ -23,7 +23,9 @@ import static org.testng.Assert.assertEquals;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.pulsar.jcloud.shade.com.google.common.base.Supplier;
 
+import org.jclouds.domain.Credentials;
 import org.testng.annotations.Test;
 
 public class TieredStorageConfigurationTests {
@@ -113,7 +115,36 @@ public class TieredStorageConfigurationTests {
         assertEquals(config.getReadBufferSizeInBytes(), new Integer(500));
         assertEquals(config.getServiceEndpoint(), "http://some-url:9093");
     }
-    
+
+    /**
+     * Confirm that with AWS we create different instances of the credentials
+     * object each time we call the supplier, this ensure that we get fresh credentials
+     * if the aws credential provider changes
+     */
+    @Test
+    public final void awsS3CredsProviderTest() {
+        Map<String, String> map = new HashMap<>();
+        map.put(TieredStorageConfiguration.BLOB_STORE_PROVIDER_KEY, JCloudBlobStoreProvider.AWS_S3.getDriver());
+        TieredStorageConfiguration config = new TieredStorageConfiguration(map);
+
+        // set the aws properties with fake creds so the defaultProviderChain works
+        System.setProperty("aws.accessKeyId", "fakeid1");
+        System.setProperty("aws.secretKey", "fakekey1");
+        Credentials creds1 = config.getProviderCredentials().get();
+        assertEquals(creds1.identity, "fakeid1");
+        assertEquals(creds1.credential, "fakekey1");
+
+        // reset the properties and ensure we get different values by re-evaluating the chain
+        System.setProperty("aws.accessKeyId", "fakeid2");
+        System.setProperty("aws.secretKey", "fakekey2");
+        Credentials creds2 = config.getProviderCredentials().get();
+        assertEquals(creds2.identity, "fakeid2");
+        assertEquals(creds2.credential, "fakekey2");
+
+        System.clearProperty("aws.accessKeyId");
+        System.clearProperty("aws.secretKey");
+    }
+
     /**
      * Confirm that both property options are available for GCS
      */
@@ -177,25 +208,4 @@ public class TieredStorageConfigurationTests {
         assertEquals(config.getMaxBlockSizeInBytes(), new Integer(12));
         assertEquals(config.getReadBufferSizeInBytes(), new Integer(500));
     }
-    
-    /**
-     * Confirm that we can configure AWS using the old properties
-     */
-    @Test
-    public final void s3BackwardCompatiblePropertiesTest() {
-        Map<String, String> map = new HashMap<String,String>(); 
-        map.put(TieredStorageConfiguration.BLOB_STORE_PROVIDER_KEY, JCloudBlobStoreProvider.AWS_S3.getDriver());
-        map.put(BC_S3_BUCKET, "test bucket");
-        map.put(BC_S3_ENDPOINT, "http://some-url:9093");
-        map.put(BC_S3_MAX_BLOCK_SIZE, "12");
-        map.put(BC_S3_READ_BUFFER_SIZE, "500");
-        map.put(BC_S3_REGION, "test region");
-        TieredStorageConfiguration config = new TieredStorageConfiguration(map);
-        
-        assertEquals(config.getRegion(), "test region");
-        assertEquals(config.getBucket(), "test bucket");
-        assertEquals(config.getMaxBlockSizeInBytes(), new Integer(12));
-        assertEquals(config.getReadBufferSizeInBytes(), new Integer(500));
-        assertEquals(config.getServiceEndpoint(), "http://some-url:9093");
-    }
 }