You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/06/03 16:34:27 UTC

[pulsar] branch master updated: [tiered-storage] Add support for AWS instance and role creds (#4433)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 176c901  [tiered-storage] Add support for AWS instance and role creds (#4433)
176c901 is described below

commit 176c901ace25c1039815ec7e81b80a1f27d2340c
Author: Addison Higham <ad...@gmail.com>
AuthorDate: Mon Jun 3 10:34:22 2019 -0600

    [tiered-storage] Add support for AWS instance and role creds (#4433)
    
    * Add support for AWS instance and role creds
    
    This commit makes changes to the tiered storage support for S3
    to allow for support of ec2 metadata instance credentials as well as
    additional config options for assuming a role to get credentials.
    
    This works by changing the way we provide credentials to use the
    funtional `Supplier` interface and for using the AWS specific
    `SessionCredentials` object for when we detect that the
    `CredentialProvider` is providing credentials that have a session token.
    
    * [tiered_storage] Tweak s3 credential handling to check on boot
    
    This changes the s3 handling slightly, instead of falling back to static
    credentials, we instead now fail if no s3 credentials can be found and
    change the unit tests to start a broker with s3 credentials.
    
    With the new Supplier API, we now fetch credentials on every request.
    Because of this, the failure and subsequent try/catch is costly and the
    integration tests were using this, which caused them to be significantly
    slower.
    
    Instead, we just check to see if we can fetch creds, and if we can't
    consider it an error condition to exit the app as it is unlikely in a
    production scenario to not have some credentials.
    
    * fix s3 test for missing creds
---
 site2/docs/cookbooks-tiered-storage.md             | 28 ++++++--
 site2/docs/reference-configuration.md              | 12 ++--
 .../integration/topologies/PulsarCluster.java      |  3 +
 tiered-storage/jcloud/pom.xml                      |  4 ++
 .../jcloud/TieredStorageConfigurationData.java     | 24 +++++++
 .../impl/BlobStoreManagedLedgerOffloader.java      | 57 ++++++++++------
 .../impl/BlobStoreManagedLedgerOffloaderTest.java  | 77 +++++++++++++++++++++-
 7 files changed, 172 insertions(+), 33 deletions(-)

diff --git a/site2/docs/cookbooks-tiered-storage.md b/site2/docs/cookbooks-tiered-storage.md
index 5d75679..7f8bc58 100644
--- a/site2/docs/cookbooks-tiered-storage.md
+++ b/site2/docs/cookbooks-tiered-storage.md
@@ -32,7 +32,7 @@ getting charged for incomplete uploads.
 
 ## Configuring the offload driver
 
-Offloading is configured in ```broker.conf```. 
+Offloading is configured in ```broker.conf```.
 
 At a minimum, the administrator must configure the driver, the bucket and the authenticating credentials.
 There is also some other knobs to configure, like the bucket region, the max block size in backed storage, etc.
@@ -82,7 +82,12 @@ but relies on the mechanisms supported by the
 
 Once you have created a set of credentials in the AWS IAM console, they can be configured in a number of ways.
 
-1. Set the environment variables **AWS_ACCESS_KEY_ID** and **AWS_SECRET_ACCESS_KEY** in ```conf/pulsar_env.sh```.
+1. Using ec2 instance metadata credentials
+
+If you are on AWS instance with an instance profile that provides credentials, Pulsar will use these credentials
+if no other mechanism is provided
+
+2. Set the environment variables **AWS_ACCESS_KEY_ID** and **AWS_SECRET_ACCESS_KEY** in ```conf/pulsar_env.sh```.
 
 ```bash
 export AWS_ACCESS_KEY_ID=ABC123456789
@@ -92,13 +97,13 @@ export AWS_SECRET_ACCESS_KEY=ded7db27a4558e2ea8bbf0bf37ae0e8521618f366c
 > \"export\" is important so that the variables are made available in the environment of spawned processes.
 
 
-2. Add the Java system properties *aws.accessKeyId* and *aws.secretKey* to **PULSAR_EXTRA_OPTS** in `conf/pulsar_env.sh`.
+3. Add the Java system properties *aws.accessKeyId* and *aws.secretKey* to **PULSAR_EXTRA_OPTS** in `conf/pulsar_env.sh`.
 
 ```bash
 PULSAR_EXTRA_OPTS="${PULSAR_EXTRA_OPTS} ${PULSAR_MEM} ${PULSAR_GC} -Daws.accessKeyId=ABC123456789 -Daws.secretKey=ded7db27a4558e2ea8bbf0bf37ae0e8521618f366c -Dio.netty.leakDetectionLevel=disabled -Dio.netty.recycler.maxCapacity.default=1000 -Dio.netty.recycler.linkCapacity=1024"
 ```
 
-3. Set the access credentials in ```~/.aws/credentials```.
+4. Set the access credentials in ```~/.aws/credentials```.
 
 ```conf
 [default]
@@ -106,7 +111,16 @@ aws_access_key_id=ABC123456789
 aws_secret_access_key=ded7db27a4558e2ea8bbf0bf37ae0e8521618f366c
 ```
 
-If you are running in EC2 you can also use instance profile credentials, provided through the EC2 metadata service, but that is out of scope for this cookbook.
+5. Assuming an IAM role
+
+If you want to assume an IAM role, this can be done via specifying the following:
+
+```conf
+s3ManagedLedgerOffloadRole=<aws role arn>
+s3ManagedLedgerOffloadRoleSessionName=pulsar-s3-offload
+```
+
+This will use the `DefaultAWSCredentialsProviderChain` for assuming this role.
 
 > The broker must be rebooted for credentials specified in pulsar_env to take effect.
 
@@ -134,7 +148,7 @@ gcsManagedLedgerOffloadBucket=pulsar-topic-offload
 Bucket Region is the region where bucket located. Bucket Region is not a required but
 a recommended configuration. If it is not configured, It will use the default region.
 
-Regarding GCS, buckets are default created in the `us multi-regional location`, 
+Regarding GCS, buckets are default created in the `us multi-regional location`,
 page [Bucket Locations](https://cloud.google.com/storage/docs/bucket-locations) contains more information.
 
 ```conf
@@ -211,7 +225,7 @@ Offload was a success
 If there is an error offloading, the error will be propagated to the offload-status command.
 
 ```bash
-$ bin/pulsar-admin topics offload-status persistent://public/default/topic1                                                                                                       
+$ bin/pulsar-admin topics offload-status persistent://public/default/topic1
 Error in offload
 null
 
diff --git a/site2/docs/reference-configuration.md b/site2/docs/reference-configuration.md
index 4f040f2..4a85995 100644
--- a/site2/docs/reference-configuration.md
+++ b/site2/docs/reference-configuration.md
@@ -159,7 +159,7 @@ Pulsar brokers are responsible for handling incoming messages from producers, di
 |athenzDomainNames| Supported Athenz provider domain names(comma separated) for authentication  ||
 |bookkeeperClientAuthenticationPlugin|  Authentication plugin to use when connecting to bookies ||
 |bookkeeperClientAuthenticationParametersName|  BookKeeper auth plugin implementatation specifics parameters name and values  ||
-|bookkeeperClientAuthenticationParameters|||   
+|bookkeeperClientAuthenticationParameters|||
 |bookkeeperClientTimeoutInSeconds|  Timeout for BK add / read operations  |30|
 |bookkeeperClientSpeculativeReadTimeoutInMillis|  Speculative reads are initiated if a read request doesn’t complete within a certain time Using a value of 0, is disabling the speculative reads |0|
 |bookkeeperClientHealthCheckEnabled|  Enable bookies health check. Bookies that have more than the configured number of failure within the interval will be quarantined for some time. During this period, new ledgers won’t be created on these bookies  |true|
@@ -225,6 +225,8 @@ Pulsar brokers are responsible for handling incoming messages from producers, di
 |s3ManagedLedgerOffloadServiceEndpoint| For Amazon S3 ledger offload, Alternative endpoint to connect to (useful for testing) ||
 |s3ManagedLedgerOffloadMaxBlockSizeInBytes| For Amazon S3 ledger offload, Max block size in bytes. (64MB by default, 5MB minimum) |67108864|
 |s3ManagedLedgerOffloadReadBufferSizeInBytes| For Amazon S3 ledger offload, Read buffer size in bytes (1MB by default)  |1048576|
+|s3ManagedLedgerOffloadRole| For Amazon S3 ledger offload, provide a role to assume before writing to s3 ||
+|s3ManagedLedgerOffloadRoleSessionName| For Amazon S3 ledger offload, provide a role session name when using a role |pulsar-s3-offload|
 
 
 
@@ -240,7 +242,7 @@ The [`pulsar-client`](reference-cli-tools.md#pulsar-client) CLI tool can be used
 |authPlugin|  The authentication plugin.  ||
 |authParams|  The authentication parameters for the cluster, as a comma-separated string. ||
 |useTls|  Whether or not TLS authentication will be enforced in the cluster.  |false|
-|tlsAllowInsecureConnection|||    
+|tlsAllowInsecureConnection|||
 |tlsTrustCertsFilePath|||
 
 
@@ -335,7 +337,7 @@ The [`pulsar-client`](reference-cli-tools.md#pulsar-client) CLI tool can be used
 |authenticationEnabled| Enable authentication for the broker. |false|
 |authenticationProviders| A comma-separated list of class names for authentication providers. |false|
 |authorizationEnabled|  Enforce authorization in brokers. |false|
-|superUserRoles|  Role names that are treated as “superusers.” Superusers are authorized to perform all admin tasks. ||  
+|superUserRoles|  Role names that are treated as “superusers.” Superusers are authorized to perform all admin tasks. ||
 |brokerClientAuthenticationPlugin|  The authentication settings of the broker itself. Used when the broker connects to other brokers either in the same cluster or from other clusters. ||
 |brokerClientAuthenticationParameters|  The parameters that go along with the plugin specified using brokerClientAuthenticationPlugin.  ||
 |athenzDomainNames| Supported Athenz authentication provider domain names as a comma-separated list.  ||
@@ -351,7 +353,7 @@ The [`pulsar-client`](reference-cli-tools.md#pulsar-client) CLI tool can be used
 |bookkeeperClientRackawarePolicyEnabled|    |true|
 |bookkeeperClientRegionawarePolicyEnabled|    |false|
 |bookkeeperClientReorderReadSequenceEnabled|    |false|
-|bookkeeperClientIsolationGroups|||   
+|bookkeeperClientIsolationGroups|||
 |managedLedgerDefaultEnsembleSize|    |1|
 |managedLedgerDefaultWriteQuorum|   |1|
 |managedLedgerDefaultAckQuorum|   |1|
@@ -409,7 +411,7 @@ The [`pulsar-client`](reference-cli-tools.md#pulsar-client) CLI tool can be used
 |bindAddress||0.0.0.0|
 |clusterName |||
 |authenticationEnabled||false|
-|authenticationProviders|||   
+|authenticationProviders|||
 |authorizationEnabled||false|
 |superUserRoles |||
 |brokerClientAuthenticationPlugin|||
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
index 4a12346..d824880 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
@@ -157,6 +157,9 @@ public class PulsarCluster {
                         .withEnv("configurationStoreServers", CSContainer.NAME + ":" + CS_PORT)
                         .withEnv("clusterName", clusterName)
                         .withEnv("brokerServiceCompactionMonitorIntervalInSeconds", "1")
+                        // used in s3 tests
+                        .withEnv("AWS_ACCESS_KEY_ID", "accesskey")
+                        .withEnv("AWS_SECRET_KEY", "secretkey")
                 )
         );
 
diff --git a/tiered-storage/jcloud/pom.xml b/tiered-storage/jcloud/pom.xml
index 57fc36c..d635337 100644
--- a/tiered-storage/jcloud/pom.xml
+++ b/tiered-storage/jcloud/pom.xml
@@ -74,6 +74,10 @@
       <artifactId>aws-java-sdk-core</artifactId>
     </dependency>
     <dependency>
+      <groupId>com.amazonaws</groupId>
+      <artifactId>aws-java-sdk-sts</artifactId>
+    </dependency>
+    <dependency>
       <groupId>com.jamesmurty.utils</groupId>
       <artifactId>java-xmlbuilder</artifactId>
       <version>1.1</version>
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/TieredStorageConfigurationData.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/TieredStorageConfigurationData.java
index 52fedfd..a4c5cf4 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/TieredStorageConfigurationData.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/TieredStorageConfigurationData.java
@@ -20,6 +20,10 @@ package org.apache.bookkeeper.mledger.offload.jcloud;
 
 import static org.apache.pulsar.common.util.FieldParser.value;
 
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
+import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
+import com.google.common.base.Strings;
 import java.io.Serializable;
 import java.lang.reflect.Field;
 import java.util.Arrays;
@@ -54,6 +58,12 @@ public class TieredStorageConfigurationData implements Serializable, Cloneable {
     // For Amazon S3 ledger offload, Read buffer size in bytes.
     private int s3ManagedLedgerOffloadReadBufferSizeInBytes = 1024 * 1024; // 1MB
 
+    // For Amazon S3 ledger offload, provide a role to assume before writing to s3
+    private String s3ManagedLedgerOffloadRole = null;
+
+    // For Amazon S3 ledger offload, provide a role session name when using a role
+    private String s3ManagedLedgerOffloadRoleSessionName = "pulsar-s3-offload";
+
     // For Google Cloud Storage ledger offload, region where offload bucket is located.
     // reference this page for more details: https://cloud.google.com/storage/docs/bucket-locations
     private String gcsManagedLedgerOffloadRegion = null;
@@ -72,6 +82,20 @@ public class TieredStorageConfigurationData implements Serializable, Cloneable {
     private String gcsManagedLedgerOffloadServiceAccountKeyFile = null;
 
     /**
+     * Builds an AWS credential provider based on the offload options
+     * @return aws credential provider
+     */
+    public AWSCredentialsProvider getAWSCredentialProvider() {
+        if (Strings.isNullOrEmpty(this.getS3ManagedLedgerOffloadRole())) {
+            return DefaultAWSCredentialsProviderChain.getInstance();
+        } else {
+            String roleName = this.getS3ManagedLedgerOffloadRole();
+            String roleSessionName = this.getS3ManagedLedgerOffloadRoleSessionName();
+            return new STSAssumeRoleSessionCredentialsProvider.Builder(roleName, roleSessionName).build();
+        }
+    }
+
+    /**
      * Create a tiered storage configuration from the provided <tt>properties</tt>.
      *
      * @param properties the configuration properties
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
index 96409b2..38fe880 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
@@ -19,7 +19,8 @@
 package org.apache.bookkeeper.mledger.offload.jcloud.impl;
 
 import com.amazonaws.auth.AWSCredentials;
-import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.AWSSessionCredentials;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
@@ -47,8 +48,10 @@ import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock;
 import org.apache.bookkeeper.mledger.offload.jcloud.TieredStorageConfigurationData;
 import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockBuilder;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.jcloud.shade.com.google.common.base.Supplier;
 import org.jclouds.Constants;
 import org.jclouds.ContextBuilder;
+import org.jclouds.aws.domain.SessionCredentials;
 import org.jclouds.aws.s3.AWSS3ProviderMetadata;
 import org.jclouds.blobstore.BlobStore;
 import org.jclouds.blobstore.BlobStoreContext;
@@ -113,7 +116,7 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
     private static Pair<BlobStoreLocation, BlobStore> createBlobStore(String driver,
                                                                       String region,
                                                                       String endpoint,
-                                                                      Credentials credentials,
+                                                                      Supplier<Credentials> credentials,
                                                                       int maxBlockSize) {
         Properties overrides = new Properties();
         // This property controls the number of parts being uploaded in parallel.
@@ -127,7 +130,7 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
         ProviderRegistry.registerProvider(new GoogleCloudStorageProviderMetadata());
 
         ContextBuilder contextBuilder = ContextBuilder.newBuilder(driver);
-        contextBuilder.credentials(credentials.identity, credentials.credential);
+        contextBuilder.credentialsSupplier(credentials);
 
         if (isS3Driver(driver) && !Strings.isNullOrEmpty(endpoint)) {
             contextBuilder.endpoint(endpoint);
@@ -162,7 +165,7 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
     // the endpoint
     private final String writeEndpoint;
     // credentials
-    private final Credentials credentials;
+    private final Supplier<Credentials> credentials;
 
     // max block size for each data block.
     private int maxBlockSize;
@@ -223,13 +226,13 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
                 "ManagedLedgerOffloadMaxBlockSizeInBytes cannot be less than 5MB for s3 and gcs offload");
         }
 
-        Credentials credentials = getCredentials(driver, conf);
+        Supplier<Credentials> credentials = getCredentials(driver, conf);
 
         return new BlobStoreManagedLedgerOffloader(driver, bucket, scheduler,
             maxBlockSize, readBufferSize, endpoint, region, credentials, userMetadata);
     }
 
-    public static Credentials getCredentials(String driver, TieredStorageConfigurationData conf) throws IOException {
+    public static Supplier<Credentials> getCredentials(String driver, TieredStorageConfigurationData conf) throws IOException {
         // credentials:
         //   for s3, get by DefaultAWSCredentialsProviderChain.
         //   for gcs, use downloaded file 'google_creds.json', which contains service account key by
@@ -243,28 +246,42 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
             }
             try {
                 String gcsKeyContent = Files.toString(new File(gcsKeyPath), Charset.defaultCharset());
-                return new GoogleCredentialsFromJson(gcsKeyContent).get();
+                return () -> new GoogleCredentialsFromJson(gcsKeyContent).get();
             } catch (IOException ioe) {
                 log.error("Cannot read GCS service account credentials file: {}", gcsKeyPath);
                 throw new IOException(ioe);
             }
         } else if (isS3Driver(driver)) {
-            AWSCredentials credentials = null;
+            AWSCredentialsProvider credsChain = conf.getAWSCredentialProvider();
+            // try and get creds before starting... if we can't fetch
+            // creds on boot, we want to fail
             try {
-                DefaultAWSCredentialsProviderChain creds = DefaultAWSCredentialsProviderChain.getInstance();
-                credentials = creds.getCredentials();
+                credsChain.getCredentials();
             } catch (Exception e) {
                 // allowed, some mock s3 service not need credential
-                log.warn("Exception when get credentials for s3 ", e);
+                log.error("unable to fetch S3 credentials for offloading, failing", e);
+                throw e;
             }
 
-            String id = "accesskey";
-            String key = "secretkey";
-            if (credentials != null) {
-                id = credentials.getAWSAccessKeyId();
-                key = credentials.getAWSSecretKey();
-            }
-            return new Credentials(id, key);
+            return () -> {
+                AWSCredentials creds = credsChain.getCredentials();
+                if (creds == null) {
+                    // we don't expect this to happen, as we
+                    // successfully fetched creds on boot
+                    throw new RuntimeException("Unable to fetch S3 credentials after start, unexpected!");
+                }
+                // if we have session credentials, we need to send the session token
+                // this allows us to support EC2 metadata credentials
+                if (creds instanceof AWSSessionCredentials) {
+                    return SessionCredentials.builder()
+                            .accessKeyId(creds.getAWSAccessKeyId())
+                            .secretAccessKey(creds.getAWSSecretKey())
+                            .sessionToken(((AWSSessionCredentials) creds).getSessionToken())
+                            .build();
+                } else {
+                    return new Credentials(creds.getAWSAccessKeyId(), creds.getAWSSecretKey());
+                }
+            };
         } else {
             throw new IOException(
                 "Not support this kind of driver: " + driver);
@@ -274,13 +291,13 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
 
     // build context for jclouds BlobStoreContext
     BlobStoreManagedLedgerOffloader(String driver, String container, OrderedScheduler scheduler,
-                                    int maxBlockSize, int readBufferSize, String endpoint, String region, Credentials credentials) {
+                                    int maxBlockSize, int readBufferSize, String endpoint, String region, Supplier<Credentials> credentials) {
         this(driver, container, scheduler, maxBlockSize, readBufferSize, endpoint, region, credentials, Maps.newHashMap());
     }
 
     BlobStoreManagedLedgerOffloader(String driver, String container, OrderedScheduler scheduler,
                                     int maxBlockSize, int readBufferSize,
-                                    String endpoint, String region, Credentials credentials,
+                                    String endpoint, String region, Supplier<Credentials> credentials,
                                     Map<String, String> userMetadata) {
         this.offloadDriverName = driver;
         this.scheduler = scheduler;
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
index 7f80397..07308ff 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
@@ -25,6 +25,9 @@ import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Mockito.mock;
 
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.AWSSessionCredentials;
 import com.google.common.util.concurrent.MoreExecutors;
 import java.io.File;
 import java.io.IOException;
@@ -52,11 +55,14 @@ import org.apache.bookkeeper.mledger.LedgerOffloader;
 import org.apache.bookkeeper.mledger.offload.jcloud.BlobStoreTestBase;
 import org.apache.bookkeeper.mledger.offload.jcloud.TieredStorageConfigurationData;
 import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.pulsar.jcloud.shade.com.google.common.base.Supplier;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.MockZooKeeper;
 import org.apache.zookeeper.data.ACL;
+import org.jclouds.aws.domain.SessionCredentials;
 import org.jclouds.blobstore.BlobStore;
 import org.jclouds.blobstore.options.CopyOptions;
+import org.jclouds.domain.Credentials;
 import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -241,7 +247,37 @@ class BlobStoreManagedLedgerOffloaderTest extends BlobStoreTestBase {
 
     @Test
     public void testS3DriverConfiguredWell() throws Exception {
-        TieredStorageConfigurationData conf = new TieredStorageConfigurationData();
+        TieredStorageConfigurationData conf = new TieredStorageConfigurationData() {
+            @Override
+            public AWSCredentialsProvider getAWSCredentialProvider() {
+                return new AWSCredentialsProvider() {
+                    @Override
+                    public AWSCredentials getCredentials() {
+                        return new AWSSessionCredentials() {
+                            @Override
+                            public String getSessionToken() {
+                                return "token";
+                            }
+
+                            @Override
+                            public String getAWSAccessKeyId() {
+                                return "access";
+                            }
+
+                            @Override
+                            public String getAWSSecretKey() {
+                                return "secret";
+                            }
+                        };
+                    }
+
+                    @Override
+                    public void refresh() {
+
+                    }
+                };
+            }
+        };
         conf.setManagedLedgerOffloadDriver("s3");
         conf.setS3ManagedLedgerOffloadBucket(BUCKET);
         conf.setS3ManagedLedgerOffloadServiceEndpoint("http://fake.s3.end.point");
@@ -605,5 +641,44 @@ class BlobStoreManagedLedgerOffloaderTest extends BlobStoreTestBase {
             Assert.assertTrue(e.getCause().getMessage().contains("Invalid object version"));
         }
     }
+
+    @Test
+    public void testSessionCredentialSupplier() throws Exception {
+        TieredStorageConfigurationData mock = mock(TieredStorageConfigurationData.class);
+        Mockito.when(mock.getAWSCredentialProvider()).thenReturn(new AWSCredentialsProvider() {
+            @Override
+            public AWSCredentials getCredentials() {
+                return new AWSSessionCredentials() {
+                    @Override
+                    public String getSessionToken() {
+                        return "token";
+                    }
+
+                    @Override
+                    public String getAWSAccessKeyId() {
+                        return "access";
+                    }
+
+                    @Override
+                    public String getAWSSecretKey() {
+                        return "secret";
+                    }
+                };
+            }
+
+            @Override
+            public void refresh() {
+
+            }
+        });
+
+        Supplier<Credentials> creds = BlobStoreManagedLedgerOffloader.getCredentials("aws-s3", mock);
+
+        Assert.assertTrue(creds.get() instanceof SessionCredentials);
+        SessionCredentials sessCreds = (SessionCredentials) creds.get();
+        Assert.assertEquals(sessCreds.getAccessKeyId(), "access");
+        Assert.assertEquals(sessCreds.getSecretAccessKey(), "secret");
+        Assert.assertEquals(sessCreds.getSessionToken(), "token");
+    }
 }