You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/11/21 03:13:07 UTC

[pulsar] 02/02: [Tiered Storage] Fix merge conflicts introduced by PR #6335 (#8630)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 8793963ccdd8ce778a681c139b5ab4fab7b820a2
Author: ran <ga...@126.com>
AuthorDate: Sat Nov 21 09:22:18 2020 +0800

    [Tiered Storage] Fix merge conflicts introduced by PR #6335 (#8630)
    
    # Motivation
    
    The PR #6335 lost some PR changes, related PRs as below.
    
    1. PR 4196 (2019/5/29 Merli)
    Configure static PulsarByteBufAllocator to handle OOM errors (#4196)
    
    2. PR 5356 (2019/10/30 Kelly)
    [TIEREDSTORAGE] Only seek when reading unexpected entry (#5356)
    
    3. PR 4433 (2019/6/4 Higham)
    [tiered-storage] Add support for AWS instance and role creds (#4433)
    
    (cherry picked from commit 68759ff405bca60343ddb8318cc10f3727981a5d)
---
 .../impl/BlobStoreBackedInputStreamImpl.java       |  7 +--
 .../jcloud/impl/BlobStoreBackedReadHandleImpl.java | 20 +++++----
 .../impl/BlobStoreManagedLedgerOffloader.java      |  7 ---
 .../impl/BlockAwareSegmentInputStreamImpl.java     |  6 +--
 .../offload/jcloud/impl/DataBlockHeaderImpl.java   |  9 +++-
 .../offload/jcloud/impl/OffloadIndexBlockImpl.java |  4 +-
 .../offload/jcloud/impl/OffloadIndexEntryImpl.java |  6 +++
 .../jcloud/provider/JCloudBlobStoreProvider.java   | 50 ++++++++++++++++------
 .../provider/TieredStorageConfiguration.java       | 11 +++--
 .../impl/BlobStoreManagedLedgerOffloaderBase.java  |  7 +--
 10 files changed, 83 insertions(+), 44 deletions(-)

diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java
index 386196c..6a204d5 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java
@@ -19,11 +19,11 @@
 package org.apache.bookkeeper.mledger.offload.jcloud.impl;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
 import java.io.IOException;
 import java.io.InputStream;
 import org.apache.bookkeeper.mledger.offload.jcloud.BackedInputStream;
 import org.apache.bookkeeper.mledger.offload.jcloud.impl.DataBlockUtils.VersionCheck;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.jclouds.blobstore.BlobStore;
 import org.jclouds.blobstore.domain.Blob;
 import org.jclouds.blobstore.options.GetOptions;
@@ -52,7 +52,7 @@ public class BlobStoreBackedInputStreamImpl extends BackedInputStream {
         this.bucket = bucket;
         this.key = key;
         this.versionCheck = versionCheck;
-        this.buffer = PooledByteBufAllocator.DEFAULT.buffer(bufferSize, bufferSize);
+        this.buffer = PulsarByteBufAllocator.DEFAULT.buffer(bufferSize, bufferSize);
         this.objectLen = objectLen;
         this.bufferSize = bufferSize;
         this.cursor = 0;
@@ -116,7 +116,8 @@ public class BlobStoreBackedInputStreamImpl extends BackedInputStream {
 
     @Override
     public void seek(long position) {
-        log.debug("Seeking to {} on {}/{}, current position {}", position, bucket, key, cursor);
+        log.debug("Seeking to {} on {}/{}, current position {} (bufStart:{}, bufEnd:{})",
+                position, bucket, key, cursor, bufferOffsetStart, bufferOffsetEnd);
         if (position >= bufferOffsetStart && position <= bufferOffsetEnd) {
             long newIndex = position - bufferOffsetStart;
             buffer.readerIndex((int) newIndex);
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
index 92f2514..b48751e 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
@@ -19,7 +19,6 @@
 package org.apache.bookkeeper.mledger.offload.jcloud.impl;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -38,8 +37,8 @@ import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
 import org.apache.bookkeeper.mledger.offload.jcloud.BackedInputStream;
 import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock;
 import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockBuilder;
-import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexEntry;
 import org.apache.bookkeeper.mledger.offload.jcloud.impl.DataBlockUtils.VersionCheck;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.jclouds.blobstore.BlobStore;
 import org.jclouds.blobstore.domain.Blob;
 import org.slf4j.Logger;
@@ -104,19 +103,16 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle {
                 List<LedgerEntry> entries = new ArrayList<LedgerEntry>();
                 long nextExpectedId = firstEntry;
                 try {
-                    OffloadIndexEntry entry = index.getIndexEntryForEntry(firstEntry);
-                    inputStream.seek(entry.getDataOffset());
-
                     while (entriesToRead > 0) {
                         int length = dataStream.readInt();
                         if (length < 0) { // hit padding or new block
-                            inputStream.seekForward(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
-                            length = dataStream.readInt();
+                            inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
+                            continue;
                         }
                         long entryId = dataStream.readLong();
 
                         if (entryId == nextExpectedId) {
-                            ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(length, length);
+                            ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(length, length);
                             entries.add(LedgerEntryImpl.create(ledgerId, entryId, length, buf));
                             int toWrite = length;
                             while (toWrite > 0) {
@@ -124,6 +120,14 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle {
                             }
                             entriesToRead--;
                             nextExpectedId++;
+                        } else if (entryId > nextExpectedId) {
+                            inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
+                            continue;
+                        } else if (entryId < nextExpectedId
+                                && !index.getIndexEntryForEntry(nextExpectedId).equals(
+                                index.getIndexEntryForEntry(entryId)))  {
+                            inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
+                            continue;
                         } else if (entryId > lastEntry) {
                             log.info("Expected to read {}, but read {}, which is greater than last entry {}",
                                      nextExpectedId, entryId, lastEntry);
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
index 6643a83..4f51713 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
@@ -67,7 +67,6 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
 
     private final OrderedScheduler scheduler;
     private final TieredStorageConfiguration config;
-//    private final BlobStore writeBlobStore;
     private final Location writeLocation;
 
     // metadata to be stored as part of the offloaded ledger metadata
@@ -116,11 +115,6 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
         return config.getOffloadDriverMetadata();
     }
 
-//    @VisibleForTesting
-//    public ConcurrentMap<BlobStoreLocation, BlobStore> getBlobStores() {
-//        return blobStores;
-//    }
-
     /**
      * Upload the DataBlocks associated with the given ReadHandle using MultiPartUpload,
      * Creating indexBlocks for each corresponding DataBlock that is uploaded.
@@ -303,7 +297,6 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
     
     @Override
     public OffloadPolicies getOffloadPolicies() {
-        // TODO Auto-generated method stub
         Properties properties = new Properties();
         properties.putAll(config.getConfigProperties());
         return OffloadPolicies.create(properties);
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java
index dc7a68a..dcc693a 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java
@@ -23,7 +23,6 @@ import static com.google.common.base.Preconditions.checkState;
 import com.google.common.collect.Lists;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.CompositeByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Iterator;
@@ -33,6 +32,7 @@ import org.apache.bookkeeper.client.api.LedgerEntries;
 import org.apache.bookkeeper.client.api.LedgerEntry;
 import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.mledger.offload.jcloud.BlockAwareSegmentInputStream;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -126,8 +126,8 @@ public class BlockAwareSegmentInputStreamImpl extends BlockAwareSegmentInputStre
                 int entryLength = buf.readableBytes();
                 long entryId = entry.getEntryId();
 
-                CompositeByteBuf entryBuf = PooledByteBufAllocator.DEFAULT.compositeBuffer(2);
-                ByteBuf entryHeaderBuf = PooledByteBufAllocator.DEFAULT.buffer(ENTRY_HEADER_SIZE, ENTRY_HEADER_SIZE);
+                CompositeByteBuf entryBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer(2);
+                ByteBuf entryHeaderBuf = PulsarByteBufAllocator.DEFAULT.buffer(ENTRY_HEADER_SIZE, ENTRY_HEADER_SIZE);
 
                 entryHeaderBuf.writeInt(entryLength).writeLong(entryId);
                 entryBuf.addComponents(true, entryHeaderBuf, buf);
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/DataBlockHeaderImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/DataBlockHeaderImpl.java
index 9e3fe90..9239ec2 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/DataBlockHeaderImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/DataBlockHeaderImpl.java
@@ -28,6 +28,7 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import org.apache.bookkeeper.mledger.offload.jcloud.DataBlockHeader;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 
 /**
  * The data block header in code storage for each data block.
@@ -110,7 +111,7 @@ public class DataBlockHeaderImpl implements DataBlockHeader {
      */
     @Override
     public InputStream toStream() {
-        ByteBuf out = PooledByteBufAllocator.DEFAULT.buffer(HEADER_MAX_SIZE, HEADER_MAX_SIZE);
+        ByteBuf out = PulsarByteBufAllocator.DEFAULT.buffer(HEADER_MAX_SIZE, HEADER_MAX_SIZE);
         out.writeInt(MAGIC_WORD)
             .writeLong(headerLength)
             .writeLong(blockLength)
@@ -120,5 +121,11 @@ public class DataBlockHeaderImpl implements DataBlockHeader {
         // true means the input stream will release the ByteBuf on close
         return new ByteBufInputStream(out, true);
     }
+
+    @Override
+    public String toString() {
+        return String.format("DataBlockHeader(len:%d,hlen:%d,firstEntry:%d)",
+                blockLength, headerLength, firstEntryId);
+    }
 }
 
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockImpl.java
index 0a4e90b..b34dce5 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockImpl.java
@@ -22,7 +22,6 @@ import com.google.common.collect.Maps;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufInputStream;
-import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
 
@@ -42,6 +41,7 @@ import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexEntry;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.DataFormats;
 import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -156,7 +156,7 @@ public class OffloadIndexBlockImpl implements OffloadIndexBlock {
             + segmentMetadataLength
             + indexEntryCount * (8 + 4 + 8); /* messageEntryId + blockPartId + blockOffset */
 
-        ByteBuf out = PooledByteBufAllocator.DEFAULT.buffer(indexBlockLength, indexBlockLength);
+        ByteBuf out = PulsarByteBufAllocator.DEFAULT.buffer(indexBlockLength, indexBlockLength);
 
         out.writeInt(INDEX_MAGIC_WORD)
             .writeInt(indexBlockLength)
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexEntryImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexEntryImpl.java
index 5dce79d..10408fe 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexEntryImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexEntryImpl.java
@@ -56,5 +56,11 @@ public class OffloadIndexEntryImpl implements OffloadIndexEntry {
         this.offset = offset;
         this.blockHeaderSize = blockHeaderSize;
     }
+
+    @Override
+    public String toString() {
+        return String.format("[eid:%d, part:%d, offset:%d, doffset:%d]",
+                entryId, partId, offset, getDataOffset());
+    }
 }
 
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java
index ba03f64..df4b0d3 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java
@@ -18,8 +18,14 @@
  */
 package org.apache.bookkeeper.mledger.offload.jcloud.provider;
 
+import static org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration.GCS_ACCOUNT_KEY_FILE_FIELD;
+import static org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration.S3_ROLE_FIELD;
+import static org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration.S3_ROLE_SESSION_NAME_FIELD;
+
 import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSSessionCredentials;
 import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
+import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
 import com.google.common.base.Strings;
 import com.google.common.io.Files;
 
@@ -36,8 +42,8 @@ import org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfig
 import org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration.CredentialBuilder;
 
 import org.apache.commons.lang3.StringUtils;
-import org.jclouds.Constants;
 import org.jclouds.ContextBuilder;
+import org.jclouds.aws.domain.SessionCredentials;
 import org.jclouds.aws.s3.AWSS3ProviderMetadata;
 import org.jclouds.azureblob.AzureBlobProviderMetadata;
 import org.jclouds.blobstore.BlobStore;
@@ -106,9 +112,8 @@ public enum JCloudBlobStoreProvider implements Serializable, ConfigValidation, B
             if (config.getCredentials() == null) {
                 try {
                     String gcsKeyContent = Files.toString(
-                            new File(config.getConfigProperty("gcsManagedLedgerOffloadServiceAccountKeyFile")),
-                                     Charset.defaultCharset());
-                    config.setProviderCredentials(new GoogleCredentialsFromJson(gcsKeyContent).get());
+                            new File(config.getConfigProperty(GCS_ACCOUNT_KEY_FILE_FIELD)), Charset.defaultCharset());
+                    config.setProviderCredentials(() -> new GoogleCredentialsFromJson(gcsKeyContent).get());
                 } catch (IOException ioe) {
                     log.error("Cannot read GCS service account credentials file: {}",
                             config.getConfigProperty("gcsManagedLedgerOffloadServiceAccountKeyFile"));
@@ -139,7 +144,7 @@ public enum JCloudBlobStoreProvider implements Serializable, ConfigValidation, B
             if (StringUtils.isEmpty(accountKey)) {
                 throw new IllegalArgumentException("Couldn't get the azure storage access key.");
             }
-            config.setProviderCredentials(new Credentials(accountName, accountKey));
+            config.setProviderCredentials(() -> new Credentials(accountName, accountKey));
         }
     },
 
@@ -246,8 +251,7 @@ public enum JCloudBlobStoreProvider implements Serializable, ConfigValidation, B
 
         if (config.getProviderCredentials() != null) {
                 return contextBuilder
-                        .credentials(config.getProviderCredentials().identity,
-                                     config.getProviderCredentials().credential)
+                        .credentialsSupplier(config.getCredentials())
                         .buildView(BlobStoreContext.class)
                         .getBlobStore();
             } else {
@@ -262,17 +266,35 @@ public enum JCloudBlobStoreProvider implements Serializable, ConfigValidation, B
         if (config.getCredentials() == null) {
             AWSCredentials awsCredentials = null;
             try {
-                DefaultAWSCredentialsProviderChain creds = DefaultAWSCredentialsProviderChain.getInstance();
-                awsCredentials = creds.getCredentials();
+                if (Strings.isNullOrEmpty(config.getConfigProperty(S3_ROLE_FIELD))) {
+                    awsCredentials = DefaultAWSCredentialsProviderChain.getInstance().getCredentials();
+                } else {
+                    awsCredentials =
+                            new STSAssumeRoleSessionCredentialsProvider.Builder(
+                                    config.getConfigProperty(S3_ROLE_FIELD),
+                                    config.getConfigProperty(S3_ROLE_SESSION_NAME_FIELD)
+                            ).build().getCredentials();
+                }
+
+                if (awsCredentials instanceof AWSSessionCredentials) {
+                    // if we have session credentials, we need to send the session token
+                    // this allows us to support EC2 metadata credentials
+                    SessionCredentials sessionCredentials =  SessionCredentials.builder()
+                            .accessKeyId(awsCredentials.getAWSAccessKeyId())
+                            .secretAccessKey(awsCredentials.getAWSSecretKey())
+                            .sessionToken(((AWSSessionCredentials) awsCredentials).getSessionToken())
+                            .build();
+                    config.setProviderCredentials(() -> sessionCredentials);
+                } else {
+                    Credentials credentials = new Credentials(
+                            awsCredentials.getAWSAccessKeyId(), awsCredentials.getAWSSecretKey());
+                    config.setProviderCredentials(() -> credentials);
+                }
+
             } catch (Exception e) {
                 // allowed, some mock s3 service do not need credential
                 log.warn("Exception when get credentials for s3 ", e);
             }
-            if (awsCredentials != null) {
-                config.setProviderCredentials(
-                        new Credentials(awsCredentials.getAWSAccessKeyId(),
-                                        awsCredentials.getAWSSecretKey()));
-            }
         }
     };
 }
\ No newline at end of file
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java
index 5abd867..ac2bed9 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java
@@ -35,6 +35,7 @@ import java.util.stream.Collectors;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.jcloud.shade.com.google.common.base.Supplier;
 import org.jclouds.Constants;
 import org.jclouds.aws.s3.AWSS3ProviderMetadata;
 import org.jclouds.blobstore.BlobStore;
@@ -69,6 +70,10 @@ public class TieredStorageConfiguration implements Serializable, Cloneable {
 
     protected static final int MB = 1024 * 1024;
 
+    public static final String GCS_ACCOUNT_KEY_FILE_FIELD = "gcsManagedLedgerOffloadServiceAccountKeyFile";
+    public static final String S3_ROLE_FIELD = "s3ManagedLedgerOffloadRole";
+    public static final String S3_ROLE_SESSION_NAME_FIELD = "s3ManagedLedgerOffloadRoleSessionName";
+
     public static TieredStorageConfiguration create(Properties props) throws IOException {
         Map<String, String> map = new HashMap<String, String>();
         map.putAll(props.entrySet()
@@ -86,7 +91,7 @@ public class TieredStorageConfiguration implements Serializable, Cloneable {
     @Getter
     private final Map<String, String> configProperties;
     @Getter
-    private Credentials credentials;
+    private Supplier<Credentials> credentials;
     private JCloudBlobStoreProvider provider;
 
     public TieredStorageConfiguration(Map<String, String> configProperties) {
@@ -221,14 +226,14 @@ public class TieredStorageConfiguration implements Serializable, Cloneable {
         return new Integer(MB);
     }
 
-    public Credentials getProviderCredentials() {
+    public Supplier<Credentials> getProviderCredentials() {
         if (credentials == null) {
             getProvider().buildCredentials(this);
         }
         return credentials;
     }
 
-    public void setProviderCredentials(Credentials credentials) {
+    public void setProviderCredentials(Supplier<Credentials> credentials) {
         this.credentials = credentials;
     }
 
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderBase.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderBase.java
index ea613e7..a2944f5 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderBase.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderBase.java
@@ -38,6 +38,7 @@ import org.apache.bookkeeper.mledger.offload.jcloud.provider.JCloudBlobStoreProv
 import org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.jcloud.shade.com.google.common.base.Supplier;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.MockZooKeeper;
 import org.apache.zookeeper.data.ACL;
@@ -99,14 +100,14 @@ public abstract class BlobStoreManagedLedgerOffloaderBase {
      * Get the credentials to use for the JCloud provider
      * based on the System properties.
      */
-    protected static Credentials getBlobStoreCredentials() {
+    protected static Supplier<Credentials> getBlobStoreCredentials() {
         if (Boolean.parseBoolean(System.getProperty("testRealAWS", "false"))) {
             /* To use this, must config credentials using "aws_access_key_id" as S3ID,
              *  and "aws_secret_access_key" as S3Key. And bucket should exist in default region. e.g.
              *      props.setProperty("S3ID", "AXXXXXXQ");
              *      props.setProperty("S3Key", "HXXXXXß");
              */
-            return new Credentials(System.getProperty("S3ID"), System.getProperty("S3Key"));
+            return () -> new Credentials(System.getProperty("S3ID"), System.getProperty("S3Key"));
                     
         } else if (Boolean.parseBoolean(System.getProperty("testRealGCS", "false"))) {
             /*
@@ -115,7 +116,7 @@ public abstract class BlobStoreManagedLedgerOffloaderBase {
              *        props.setProperty("GCSID", "5XXXXXXXXXX6-compute@developer.gserviceaccount.com");
              *        props.setProperty("GCSKey", "XXXXXX");
              */
-            return new Credentials(System.getProperty("GCSID"), System.getProperty("GCSKey"));
+            return () -> new Credentials(System.getProperty("GCSID"), System.getProperty("GCSKey"));
         } else {
             return null;
         }