You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/05/06 21:13:23 UTC

[pulsar] branch master updated: [Issue 8894][Offloader] Fix AWS credentials usages (#8950)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 0355a63  [Issue 8894][Offloader] Fix AWS credentials usages (#8950)
0355a63 is described below

commit 0355a63731299655acb4dce4f123074b608bdd81
Author: Alexandre DUVAL <ka...@gmail.com>
AuthorDate: Thu May 6 23:12:56 2021 +0200

    [Issue 8894][Offloader] Fix AWS credentials usages (#8950)
    
    * support both STS and default aws creds
    
    * checkstyle
    
    * use provider
    
    * pass creds to properties
    
    * add tests
---
 .../mledger/impl/OffloadPrefixReadTest.java        |  1 +
 .../bookkeeper/mledger/impl/OffloadPrefixTest.java |  1 +
 .../pulsar/broker/admin/AdminApiOffloadTest.java   |  2 +-
 .../apache/pulsar/broker/admin/NamespacesTest.java |  4 +++
 .../pulsar/admin/cli/PulsarAdminToolTest.java      | 30 ++++++++++++++--------
 .../org/apache/pulsar/admin/cli/CmdNamespaces.java | 24 +++++++++++++----
 .../org/apache/pulsar/admin/cli/CmdTopics.java     | 29 +++++++++++++++------
 .../common/policies/data/OffloadPolicies.java      | 23 ++++++++++++++---
 .../common/policies/data/OffloadPoliciesTest.java  | 17 ++++++++++++
 .../pulsar/sql/presto/TestPulsarSplitManager.java  |  2 ++
 site2/docs/reference-pulsar-admin.md               |  3 ++-
 .../jcloud/provider/JCloudBlobStoreProvider.java   | 23 ++++++++++++++++-
 .../provider/TieredStorageConfiguration.java       |  2 ++
 13 files changed, 131 insertions(+), 30 deletions(-)

diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
index a3d2283..f3c1583 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
@@ -198,6 +198,7 @@ public class OffloadPrefixReadTest extends MockedBookKeeperTestCase {
 
         OffloadPolicies offloadPolicies = OffloadPolicies.create("S3", "", "", "",
                 null, null,
+                null, null,
                 OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
                 OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
                 OffloadPolicies.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES,
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
index 740a4ab..d61da5f 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
@@ -994,6 +994,7 @@ public class OffloadPrefixTest extends MockedBookKeeperTestCase {
 
         OffloadPolicies offloadPolicies = OffloadPolicies.create("S3", "", "", "",
                 null, null,
+                null, null,
                 OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
                 OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
                 OffloadPolicies.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
index 9bbb27c..480153d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
@@ -170,7 +170,7 @@ public class AdminApiOffloadTest extends MockedPulsarServiceBaseTest {
         OffloadPolicies.OffloadedReadPriority priority = OffloadPolicies.OffloadedReadPriority.TIERED_STORAGE_FIRST;
 
         OffloadPolicies offload1 = OffloadPolicies.create(
-                driver, region, bucket, endpoint, null, null,
+                driver, region, bucket, endpoint, null, null, null, null,
                 100, 100, offloadThresholdInBytes, offloadDeletionLagInMillis, priority);
         admin.namespaces().setOffloadPolicies(namespaceName, offload1);
         OffloadPolicies offload2 = admin.namespaces().getOffloadPolicies(namespaceName);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index 1d04934..bc6b2de 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -1335,6 +1335,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
         ManagedLedgerConfig ledgerConf = pulsar.getBrokerService().getManagedLedgerConfig(topicName).get();
         MockLedgerOffloader offloader = new MockLedgerOffloader(OffloadPolicies.create("S3", "", "", "",
                 null, null,
+                null, null,
                 OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
                 OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
                 admin.namespaces().getOffloadThreshold(namespace),
@@ -1351,6 +1352,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
         admin.namespaces().getOffloadPolicies(namespace);
         offloader = new MockLedgerOffloader(OffloadPolicies.create("S3", "", "", "",
                 null, null,
+                null, null,
                 OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
                 OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
                 admin.namespaces().getOffloadThreshold(namespace),
@@ -1366,6 +1368,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
         ledgerConf = pulsar.getBrokerService().getManagedLedgerConfig(topicName).get();
         offloader = new MockLedgerOffloader(OffloadPolicies.create("S3", "", "", "",
                 null, null,
+                null, null,
                 OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
                 OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
                 admin.namespaces().getOffloadThreshold(namespace),
@@ -1381,6 +1384,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
         ledgerConf = pulsar.getBrokerService().getManagedLedgerConfig(topicName).get();
         offloader = new MockLedgerOffloader(OffloadPolicies.create("S3", "", "", "",
                 null, null,
+                null, null,
                 OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
                 OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
                 admin.namespaces().getOffloadThreshold(namespace),
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 2b1edd8..d368167 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -230,7 +230,8 @@ public class PulsarAdminToolTest {
         verify(mockClusters).deleteCluster("my-cluster");
 
         clusters.run(split("update-peer-clusters my-cluster --peer-clusters c1,c2"));
-        verify(mockClusters).updatePeerClusterNames("my-cluster", Sets.newLinkedHashSet(Lists.newArrayList("c1", "c2")));
+        verify(mockClusters).updatePeerClusterNames("my-cluster",
+                Sets.newLinkedHashSet(Lists.newArrayList("c1", "c2")));
 
         clusters.run(split("get-peer-clusters my-cluster"));
         verify(mockClusters).getPeerClusterNames("my-cluster");
@@ -390,10 +391,11 @@ public class PulsarAdminToolTest {
 
         namespaces.run(split("set-backlog-quota myprop/clust/ns1 -p producer_exception -l 10G"));
         verify(mockNamespaces).setBacklogQuota("myprop/clust/ns1",
-                new BacklogQuota(10l * 1024 * 1024 * 1024, RetentionPolicy.producer_exception));
+                new BacklogQuota(10L * 1024 * 1024 * 1024, RetentionPolicy.producer_exception));
 
         namespaces.run(split("set-persistence myprop/clust/ns1 -e 2 -w 1 -a 1 -r 100.0"));
-        verify(mockNamespaces).setPersistence("myprop/clust/ns1", new PersistencePolicies(2, 1, 1, 100.0d));
+        verify(mockNamespaces).setPersistence("myprop/clust/ns1",
+                new PersistencePolicies(2, 1, 1, 100.0d));
 
         namespaces.run(split("get-persistence myprop/clust/ns1"));
         verify(mockNamespaces).getPersistence("myprop/clust/ns1");
@@ -455,7 +457,8 @@ public class PulsarAdminToolTest {
 
 
         namespaces.run(split("set-retention myprop/clust/ns1 -t 1h -s 1M"));
-        verify(mockNamespaces).setRetention("myprop/clust/ns1", new RetentionPolicies(60, 1));
+        verify(mockNamespaces).setRetention("myprop/clust/ns1",
+                new RetentionPolicies(60, 1));
 
         namespaces.run(split("get-retention myprop/clust/ns1"));
         verify(mockNamespaces).getRetention("myprop/clust/ns1");
@@ -464,7 +467,8 @@ public class PulsarAdminToolTest {
         verify(mockNamespaces).removeRetention("myprop/clust/ns1");
 
         namespaces.run(split("set-delayed-delivery myprop/clust/ns1 -e -t 1s"));
-        verify(mockNamespaces).setDelayedDeliveryMessages("myprop/clust/ns1", new DelayedDeliveryPolicies(1000, true));
+        verify(mockNamespaces).setDelayedDeliveryMessages("myprop/clust/ns1",
+                new DelayedDeliveryPolicies(1000, true));
 
         namespaces.run(split("get-delayed-delivery myprop/clust/ns1"));
         verify(mockNamespaces).getDelayedDelivery("myprop/clust/ns1");
@@ -473,8 +477,9 @@ public class PulsarAdminToolTest {
         verify(mockNamespaces).removeDelayedDeliveryMessages("myprop/clust/ns1");
 
         namespaces.run(split("set-inactive-topic-policies myprop/clust/ns1 -e -t 1s -m delete_when_no_subscriptions"));
-        verify(mockNamespaces).setInactiveTopicPolicies("myprop/clust/ns1"
-                , new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1,true));
+        verify(mockNamespaces).setInactiveTopicPolicies("myprop/clust/ns1",
+                new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1,
+                        true));
 
         namespaces.run(split("get-inactive-topic-policies myprop/clust/ns1"));
         verify(mockNamespaces).getInactiveTopicPolicies("myprop/clust/ns1");
@@ -631,7 +636,7 @@ public class PulsarAdminToolTest {
                 "set-offload-policies myprop/clust/ns1 -r test-region -d aws-s3 -b test-bucket -e http://test.endpoint -mbs 32M -rbs 5M -oat 10M -oae 10s -orp tiered-storage-first"));
         verify(mockNamespaces).setOffloadPolicies("myprop/clust/ns1",
                 OffloadPolicies.create("aws-s3", "test-region", "test-bucket",
-                        "http://test.endpoint", null, null, 32 * 1024 * 1024, 5 * 1024 * 1024,
+                        "http://test.endpoint",null, null, null, null, 32 * 1024 * 1024, 5 * 1024 * 1024,
                         10 * 1024 * 1024L, 10000L, OffloadPolicies.OffloadedReadPriority.TIERED_STORAGE_FIRST));
 
         namespaces.run(split("remove-offload-policies myprop/clust/ns1"));
@@ -672,7 +677,8 @@ public class PulsarAdminToolTest {
 
         namespaces.run(split("create my-prop/my-cluster/my-namespace --bundles 5 --clusters a,b,c"));
         verify(mockNamespaces).createNamespace("my-prop/my-cluster/my-namespace", 5);
-        verify(mockNamespaces).setNamespaceReplicationClusters("my-prop/my-cluster/my-namespace", Sets.newHashSet("a", "b", "c"));
+        verify(mockNamespaces).setNamespaceReplicationClusters("my-prop/my-cluster/my-namespace",
+                Sets.newHashSet("a", "b", "c"));
     }
 
     @Test
@@ -944,7 +950,8 @@ public class PulsarAdminToolTest {
 
         cmdTopics.run(split("set-offload-policies persistent://myprop/clust/ns1/ds1 -d s3 -r region -b bucket -e endpoint -m 8 -rb 9 -t 10 -orp tiered-storage-first"));
         OffloadPolicies offloadPolicies = OffloadPolicies.create("s3", "region", "bucket"
-                , "endpoint", null, null, 8, 9, 10L, null, OffloadPolicies.OffloadedReadPriority.TIERED_STORAGE_FIRST);
+                , "endpoint", null, null, null, null,
+                8, 9, 10L, null, OffloadPolicies.OffloadedReadPriority.TIERED_STORAGE_FIRST);
         verify(mockTopics).setOffloadPolicies("persistent://myprop/clust/ns1/ds1", offloadPolicies);
 
         cmdTopics.run(split("get-max-unacked-messages-on-consumer persistent://myprop/clust/ns1/ds1"));
@@ -1048,7 +1055,8 @@ public class PulsarAdminToolTest {
         verify(mockTopics).getInactiveTopicPolicies("persistent://myprop/clust/ns1/ds1", false);
         cmdTopics.run(split("remove-inactive-topic-policies persistent://myprop/clust/ns1/ds1"));
         verify(mockTopics).removeInactiveTopicPolicies("persistent://myprop/clust/ns1/ds1");
-        cmdTopics.run(split("set-inactive-topic-policies persistent://myprop/clust/ns1/ds1 -e -t 1s -m delete_when_no_subscriptions"));
+        cmdTopics.run(split("set-inactive-topic-policies persistent://myprop/clust/ns1/ds1"
+                        + " -e -t 1s -m delete_when_no_subscriptions"));
         verify(mockTopics).setInactiveTopicPolicies("persistent://myprop/clust/ns1/ds1"
                 , new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1, true));
 
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 99c0033..42bc136 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -1863,8 +1863,8 @@ public class CmdNamespaces extends CmdBase {
 
         @Parameter(
                 names = {"--region", "-r"},
-                description = "The long term storage region, " +
-                        "default is s3ManagedLedgerOffloadRegion or gcsManagedLedgerOffloadRegion in broker.conf",
+                description = "The long term storage region, "
+                         + "default is s3ManagedLedgerOffloadRegion or gcsManagedLedgerOffloadRegion in broker.conf",
                 required = false)
         private String region;
 
@@ -1876,8 +1876,8 @@ public class CmdNamespaces extends CmdBase {
 
         @Parameter(
                 names = {"--endpoint", "-e"},
-                description = "Alternative endpoint to connect to, " +
-                        "s3 default is s3ManagedLedgerOffloadServiceEndpoint in broker.conf",
+                description = "Alternative endpoint to connect to, "
+                        + "s3 default is s3ManagedLedgerOffloadServiceEndpoint in broker.conf",
                 required = false)
         private String endpoint;
 
@@ -1894,6 +1894,18 @@ public class CmdNamespaces extends CmdBase {
         private String awsSecret;
 
         @Parameter(
+                names = {"--s3-role", "-ro"},
+                description = "S3 Role used for STSAssumeRoleSessionCredentialsProvider",
+                required = false)
+        private String s3Role;
+
+        @Parameter(
+                names = {"--s3-role-session-name", "-rsn"},
+                description = "S3 role session name used for STSAssumeRoleSessionCredentialsProvider",
+                required = false)
+        private String s3RoleSessionName;
+
+        @Parameter(
                 names = {"--maxBlockSize", "-mbs"},
                 description = "Max block size (eg: 32M, 64M), default is 64MB",
                 required = false)
@@ -2016,7 +2028,9 @@ public class CmdNamespaces extends CmdBase {
                 }
             }
 
-            OffloadPolicies offloadPolicies = OffloadPolicies.create(driver, region, bucket, endpoint, awsId, awsSecret,
+            OffloadPolicies offloadPolicies = OffloadPolicies.create(driver, region, bucket, endpoint,
+                    s3Role, s3RoleSessionName,
+                    awsId, awsSecret,
                     maxBlockSizeInBytes, readBufferSizeInBytes, offloadAfterThresholdInBytes,
                     offloadAfterElapsedInMillis, offloadedReadPriority);
 
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 92c3955..1b97de9 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -1470,12 +1470,22 @@ public class CmdTopics extends CmdBase {
                 , description = "AWS Credential Secret to use when using driver S3 or aws-s3")
         private String awsSecret;
 
-        @Parameter(names = {"-m", "--maxBlockSizeInBytes"}
-                , description = "ManagedLedger offload max block Size in bytes, s3 and google-cloud-storage requires this parameter")
+        @Parameter(names = {"--ro", "--s3-role"}
+                , description = "S3 Role used for STSAssumeRoleSessionCredentialsProvider")
+        private String s3Role;
+
+        @Parameter(names = {"--s3-role-session-name", "-rsn"}
+                , description = "S3 role session name used for STSAssumeRoleSessionCredentialsProvider")
+        private String s3RoleSessionName;
+
+        @Parameter(names = {"-m", "--maxBlockSizeInBytes"},
+                description = "ManagedLedger offload max block Size in bytes,"
+                + "s3 and google-cloud-storage requires this parameter")
         private int maxBlockSizeInBytes;
 
-        @Parameter(names = {"-rb", "--readBufferSizeInBytes"}
-                , description = "ManagedLedger offload read buffer size in bytes, s3 and google-cloud-storage requires this parameter")
+        @Parameter(names = {"-rb", "--readBufferSizeInBytes"},
+                description = "ManagedLedger offload read buffer size in bytes,"
+                + "s3 and google-cloud-storage requires this parameter")
         private int readBufferSizeInBytes;
 
         @Parameter(names = {"-t", "--offloadThresholdInBytes"}
@@ -1503,16 +1513,19 @@ public class CmdTopics extends CmdBase {
                 try {
                     offloadedReadPriority = OffloadedReadPriority.fromString(this.offloadReadPriorityStr);
                 } catch (Exception e) {
-                    throw new ParameterException("--offloadedReadPriority parameter must be one of " +
-                            Arrays.stream(OffloadedReadPriority.values())
+                    throw new ParameterException("--offloadedReadPriority parameter must be one of "
+                            + Arrays.stream(OffloadedReadPriority.values())
                                     .map(OffloadedReadPriority::toString)
                                     .collect(Collectors.joining(","))
                             + " but got: " + this.offloadReadPriorityStr, e);
                 }
             }
 
-            OffloadPolicies offloadPolicies = OffloadPolicies.create(driver, region, bucket, endpoint, awsId, awsSecret, maxBlockSizeInBytes
-                    , readBufferSizeInBytes, offloadThresholdInBytes, offloadDeletionLagInMillis, offloadedReadPriority);
+            OffloadPolicies offloadPolicies = OffloadPolicies.create(driver, region, bucket, endpoint,
+                    s3Role, s3RoleSessionName,
+                    awsId, awsSecret,
+                    maxBlockSizeInBytes,
+                    readBufferSizeInBytes, offloadThresholdInBytes, offloadDeletionLagInMillis, offloadedReadPriority);
 
             getTopics().setOffloadPolicies(persistentTopic, offloadPolicies);
         }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
index 4417f21..12172f1 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
@@ -168,6 +168,12 @@ public class OffloadPolicies implements Serializable {
     // s3 config, set by service configuration
     @Configuration
     @JsonProperty(access = JsonProperty.Access.READ_WRITE)
+    private String s3ManagedLedgerOffloadCredentialId = null;
+    @Configuration
+    @JsonProperty(access = JsonProperty.Access.READ_WRITE)
+    private String s3ManagedLedgerOffloadCredentialSecret = null;
+    @Configuration
+    @JsonProperty(access = JsonProperty.Access.READ_WRITE)
     private String s3ManagedLedgerOffloadRole = null;
     @Configuration
     @JsonProperty(access = JsonProperty.Access.READ_WRITE)
@@ -218,6 +224,7 @@ public class OffloadPolicies implements Serializable {
     private Integer managedLedgerOffloadReadBufferSizeInBytes;
 
     public static OffloadPolicies create(String driver, String region, String bucket, String endpoint,
+                                         String role, String roleSessionName,
                                          String credentialId, String credentialSecret,
                                          Integer maxBlockSizeInBytes, Integer readBufferSizeInBytes,
                                          Long offloadThresholdInBytes, Long offloadDeletionLagInMillis,
@@ -234,12 +241,18 @@ public class OffloadPolicies implements Serializable {
         offloadPolicies.setManagedLedgerOffloadReadBufferSizeInBytes(readBufferSizeInBytes);
         offloadPolicies.setManagedLedgerOffloadedReadPriority(readPriority);
 
-        if (driver.equalsIgnoreCase(DRIVER_NAMES.get(0)) || driver.equalsIgnoreCase(DRIVER_NAMES.get(1))) {
+            if (driver.equalsIgnoreCase(DRIVER_NAMES.get(0)) || driver.equalsIgnoreCase(DRIVER_NAMES.get(1))) {
+            if (role != null) {
+                offloadPolicies.setS3ManagedLedgerOffloadRole(role);
+            }
+            if (roleSessionName != null) {
+                offloadPolicies.setS3ManagedLedgerOffloadRoleSessionName(roleSessionName);
+            }
             if (credentialId != null) {
-                offloadPolicies.setS3ManagedLedgerOffloadRole(credentialId);
+                offloadPolicies.setS3ManagedLedgerOffloadCredentialId(credentialId);
             }
             if (credentialSecret != null) {
-                offloadPolicies.setS3ManagedLedgerOffloadRoleSessionName(credentialSecret);
+                offloadPolicies.setS3ManagedLedgerOffloadCredentialSecret(credentialSecret);
             }
             offloadPolicies.setS3ManagedLedgerOffloadRegion(region);
             offloadPolicies.setS3ManagedLedgerOffloadBucket(bucket);
@@ -467,6 +480,10 @@ public class OffloadPolicies implements Serializable {
                     this.getS3ManagedLedgerOffloadServiceEndpoint());
             setProperty(properties, "s3ManagedLedgerOffloadMaxBlockSizeInBytes",
                     this.getS3ManagedLedgerOffloadMaxBlockSizeInBytes());
+            setProperty(properties, "s3ManagedLedgerOffloadCredentialId",
+                    this.getS3ManagedLedgerOffloadCredentialId());
+            setProperty(properties, "s3ManagedLedgerOffloadCredentialSecret",
+                    this.getS3ManagedLedgerOffloadCredentialSecret());
             setProperty(properties, "s3ManagedLedgerOffloadRole",
                     this.getS3ManagedLedgerOffloadRole());
             setProperty(properties, "s3ManagedLedgerOffloadRoleSessionName",
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
index 96b1f48..c4aacc1 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
@@ -42,6 +42,8 @@ public class OffloadPoliciesTest {
         final String driver = "aws-s3";
         final String region = "test-region";
         final String bucket = "test-bucket";
+        final String role = "test-role";
+        final String roleSessionName = "test-role-session-name";
         final String credentialId = "test-credential-id";
         final String credentialSecret = "test-credential-secret";
         final String endPoint = "test-endpoint";
@@ -55,6 +57,8 @@ public class OffloadPoliciesTest {
                 region,
                 bucket,
                 endPoint,
+                role,
+                roleSessionName,
                 credentialId,
                 credentialSecret,
                 maxBlockSizeInBytes,
@@ -82,6 +86,8 @@ public class OffloadPoliciesTest {
         final String region = "test-region";
         final String bucket = "test-bucket";
         final String endPoint = "test-endpoint";
+        final String role = "test-role";
+        final String roleSessionName = "test-role-session-name";
         final String credentialId = "test-credential-id";
         final String credentialSecret = "test-credential-secret";
         final Integer maxBlockSizeInBytes = 5 * M;
@@ -95,6 +101,8 @@ public class OffloadPoliciesTest {
                 region,
                 bucket,
                 endPoint,
+                role,
+                roleSessionName,
                 credentialId,
                 credentialSecret,
                 maxBlockSizeInBytes,
@@ -123,6 +131,8 @@ public class OffloadPoliciesTest {
         final Integer s3ManagedLedgerOffloadReadBufferSizeInBytes = 2 * M;
         final String s3ManagedLedgerOffloadRole = "test-s3-role";
         final String s3ManagedLedgerOffloadRoleSessionName = "test-s3-role-session-name";
+        final String s3ManagedLedgerOffloadCredentialId = "test-s3-credential-id";
+        final String s3ManagedLedgerOffloadCredentialSecret = "test-s3-credential-secret";
 
         final String gcsManagedLedgerOffloadRegion = "test-gcs-region";
         final String gcsManagedLedgerOffloadBucket = "test-s3-bucket";
@@ -153,6 +163,9 @@ public class OffloadPoliciesTest {
         properties.setProperty("s3ManagedLedgerOffloadRole", s3ManagedLedgerOffloadRole);
         properties.setProperty("s3ManagedLedgerOffloadRoleSessionName", s3ManagedLedgerOffloadRoleSessionName);
 
+        properties.setProperty("s3ManagedLedgerOffloadCredentialId", s3ManagedLedgerOffloadCredentialId);
+        properties.setProperty("s3ManagedLedgerOffloadCredentialSecret", s3ManagedLedgerOffloadCredentialSecret);
+
         properties.setProperty("gcsManagedLedgerOffloadRegion", gcsManagedLedgerOffloadRegion);
         properties.setProperty("gcsManagedLedgerOffloadBucket", gcsManagedLedgerOffloadBucket);
         properties.setProperty("gcsManagedLedgerOffloadMaxBlockSizeInBytes",
@@ -185,6 +198,10 @@ public class OffloadPoliciesTest {
         Assert.assertEquals(offloadPolicies.getS3ManagedLedgerOffloadRole(), s3ManagedLedgerOffloadRole);
         Assert.assertEquals(offloadPolicies.getS3ManagedLedgerOffloadRoleSessionName(),
                 s3ManagedLedgerOffloadRoleSessionName);
+        Assert.assertEquals(offloadPolicies.getS3ManagedLedgerOffloadCredentialId(),
+                s3ManagedLedgerOffloadCredentialId);
+        Assert.assertEquals(offloadPolicies.getS3ManagedLedgerOffloadCredentialSecret(),
+                s3ManagedLedgerOffloadCredentialSecret);
 
         Assert.assertEquals(offloadPolicies.getGcsManagedLedgerOffloadRegion(), gcsManagedLedgerOffloadRegion);
         Assert.assertEquals(offloadPolicies.getGcsManagedLedgerOffloadBucket(), gcsManagedLedgerOffloadBucket);
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java
index 82bff48..6a81eb6 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java
@@ -418,6 +418,8 @@ public class TestPulsarSplitManager extends TestPulsarConnector {
                 "test-region",
                 "test-bucket",
                 "test-endpoint",
+                "role-",
+                "role-session-name",
                 "test-credential-id",
                 "test-credential-secret",
                 5000,
diff --git a/site2/docs/reference-pulsar-admin.md b/site2/docs/reference-pulsar-admin.md
index 11bc87f..e15104a 100644
--- a/site2/docs/reference-pulsar-admin.md
+++ b/site2/docs/reference-pulsar-admin.md
@@ -1707,6 +1707,8 @@ Options
 |`-e`, `--endpoint`|Alternative endpoint to connect to||
 |`-i`, `--aws-id`|AWS Credential Id to use when using driver S3 or aws-s3||
 |`-s`, `--aws-secret`|AWS Credential Secret to use when using driver S3 or aws-s3||
+|`-ro`, `--s3-role`|S3 Role used for STSAssumeRoleSessionCredentialsProvider using driver S3 or aws-s3||
+|`-rsn`, `--s3-role-session-name`|S3 role session name used for STSAssumeRoleSessionCredentialsProvider using driver S3 or aws-s3||
 |`-mbs`, `--maxBlockSize`|Max block size|64MB|
 |`-rbs`, `--readBufferSize`|Read buffer size|1MB|
 |`-oat`, `--offloadAfterThreshold`|Offload after threshold size (eg: 1M, 5M)||
@@ -2656,4 +2658,3 @@ Options
 |`-c`, `--classname`|The Java class name||
 |`-j`, `--jar`|A path to the JAR file which contains the above Java class||
 |`-t`, `--type`|The type of the schema (avro or json)||
-
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java
index 3df1e04..49dabb2 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java
@@ -19,12 +19,16 @@
 package org.apache.bookkeeper.mledger.offload.jcloud.provider;
 
 import static org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration.GCS_ACCOUNT_KEY_FILE_FIELD;
+import static org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration.S3_ID_FIELD;
 import static org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration.S3_ROLE_FIELD;
 import static org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration.S3_ROLE_SESSION_NAME_FIELD;
+import static org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration.S3_SECRET_FIELD;
 
 import com.amazonaws.auth.AWSCredentials;
 import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.auth.AWSSessionCredentials;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
 import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
 import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
 import com.google.common.base.Strings;
@@ -307,7 +311,24 @@ public enum JCloudBlobStoreProvider implements Serializable, ConfigValidation, B
         if (config.getCredentials() == null) {
             final AWSCredentialsProvider authChain;
             try {
-                if (Strings.isNullOrEmpty(config.getConfigProperty(S3_ROLE_FIELD))) {
+                if (!Strings.isNullOrEmpty(config.getConfigProperty(S3_ID_FIELD))
+                    && !Strings.isNullOrEmpty(config.getConfigProperty(S3_SECRET_FIELD))) {
+                    AWSCredentials awsCredentials = new AWSCredentials() {
+                        @Override
+                        public String getAWSAccessKeyId() {
+                            return config.getConfigProperty(S3_ID_FIELD);
+                        }
+
+                        @Override
+                        public String getAWSSecretKey() {
+                            return config.getConfigProperty(S3_SECRET_FIELD);
+                        }
+                    };
+                    authChain = new AWSStaticCredentialsProvider(
+                            new BasicAWSCredentials(
+                                config.getConfigProperty(S3_ID_FIELD),
+                                config.getConfigProperty(S3_SECRET_FIELD)));
+                } else if (Strings.isNullOrEmpty(config.getConfigProperty(S3_ROLE_FIELD))) {
                     authChain = DefaultAWSCredentialsProviderChain.getInstance();
                 } else {
                     authChain =
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java
index afe561d..442980a 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java
@@ -75,6 +75,8 @@ public class TieredStorageConfiguration {
     protected static final int MB = 1024 * 1024;
 
     public static final String GCS_ACCOUNT_KEY_FILE_FIELD = "gcsManagedLedgerOffloadServiceAccountKeyFile";
+    public static final String S3_ID_FIELD = "s3ManagedLedgerOffloadCredentialId";
+    public static final String S3_SECRET_FIELD = "s3ManagedLedgerOffloadCredentialSecret";
     public static final String S3_ROLE_FIELD = "s3ManagedLedgerOffloadRole";
     public static final String S3_ROLE_SESSION_NAME_FIELD = "s3ManagedLedgerOffloadRoleSessionName";