You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/02/07 05:59:52 UTC

[pulsar] branch branch-2.6 updated (5e21470 -> e72b924)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from 5e21470  fix the closed ledger did not delete after expired (#9136)
     new ce191fb  [tiered-storage] Allow AWS credentials to be refreshed (#9387)
     new 76cd748  Fix the partition number not equals expected error (#9446)
     new 600f137  Fix the batch index ack persistent issue. (#9504)
     new e72b924  Compression must be applied during deferred schema preparation and enableBatching is enabled (#9396)

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  1 +
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 16 ++++++-
 .../client/api/SimpleProducerConsumerTest.java     | 48 +++++++++++++++++++
 .../pulsar/client/impl/TopicsConsumerImplTest.java | 45 ++++++++++++++++++
 .../client/impl/MultiTopicsConsumerImpl.java       |  8 +++-
 .../apache/pulsar/client/impl/ProducerImpl.java    | 27 +++++++++--
 .../jcloud/provider/JCloudBlobStoreProvider.java   | 50 ++++++++++++--------
 .../provider/TieredStorageConfigurationTests.java  | 54 +++++++++++++---------
 8 files changed, 199 insertions(+), 50 deletions(-)


[pulsar] 04/04: Compression must be applied during deferred schema preparation and enableBatching is enabled (#9396)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e72b9244c21c68d776d5f4215b117e8b8a0532e6
Author: Enrico Olivelli <eo...@gmail.com>
AuthorDate: Thu Feb 4 00:44:07 2021 +0100

    Compression must be applied during deferred schema preparation and enableBatching is enabled (#9396)
    
    * Compression must be applied during deferred schema preparation and enableBatching is enabled
    if you do not set an initial schema to the Producer the schema must be prepared at the first message with a Schema.
    There is a bug and compression is not applied in this case, and the consumer receives an uncompressed message, failing
    
    * address Matteo's comments
    
    Co-authored-by: Enrico Olivelli <eo...@apache.org>
    (cherry picked from commit 014385061ec788c981c9dc78a207ed7797103221)
---
 .../client/api/SimpleProducerConsumerTest.java     | 48 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ProducerImpl.java    | 27 +++++++++---
 2 files changed, 70 insertions(+), 5 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index fa57f2d..e103129 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -70,12 +70,15 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 import lombok.Cleanup;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.mledger.impl.EntryCacheImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.impl.ClientCnx;
 import org.apache.pulsar.client.impl.ConsumerBase;
 import org.apache.pulsar.client.impl.ConsumerImpl;
@@ -3606,4 +3609,49 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
             Assert.assertEquals(size, 0);
         });
     }
+
+
+    @Data
+    @EqualsAndHashCode
+    public static class MyBean {
+        private String field;
+    }
+
+    @DataProvider(name = "enableBatching")
+    public static Object[] isEnableBatching() {
+        return new Object[]{false, true};
+    }
+
+    @Test(dataProvider = "enableBatching")
+    public void testSendCompressedWithDeferredSchemaSetup(boolean enableBatching) throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        final String topic = "persistent://my-property/my-ns/deferredSchemaCompressed";
+        Consumer<GenericRecord> consumer = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+                .topic(topic)
+                .subscriptionName("testsub")
+                .subscribe();
+
+        // initially we are not setting a Schema in the producer
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topic)
+                .enableBatching(enableBatching)
+                .compressionType(CompressionType.LZ4)
+                .create();
+        MyBean payload = new MyBean();
+        payload.setField("aaaaaaaaaaaaaaaaaaaaaaaaa");
+
+        // now we send with a schema, but we have enabled compression and batching
+        // the producer will have to setup the schema and resume the send
+        producer.newMessage(Schema.AVRO(MyBean.class)).value(payload).send();
+        producer.close();
+
+        GenericRecord res = consumer.receive().getValue();
+        consumer.close();
+        for (org.apache.pulsar.client.api.schema.Field f : res.getFields()) {
+            log.info("field {} {}", f.getName(), res.getField(f));
+            assertEquals("field", f.getName());
+            assertEquals("aaaaaaaaaaaaaaaaaaaaaaaaa", res.getField(f));
+        }
+        assertEquals(1, res.getFields().size());
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 2367665..0e099c6 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -345,6 +345,17 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
         return future;
     }
 
+    /**
+     * Compress the payload if compression is configured
+     * @param payload
+     * @return a new payload
+     */
+    private ByteBuf applyCompression(ByteBuf payload) {
+        ByteBuf compressedPayload = compressor.encode(payload);
+        payload.release();
+        return compressedPayload;
+    }
+
     public void sendAsync(Message<?> message, SendCallback callback) {
         checkArgument(message instanceof MessageImpl);
 
@@ -363,11 +374,12 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
         // If compression is enabled, we are compressing, otherwise it will simply use the same buffer
         int uncompressedSize = payload.readableBytes();
         ByteBuf compressedPayload = payload;
+        boolean compressed = false;
         // Batch will be compressed when closed
         // If a message has a delayed delivery time, we'll always send it individually
         if (!isBatchMessagingEnabled() || msgMetadataBuilder.hasDeliverAtTime()) {
-            compressedPayload = compressor.encode(payload);
-            payload.release();
+            compressedPayload = applyCompression(payload);
+            compressed = true;
 
             // validate msg-size (For batching this will be check at the batch completion size)
             int compressedSize = compressedPayload.readableBytes();
@@ -415,7 +427,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
                 String uuid = UUID.randomUUID().toString();
                 for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
                     serializeAndSendMessage(msg, msgMetadataBuilder, payload, uuid, chunkId, totalChunks,
-                            readStartIndex, ClientCnx.getMaxMessageSize(), compressedPayload,
+                            readStartIndex, ClientCnx.getMaxMessageSize(), compressedPayload, compressed,
                             compressedPayload.readableBytes(), uncompressedSize, callback);
                     readStartIndex = ((chunkId + 1) * ClientCnx.getMaxMessageSize());
                 }
@@ -430,7 +442,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
 
     private void serializeAndSendMessage(MessageImpl<?> msg, Builder msgMetadataBuilder, ByteBuf payload,
             String uuid, int chunkId, int totalChunks, int readStartIndex, int chunkMaxSizeInBytes, ByteBuf compressedPayload,
-            int compressedPayloadSize,
+            boolean compressed, int compressedPayloadSize,
             int uncompressedSize, SendCallback callback) throws IOException, InterruptedException {
         ByteBuf chunkPayload = compressedPayload;
         Builder chunkMsgMetadataBuilder = msgMetadataBuilder;
@@ -503,7 +515,12 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
                 doBatchSendAndAdd(msg, callback, payload);
             }
         } else {
-            ByteBuf encryptedPayload = encryptMessage(chunkMsgMetadataBuilder, chunkPayload);
+            // in this case compression has not been applied by the caller
+            // but we have to compress the payload if compression is configured
+            if (!compressed) {
+                chunkPayload = applyCompression(chunkPayload);
+            }
+            ByteBuf encryptedPayload = encryptMessage(msgMetadataBuilder, chunkPayload);
 
             MessageMetadata msgMetadata = chunkMsgMetadataBuilder.build();
             // When publishing during replication, we need to set the correct number of message in batch


[pulsar] 02/04: Fix the partition number not equals expected error (#9446)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 76cd7482e2dee3e34e27302032b420b6482052fd
Author: lipenghui <pe...@apache.org>
AuthorDate: Fri Feb 5 14:46:47 2021 +0800

    Fix the partition number not equals expected error (#9446)
    
    Fixes #8000
    
    ### Motivation
    
    Fix the partition number not equals expected error
    
    ### Verifying this change
    
    New tests added, without this fix, you can see errors like
    `topics consumer java.lang.IllegalStateException: allTopicPartitionsNumber 2 not equals expected: 5`
    
    (cherry picked from commit bbce00a2245cf05b829182a9a75a86d4e1139492)
---
 .../pulsar/client/impl/TopicsConsumerImplTest.java | 45 ++++++++++++++++++++++
 .../client/impl/MultiTopicsConsumerImpl.java       |  8 +++-
 2 files changed, 52 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
index 3c58908..0996f63 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
@@ -42,6 +42,7 @@ import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TopicStats;
+import org.awaitility.Awaitility;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -1170,4 +1171,48 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase {
         }
     }
 
+    @Test(timeOut = testTimeout)
+    public void testPartitionsUpdatesForMultipleTopics() throws Exception {
+        final String topicName0 = "persistent://public/default/testPartitionsUpdatesForMultipleTopics-0";
+        final String subName = "my-sub";
+        admin.topics().createPartitionedTopic(topicName0, 2);
+        assertEquals(admin.topics().getPartitionedTopicMetadata(topicName0).partitions, 2);
+
+        PatternMultiTopicsConsumerImpl<String> consumer = (PatternMultiTopicsConsumerImpl<String>) pulsarClient.newConsumer(Schema.STRING)
+                .topicsPattern("persistent://public/default/test.*")
+                .subscriptionType(SubscriptionType.Failover)
+                .subscriptionName(subName)
+                .subscribe();
+
+        Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 2);
+        Assert.assertEquals(consumer.allTopicPartitionsNumber.intValue(), 2);
+
+        admin.topics().updatePartitionedTopic(topicName0, 5);
+        consumer.getPartitionsAutoUpdateTimeout().task().run(consumer.getPartitionsAutoUpdateTimeout());
+
+        Awaitility.await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> {
+            Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 5);
+            Assert.assertEquals(consumer.allTopicPartitionsNumber.intValue(), 5);
+        });
+
+        final String topicName1 = "persistent://public/default/testPartitionsUpdatesForMultipleTopics-1";
+        admin.topics().createPartitionedTopic(topicName1, 3);
+        assertEquals(admin.topics().getPartitionedTopicMetadata(topicName1).partitions, 3);
+
+        consumer.getRecheckPatternTimeout().task().run(consumer.getRecheckPatternTimeout());
+
+        Awaitility.await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> {
+            Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 8);
+            Assert.assertEquals(consumer.allTopicPartitionsNumber.intValue(), 8);
+        });
+
+        admin.topics().updatePartitionedTopic(topicName1, 5);
+        consumer.getPartitionsAutoUpdateTimeout().task().run(consumer.getPartitionsAutoUpdateTimeout());
+
+        Awaitility.await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> {
+            Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 10);
+            Assert.assertEquals(consumer.allTopicPartitionsNumber.intValue(), 10);
+        });
+    }
+
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 549eb23..af3c33a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -1061,6 +1061,11 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
         return consumers.values().stream().collect(Collectors.toList());
     }
 
