You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/11/21 03:13:05 UTC

[pulsar] branch branch-2.6 updated (45125a1 -> 8793963)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from 45125a1  [Branch-2.6] Fix cherry-pick issue between #8302 and #8304 (#8652)
     new 89d1cb8  [Tiered Storage] Support Azure BlobStore offload configuration (#8436)
     new 8793963  [Tiered Storage] Fix merge conflicts introduced by PR #6335 (#8630)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../common/policies/data/OffloadPolicies.java      |  64 +++++-
 site2/docs/tiered-storage-azure.md                 | 224 +++++++++++++++++++++
 ...fload.java => TestUniversalConfigurations.java} |  15 +-
 .../impl/BlobStoreBackedInputStreamImpl.java       |   7 +-
 .../jcloud/impl/BlobStoreBackedReadHandleImpl.java |  20 +-
 .../impl/BlobStoreManagedLedgerOffloader.java      |   7 -
 .../impl/BlockAwareSegmentInputStreamImpl.java     |   6 +-
 .../offload/jcloud/impl/DataBlockHeaderImpl.java   |   9 +-
 .../offload/jcloud/impl/OffloadIndexBlockImpl.java |   4 +-
 .../offload/jcloud/impl/OffloadIndexEntryImpl.java |   6 +
 .../jcloud/provider/JCloudBlobStoreProvider.java   |  74 +++++--
 .../provider/TieredStorageConfiguration.java       |  15 +-
 .../impl/BlobStoreManagedLedgerOffloaderBase.java  |  18 +-
 .../provider/JCloudBlobStoreProviderTests.java     |  28 +--
 .../provider/TieredStorageConfigurationTests.java  |  38 ++--
 .../provider/TransientBlobStoreFactoryTests.java   |   2 +-
 16 files changed, 435 insertions(+), 102 deletions(-)
 create mode 100644 site2/docs/tiered-storage-azure.md
 copy tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/{TestS3Offload.java => TestUniversalConfigurations.java} (91%)


