You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/05/06 21:13:23 UTC
[pulsar] branch master updated: [Issue 8894][Offloader] Fix AWS
credentials usages (#8950)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 0355a63 [Issue 8894][Offloader] Fix AWS credentials usages (#8950)
0355a63 is described below
commit 0355a63731299655acb4dce4f123074b608bdd81
Author: Alexandre DUVAL <ka...@gmail.com>
AuthorDate: Thu May 6 23:12:56 2021 +0200
[Issue 8894][Offloader] Fix AWS credentials usages (#8950)
* support both STS and default aws creds
* checkstyle
* use provider
* pass creds to properties
* add tests
---
.../mledger/impl/OffloadPrefixReadTest.java | 1 +
.../bookkeeper/mledger/impl/OffloadPrefixTest.java | 1 +
.../pulsar/broker/admin/AdminApiOffloadTest.java | 2 +-
.../apache/pulsar/broker/admin/NamespacesTest.java | 4 +++
.../pulsar/admin/cli/PulsarAdminToolTest.java | 30 ++++++++++++++--------
.../org/apache/pulsar/admin/cli/CmdNamespaces.java | 24 +++++++++++++----
.../org/apache/pulsar/admin/cli/CmdTopics.java | 29 +++++++++++++++------
.../common/policies/data/OffloadPolicies.java | 23 ++++++++++++++---
.../common/policies/data/OffloadPoliciesTest.java | 17 ++++++++++++
.../pulsar/sql/presto/TestPulsarSplitManager.java | 2 ++
site2/docs/reference-pulsar-admin.md | 3 ++-
.../jcloud/provider/JCloudBlobStoreProvider.java | 23 ++++++++++++++++-
.../provider/TieredStorageConfiguration.java | 2 ++
13 files changed, 131 insertions(+), 30 deletions(-)
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
index a3d2283..f3c1583 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
@@ -198,6 +198,7 @@ public class OffloadPrefixReadTest extends MockedBookKeeperTestCase {
OffloadPolicies offloadPolicies = OffloadPolicies.create("S3", "", "", "",
null, null,
+ null, null,
OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
OffloadPolicies.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES,
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
index 740a4ab..d61da5f 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
@@ -994,6 +994,7 @@ public class OffloadPrefixTest extends MockedBookKeeperTestCase {
OffloadPolicies offloadPolicies = OffloadPolicies.create("S3", "", "", "",
null, null,
+ null, null,
OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
OffloadPolicies.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
index 9bbb27c..480153d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
@@ -170,7 +170,7 @@ public class AdminApiOffloadTest extends MockedPulsarServiceBaseTest {
OffloadPolicies.OffloadedReadPriority priority = OffloadPolicies.OffloadedReadPriority.TIERED_STORAGE_FIRST;
OffloadPolicies offload1 = OffloadPolicies.create(
- driver, region, bucket, endpoint, null, null,
+ driver, region, bucket, endpoint, null, null, null, null,
100, 100, offloadThresholdInBytes, offloadDeletionLagInMillis, priority);
admin.namespaces().setOffloadPolicies(namespaceName, offload1);
OffloadPolicies offload2 = admin.namespaces().getOffloadPolicies(namespaceName);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index 1d04934..bc6b2de 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -1335,6 +1335,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
ManagedLedgerConfig ledgerConf = pulsar.getBrokerService().getManagedLedgerConfig(topicName).get();
MockLedgerOffloader offloader = new MockLedgerOffloader(OffloadPolicies.create("S3", "", "", "",
null, null,
+ null, null,
OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
admin.namespaces().getOffloadThreshold(namespace),
@@ -1351,6 +1352,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
admin.namespaces().getOffloadPolicies(namespace);
offloader = new MockLedgerOffloader(OffloadPolicies.create("S3", "", "", "",
null, null,
+ null, null,
OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
admin.namespaces().getOffloadThreshold(namespace),
@@ -1366,6 +1368,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
ledgerConf = pulsar.getBrokerService().getManagedLedgerConfig(topicName).get();
offloader = new MockLedgerOffloader(OffloadPolicies.create("S3", "", "", "",
null, null,
+ null, null,
OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
admin.namespaces().getOffloadThreshold(namespace),
@@ -1381,6 +1384,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
ledgerConf = pulsar.getBrokerService().getManagedLedgerConfig(topicName).get();
offloader = new MockLedgerOffloader(OffloadPolicies.create("S3", "", "", "",
null, null,
+ null, null,
OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
admin.namespaces().getOffloadThreshold(namespace),
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 2b1edd8..d368167 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -230,7 +230,8 @@ public class PulsarAdminToolTest {
verify(mockClusters).deleteCluster("my-cluster");
clusters.run(split("update-peer-clusters my-cluster --peer-clusters c1,c2"));
- verify(mockClusters).updatePeerClusterNames("my-cluster", Sets.newLinkedHashSet(Lists.newArrayList("c1", "c2")));
+ verify(mockClusters).updatePeerClusterNames("my-cluster",
+ Sets.newLinkedHashSet(Lists.newArrayList("c1", "c2")));
clusters.run(split("get-peer-clusters my-cluster"));
verify(mockClusters).getPeerClusterNames("my-cluster");
@@ -390,10 +391,11 @@ public class PulsarAdminToolTest {
namespaces.run(split("set-backlog-quota myprop/clust/ns1 -p producer_exception -l 10G"));
verify(mockNamespaces).setBacklogQuota("myprop/clust/ns1",
- new BacklogQuota(10l * 1024 * 1024 * 1024, RetentionPolicy.producer_exception));
+ new BacklogQuota(10L * 1024 * 1024 * 1024, RetentionPolicy.producer_exception));
namespaces.run(split("set-persistence myprop/clust/ns1 -e 2 -w 1 -a 1 -r 100.0"));
- verify(mockNamespaces).setPersistence("myprop/clust/ns1", new PersistencePolicies(2, 1, 1, 100.0d));
+ verify(mockNamespaces).setPersistence("myprop/clust/ns1",
+ new PersistencePolicies(2, 1, 1, 100.0d));
namespaces.run(split("get-persistence myprop/clust/ns1"));
verify(mockNamespaces).getPersistence("myprop/clust/ns1");
@@ -455,7 +457,8 @@ public class PulsarAdminToolTest {
namespaces.run(split("set-retention myprop/clust/ns1 -t 1h -s 1M"));
- verify(mockNamespaces).setRetention("myprop/clust/ns1", new RetentionPolicies(60, 1));
+ verify(mockNamespaces).setRetention("myprop/clust/ns1",
+ new RetentionPolicies(60, 1));
namespaces.run(split("get-retention myprop/clust/ns1"));
verify(mockNamespaces).getRetention("myprop/clust/ns1");
@@ -464,7 +467,8 @@ public class PulsarAdminToolTest {
verify(mockNamespaces).removeRetention("myprop/clust/ns1");
namespaces.run(split("set-delayed-delivery myprop/clust/ns1 -e -t 1s"));
- verify(mockNamespaces).setDelayedDeliveryMessages("myprop/clust/ns1", new DelayedDeliveryPolicies(1000, true));
+ verify(mockNamespaces).setDelayedDeliveryMessages("myprop/clust/ns1",
+ new DelayedDeliveryPolicies(1000, true));
namespaces.run(split("get-delayed-delivery myprop/clust/ns1"));
verify(mockNamespaces).getDelayedDelivery("myprop/clust/ns1");
@@ -473,8 +477,9 @@ public class PulsarAdminToolTest {
verify(mockNamespaces).removeDelayedDeliveryMessages("myprop/clust/ns1");
namespaces.run(split("set-inactive-topic-policies myprop/clust/ns1 -e -t 1s -m delete_when_no_subscriptions"));
- verify(mockNamespaces).setInactiveTopicPolicies("myprop/clust/ns1"
- , new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1,true));
+ verify(mockNamespaces).setInactiveTopicPolicies("myprop/clust/ns1",
+ new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1,
+ true));
namespaces.run(split("get-inactive-topic-policies myprop/clust/ns1"));
verify(mockNamespaces).getInactiveTopicPolicies("myprop/clust/ns1");
@@ -631,7 +636,7 @@ public class PulsarAdminToolTest {
"set-offload-policies myprop/clust/ns1 -r test-region -d aws-s3 -b test-bucket -e http://test.endpoint -mbs 32M -rbs 5M -oat 10M -oae 10s -orp tiered-storage-first"));
verify(mockNamespaces).setOffloadPolicies("myprop/clust/ns1",
OffloadPolicies.create("aws-s3", "test-region", "test-bucket",
- "http://test.endpoint", null, null, 32 * 1024 * 1024, 5 * 1024 * 1024,
+ "http://test.endpoint",null, null, null, null, 32 * 1024 * 1024, 5 * 1024 * 1024,
10 * 1024 * 1024L, 10000L, OffloadPolicies.OffloadedReadPriority.TIERED_STORAGE_FIRST));
namespaces.run(split("remove-offload-policies myprop/clust/ns1"));
@@ -672,7 +677,8 @@ public class PulsarAdminToolTest {
namespaces.run(split("create my-prop/my-cluster/my-namespace --bundles 5 --clusters a,b,c"));
verify(mockNamespaces).createNamespace("my-prop/my-cluster/my-namespace", 5);
- verify(mockNamespaces).setNamespaceReplicationClusters("my-prop/my-cluster/my-namespace", Sets.newHashSet("a", "b", "c"));
+ verify(mockNamespaces).setNamespaceReplicationClusters("my-prop/my-cluster/my-namespace",
+ Sets.newHashSet("a", "b", "c"));
}
@Test
@@ -944,7 +950,8 @@ public class PulsarAdminToolTest {
cmdTopics.run(split("set-offload-policies persistent://myprop/clust/ns1/ds1 -d s3 -r region -b bucket -e endpoint -m 8 -rb 9 -t 10 -orp tiered-storage-first"));
OffloadPolicies offloadPolicies = OffloadPolicies.create("s3", "region", "bucket"
- , "endpoint", null, null, 8, 9, 10L, null, OffloadPolicies.OffloadedReadPriority.TIERED_STORAGE_FIRST);
+ , "endpoint", null, null, null, null,
+ 8, 9, 10L, null, OffloadPolicies.OffloadedReadPriority.TIERED_STORAGE_FIRST);
verify(mockTopics).setOffloadPolicies("persistent://myprop/clust/ns1/ds1", offloadPolicies);
cmdTopics.run(split("get-max-unacked-messages-on-consumer persistent://myprop/clust/ns1/ds1"));
@@ -1048,7 +1055,8 @@ public class PulsarAdminToolTest {
verify(mockTopics).getInactiveTopicPolicies("persistent://myprop/clust/ns1/ds1", false);
cmdTopics.run(split("remove-inactive-topic-policies persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).removeInactiveTopicPolicies("persistent://myprop/clust/ns1/ds1");
- cmdTopics.run(split("set-inactive-topic-policies persistent://myprop/clust/ns1/ds1 -e -t 1s -m delete_when_no_subscriptions"));
+ cmdTopics.run(split("set-inactive-topic-policies persistent://myprop/clust/ns1/ds1"
+ + " -e -t 1s -m delete_when_no_subscriptions"));
verify(mockTopics).setInactiveTopicPolicies("persistent://myprop/clust/ns1/ds1"
, new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1, true));
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 99c0033..42bc136 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -1863,8 +1863,8 @@ public class CmdNamespaces extends CmdBase {
@Parameter(
names = {"--region", "-r"},
- description = "The long term storage region, " +
- "default is s3ManagedLedgerOffloadRegion or gcsManagedLedgerOffloadRegion in broker.conf",
+ description = "The long term storage region, "
+ + "default is s3ManagedLedgerOffloadRegion or gcsManagedLedgerOffloadRegion in broker.conf",
required = false)
private String region;
@@ -1876,8 +1876,8 @@ public class CmdNamespaces extends CmdBase {
@Parameter(
names = {"--endpoint", "-e"},
- description = "Alternative endpoint to connect to, " +
- "s3 default is s3ManagedLedgerOffloadServiceEndpoint in broker.conf",
+ description = "Alternative endpoint to connect to, "
+ + "s3 default is s3ManagedLedgerOffloadServiceEndpoint in broker.conf",
required = false)
private String endpoint;
@@ -1894,6 +1894,18 @@ public class CmdNamespaces extends CmdBase {
private String awsSecret;
@Parameter(
+ names = {"--s3-role", "-ro"},
+ description = "S3 Role used for STSAssumeRoleSessionCredentialsProvider",
+ required = false)
+ private String s3Role;
+
+ @Parameter(
+ names = {"--s3-role-session-name", "-rsn"},
+ description = "S3 role session name used for STSAssumeRoleSessionCredentialsProvider",
+ required = false)
+ private String s3RoleSessionName;
+
+ @Parameter(
names = {"--maxBlockSize", "-mbs"},
description = "Max block size (eg: 32M, 64M), default is 64MB",
required = false)
@@ -2016,7 +2028,9 @@ public class CmdNamespaces extends CmdBase {
}
}
- OffloadPolicies offloadPolicies = OffloadPolicies.create(driver, region, bucket, endpoint, awsId, awsSecret,
+ OffloadPolicies offloadPolicies = OffloadPolicies.create(driver, region, bucket, endpoint,
+ s3Role, s3RoleSessionName,
+ awsId, awsSecret,
maxBlockSizeInBytes, readBufferSizeInBytes, offloadAfterThresholdInBytes,
offloadAfterElapsedInMillis, offloadedReadPriority);
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 92c3955..1b97de9 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -1470,12 +1470,22 @@ public class CmdTopics extends CmdBase {
, description = "AWS Credential Secret to use when using driver S3 or aws-s3")
private String awsSecret;
- @Parameter(names = {"-m", "--maxBlockSizeInBytes"}
- , description = "ManagedLedger offload max block Size in bytes, s3 and google-cloud-storage requires this parameter")
+ @Parameter(names = {"--ro", "--s3-role"}
+ , description = "S3 Role used for STSAssumeRoleSessionCredentialsProvider")
+ private String s3Role;
+
+ @Parameter(names = {"--s3-role-session-name", "-rsn"}
+ , description = "S3 role session name used for STSAssumeRoleSessionCredentialsProvider")
+ private String s3RoleSessionName;
+
+ @Parameter(names = {"-m", "--maxBlockSizeInBytes"},
+ description = "ManagedLedger offload max block Size in bytes,"
+ + "s3 and google-cloud-storage requires this parameter")
private int maxBlockSizeInBytes;
- @Parameter(names = {"-rb", "--readBufferSizeInBytes"}
- , description = "ManagedLedger offload read buffer size in bytes, s3 and google-cloud-storage requires this parameter")
+ @Parameter(names = {"-rb", "--readBufferSizeInBytes"},
+ description = "ManagedLedger offload read buffer size in bytes,"
+ + "s3 and google-cloud-storage requires this parameter")
private int readBufferSizeInBytes;
@Parameter(names = {"-t", "--offloadThresholdInBytes"}
@@ -1503,16 +1513,19 @@ public class CmdTopics extends CmdBase {
try {
offloadedReadPriority = OffloadedReadPriority.fromString(this.offloadReadPriorityStr);
} catch (Exception e) {
- throw new ParameterException("--offloadedReadPriority parameter must be one of " +
- Arrays.stream(OffloadedReadPriority.values())
+ throw new ParameterException("--offloadedReadPriority parameter must be one of "
+ + Arrays.stream(OffloadedReadPriority.values())
.map(OffloadedReadPriority::toString)
.collect(Collectors.joining(","))
+ " but got: " + this.offloadReadPriorityStr, e);
}
}
- OffloadPolicies offloadPolicies = OffloadPolicies.create(driver, region, bucket, endpoint, awsId, awsSecret, maxBlockSizeInBytes
- , readBufferSizeInBytes, offloadThresholdInBytes, offloadDeletionLagInMillis, offloadedReadPriority);
+ OffloadPolicies offloadPolicies = OffloadPolicies.create(driver, region, bucket, endpoint,
+ s3Role, s3RoleSessionName,
+ awsId, awsSecret,
+ maxBlockSizeInBytes,
+ readBufferSizeInBytes, offloadThresholdInBytes, offloadDeletionLagInMillis, offloadedReadPriority);
getTopics().setOffloadPolicies(persistentTopic, offloadPolicies);
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
index 4417f21..12172f1 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
@@ -168,6 +168,12 @@ public class OffloadPolicies implements Serializable {
// s3 config, set by service configuration
@Configuration
@JsonProperty(access = JsonProperty.Access.READ_WRITE)
+ private String s3ManagedLedgerOffloadCredentialId = null;
+ @Configuration
+ @JsonProperty(access = JsonProperty.Access.READ_WRITE)
+ private String s3ManagedLedgerOffloadCredentialSecret = null;
+ @Configuration
+ @JsonProperty(access = JsonProperty.Access.READ_WRITE)
private String s3ManagedLedgerOffloadRole = null;
@Configuration
@JsonProperty(access = JsonProperty.Access.READ_WRITE)
@@ -218,6 +224,7 @@ public class OffloadPolicies implements Serializable {
private Integer managedLedgerOffloadReadBufferSizeInBytes;
public static OffloadPolicies create(String driver, String region, String bucket, String endpoint,
+ String role, String roleSessionName,
String credentialId, String credentialSecret,
Integer maxBlockSizeInBytes, Integer readBufferSizeInBytes,
Long offloadThresholdInBytes, Long offloadDeletionLagInMillis,
@@ -234,12 +241,18 @@ public class OffloadPolicies implements Serializable {
offloadPolicies.setManagedLedgerOffloadReadBufferSizeInBytes(readBufferSizeInBytes);
offloadPolicies.setManagedLedgerOffloadedReadPriority(readPriority);
- if (driver.equalsIgnoreCase(DRIVER_NAMES.get(0)) || driver.equalsIgnoreCase(DRIVER_NAMES.get(1))) {
+ if (driver.equalsIgnoreCase(DRIVER_NAMES.get(0)) || driver.equalsIgnoreCase(DRIVER_NAMES.get(1))) {
+ if (role != null) {
+ offloadPolicies.setS3ManagedLedgerOffloadRole(role);
+ }
+ if (roleSessionName != null) {
+ offloadPolicies.setS3ManagedLedgerOffloadRoleSessionName(roleSessionName);
+ }
if (credentialId != null) {
- offloadPolicies.setS3ManagedLedgerOffloadRole(credentialId);
+ offloadPolicies.setS3ManagedLedgerOffloadCredentialId(credentialId);
}
if (credentialSecret != null) {
- offloadPolicies.setS3ManagedLedgerOffloadRoleSessionName(credentialSecret);
+ offloadPolicies.setS3ManagedLedgerOffloadCredentialSecret(credentialSecret);
}
offloadPolicies.setS3ManagedLedgerOffloadRegion(region);
offloadPolicies.setS3ManagedLedgerOffloadBucket(bucket);
@@ -467,6 +480,10 @@ public class OffloadPolicies implements Serializable {
this.getS3ManagedLedgerOffloadServiceEndpoint());
setProperty(properties, "s3ManagedLedgerOffloadMaxBlockSizeInBytes",
this.getS3ManagedLedgerOffloadMaxBlockSizeInBytes());
+ setProperty(properties, "s3ManagedLedgerOffloadCredentialId",
+ this.getS3ManagedLedgerOffloadCredentialId());
+ setProperty(properties, "s3ManagedLedgerOffloadCredentialSecret",
+ this.getS3ManagedLedgerOffloadCredentialSecret());
setProperty(properties, "s3ManagedLedgerOffloadRole",
this.getS3ManagedLedgerOffloadRole());
setProperty(properties, "s3ManagedLedgerOffloadRoleSessionName",
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
index 96b1f48..c4aacc1 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
@@ -42,6 +42,8 @@ public class OffloadPoliciesTest {
final String driver = "aws-s3";
final String region = "test-region";
final String bucket = "test-bucket";
+ final String role = "test-role";
+ final String roleSessionName = "test-role-session-name";
final String credentialId = "test-credential-id";
final String credentialSecret = "test-credential-secret";
final String endPoint = "test-endpoint";
@@ -55,6 +57,8 @@ public class OffloadPoliciesTest {
region,
bucket,
endPoint,
+ role,
+ roleSessionName,
credentialId,
credentialSecret,
maxBlockSizeInBytes,
@@ -82,6 +86,8 @@ public class OffloadPoliciesTest {
final String region = "test-region";
final String bucket = "test-bucket";
final String endPoint = "test-endpoint";
+ final String role = "test-role";
+ final String roleSessionName = "test-role-session-name";
final String credentialId = "test-credential-id";
final String credentialSecret = "test-credential-secret";
final Integer maxBlockSizeInBytes = 5 * M;
@@ -95,6 +101,8 @@ public class OffloadPoliciesTest {
region,
bucket,
endPoint,
+ role,
+ roleSessionName,
credentialId,
credentialSecret,
maxBlockSizeInBytes,
@@ -123,6 +131,8 @@ public class OffloadPoliciesTest {
final Integer s3ManagedLedgerOffloadReadBufferSizeInBytes = 2 * M;
final String s3ManagedLedgerOffloadRole = "test-s3-role";
final String s3ManagedLedgerOffloadRoleSessionName = "test-s3-role-session-name";
+ final String s3ManagedLedgerOffloadCredentialId = "test-s3-credential-id";
+ final String s3ManagedLedgerOffloadCredentialSecret = "test-s3-credential-secret";
final String gcsManagedLedgerOffloadRegion = "test-gcs-region";
final String gcsManagedLedgerOffloadBucket = "test-s3-bucket";
@@ -153,6 +163,9 @@ public class OffloadPoliciesTest {
properties.setProperty("s3ManagedLedgerOffloadRole", s3ManagedLedgerOffloadRole);
properties.setProperty("s3ManagedLedgerOffloadRoleSessionName", s3ManagedLedgerOffloadRoleSessionName);
+ properties.setProperty("s3ManagedLedgerOffloadCredentialId", s3ManagedLedgerOffloadCredentialId);
+ properties.setProperty("s3ManagedLedgerOffloadCredentialSecret", s3ManagedLedgerOffloadCredentialSecret);
+
properties.setProperty("gcsManagedLedgerOffloadRegion", gcsManagedLedgerOffloadRegion);
properties.setProperty("gcsManagedLedgerOffloadBucket", gcsManagedLedgerOffloadBucket);
properties.setProperty("gcsManagedLedgerOffloadMaxBlockSizeInBytes",
@@ -185,6 +198,10 @@ public class OffloadPoliciesTest {
Assert.assertEquals(offloadPolicies.getS3ManagedLedgerOffloadRole(), s3ManagedLedgerOffloadRole);
Assert.assertEquals(offloadPolicies.getS3ManagedLedgerOffloadRoleSessionName(),
s3ManagedLedgerOffloadRoleSessionName);
+ Assert.assertEquals(offloadPolicies.getS3ManagedLedgerOffloadCredentialId(),
+ s3ManagedLedgerOffloadCredentialId);
+ Assert.assertEquals(offloadPolicies.getS3ManagedLedgerOffloadCredentialSecret(),
+ s3ManagedLedgerOffloadCredentialSecret);
Assert.assertEquals(offloadPolicies.getGcsManagedLedgerOffloadRegion(), gcsManagedLedgerOffloadRegion);
Assert.assertEquals(offloadPolicies.getGcsManagedLedgerOffloadBucket(), gcsManagedLedgerOffloadBucket);
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java
index 82bff48..6a81eb6 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java
@@ -418,6 +418,8 @@ public class TestPulsarSplitManager extends TestPulsarConnector {
"test-region",
"test-bucket",
"test-endpoint",
+ "role-",
+ "role-session-name",
"test-credential-id",
"test-credential-secret",
5000,
diff --git a/site2/docs/reference-pulsar-admin.md b/site2/docs/reference-pulsar-admin.md
index 11bc87f..e15104a 100644
--- a/site2/docs/reference-pulsar-admin.md
+++ b/site2/docs/reference-pulsar-admin.md
@@ -1707,6 +1707,8 @@ Options
|`-e`, `--endpoint`|Alternative endpoint to connect to||
|`-i`, `--aws-id`|AWS Credential Id to use when using driver S3 or aws-s3||
|`-s`, `--aws-secret`|AWS Credential Secret to use when using driver S3 or aws-s3||
+|`-ro`, `--s3-role`|S3 Role used for STSAssumeRoleSessionCredentialsProvider using driver S3 or aws-s3||
+|`-rsn`, `--s3-role-session-name`|S3 role session name used for STSAssumeRoleSessionCredentialsProvider using driver S3 or aws-s3||
|`-mbs`, `--maxBlockSize`|Max block size|64MB|
|`-rbs`, `--readBufferSize`|Read buffer size|1MB|
|`-oat`, `--offloadAfterThreshold`|Offload after threshold size (eg: 1M, 5M)||
@@ -2656,4 +2658,3 @@ Options
|`-c`, `--classname`|The Java class name||
|`-j`, `--jar`|A path to the JAR file which contains the above Java class||
|`-t`, `--type`|The type of the schema (avro or json)||
-
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java
index 3df1e04..49dabb2 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java
@@ -19,12 +19,16 @@
package org.apache.bookkeeper.mledger.offload.jcloud.provider;
import static org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration.GCS_ACCOUNT_KEY_FILE_FIELD;
+import static org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration.S3_ID_FIELD;
import static org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration.S3_ROLE_FIELD;
import static org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration.S3_ROLE_SESSION_NAME_FIELD;
+import static org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration.S3_SECRET_FIELD;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSSessionCredentials;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
import com.google.common.base.Strings;
@@ -307,7 +311,24 @@ public enum JCloudBlobStoreProvider implements Serializable, ConfigValidation, B
if (config.getCredentials() == null) {
final AWSCredentialsProvider authChain;
try {
- if (Strings.isNullOrEmpty(config.getConfigProperty(S3_ROLE_FIELD))) {
+ if (!Strings.isNullOrEmpty(config.getConfigProperty(S3_ID_FIELD))
+ && !Strings.isNullOrEmpty(config.getConfigProperty(S3_SECRET_FIELD))) {
+ AWSCredentials awsCredentials = new AWSCredentials() {
+ @Override
+ public String getAWSAccessKeyId() {
+ return config.getConfigProperty(S3_ID_FIELD);
+ }
+
+ @Override
+ public String getAWSSecretKey() {
+ return config.getConfigProperty(S3_SECRET_FIELD);
+ }
+ };
+ authChain = new AWSStaticCredentialsProvider(
+ new BasicAWSCredentials(
+ config.getConfigProperty(S3_ID_FIELD),
+ config.getConfigProperty(S3_SECRET_FIELD)));
+ } else if (Strings.isNullOrEmpty(config.getConfigProperty(S3_ROLE_FIELD))) {
authChain = DefaultAWSCredentialsProviderChain.getInstance();
} else {
authChain =
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java
index afe561d..442980a 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java
@@ -75,6 +75,8 @@ public class TieredStorageConfiguration {
protected static final int MB = 1024 * 1024;
public static final String GCS_ACCOUNT_KEY_FILE_FIELD = "gcsManagedLedgerOffloadServiceAccountKeyFile";
+ public static final String S3_ID_FIELD = "s3ManagedLedgerOffloadCredentialId";
+ public static final String S3_SECRET_FIELD = "s3ManagedLedgerOffloadCredentialSecret";
public static final String S3_ROLE_FIELD = "s3ManagedLedgerOffloadRole";
public static final String S3_ROLE_SESSION_NAME_FIELD = "s3ManagedLedgerOffloadRoleSessionName";