+    // get all partitions that in the topics map
+    int getPartitionsOfTheTopicMap() {
+        return topics.values().stream().mapToInt(Integer::intValue).sum();
+    }
+
     @Override
     public void pause() {
         consumers.forEach((name, consumer) -> consumer.pause());
@@ -1129,7 +1134,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
                 future.complete(null);
                 return future;
             } else if (oldPartitionNumber < currentPartitionNumber) {
-                allTopicPartitionsNumber.compareAndSet(oldPartitionNumber, currentPartitionNumber);
+                allTopicPartitionsNumber.addAndGet(currentPartitionNumber - oldPartitionNumber);
+                topics.put(topicName, currentPartitionNumber);
                 List<String> newPartitions = list.subList(oldPartitionNumber, currentPartitionNumber);
                 // subscribe new added partitions
                 List<CompletableFuture<Consumer<T>>> futureList = newPartitions


[pulsar] 03/04: Fix the batch index ack persistent issue. (#9504)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 600f13782c6caf3545317fb762162e96fcea4492
Author: lipenghui <pe...@apache.org>
AuthorDate: Sun Feb 7 11:18:28 2021 +0800

    Fix the batch index ack persistent issue. (#9504)
    
    Fix the batch index ack persistent issue. Currently, the `batchDeletedIndexInfoBuilder` been reused for generating the batch index ack data, but the delete set does not been cleared before adding the delete set.
    
    Clear the delete set before add the new delete set.
    
    Test added. Without this fix, the test failed.
    
    (cherry picked from commit 7916666cd579438a5bc11e749e73d6e549d7e756)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java       |  1 +
 .../bookkeeper/mledger/impl/ManagedCursorTest.java       | 16 ++++++++++++++--
 2 files changed, 15 insertions(+), 2 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index df66e91..d752466 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -2467,6 +2467,7 @@ public class ManagedCursorImpl implements ManagedCursor {
             for (long l : array) {
                 deleteSet.add(l);
             }
+            batchDeletedIndexInfoBuilder.clearDeleteSet();
             batchDeletedIndexInfoBuilder.addAllDeleteSet(deleteSet);
             result.add(batchDeletedIndexInfoBuilder.build());
         }
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index e53bcae..f0d37ad 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -3115,7 +3115,10 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
 
     @Test
     public void testBatchIndexesDeletionPersistAndRecover() throws ManagedLedgerException, InterruptedException {
-        ManagedLedger ledger = factory.open("test_batch_indexes_deletion_persistent");
+        ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+        // Make sure the cursor metadata updated by the cursor ledger ID.
+        managedLedgerConfig.setMaxUnackedRangesToPersistInZk(-1);
+        ManagedLedger ledger = factory.open("test_batch_indexes_deletion_persistent", managedLedgerConfig);
         ManagedCursor cursor = ledger.openCursor("c1");
 
         final int totalEntries = 100;
@@ -3125,6 +3128,7 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
             positions[i] = ledger.addEntry(("entry-" + i).getBytes(Encoding));
         }
         assertEquals(cursor.getNumberOfEntries(), totalEntries);
+        deleteBatchIndex(cursor, positions[6], 10, Lists.newArrayList(IntRange.newBuilder().setStart(1).setEnd(3).build()));
         deleteBatchIndex(cursor, positions[5], 10, Lists.newArrayList(IntRange.newBuilder().setStart(3).setEnd(6).build()));
         deleteBatchIndex(cursor, positions[0], 10, Lists.newArrayList(IntRange.newBuilder().setStart(0).setEnd(9).build()));
         deleteBatchIndex(cursor, positions[1], 10, Lists.newArrayList(IntRange.newBuilder().setStart(0).setEnd(9).build()));
@@ -3132,8 +3136,11 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
         deleteBatchIndex(cursor, positions[3], 10, Lists.newArrayList(IntRange.newBuilder().setStart(0).setEnd(9).build()));
         deleteBatchIndex(cursor, positions[4], 10, Lists.newArrayList(IntRange.newBuilder().setStart(0).setEnd(9).build()));
 
-        ledger = factory.open("test_batch_indexes_deletion_persistent");
+        cursor.close();
+        ledger.close();
+        ledger = factory.open("test_batch_indexes_deletion_persistent", managedLedgerConfig);
         cursor = ledger.openCursor("c1");
+
         List<IntRange> deletedIndexes = getAckedIndexRange(cursor.getDeletedBatchIndexesAsLongArray((PositionImpl) positions[5]), 10);
         Assert.assertEquals(deletedIndexes.size(), 1);
         Assert.assertEquals(deletedIndexes.get(0).getStart(), 3);
@@ -3143,6 +3150,11 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
         deletedIndexes = getAckedIndexRange(cursor.getDeletedBatchIndexesAsLongArray((PositionImpl) positions[5]), 10);
         Assert.assertNull(deletedIndexes);
         Assert.assertEquals(cursor.getMarkDeletedPosition(), positions[5]);
+
+        deletedIndexes = getAckedIndexRange(cursor.getDeletedBatchIndexesAsLongArray((PositionImpl) positions[6]), 10);
+        Assert.assertEquals(deletedIndexes.size(), 1);
+        Assert.assertEquals(deletedIndexes.get(0).getStart(), 1);
+        Assert.assertEquals(deletedIndexes.get(0).getEnd(), 3);
     }
 
     private void deleteBatchIndex(ManagedCursor cursor, Position position, int batchSize,


[pulsar] 01/04: [tiered-storage] Allow AWS credentials to be refreshed (#9387)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ce191fbc43575348ad52fbadf80d0308d77772f0
Author: Addison Higham <ad...@gmail.com>
AuthorDate: Mon Feb 1 11:41:49 2021 -0700

    [tiered-storage] Allow AWS credentials to be refreshed (#9387)
    
    With the refactor of support azure, a regression occured where the AWS
    credentials were fetched once and then used through the entire process.
    
    This is a problem in AWS, where it is commonplace to use credentials
    that expire.
    
    The AWS credential provider chain takes care of this
    problem, but when intgrating with JClouds, that means we need the
    credential Supplier to return a new set of credentials each time.
    
    Luckily, AWS should intelligently cache this so we aren't thrashing the
    underlying credential mechanisms.
    
    This also adds a test to ensure this isn't broken in the future, it does a simple validation to ensure that the underlying credentials can change via AWS SystemPropertyCredentialProvider
    
    (cherry picked from commit 562b2e76df37113edb41bb4393afd4b6aaa2dc21)
---
 .../jcloud/provider/JCloudBlobStoreProvider.java   | 50 ++++++++++++--------
 .../provider/TieredStorageConfigurationTests.java  | 54 +++++++++++++---------
 2 files changed, 62 insertions(+), 42 deletions(-)

diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java
index 9d0871e..5c5f621 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java
@@ -23,6 +23,7 @@ import static org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorag
 import static org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration.S3_ROLE_SESSION_NAME_FIELD;
 
 import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.auth.AWSSessionCredentials;
 import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
 import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
@@ -279,37 +280,46 @@ public enum JCloudBlobStoreProvider implements Serializable, ConfigValidation, B
 
     static final CredentialBuilder AWS_CREDENTIAL_BUILDER = (TieredStorageConfiguration config) -> {
         if (config.getCredentials() == null) {
-            AWSCredentials awsCredentials = null;
+            final AWSCredentialsProvider authChain;
             try {
                 if (Strings.isNullOrEmpty(config.getConfigProperty(S3_ROLE_FIELD))) {
-                    awsCredentials = DefaultAWSCredentialsProviderChain.getInstance().getCredentials();
+                    authChain = DefaultAWSCredentialsProviderChain.getInstance();
                 } else {
-                    awsCredentials =
+                    authChain =
                             new STSAssumeRoleSessionCredentialsProvider.Builder(
                                     config.getConfigProperty(S3_ROLE_FIELD),
                                     config.getConfigProperty(S3_ROLE_SESSION_NAME_FIELD)
-                            ).build().getCredentials();
-                }
-
-                if (awsCredentials instanceof AWSSessionCredentials) {
-                    // if we have session credentials, we need to send the session token
-                    // this allows us to support EC2 metadata credentials
-                    SessionCredentials sessionCredentials =  SessionCredentials.builder()
-                            .accessKeyId(awsCredentials.getAWSAccessKeyId())
-                            .secretAccessKey(awsCredentials.getAWSSecretKey())
-                            .sessionToken(((AWSSessionCredentials) awsCredentials).getSessionToken())
-                            .build();
-                    config.setProviderCredentials(() -> sessionCredentials);
-                } else {
-                    Credentials credentials = new Credentials(
-                            awsCredentials.getAWSAccessKeyId(), awsCredentials.getAWSSecretKey());
-                    config.setProviderCredentials(() -> credentials);
+                            ).build();
                 }
 
+                // Important! Delay the building of actual credentials
+                // until later to support tokens that may be refreshed
+                // such as all session tokens
+                config.setProviderCredentials(() -> {
+                    AWSCredentials newCreds = authChain.getCredentials();
+                    Credentials jcloudCred = null;
+
+                    if (newCreds instanceof AWSSessionCredentials) {
+                        // if we have session credentials, we need to send the session token
+                        // this allows us to support EC2 metadata credentials
+                        jcloudCred = SessionCredentials.builder()
+                                .accessKeyId(newCreds.getAWSAccessKeyId())
+                                .secretAccessKey(newCreds.getAWSSecretKey())
+                                .sessionToken(((AWSSessionCredentials) newCreds).getSessionToken())
+                                .build();
+                    } else {
+                        // in the event we hit this branch, we likely don't have expiring
+                        // credentials, however, this still allows for the user to update
+                        // profiles creds or some other mechanism
+                        jcloudCred = new Credentials(
+                                newCreds.getAWSAccessKeyId(), newCreds.getAWSSecretKey());
+                    }
+                    return jcloudCred;
+                });
             } catch (Exception e) {
                 // allowed, some mock s3 service do not need credential
                 log.warn("Exception when get credentials for s3 ", e);
             }
         }
     };
-}
\ No newline at end of file
+}
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfigurationTests.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfigurationTests.java
index 41e6255..3a0c38c 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfigurationTests.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfigurationTests.java
@@ -23,7 +23,9 @@ import static org.testng.Assert.assertEquals;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.pulsar.jcloud.shade.com.google.common.base.Supplier;
 
+import org.jclouds.domain.Credentials;
 import org.testng.annotations.Test;
 
 public class TieredStorageConfigurationTests {
@@ -113,7 +115,36 @@ public class TieredStorageConfigurationTests {
         assertEquals(config.getReadBufferSizeInBytes(), new Integer(500));
         assertEquals(config.getServiceEndpoint(), "http://some-url:9093");
     }
-    
+
+    /**
+     * Confirm that with AWS we create different instances of the credentials
+     * object each time we call the supplier, this ensure that we get fresh credentials
+     * if the aws credential provider changes
+     */
+    @Test
+    public final void awsS3CredsProviderTest() {
+        Map<String, String> map = new HashMap<>();
+        map.put(TieredStorageConfiguration.BLOB_STORE_PROVIDER_KEY, JCloudBlobStoreProvider.AWS_S3.getDriver());
+        TieredStorageConfiguration config = new TieredStorageConfiguration(map);
+
+        // set the aws properties with fake creds so the defaultProviderChain works
+        System.setProperty("aws.accessKeyId", "fakeid1");
+        System.setProperty("aws.secretKey", "fakekey1");
+        Credentials creds1 = config.getProviderCredentials().get();
+        assertEquals(creds1.identity, "fakeid1");
+        assertEquals(creds1.credential, "fakekey1");
+
+        // reset the properties and ensure we get different values by re-evaluating the chain
+        System.setProperty("aws.accessKeyId", "fakeid2");
+        System.setProperty("aws.secretKey", "fakekey2");
+        Credentials creds2 = config.getProviderCredentials().get();
+        assertEquals(creds2.identity, "fakeid2");
+        assertEquals(creds2.credential, "fakekey2");
+
+        System.clearProperty("aws.accessKeyId");
+        System.clearProperty("aws.secretKey");
+    }
+
     /**
      * Confirm that both property options are available for GCS
      */
@@ -177,25 +208,4 @@ public class TieredStorageConfigurationTests {
         assertEquals(config.getMaxBlockSizeInBytes(), new Integer(12));
         assertEquals(config.getReadBufferSizeInBytes(), new Integer(500));
     }
-    
-    /**
-     * Confirm that we can configure AWS using the old properties
-     */
-    @Test
-    public final void s3BackwardCompatiblePropertiesTest() {
-        Map<String, String> map = new HashMap<String,String>(); 
-        map.put(TieredStorageConfiguration.BLOB_STORE_PROVIDER_KEY, JCloudBlobStoreProvider.AWS_S3.getDriver());
-        map.put(BC_S3_BUCKET, "test bucket");
-        map.put(BC_S3_ENDPOINT, "http://some-url:9093");
-        map.put(BC_S3_MAX_BLOCK_SIZE, "12");
-        map.put(BC_S3_READ_BUFFER_SIZE, "500");
-        map.put(BC_S3_REGION, "test region");
-        TieredStorageConfiguration config = new TieredStorageConfiguration(map);
-        
-        assertEquals(config.getRegion(), "test region");
-        assertEquals(config.getBucket(), "test bucket");
-        assertEquals(config.getMaxBlockSizeInBytes(), new Integer(12));
-        assertEquals(config.getReadBufferSizeInBytes(), new Integer(500));
-        assertEquals(config.getServiceEndpoint(), "http://some-url:9093");
-    }
 }