[pulsar] 01/02: [Tiered Storage] Support Azure BlobStore offload configuration (#8436)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 89d1cb878a1429911ec396dadbb04826f7518f97
Author: ran <ga...@126.com>
AuthorDate: Wed Nov 4 08:46:36 2020 +0800

    [Tiered Storage] Support Azure BlobStore offload configuration (#8436)
    
    Fix https://github.com/streamnative/pulsar/issues/1606
    
    ### Motivation
    
    Support Azure BlobStore offload configuration.
    
    ### Modifications
    
    The new tiered-storage configuration was introduced in the offload refinement.
    Add new universal configurations in `OffloadPolicies`
    
    (cherry picked from commit 4d98dc678e6a22cfa18da476641b8e314676a96a)
---
 .../common/policies/data/OffloadPolicies.java      |  64 +++++-
 site2/docs/tiered-storage-azure.md                 | 224 +++++++++++++++++++++
 .../offload/TestUniversalConfigurations.java       |  83 ++++++++
 .../jcloud/provider/JCloudBlobStoreProvider.java   |  26 +++
 .../provider/TieredStorageConfiguration.java       |   4 +-
 .../impl/BlobStoreManagedLedgerOffloaderBase.java  |  11 +-
 .../provider/JCloudBlobStoreProviderTests.java     |  28 +--
 .../provider/TieredStorageConfigurationTests.java  |  38 ++--
 .../provider/TransientBlobStoreFactoryTests.java   |   2 +-
 9 files changed, 430 insertions(+), 50 deletions(-)

diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
index bbc83e4..5d2103b 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
@@ -61,7 +61,7 @@ public class OffloadPolicies implements Serializable {
     public final static int DEFAULT_READ_BUFFER_SIZE_IN_BYTES = 1024 * 1024;      // 1MB
     public final static int DEFAULT_OFFLOAD_MAX_THREADS = 2;
     public final static int DEFAULT_OFFLOAD_MAX_PREFETCH_ROUNDS = 1;
-    public final static String[] DRIVER_NAMES = {"S3", "aws-s3", "google-cloud-storage", "filesystem"};
+    public final static String[] DRIVER_NAMES = {"S3", "aws-s3", "google-cloud-storage", "filesystem", "azureblob"};
     public final static String DEFAULT_OFFLOADER_DIRECTORY = "./offloaders";
     public final static Long DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES = null;
     public final static Long DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS = null;
@@ -120,6 +120,19 @@ public class OffloadPolicies implements Serializable {
     @Configuration
     private String fileSystemURI = null;
 
+    // --------- new offload configurations ---------
+    // they are universal configurations and could be used to `aws-s3`, `google-cloud-storage` or `azureblob`.
+    @Configuration
+    private String managedLedgerOffloadBucket;
+    @Configuration
+    private String managedLedgerOffloadRegion;
+    @Configuration
+    private String managedLedgerOffloadServiceEndpoint;
+    @Configuration
+    private Integer managedLedgerOffloadMaxBlockSizeInBytes;
+    @Configuration
+    private Integer managedLedgerOffloadReadBufferSizeInBytes;
+
     public static OffloadPolicies create(String driver, String region, String bucket, String endpoint,
                                          Integer maxBlockSizeInBytes, Integer readBufferSizeInBytes,
                                          Long offloadThresholdInBytes, Long offloadDeletionLagInMillis) {
@@ -128,6 +141,12 @@ public class OffloadPolicies implements Serializable {
         offloadPolicies.setManagedLedgerOffloadThresholdInBytes(offloadThresholdInBytes);
         offloadPolicies.setManagedLedgerOffloadDeletionLagInMillis(offloadDeletionLagInMillis);
 
+        offloadPolicies.setManagedLedgerOffloadBucket(bucket);
+        offloadPolicies.setManagedLedgerOffloadRegion(region);
+        offloadPolicies.setManagedLedgerOffloadServiceEndpoint(endpoint);
+        offloadPolicies.setManagedLedgerOffloadMaxBlockSizeInBytes(maxBlockSizeInBytes);
+        offloadPolicies.setManagedLedgerOffloadReadBufferSizeInBytes(readBufferSizeInBytes);
+
         if (driver.equalsIgnoreCase(DRIVER_NAMES[0]) || driver.equalsIgnoreCase(DRIVER_NAMES[1])) {
             offloadPolicies.setS3ManagedLedgerOffloadRegion(region);
             offloadPolicies.setS3ManagedLedgerOffloadBucket(bucket);
@@ -210,6 +229,9 @@ public class OffloadPolicies implements Serializable {
         if (managedLedgerOffloadDriver == null) {
             return false;
         }
+        if (StringUtils.isNotEmpty(managedLedgerOffloadBucket)) {
+            return true;
+        }
         if (isS3Driver()) {
             return StringUtils.isNotEmpty(s3ManagedLedgerOffloadBucket);
         } else if (isGcsDriver()) {
@@ -241,7 +263,12 @@ public class OffloadPolicies implements Serializable {
                 gcsManagedLedgerOffloadReadBufferSizeInBytes,
                 gcsManagedLedgerOffloadServiceAccountKeyFile,
                 fileSystemProfilePath,
-                fileSystemURI);
+                fileSystemURI,
+                managedLedgerOffloadBucket,
+                managedLedgerOffloadRegion,
+                managedLedgerOffloadServiceEndpoint,
+                managedLedgerOffloadMaxBlockSizeInBytes,
+                managedLedgerOffloadReadBufferSizeInBytes);
     }
 
     @Override
@@ -280,7 +307,14 @@ public class OffloadPolicies implements Serializable {
                 && Objects.equals(gcsManagedLedgerOffloadServiceAccountKeyFile,
                     other.getGcsManagedLedgerOffloadServiceAccountKeyFile())
                 && Objects.equals(fileSystemProfilePath, other.getFileSystemProfilePath())
-                && Objects.equals(fileSystemURI, other.getFileSystemURI());
+                && Objects.equals(fileSystemURI, other.getFileSystemURI())
+                && Objects.equals(managedLedgerOffloadBucket, other.getManagedLedgerOffloadBucket())
+                && Objects.equals(managedLedgerOffloadRegion, other.getManagedLedgerOffloadRegion())
+                && Objects.equals(managedLedgerOffloadServiceEndpoint, other.getManagedLedgerOffloadServiceEndpoint())
+                && Objects.equals(managedLedgerOffloadMaxBlockSizeInBytes,
+                    other.getManagedLedgerOffloadMaxBlockSizeInBytes())
+                && Objects.equals(managedLedgerOffloadReadBufferSizeInBytes,
+                    other.getManagedLedgerOffloadReadBufferSizeInBytes());
     }
 
     @Override
@@ -306,16 +340,14 @@ public class OffloadPolicies implements Serializable {
                 .add("gcsManagedLedgerOffloadServiceAccountKeyFile", gcsManagedLedgerOffloadServiceAccountKeyFile)
                 .add("fileSystemProfilePath", fileSystemProfilePath)
                 .add("fileSystemURI", fileSystemURI)
+                .add("managedLedgerOffloadBucket", managedLedgerOffloadBucket)
+                .add("managedLedgerOffloadRegion", managedLedgerOffloadRegion)
+                .add("managedLedgerOffloadServiceEndpoint", managedLedgerOffloadServiceEndpoint)
+                .add("managedLedgerOffloadMaxBlockSizeInBytes", managedLedgerOffloadMaxBlockSizeInBytes)
+                .add("managedLedgerOffloadReadBufferSizeInBytes", managedLedgerOffloadReadBufferSizeInBytes)
                 .toString();
     }
 
-    public static final String METADATA_FIELD_BUCKET = "bucket";
-    public static final String METADATA_FIELD_REGION = "region";
-    public static final String METADATA_FIELD_ENDPOINT = "serviceEndpoint";
-    public static final String METADATA_FIELD_MAX_BLOCK_SIZE = "maxBlockSizeInBytes";
-    public static final String METADATA_FIELD_READ_BUFFER_SIZE = "readBufferSizeInBytes";
-    public static final String OFFLOADER_PROPERTY_PREFIX = "managedLedgerOffload.";
-
     public Properties toProperties() {
         Properties properties = new Properties();
 
@@ -360,6 +392,16 @@ public class OffloadPolicies implements Serializable {
             setProperty(properties, "fileSystemProfilePath", this.getFileSystemProfilePath());
             setProperty(properties, "fileSystemURI", this.getFileSystemURI());
         }
+
+        setProperty(properties, "managedLedgerOffloadBucket", this.getManagedLedgerOffloadBucket());
+        setProperty(properties, "managedLedgerOffloadRegion", this.getManagedLedgerOffloadRegion());
+        setProperty(properties, "managedLedgerOffloadServiceEndpoint",
+                this.getManagedLedgerOffloadServiceEndpoint());
+        setProperty(properties, "managedLedgerOffloadMaxBlockSizeInBytes",
+                this.getManagedLedgerOffloadMaxBlockSizeInBytes());
+        setProperty(properties, "managedLedgerOffloadReadBufferSizeInBytes",
+                this.getManagedLedgerOffloadReadBufferSizeInBytes());
+
         return properties;
     }
 
@@ -409,7 +451,7 @@ public class OffloadPolicies implements Serializable {
      * <p>policies level priority: topic > namespace > broker
      *
      * @param topicLevelPolicies topic level offload policies
-     * @param nsLevelPolicies namesapce level offload policies
+     * @param nsLevelPolicies namespace level offload policies
      * @param brokerProperties broker level offload configuration
      * @return offload policies
      */
diff --git a/site2/docs/tiered-storage-azure.md b/site2/docs/tiered-storage-azure.md
new file mode 100644
index 0000000..0a86080
--- /dev/null
+++ b/site2/docs/tiered-storage-azure.md
@@ -0,0 +1,224 @@
+---
+id: tiered-storage-azure
+title: Use Azure BlobStore offloader with Pulsar
+sidebar_label: Azure BlobStore offloader
+---
+
+This chapter guides you through every step of installing and configuring the Azure BlobStore offloader and using it with Pulsar.
+
+## Installation
+
+Follow the steps below to install the Azure BlobStore offloader.
+
+### Prerequisite
+
+- Pulsar: 2.6.2 or later versions
+  
+### Step
+
+This example uses Pulsar 2.6.2.
+
+1. Download the Pulsar tarball using one of the following ways:
+
+   * Download from the [Apache mirror](https://archive.apache.org/dist/pulsar/pulsar-2.6.2/apache-pulsar-2.6.2-bin.tar.gz)
+
+   * Download from the Pulsar [downloads page](https://pulsar.apache.org/download)
+
+   * Use [wget](https://www.gnu.org/software/wget):
+
+     ```shell
+     wget https://archive.apache.org/dist/pulsar/pulsar-2.6.2/apache-pulsar-2.6.2-bin.tar.gz
+     ```
+
+2. Download and untar the Pulsar offloaders package. 
+
+    ```bash
+    wget https://downloads.apache.org/pulsar/pulsar-2.6.2/apache-pulsar-offloaders-2.6.2-bin.tar.gz
+    tar xvfz apache-pulsar-offloaders-2.6.2-bin.tar.gz
+    ```
+
+3. Copy the Pulsar offloaders as `offloaders` in the Pulsar directory.
+
+    ```
+    mv apache-pulsar-offloaders-2.6.2/offloaders apache-pulsar-2.6.2/offloaders
+
+    ls offloaders
+    ```
+
+    **Output**
+
+    As shown from the output, Pulsar uses [Apache jclouds](https://jclouds.apache.org) to support [AWS S3](https://aws.amazon.com/s3/),  [GCS](https://cloud.google.com/storage/) and [Azure](https://portal.azure.com/#home) for long term storage. 
+
+    ```
+    tiered-storage-file-system-2.6.2.nar
+    tiered-storage-jcloud-2.6.2.nar
+    ```
+
+    > #### Note
+    >
+    > * If you are running Pulsar in a bare metal cluster, make sure that `offloaders` tarball is unzipped in every broker's Pulsar directory.
+    > 
+    > * If you are running Pulsar in Docker or deploying Pulsar using a Docker image (such as K8s and DCOS), you can use the `apachepulsar/pulsar-all` image instead of the `apachepulsar/pulsar` image. `apachepulsar/pulsar-all` image has already bundled tiered storage offloaders.
+
+## Configuration
+
+> #### Note
+> 
+> Before offloading data from BookKeeper to Azure BlobStore, you need to configure some properties of the Azure BlobStore offload driver.
+
+Besides, you can also configure the Azure BlobStore offloader to run it automatically or trigger it manually.
+
+### Configure Azure BlobStore offloader driver
+
+You can configure the Azure BlobStore offloader driver in the configuration file `broker.conf` or `standalone.conf`.
+
+- **Required** configurations are as below.
+  
+    Required configuration | Description | Example value
+    |---|---|---
+    `managedLedgerOffloadDriver` | Offloader driver name | azureblob
+    `offloadersDirectory` | Offloader directory | offloaders
+    `managedLedgerOffloadBucket` | Bucket | pulsar-topic-offload
+
+- **Optional** configurations are as below.
+
+    Optional | Description | Example value
+    |---|---|---
+    `managedLedgerOffloadReadBufferSizeInBytes`|Size of block read|1 MB
+    `managedLedgerOffloadMaxBlockSizeInBytes`|Size of block write|64 MB
+    `managedLedgerMinLedgerRolloverTimeMinutes`|Minimum time between ledger rollover for a topic<br><br>**Note**: it is not recommended that you set this configuration in the production environment.|2
+    `managedLedgerMaxEntriesPerLedger`|Maximum number of entries to append to a ledger before triggering a rollover.<br><br>**Note**: it is not recommended that you set this configuration in the production environment.|5000
+
+#### Bucket (required)
+
+A bucket is a basic container that holds your data. Everything you store in Azure BlobStore must be contained in a bucket. You can use a bucket to organize your data and control access to your data, but unlike directory and folder, you cannot nest a bucket.
+
+##### Example
+
+This example names the bucket as _pulsar-topic-offload_.
+
+```conf
+managedLedgerOffloadBucket=pulsar-topic-offload
+```
+
+#### Authentication (required)
+
+To be able to access Azure BlobStore, you need to authenticate with Azure BlobStore.
+
+* Set the environment variables `AZURE_STORAGE_ACCOUNT` and `AZURE_STORAGE_ACCESS_KEY` in `conf/pulsar_env.sh`.
+
+    "export" is important so that the variables are made available in the environment of spawned processes.
+
+    ```bash
+    export AZURE_STORAGE_ACCOUNT=ABC123456789
+    export AZURE_STORAGE_ACCESS_KEY=ded7db27a4558e2ea8bbf0bf37ae0e8521618f366c
+    ```
+
+#### Size of block read/write
+
+You can configure the size of a request sent to or read from Azure BlobStore in the configuration file `broker.conf` or `standalone.conf`. 
+
+Configuration|Description|Default value
+|---|---|---
+`managedLedgerOffloadReadBufferSizeInBytes`|Block size for each individual read when reading back data from Azure BlobStore store.|1 MB
+`managedLedgerOffloadMaxBlockSizeInBytes`|Maximum size of a "part" sent during a multipart upload to Azure BlobStore store. It **cannot** be smaller than 5 MB. |64 MB
+
+### Configure Azure BlobStore offloader to run automatically
+
+Namespace policy can be configured to offload data automatically once a threshold is reached. The threshold is based on the size of data that a topic has stored on a Pulsar cluster. Once the topic reaches the threshold, an offloading operation is triggered automatically. 
+
+Threshold value|Action
+|---|---
+> 0 | It triggers the offloading operation if the topic storage reaches its threshold.
+= 0|It causes a broker to offload data as soon as possible.
+< 0 |It disables automatic offloading operation.
+
+Automatic offloading runs when a new segment is added to a topic log. If you set the threshold on a namespace, but few messages are being produced to the topic, offloader does not work until the current segment is full.
+
+You can configure the threshold size using CLI tools, such as [pulsarctl](https://streamnative.io/docs/v1.0.0/manage-and-monitor/pulsarctl/overview/) or pulsar-admin.
+
+The offload configurations in `broker.conf` and `standalone.conf` are used for the namespaces that do not have namespace level offload policies. Each namespace can have its own offload policy. If you want to set offload policy for each namespace, use the command [`pulsar-admin namespaces set-offload-policies options`](http://pulsar.apache.org/tools/pulsar-admin/2.6.0-SNAPSHOT/#-em-set-offload-policies-em-) command.
+ 
+#### Example
+
+This example sets the Azure BlobStore offloader threshold size to 10 MB using pulsar-admin.
+
+```bash
+bin/pulsar-admin namespaces set-offload-threshold --size 10M my-tenant/my-namespace
+```
+
+> #### Tip
+>
+> For more information about the `pulsar-admin namespaces set-offload-threshold options` command, including flags, descriptions, and default values, see [here](http://pulsar.apache.org/tools/pulsar-admin/2.6.0-SNAPSHOT/#-em-set-offload-threshold-em-). 
+
+### Configure Azure BlobStore offloader to run manually
+
+For individual topics, you can trigger Azure BlobStore offloader manually using one of the following methods:
+
+- Use REST endpoint 
+
+- Use CLI tools (such as [pulsarctl](https://streamnative.io/docs/v1.0.0/manage-and-monitor/pulsarctl/overview/) or pulsar-admin). 
+
+    To trigger it via CLI tools, you need to specify the maximum amount of data (threshold) that should be retained on a Pulsar cluster for a topic. If the size of the topic data on the Pulsar cluster exceeds this threshold, segments from the topic are moved to Azure BlobStore until the threshold is no longer exceeded. Older segments are moved first.
+
+#### Example
+
+- This example triggers the Azure BlobStore offloader to run manually using pulsar-admin.
+
+    ```bash
+    bin/pulsar-admin topics offload --size-threshold 10M my-tenant/my-namespace/topic1
+    ``` 
+
+    **Output**
+
+    ```bash
+    Offload triggered for persistent://my-tenant/my-namespace/topic1 for messages before 2:0:-1
+    ```
+
+    > #### Tip
+    >
+    > For more information about the `pulsar-admin topics offload options` command, including flags, descriptions, and default values, see [here](http://pulsar.apache.org/tools/pulsar-admin/2.6.0-SNAPSHOT/#-em-offload-em-). 
+
+- This example checks the Azure BlobStore offloader status using pulsar-admin.
+
+    ```bash
+    bin/pulsar-admin topics offload-status persistent://my-tenant/my-namespace/topic1
+    ```
+
+    **Output**
+
+    ```bash
+    Offload is currently running
+    ```
+
+    To wait for the Azure BlobStore offloader to complete the job, add the `-w` flag.
+
+    ```bash
+    bin/pulsar-admin topics offload-status -w persistent://my-tenant/my-namespace/topic1
+    ```
+
+    **Output**
+    
+    ```
+    Offload was a success
+    ```
+
+
+    If there is an error in offloading, the error is propagated to the `pulsar-admin topics offload-status` command.
+
+    ```bash
+    bin/pulsar-admin topics offload-status persistent://my-tenant/my-namespace/topic1
+    ```
+
+    **Output**
+
+    ```
+    Error in offload
+    null
+
+    Reason: Error offloading: org.apache.bookkeeper.mledger.ManagedLedgerException: 
+    ````
+
+    > #### Tip
+    >
+    > For more information about the `pulsar-admin topics offload-status options` command, including flags, descriptions, and default values, see [here](http://pulsar.apache.org/tools/pulsar-admin/2.6.0-SNAPSHOT/#-em-offload-status-em-). 
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestUniversalConfigurations.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestUniversalConfigurations.java
new file mode 100644
index 0000000..2e8ef30
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestUniversalConfigurations.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.offload;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.tests.integration.containers.S3Container;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Slf4j
+public class TestUniversalConfigurations extends TestBaseOffload {
+
+    private S3Container s3Container;
+
+    @Override
+    protected void beforeStartCluster() throws Exception {
+        super.beforeStartCluster();
+
+        log.info("s3 container init");
+        s3Container = new S3Container(
+                pulsarCluster.getClusterName(),
+                S3Container.NAME)
+                .withNetwork(pulsarCluster.getNetwork())
+                .withNetworkAliases(S3Container.NAME);
+        s3Container.start();
+        log.info("s3 container start finish.");
+    }
+
+    @AfterClass
+    public void teardownS3() {
+        if (null != s3Container) {
+            s3Container.stop();
+        }
+    }
+
+    @Test(dataProvider =  "ServiceAndAdminUrls")
+    public void testPublishOffloadAndConsumeViaCLI(String serviceUrl, String adminUrl) throws Exception {
+        super.testPublishOffloadAndConsumeViaCLI(serviceUrl, adminUrl);
+    }
+
+    @Test(dataProvider =  "ServiceAndAdminUrls")
+    public void testPublishOffloadAndConsumeViaThreshold(String serviceUrl, String adminUrl) throws Exception {
+        super.testPublishOffloadAndConsumeViaThreshold(serviceUrl, adminUrl);
+    }
+
+    @Test(dataProvider =  "ServiceAndAdminUrls")
+    public void testPublishOffloadAndConsumeDeletionLag(String serviceUrl, String adminUrl) throws Exception {
+        super.testPublishOffloadAndConsumeDeletionLag(serviceUrl, adminUrl);
+
+    }
+
+    @Override
+    protected Map<String, String> getEnv() {
+        Map<String, String> result = new HashMap<>();
+        result.put("managedLedgerMaxEntriesPerLedger", String.valueOf(ENTRIES_PER_LEDGER));
+        result.put("managedLedgerMinLedgerRolloverTimeMinutes", "0");
+        result.put("managedLedgerOffloadDriver", "aws-s3");
+        result.put("managedLedgerOffloadBucket", "pulsar-integtest");
+        result.put("managedLedgerOffloadServiceEndpoint", "http://" + S3Container.NAME + ":9090");
+
+        return result;
+    }
+
+}
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java
index 9fcebc2..ba03f64 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java
@@ -39,6 +39,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.jclouds.Constants;
 import org.jclouds.ContextBuilder;
 import org.jclouds.aws.s3.AWSS3ProviderMetadata;
+import org.jclouds.azureblob.AzureBlobProviderMetadata;
 import org.jclouds.blobstore.BlobStore;
 import org.jclouds.blobstore.BlobStoreContext;
 import org.jclouds.blobstore.TransientApiMetadata;
@@ -117,6 +118,31 @@ public enum JCloudBlobStoreProvider implements Serializable, ConfigValidation, B
         }
     },
 
+    AZURE_BLOB("azureblob", new AzureBlobProviderMetadata()) {
+        @Override
+        public void validate(TieredStorageConfiguration config) throws IllegalArgumentException {
+            VALIDATION.validate(config);
+        }
+
+        @Override
+        public BlobStore getBlobStore(TieredStorageConfiguration config) {
+            return BLOB_STORE_BUILDER.getBlobStore(config);
+        }
+
+        @Override
+        public void buildCredentials(TieredStorageConfiguration config) {
+            String accountName = System.getenv("AZURE_STORAGE_ACCOUNT");
+            if (StringUtils.isEmpty(accountName)) {
+                throw new IllegalArgumentException("Couldn't get the azure storage account.");
+            }
+            String accountKey = System.getenv("AZURE_STORAGE_ACCESS_KEY");
+            if (StringUtils.isEmpty(accountKey)) {
+                throw new IllegalArgumentException("Couldn't get the azure storage access key.");
+            }
+            config.setProviderCredentials(new Credentials(accountName, accountKey));
+        }
+    },
+
     TRANSIENT("transient", new AnonymousProviderMetadata(new TransientApiMetadata(), "")) {
         @Override
         public void validate(TieredStorageConfiguration config) throws IllegalArgumentException {
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java
index f122ec0..5abd867 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java
@@ -65,7 +65,7 @@ public class TieredStorageConfiguration implements Serializable, Cloneable {
     public static final String METADATA_FIELD_ENDPOINT = "serviceEndpoint";
     public static final String METADATA_FIELD_MAX_BLOCK_SIZE = "maxBlockSizeInBytes";
     public static final String METADATA_FIELD_READ_BUFFER_SIZE = "readBufferSizeInBytes";
-    public static final String OFFLOADER_PROPERTY_PREFIX = "managedLedgerOffload.";
+    public static final String OFFLOADER_PROPERTY_PREFIX = "managedLedgerOffload";
 
     protected static final int MB = 1024 * 1024;
 
@@ -115,7 +115,7 @@ public class TieredStorageConfiguration implements Serializable, Cloneable {
     private String getKeyName(String property) {
         StringBuilder sb = new StringBuilder();
         sb.append(OFFLOADER_PROPERTY_PREFIX)
-          .append(property);
+          .append(StringUtils.capitalize(property));
 
         return sb.toString();
     }
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderBase.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderBase.java
index 90e1caf..ea613e7 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderBase.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderBase.java
@@ -37,6 +37,7 @@ import org.apache.bookkeeper.mledger.offload.jcloud.impl.DataBlockHeaderImpl;
 import org.apache.bookkeeper.mledger.offload.jcloud.provider.JCloudBlobStoreProvider;
 import org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration;
 import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.MockZooKeeper;
 import org.apache.zookeeper.data.ACL;
@@ -123,9 +124,9 @@ public abstract class BlobStoreManagedLedgerOffloaderBase {
     protected TieredStorageConfiguration getConfiguration(String bucket) {
         Map<String, String> metaData = new HashMap<String, String> ();
         metaData.put(TieredStorageConfiguration.BLOB_STORE_PROVIDER_KEY, provider.getDriver());
-        metaData.put(TieredStorageConfiguration.OFFLOADER_PROPERTY_PREFIX + TieredStorageConfiguration.METADATA_FIELD_REGION, "");
-        metaData.put(TieredStorageConfiguration.OFFLOADER_PROPERTY_PREFIX + TieredStorageConfiguration.METADATA_FIELD_BUCKET, bucket);
-        metaData.put(TieredStorageConfiguration.OFFLOADER_PROPERTY_PREFIX + TieredStorageConfiguration.METADATA_FIELD_ENDPOINT, "");
+        metaData.put(getConfigKey(TieredStorageConfiguration.METADATA_FIELD_REGION), "");
+        metaData.put(getConfigKey(TieredStorageConfiguration.METADATA_FIELD_BUCKET), bucket);
+        metaData.put(getConfigKey(TieredStorageConfiguration.METADATA_FIELD_ENDPOINT), "");
         
         TieredStorageConfiguration config = TieredStorageConfiguration.create(metaData);
         config.setProviderCredentials(getBlobStoreCredentials());
@@ -133,6 +134,10 @@ public abstract class BlobStoreManagedLedgerOffloaderBase {
         return config;
     }
 
+    private String getConfigKey(String field) {
+        return TieredStorageConfiguration.OFFLOADER_PROPERTY_PREFIX + StringUtils.capitalize(field);
+    }
+
     protected ReadHandle buildReadHandle() throws Exception {
         return buildReadHandle(DEFAULT_BLOCK_SIZE, 1);
     }
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProviderTests.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProviderTests.java
index 4a0a56e..28e5829 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProviderTests.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProviderTests.java
@@ -35,9 +35,9 @@ public class JCloudBlobStoreProviderTests {
     public void awsValidationSuccessTest() {
         Map<String, String> map = new HashMap<String,String>(); 
         map.put(TieredStorageConfiguration.BLOB_STORE_PROVIDER_KEY, JCloudBlobStoreProvider.AWS_S3.getDriver());
-        map.put("managedLedgerOffload.region", "us-east-1");
-        map.put("managedLedgerOffload.bucket", "test bucket");
-        map.put("managedLedgerOffload.maxBlockSizeInBytes", "99999999");
+        map.put("managedLedgerOffloadRegion", "us-east-1");
+        map.put("managedLedgerOffloadBucket", "test bucket");
+        map.put("managedLedgerOffloadMaxBlockSizeInBytes", "99999999");
         config = new TieredStorageConfiguration(map);
         JCloudBlobStoreProvider.AWS_S3.validate(config);
     }
@@ -46,8 +46,8 @@ public class JCloudBlobStoreProviderTests {
     public void awsValidationDefaultBlockSizeTest() {
         Map<String, String> map = new HashMap<String,String>(); 
         map.put(TieredStorageConfiguration.BLOB_STORE_PROVIDER_KEY, JCloudBlobStoreProvider.AWS_S3.getDriver());
-        map.put("managedLedgerOffload.region", "us-east-1");
-        map.put("managedLedgerOffload.bucket", "test bucket");
+        map.put("managedLedgerOffloadRegion", "us-east-1");
+        map.put("managedLedgerOffloadBucket", "test bucket");
         config = new TieredStorageConfiguration(map);
         JCloudBlobStoreProvider.AWS_S3.validate(config);
     }
@@ -57,8 +57,8 @@ public class JCloudBlobStoreProviderTests {
     public void awsValidationMissingRegionTest() {
         Map<String, String> map = new HashMap<String,String>(); 
         map.put(TieredStorageConfiguration.BLOB_STORE_PROVIDER_KEY, JCloudBlobStoreProvider.AWS_S3.getDriver());
-        map.put("managedLedgerOffload.bucket", "my-bucket");
-        map.put("managedLedgerOffload.maxBlockSizeInBytes", "999999");
+        map.put("managedLedgerOffloadBucket", "my-bucket");
+        map.put("managedLedgerOffloadMaxBlockSizeInBytes", "999999");
         config = new TieredStorageConfiguration(map);
         JCloudBlobStoreProvider.AWS_S3.validate(config);
     }
@@ -68,8 +68,8 @@ public class JCloudBlobStoreProviderTests {
     public void awsValidationMissingBucketTest() {
         Map<String, String> map = new HashMap<String,String>(); 
         map.put(TieredStorageConfiguration.BLOB_STORE_PROVIDER_KEY, JCloudBlobStoreProvider.AWS_S3.getDriver());
-        map.put("managedLedgerOffload.region", "us-east-1");
-        map.put("managedLedgerOffload.maxBlockSizeInBytes", "99999999");
+        map.put("managedLedgerOffloadRegion", "us-east-1");
+        map.put("managedLedgerOffloadMaxBlockSizeInBytes", "99999999");
         config = new TieredStorageConfiguration(map);
         assertEquals(config.getRegion(), "us-east-1");
         JCloudBlobStoreProvider.AWS_S3.validate(config);
@@ -81,18 +81,18 @@ public class JCloudBlobStoreProviderTests {
     public void awsValidationBlockSizeTest() {
         Map<String, String> map = new HashMap<String,String>(); 
         map.put(TieredStorageConfiguration.BLOB_STORE_PROVIDER_KEY, JCloudBlobStoreProvider.AWS_S3.getDriver());
-        map.put("managedLedgerOffload.region", "us-east-1");
-        map.put("managedLedgerOffload.bucket", "test bucket");
-        map.put("managedLedgerOffload.maxBlockSizeInBytes", "1");
+        map.put("managedLedgerOffloadRegion", "us-east-1");
+        map.put("managedLedgerOffloadBucket", "test bucket");
+        map.put("managedLedgerOffloadMaxBlockSizeInBytes", "1");
         config = new TieredStorageConfiguration(map);
         JCloudBlobStoreProvider.AWS_S3.validate(config);
     }
    
     @Test
     public void transientValidationSuccessTest() {
-        Map<String, String> map = new HashMap<String,String>(); 
+        Map<String, String> map = new HashMap<String,String>();
         map.put(TieredStorageConfiguration.BLOB_STORE_PROVIDER_KEY, "transient");
-        map.put("managedLedgerOffload.bucket", "test bucket");
+        map.put("managedLedgerOffloadBucket", "test bucket");
         config = new TieredStorageConfiguration(map);
         JCloudBlobStoreProvider.TRANSIENT.validate(config);
     }
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfigurationTests.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfigurationTests.java
index 6e84e8c..41e6255 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfigurationTests.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfigurationTests.java
@@ -53,23 +53,23 @@ public class TieredStorageConfigurationTests {
         TieredStorageConfiguration config = new TieredStorageConfiguration(map);
         List<String> keys = config.getKeys(TieredStorageConfiguration.METADATA_FIELD_BUCKET);
         assertEquals(keys.get(0), BC_S3_BUCKET);
-        assertEquals(keys.get(1), "managedLedgerOffload.bucket");
+        assertEquals(keys.get(1), "managedLedgerOffloadBucket");
         
         keys = config.getKeys(TieredStorageConfiguration.METADATA_FIELD_REGION);
         assertEquals(keys.get(0), BC_S3_REGION);
-        assertEquals(keys.get(1), "managedLedgerOffload.region");
+        assertEquals(keys.get(1), "managedLedgerOffloadRegion");
         
         keys = config.getKeys(TieredStorageConfiguration.METADATA_FIELD_ENDPOINT);
         assertEquals(keys.get(0), BC_S3_ENDPOINT);
-        assertEquals(keys.get(1), "managedLedgerOffload.serviceEndpoint");
+        assertEquals(keys.get(1), "managedLedgerOffloadServiceEndpoint");
         
         keys = config.getKeys(TieredStorageConfiguration.METADATA_FIELD_MAX_BLOCK_SIZE);
         assertEquals(keys.get(0), BC_S3_MAX_BLOCK_SIZE);
-        assertEquals(keys.get(1), "managedLedgerOffload.maxBlockSizeInBytes");
+        assertEquals(keys.get(1), "managedLedgerOffloadMaxBlockSizeInBytes");
         
         keys = config.getKeys(TieredStorageConfiguration.METADATA_FIELD_READ_BUFFER_SIZE);
         assertEquals(keys.get(0), BC_S3_READ_BUFFER_SIZE);
-        assertEquals(keys.get(1), "managedLedgerOffload.readBufferSizeInBytes");
+        assertEquals(keys.get(1), "managedLedgerOffloadReadBufferSizeInBytes");
     }
     
     /**
@@ -79,11 +79,11 @@ public class TieredStorageConfigurationTests {
     public final void awsS3PropertiesTest() {
         Map<String, String> map = new HashMap<String,String>(); 
         map.put(TieredStorageConfiguration.BLOB_STORE_PROVIDER_KEY, JCloudBlobStoreProvider.AWS_S3.getDriver());
-        map.put("managedLedgerOffload.region", "us-east-1");
-        map.put("managedLedgerOffload.bucket", "test bucket");
-        map.put("managedLedgerOffload.maxBlockSizeInBytes", "1");
-        map.put("managedLedgerOffload.readBufferSizeInBytes", "500");
-        map.put("managedLedgerOffload.serviceEndpoint", "http://some-url:9093");
+        map.put("managedLedgerOffloadRegion", "us-east-1");
+        map.put("managedLedgerOffloadBucket", "test bucket");
+        map.put("managedLedgerOffloadMaxBlockSizeInBytes", "1");
+        map.put("managedLedgerOffloadReadBufferSizeInBytes", "500");
+        map.put("managedLedgerOffloadServiceEndpoint", "http://some-url:9093");
         TieredStorageConfiguration config = new TieredStorageConfiguration(map);
         
         assertEquals(config.getRegion(), "us-east-1");
@@ -124,19 +124,19 @@ public class TieredStorageConfigurationTests {
         TieredStorageConfiguration config = new TieredStorageConfiguration(map);
         List<String> keys = config.getKeys(TieredStorageConfiguration.METADATA_FIELD_BUCKET);
         assertEquals(keys.get(0), BC_GCS_BUCKET);
-        assertEquals(keys.get(1), "managedLedgerOffload.bucket");
+        assertEquals(keys.get(1), "managedLedgerOffloadBucket");
         
         keys = config.getKeys(TieredStorageConfiguration.METADATA_FIELD_REGION);
         assertEquals(keys.get(0), BC_GCS_REGION);
-        assertEquals(keys.get(1), "managedLedgerOffload.region");
+        assertEquals(keys.get(1), "managedLedgerOffloadRegion");
         
         keys = config.getKeys(TieredStorageConfiguration.METADATA_FIELD_MAX_BLOCK_SIZE);
         assertEquals(keys.get(0), BC_GCS_MAX_BLOCK_SIZE);
-        assertEquals(keys.get(1), "managedLedgerOffload.maxBlockSizeInBytes");
+        assertEquals(keys.get(1), "managedLedgerOffloadMaxBlockSizeInBytes");
         
         keys = config.getKeys(TieredStorageConfiguration.METADATA_FIELD_READ_BUFFER_SIZE);
         assertEquals(keys.get(0), BC_GCS_READ_BUFFER_SIZE);
-        assertEquals(keys.get(1), "managedLedgerOffload.readBufferSizeInBytes");
+        assertEquals(keys.get(1), "managedLedgerOffloadReadBufferSizeInBytes");
     }
     
     /**
@@ -146,11 +146,11 @@ public class TieredStorageConfigurationTests {
     public final void gcsPropertiesTest() {
         Map<String, String> map = new HashMap<String,String>(); 
         map.put(TieredStorageConfiguration.BLOB_STORE_PROVIDER_KEY, JCloudBlobStoreProvider.GOOGLE_CLOUD_STORAGE.getDriver());
-        map.put("managedLedgerOffload.region", "us-east-1");
-        map.put("managedLedgerOffload.bucket", "test bucket");
-        map.put("managedLedgerOffload.maxBlockSizeInBytes", "1");
-        map.put("managedLedgerOffload.readBufferSizeInBytes", "500");
-        map.put("managedLedgerOffload.serviceEndpoint", "http://some-url:9093");
+        map.put("managedLedgerOffloadRegion", "us-east-1");
+        map.put("managedLedgerOffloadBucket", "test bucket");
+        map.put("managedLedgerOffloadMaxBlockSizeInBytes", "1");
+        map.put("managedLedgerOffloadReadBufferSizeInBytes", "500");
+        map.put("managedLedgerOffloadServiceEndpoint", "http://some-url:9093");
         TieredStorageConfiguration config = new TieredStorageConfiguration(map);
         
         assertEquals(config.getRegion(), "us-east-1");
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TransientBlobStoreFactoryTests.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TransientBlobStoreFactoryTests.java
index dba68cf..ec57023 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TransientBlobStoreFactoryTests.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TransientBlobStoreFactoryTests.java
@@ -55,7 +55,7 @@ public class TransientBlobStoreFactoryTests extends AbstractJCloudBlobStoreFacto
     protected Map<String, String> getConfig() {
         Map<String, String> metadata = new HashMap<String,String>();
         metadata.put(TieredStorageConfiguration.BLOB_STORE_PROVIDER_KEY, "transient");
-        metadata.put("managedLedgerOffload.bucket", TEST_CONTAINER_NAME);
+        metadata.put("managedLedgerOffloadBucket", TEST_CONTAINER_NAME);
         return metadata;
     }
 }


[pulsar] 02/02: [Tiered Storage] Fix merge conflicts introduced by PR #6335 (#8630)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 8793963ccdd8ce778a681c139b5ab4fab7b820a2
Author: ran <ga...@126.com>
AuthorDate: Sat Nov 21 09:22:18 2020 +0800

    [Tiered Storage] Fix merge conflicts introduced by PR #6335 (#8630)
    
    # Motivation
    
    The PR #6335 lost some PR changes, related PRs as below.
    
    1. PR 4196 (2019/5/29 Merli)
    Configure static PulsarByteBufAllocator to handle OOM errors (#4196)
    
    2. PR 5356 (2019/10/30 Kelly)
    [TIEREDSTORAGE] Only seek when reading unexpected entry (#5356)
    
    3. PR 4433 (2019/6/4 Higham)
    [tiered-storage] Add support for AWS instance and role creds (#4433)
    
    (cherry picked from commit 68759ff405bca60343ddb8318cc10f3727981a5d)
---
 .../impl/BlobStoreBackedInputStreamImpl.java       |  7 +--
 .../jcloud/impl/BlobStoreBackedReadHandleImpl.java | 20 +++++----
 .../impl/BlobStoreManagedLedgerOffloader.java      |  7 ---
 .../impl/BlockAwareSegmentInputStreamImpl.java     |  6 +--
 .../offload/jcloud/impl/DataBlockHeaderImpl.java   |  9 +++-
 .../offload/jcloud/impl/OffloadIndexBlockImpl.java |  4 +-
 .../offload/jcloud/impl/OffloadIndexEntryImpl.java |  6 +++
 .../jcloud/provider/JCloudBlobStoreProvider.java   | 50 ++++++++++++++++------
 .../provider/TieredStorageConfiguration.java       | 11 +++--
 .../impl/BlobStoreManagedLedgerOffloaderBase.java  |  7 +--
 10 files changed, 83 insertions(+), 44 deletions(-)

diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java
index 386196c..6a204d5 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java
@@ -19,11 +19,11 @@
 package org.apache.bookkeeper.mledger.offload.jcloud.impl;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
 import java.io.IOException;
 import java.io.InputStream;
 import org.apache.bookkeeper.mledger.offload.jcloud.BackedInputStream;
 import org.apache.bookkeeper.mledger.offload.jcloud.impl.DataBlockUtils.VersionCheck;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.jclouds.blobstore.BlobStore;
 import org.jclouds.blobstore.domain.Blob;
 import org.jclouds.blobstore.options.GetOptions;
@@ -52,7 +52,7 @@ public class BlobStoreBackedInputStreamImpl extends BackedInputStream {
         this.bucket = bucket;
         this.key = key;
         this.versionCheck = versionCheck;
-        this.buffer = PooledByteBufAllocator.DEFAULT.buffer(bufferSize, bufferSize);
+        this.buffer = PulsarByteBufAllocator.DEFAULT.buffer(bufferSize, bufferSize);
         this.objectLen = objectLen;
         this.bufferSize = bufferSize;
         this.cursor = 0;
@@ -116,7 +116,8 @@ public class BlobStoreBackedInputStreamImpl extends BackedInputStream {
 
     @Override
     public void seek(long position) {
-        log.debug("Seeking to {} on {}/{}, current position {}", position, bucket, key, cursor);
+        log.debug("Seeking to {} on {}/{}, current position {} (bufStart:{}, bufEnd:{})",
+                position, bucket, key, cursor, bufferOffsetStart, bufferOffsetEnd);
         if (position >= bufferOffsetStart && position <= bufferOffsetEnd) {
             long newIndex = position - bufferOffsetStart;
             buffer.readerIndex((int) newIndex);
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
index 92f2514..b48751e 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
@@ -19,7 +19,6 @@
 package org.apache.bookkeeper.mledger.offload.jcloud.impl;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -38,8 +37,8 @@ import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
 import org.apache.bookkeeper.mledger.offload.jcloud.BackedInputStream;
 import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock;
 import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockBuilder;
-import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexEntry;
 import org.apache.bookkeeper.mledger.offload.jcloud.impl.DataBlockUtils.VersionCheck;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.jclouds.blobstore.BlobStore;
 import org.jclouds.blobstore.domain.Blob;
 import org.slf4j.Logger;
@@ -104,19 +103,16 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle {
                 List<LedgerEntry> entries = new ArrayList<LedgerEntry>();
                 long nextExpectedId = firstEntry;
                 try {
-                    OffloadIndexEntry entry = index.getIndexEntryForEntry(firstEntry);
-                    inputStream.seek(entry.getDataOffset());
-
                     while (entriesToRead > 0) {
                         int length = dataStream.readInt();
                         if (length < 0) { // hit padding or new block
-                            inputStream.seekForward(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
-                            length = dataStream.readInt();
+                            inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
+                            continue;
                         }
                         long entryId = dataStream.readLong();
 
                         if (entryId == nextExpectedId) {
-                            ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(length, length);
+                            ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(length, length);
                             entries.add(LedgerEntryImpl.create(ledgerId, entryId, length, buf));
                             int toWrite = length;
                             while (toWrite > 0) {
@@ -124,6 +120,14 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle {
                             }
                             entriesToRead--;
                             nextExpectedId++;
+                        } else if (entryId > nextExpectedId) {
+                            inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
+                            continue;
+                        } else if (entryId < nextExpectedId
+                                && !index.getIndexEntryForEntry(nextExpectedId).equals(
+                                index.getIndexEntryForEntry(entryId)))  {
+                            inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
+                            continue;
                         } else if (entryId > lastEntry) {
                             log.info("Expected to read {}, but read {}, which is greater than last entry {}",
                                      nextExpectedId, entryId, lastEntry);
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
index 6643a83..4f51713 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
@@ -67,7 +67,6 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
 
     private final OrderedScheduler scheduler;
     private final TieredStorageConfiguration config;
-//    private final BlobStore writeBlobStore;
     private final Location writeLocation;
 
     // metadata to be stored as part of the offloaded ledger metadata
@@ -116,11 +115,6 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
         return config.getOffloadDriverMetadata();
     }
 
-//    @VisibleForTesting
-//    public ConcurrentMap<BlobStoreLocation, BlobStore> getBlobStores() {
-//        return blobStores;
-//    }
-
     /**
      * Upload the DataBlocks associated with the given ReadHandle using MultiPartUpload,
      * Creating indexBlocks for each corresponding DataBlock that is uploaded.
@@ -303,7 +297,6 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
     
     @Override
     public OffloadPolicies getOffloadPolicies() {
-        // TODO Auto-generated method stub
         Properties properties = new Properties();
         properties.putAll(config.getConfigProperties());
         return OffloadPolicies.create(properties);
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java
index dc7a68a..dcc693a 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java
@@ -23,7 +23,6 @@ import static com.google.common.base.Preconditions.checkState;
 import com.google.common.collect.Lists;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.CompositeByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Iterator;
@@ -33,6 +32,7 @@ import org.apache.bookkeeper.client.api.LedgerEntries;
 import org.apache.bookkeeper.client.api.LedgerEntry;
 import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.mledger.offload.jcloud.BlockAwareSegmentInputStream;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -126,8 +126,8 @@ public class BlockAwareSegmentInputStreamImpl extends BlockAwareSegmentInputStre
                 int entryLength = buf.readableBytes();
                 long entryId = entry.getEntryId();
 
-                CompositeByteBuf entryBuf = PooledByteBufAllocator.DEFAULT.compositeBuffer(2);
-                ByteBuf entryHeaderBuf = PooledByteBufAllocator.DEFAULT.buffer(ENTRY_HEADER_SIZE, ENTRY_HEADER_SIZE);
+                CompositeByteBuf entryBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer(2);
+                ByteBuf entryHeaderBuf = PulsarByteBufAllocator.DEFAULT.buffer(ENTRY_HEADER_SIZE, ENTRY_HEADER_SIZE);
 
                 entryHeaderBuf.writeInt(entryLength).writeLong(entryId);
                 entryBuf.addComponents(true, entryHeaderBuf, buf);
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/DataBlockHeaderImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/DataBlockHeaderImpl.java
index 9e3fe90..9239ec2 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/DataBlockHeaderImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/DataBlockHeaderImpl.java
@@ -28,6 +28,7 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import org.apache.bookkeeper.mledger.offload.jcloud.DataBlockHeader;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 
 /**
  * The data block header in code storage for each data block.
@@ -110,7 +111,7 @@ public class DataBlockHeaderImpl implements DataBlockHeader {
      */
     @Override
     public InputStream toStream() {
-        ByteBuf out = PooledByteBufAllocator.DEFAULT.buffer(HEADER_MAX_SIZE, HEADER_MAX_SIZE);
+        ByteBuf out = PulsarByteBufAllocator.DEFAULT.buffer(HEADER_MAX_SIZE, HEADER_MAX_SIZE);
         out.writeInt(MAGIC_WORD)
             .writeLong(headerLength)
             .writeLong(blockLength)
@@ -120,5 +121,11 @@ public class DataBlockHeaderImpl implements DataBlockHeader {
         // true means the input stream will release the ByteBuf on close
         return new ByteBufInputStream(out, true);
     }
+
+    @Override
+    public String toString() {
+        return String.format("DataBlockHeader(len:%d,hlen:%d,firstEntry:%d)",
+                blockLength, headerLength, firstEntryId);
+    }
 }
 
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockImpl.java
index 0a4e90b..b34dce5 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockImpl.java
@@ -22,7 +22,6 @@ import com.google.common.collect.Maps;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufInputStream;
-import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
 
@@ -42,6 +41,7 @@ import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexEntry;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.DataFormats;
 import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -156,7 +156,7 @@ public class OffloadIndexBlockImpl implements OffloadIndexBlock {
             + segmentMetadataLength
             + indexEntryCount * (8 + 4 + 8); /* messageEntryId + blockPartId + blockOffset */
 
-        ByteBuf out = PooledByteBufAllocator.DEFAULT.buffer(indexBlockLength, indexBlockLength);
+        ByteBuf out = PulsarByteBufAllocator.DEFAULT.buffer(indexBlockLength, indexBlockLength);
 
         out.writeInt(INDEX_MAGIC_WORD)
             .writeInt(indexBlockLength)
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexEntryImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexEntryImpl.java
index 5dce79d..10408fe 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexEntryImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexEntryImpl.java
@@ -56,5 +56,11 @@ public class OffloadIndexEntryImpl implements OffloadIndexEntry {
         this.offset = offset;
         this.blockHeaderSize = blockHeaderSize;
     }
+
+    @Override
+    public String toString() {
+        return String.format("[eid:%d, part:%d, offset:%d, doffset:%d]",
+                entryId, partId, offset, getDataOffset());
+    }
 }
 
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java
index ba03f64..df4b0d3 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java
@@ -18,8 +18,14 @@
  */
 package org.apache.bookkeeper.mledger.offload.jcloud.provider;
 
+import static org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration.GCS_ACCOUNT_KEY_FILE_FIELD;
+import static org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration.S3_ROLE_FIELD;
+import static org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration.S3_ROLE_SESSION_NAME_FIELD;
+
 import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSSessionCredentials;
 import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
+import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
 import com.google.common.base.Strings;
 import com.google.common.io.Files;
 
@@ -36,8 +42,8 @@ import org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfig
 import org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration.CredentialBuilder;
 
 import org.apache.commons.lang3.StringUtils;
-import org.jclouds.Constants;
 import org.jclouds.ContextBuilder;
+import org.jclouds.aws.domain.SessionCredentials;
 import org.jclouds.aws.s3.AWSS3ProviderMetadata;
 import org.jclouds.azureblob.AzureBlobProviderMetadata;
 import org.jclouds.blobstore.BlobStore;
@@ -106,9 +112,8 @@ public enum JCloudBlobStoreProvider implements Serializable, ConfigValidation, B
             if (config.getCredentials() == null) {
                 try {
                     String gcsKeyContent = Files.toString(
-                            new File(config.getConfigProperty("gcsManagedLedgerOffloadServiceAccountKeyFile")),
-                                     Charset.defaultCharset());
-                    config.setProviderCredentials(new GoogleCredentialsFromJson(gcsKeyContent).get());
+                            new File(config.getConfigProperty(GCS_ACCOUNT_KEY_FILE_FIELD)), Charset.defaultCharset());
+                    config.setProviderCredentials(() -> new GoogleCredentialsFromJson(gcsKeyContent).get());
                 } catch (IOException ioe) {
                     log.error("Cannot read GCS service account credentials file: {}",
                             config.getConfigProperty("gcsManagedLedgerOffloadServiceAccountKeyFile"));
@@ -139,7 +144,7 @@ public enum JCloudBlobStoreProvider implements Serializable, ConfigValidation, B
             if (StringUtils.isEmpty(accountKey)) {
                 throw new IllegalArgumentException("Couldn't get the azure storage access key.");
             }
-            config.setProviderCredentials(new Credentials(accountName, accountKey));
+            config.setProviderCredentials(() -> new Credentials(accountName, accountKey));
         }
     },
 
@@ -246,8 +251,7 @@ public enum JCloudBlobStoreProvider implements Serializable, ConfigValidation, B
 
         if (config.getProviderCredentials() != null) {
                 return contextBuilder
-                        .credentials(config.getProviderCredentials().identity,
-                                     config.getProviderCredentials().credential)
+                        .credentialsSupplier(config.getCredentials())
                         .buildView(BlobStoreContext.class)
                         .getBlobStore();
             } else {
@@ -262,17 +266,35 @@ public enum JCloudBlobStoreProvider implements Serializable, ConfigValidation, B
         if (config.getCredentials() == null) {
             AWSCredentials awsCredentials = null;
             try {
-                DefaultAWSCredentialsProviderChain creds = DefaultAWSCredentialsProviderChain.getInstance();
-                awsCredentials = creds.getCredentials();
+                if (Strings.isNullOrEmpty(config.getConfigProperty(S3_ROLE_FIELD))) {
+                    awsCredentials = DefaultAWSCredentialsProviderChain.getInstance().getCredentials();
+                } else {
+                    awsCredentials =
+                            new STSAssumeRoleSessionCredentialsProvider.Builder(
+                                    config.getConfigProperty(S3_ROLE_FIELD),
+                                    config.getConfigProperty(S3_ROLE_SESSION_NAME_FIELD)
+                            ).build().getCredentials();
+                }
+
+                if (awsCredentials instanceof AWSSessionCredentials) {
+                    // if we have session credentials, we need to send the session token
+                    // this allows us to support EC2 metadata credentials
+                    SessionCredentials sessionCredentials =  SessionCredentials.builder()
+                            .accessKeyId(awsCredentials.getAWSAccessKeyId())
+                            .secretAccessKey(awsCredentials.getAWSSecretKey())
+                            .sessionToken(((AWSSessionCredentials) awsCredentials).getSessionToken())
+                            .build();
+                    config.setProviderCredentials(() -> sessionCredentials);
+                } else {
+                    Credentials credentials = new Credentials(
+                            awsCredentials.getAWSAccessKeyId(), awsCredentials.getAWSSecretKey());
+                    config.setProviderCredentials(() -> credentials);
+                }
+
             } catch (Exception e) {
                 // allowed, some mock s3 service do not need credential
                 log.warn("Exception when get credentials for s3 ", e);
             }
-            if (awsCredentials != null) {
-                config.setProviderCredentials(
-                        new Credentials(awsCredentials.getAWSAccessKeyId(),
-                                        awsCredentials.getAWSSecretKey()));
-            }
         }
     };
 }
\ No newline at end of file
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java
index 5abd867..ac2bed9 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java
@@ -35,6 +35,7 @@ import java.util.stream.Collectors;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.jcloud.shade.com.google.common.base.Supplier;
 import org.jclouds.Constants;
 import org.jclouds.aws.s3.AWSS3ProviderMetadata;
 import org.jclouds.blobstore.BlobStore;
@@ -69,6 +70,10 @@ public class TieredStorageConfiguration implements Serializable, Cloneable {
 
     protected static final int MB = 1024 * 1024;
 
+    public static final String GCS_ACCOUNT_KEY_FILE_FIELD = "gcsManagedLedgerOffloadServiceAccountKeyFile";
+    public static final String S3_ROLE_FIELD = "s3ManagedLedgerOffloadRole";
+    public static final String S3_ROLE_SESSION_NAME_FIELD = "s3ManagedLedgerOffloadRoleSessionName";
+
     public static TieredStorageConfiguration create(Properties props) throws IOException {
         Map<String, String> map = new HashMap<String, String>();
         map.putAll(props.entrySet()
@@ -86,7 +91,7 @@ public class TieredStorageConfiguration implements Serializable, Cloneable {
     @Getter
     private final Map<String, String> configProperties;
     @Getter
-    private Credentials credentials;
+    private Supplier<Credentials> credentials;
     private JCloudBlobStoreProvider provider;
 
     public TieredStorageConfiguration(Map<String, String> configProperties) {
@@ -221,14 +226,14 @@ public class TieredStorageConfiguration implements Serializable, Cloneable {
         return new Integer(MB);
     }
 
-    public Credentials getProviderCredentials() {
+    public Supplier<Credentials> getProviderCredentials() {
         if (credentials == null) {
             getProvider().buildCredentials(this);
         }
         return credentials;
     }
 
-    public void setProviderCredentials(Credentials credentials) {
+    public void setProviderCredentials(Supplier<Credentials> credentials) {
         this.credentials = credentials;
     }
 
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderBase.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderBase.java
index ea613e7..a2944f5 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderBase.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderBase.java
@@ -38,6 +38,7 @@ import org.apache.bookkeeper.mledger.offload.jcloud.provider.JCloudBlobStoreProv
 import org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.jcloud.shade.com.google.common.base.Supplier;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.MockZooKeeper;
 import org.apache.zookeeper.data.ACL;
@@ -99,14 +100,14 @@ public abstract class BlobStoreManagedLedgerOffloaderBase {
      * Get the credentials to use for the JCloud provider
      * based on the System properties.
      */
-    protected static Credentials getBlobStoreCredentials() {
+    protected static Supplier<Credentials> getBlobStoreCredentials() {
         if (Boolean.parseBoolean(System.getProperty("testRealAWS", "false"))) {
             /* To use this, must config credentials using "aws_access_key_id" as S3ID,
              *  and "aws_secret_access_key" as S3Key. And bucket should exist in default region. e.g.
              *      props.setProperty("S3ID", "AXXXXXXQ");
              *      props.setProperty("S3Key", "HXXXXXß");
              */
-            return new Credentials(System.getProperty("S3ID"), System.getProperty("S3Key"));
+            return () -> new Credentials(System.getProperty("S3ID"), System.getProperty("S3Key"));
                     
         } else if (Boolean.parseBoolean(System.getProperty("testRealGCS", "false"))) {
             /*
@@ -115,7 +116,7 @@ public abstract class BlobStoreManagedLedgerOffloaderBase {
              *        props.setProperty("GCSID", "5XXXXXXXXXX6-compute@developer.gserviceaccount.com");
              *        props.setProperty("GCSKey", "XXXXXX");
              */
-            return new Credentials(System.getProperty("GCSID"), System.getProperty("GCSKey"));
+            return () -> new Credentials(System.getProperty("GCSID"), System.getProperty("GCSKey"));
         } else {
             return null;
         }