You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/11/21 03:13:06 UTC

[pulsar] 01/02: [Tiered Storage] Support Azure BlobStore offload configuration (#8436)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 89d1cb878a1429911ec396dadbb04826f7518f97
Author: ran <ga...@126.com>
AuthorDate: Wed Nov 4 08:46:36 2020 +0800

    [Tiered Storage] Support Azure BlobStore offload configuration (#8436)
    
    Fix https://github.com/streamnative/pulsar/issues/1606
    
    ### Motivation
    
    Support Azure BlobStore offload configuration.
    
    ### Modifications
    
    The new tiered-storage configuration was introduced in the offload refinement.
    Add new universal configurations in `OffloadPolicies`
    
    (cherry picked from commit 4d98dc678e6a22cfa18da476641b8e314676a96a)
---
 .../common/policies/data/OffloadPolicies.java      |  64 +++++-
 site2/docs/tiered-storage-azure.md                 | 224 +++++++++++++++++++++
 .../offload/TestUniversalConfigurations.java       |  83 ++++++++
 .../jcloud/provider/JCloudBlobStoreProvider.java   |  26 +++
 .../provider/TieredStorageConfiguration.java       |   4 +-
 .../impl/BlobStoreManagedLedgerOffloaderBase.java  |  11 +-
 .../provider/JCloudBlobStoreProviderTests.java     |  28 +--
 .../provider/TieredStorageConfigurationTests.java  |  38 ++--
 .../provider/TransientBlobStoreFactoryTests.java   |   2 +-
 9 files changed, 430 insertions(+), 50 deletions(-)

diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
index bbc83e4..5d2103b 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
@@ -61,7 +61,7 @@ public class OffloadPolicies implements Serializable {
     public final static int DEFAULT_READ_BUFFER_SIZE_IN_BYTES = 1024 * 1024;      // 1MB
     public final static int DEFAULT_OFFLOAD_MAX_THREADS = 2;
     public final static int DEFAULT_OFFLOAD_MAX_PREFETCH_ROUNDS = 1;
-    public final static String[] DRIVER_NAMES = {"S3", "aws-s3", "google-cloud-storage", "filesystem"};
+    public final static String[] DRIVER_NAMES = {"S3", "aws-s3", "google-cloud-storage", "filesystem", "azureblob"};
     public final static String DEFAULT_OFFLOADER_DIRECTORY = "./offloaders";
     public final static Long DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES = null;
     public final static Long DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS = null;
@@ -120,6 +120,19 @@ public class OffloadPolicies implements Serializable {
     @Configuration
     private String fileSystemURI = null;
 
+    // --------- new offload configurations ---------
+    // they are universal configurations and could be used to `aws-s3`, `google-cloud-storage` or `azureblob`.
+    @Configuration
+    private String managedLedgerOffloadBucket;
+    @Configuration
+    private String managedLedgerOffloadRegion;
+    @Configuration
+    private String managedLedgerOffloadServiceEndpoint;
+    @Configuration
+    private Integer managedLedgerOffloadMaxBlockSizeInBytes;
+    @Configuration
+    private Integer managedLedgerOffloadReadBufferSizeInBytes;
+
     public static OffloadPolicies create(String driver, String region, String bucket, String endpoint,
                                          Integer maxBlockSizeInBytes, Integer readBufferSizeInBytes,
                                          Long offloadThresholdInBytes, Long offloadDeletionLagInMillis) {
@@ -128,6 +141,12 @@ public class OffloadPolicies implements Serializable {
         offloadPolicies.setManagedLedgerOffloadThresholdInBytes(offloadThresholdInBytes);
         offloadPolicies.setManagedLedgerOffloadDeletionLagInMillis(offloadDeletionLagInMillis);
 
+        offloadPolicies.setManagedLedgerOffloadBucket(bucket);
+        offloadPolicies.setManagedLedgerOffloadRegion(region);
+        offloadPolicies.setManagedLedgerOffloadServiceEndpoint(endpoint);
+        offloadPolicies.setManagedLedgerOffloadMaxBlockSizeInBytes(maxBlockSizeInBytes);
+        offloadPolicies.setManagedLedgerOffloadReadBufferSizeInBytes(readBufferSizeInBytes);
+
         if (driver.equalsIgnoreCase(DRIVER_NAMES[0]) || driver.equalsIgnoreCase(DRIVER_NAMES[1])) {
             offloadPolicies.setS3ManagedLedgerOffloadRegion(region);
             offloadPolicies.setS3ManagedLedgerOffloadBucket(bucket);
@@ -210,6 +229,9 @@ public class OffloadPolicies implements Serializable {
         if (managedLedgerOffloadDriver == null) {
             return false;
         }
+        if (StringUtils.isNotEmpty(managedLedgerOffloadBucket)) {
+            return true;
+        }
         if (isS3Driver()) {
             return StringUtils.isNotEmpty(s3ManagedLedgerOffloadBucket);
         } else if (isGcsDriver()) {
@@ -241,7 +263,12 @@ public class OffloadPolicies implements Serializable {
                 gcsManagedLedgerOffloadReadBufferSizeInBytes,
                 gcsManagedLedgerOffloadServiceAccountKeyFile,
                 fileSystemProfilePath,
-                fileSystemURI);
+                fileSystemURI,
+                managedLedgerOffloadBucket,
+                managedLedgerOffloadRegion,
+                managedLedgerOffloadServiceEndpoint,
+                managedLedgerOffloadMaxBlockSizeInBytes,
+                managedLedgerOffloadReadBufferSizeInBytes);
     }
 
     @Override
@@ -280,7 +307,14 @@ public class OffloadPolicies implements Serializable {
                 && Objects.equals(gcsManagedLedgerOffloadServiceAccountKeyFile,
                     other.getGcsManagedLedgerOffloadServiceAccountKeyFile())
                 && Objects.equals(fileSystemProfilePath, other.getFileSystemProfilePath())
-                && Objects.equals(fileSystemURI, other.getFileSystemURI());
+                && Objects.equals(fileSystemURI, other.getFileSystemURI())
+                && Objects.equals(managedLedgerOffloadBucket, other.getManagedLedgerOffloadBucket())
+                && Objects.equals(managedLedgerOffloadRegion, other.getManagedLedgerOffloadRegion())
+                && Objects.equals(managedLedgerOffloadServiceEndpoint, other.getManagedLedgerOffloadServiceEndpoint())
+                && Objects.equals(managedLedgerOffloadMaxBlockSizeInBytes,
+                    other.getManagedLedgerOffloadMaxBlockSizeInBytes())
+                && Objects.equals(managedLedgerOffloadReadBufferSizeInBytes,
+                    other.getManagedLedgerOffloadReadBufferSizeInBytes());
     }
 
     @Override
@@ -306,16 +340,14 @@ public class OffloadPolicies implements Serializable {
                 .add("gcsManagedLedgerOffloadServiceAccountKeyFile", gcsManagedLedgerOffloadServiceAccountKeyFile)
                 .add("fileSystemProfilePath", fileSystemProfilePath)
                 .add("fileSystemURI", fileSystemURI)
+                .add("managedLedgerOffloadBucket", managedLedgerOffloadBucket)
+                .add("managedLedgerOffloadRegion", managedLedgerOffloadRegion)
+                .add("managedLedgerOffloadServiceEndpoint", managedLedgerOffloadServiceEndpoint)
+                .add("managedLedgerOffloadMaxBlockSizeInBytes", managedLedgerOffloadMaxBlockSizeInBytes)
+                .add("managedLedgerOffloadReadBufferSizeInBytes", managedLedgerOffloadReadBufferSizeInBytes)
                 .toString();
     }
 
-    public static final String METADATA_FIELD_BUCKET = "bucket";
-    public static final String METADATA_FIELD_REGION = "region";
-    public static final String METADATA_FIELD_ENDPOINT = "serviceEndpoint";
-    public static final String METADATA_FIELD_MAX_BLOCK_SIZE = "maxBlockSizeInBytes";
-    public static final String METADATA_FIELD_READ_BUFFER_SIZE = "readBufferSizeInBytes";
-    public static final String OFFLOADER_PROPERTY_PREFIX = "managedLedgerOffload.";
-
     public Properties toProperties() {
         Properties properties = new Properties();
 
@@ -360,6 +392,16 @@ public class OffloadPolicies implements Serializable {
             setProperty(properties, "fileSystemProfilePath", this.getFileSystemProfilePath());
             setProperty(properties, "fileSystemURI", this.getFileSystemURI());
         }
+
+        setProperty(properties, "managedLedgerOffloadBucket", this.getManagedLedgerOffloadBucket());
+        setProperty(properties, "managedLedgerOffloadRegion", this.getManagedLedgerOffloadRegion());
+        setProperty(properties, "managedLedgerOffloadServiceEndpoint",
+                this.getManagedLedgerOffloadServiceEndpoint());
+        setProperty(properties, "managedLedgerOffloadMaxBlockSizeInBytes",
+                this.getManagedLedgerOffloadMaxBlockSizeInBytes());
+        setProperty(properties, "managedLedgerOffloadReadBufferSizeInBytes",
+                this.getManagedLedgerOffloadReadBufferSizeInBytes());
+
         return properties;
     }
 
@@ -409,7 +451,7 @@ public class OffloadPolicies implements Serializable {
      * <p>policies level priority: topic > namespace > broker
      *
      * @param topicLevelPolicies topic level offload policies
-     * @param nsLevelPolicies namesapce level offload policies
+     * @param nsLevelPolicies namespace level offload policies
      * @param brokerProperties broker level offload configuration
      * @return offload policies
      */
diff --git a/site2/docs/tiered-storage-azure.md b/site2/docs/tiered-storage-azure.md
new file mode 100644
index 0000000..0a86080
--- /dev/null
+++ b/site2/docs/tiered-storage-azure.md
@@ -0,0 +1,224 @@
+---
+id: tiered-storage-azure
+title: Use Azure BlobStore offloader with Pulsar
+sidebar_label: Azure BlobStore offloader
+---
+
+This chapter guides you through every step of installing and configuring the Azure BlobStore offloader and using it with Pulsar.
+
+## Installation
+
+Follow the steps below to install the Azure BlobStore offloader.
+
+### Prerequisite
+
+- Pulsar: 2.6.2 or later versions
+  
+### Step
+
+This example uses Pulsar 2.6.2.
+
+1. Download the Pulsar tarball using one of the following ways:
+
+   * Download from the [Apache mirror](https://archive.apache.org/dist/pulsar/pulsar-2.6.2/apache-pulsar-2.6.2-bin.tar.gz)
+
+   * Download from the Pulsar [downloads page](https://pulsar.apache.org/download)
+
+   * Use [wget](https://www.gnu.org/software/wget):
+
+     ```shell
+     wget https://archive.apache.org/dist/pulsar/pulsar-2.6.2/apache-pulsar-2.6.2-bin.tar.gz
+     ```
+
+2. Download and untar the Pulsar offloaders package. 
+
+    ```bash
+    wget https://downloads.apache.org/pulsar/pulsar-2.6.2/apache-pulsar-offloaders-2.6.2-bin.tar.gz
+    tar xvfz apache-pulsar-offloaders-2.6.2-bin.tar.gz
+    ```
+
+3. Copy the Pulsar offloaders as `offloaders` in the Pulsar directory.
+
+    ```
+    mv apache-pulsar-offloaders-2.6.2/offloaders apache-pulsar-2.6.2/offloaders
+
+    ls offloaders
+    ```
+
+    **Output**
+
+    As shown from the output, Pulsar uses [Apache jclouds](https://jclouds.apache.org) to support [AWS S3](https://aws.amazon.com/s3/),  [GCS](https://cloud.google.com/storage/) and [Azure](https://portal.azure.com/#home) for long term storage. 
+
+    ```
+    tiered-storage-file-system-2.6.2.nar
+    tiered-storage-jcloud-2.6.2.nar
+    ```
+
+    > #### Note
+    >
+    > * If you are running Pulsar in a bare metal cluster, make sure that `offloaders` tarball is unzipped in every broker's Pulsar directory.
+    > 
+    > * If you are running Pulsar in Docker or deploying Pulsar using a Docker image (such as K8s and DCOS), you can use the `apachepulsar/pulsar-all` image instead of the `apachepulsar/pulsar` image. `apachepulsar/pulsar-all` image has already bundled tiered storage offloaders.
+
+## Configuration
+
+> #### Note
+> 
+> Before offloading data from BookKeeper to Azure BlobStore, you need to configure some properties of the Azure BlobStore offload driver.
+
+Besides, you can also configure the Azure BlobStore offloader to run it automatically or trigger it manually.
+
+### Configure Azure BlobStore offloader driver
+
+You can configure the Azure BlobStore offloader driver in the configuration file `broker.conf` or `standalone.conf`.
+
+- **Required** configurations are as below.
+  
+    Required configuration | Description | Example value
+    |---|---|---
+    `managedLedgerOffloadDriver` | Offloader driver name | azureblob
+    `offloadersDirectory` | Offloader directory | offloaders
+    `managedLedgerOffloadBucket` | Bucket | pulsar-topic-offload
+
+- **Optional** configurations are as below.
+
+    Optional | Description | Example value
+    |---|---|---
+    `managedLedgerOffloadReadBufferSizeInBytes`|Size of block read|1 MB
+    `managedLedgerOffloadMaxBlockSizeInBytes`|Size of block write|64 MB
+    `managedLedgerMinLedgerRolloverTimeMinutes`|Minimum time between ledger rollover for a topic<br><br>**Note**: it is not recommended that you set this configuration in the production environment.|2
+    `managedLedgerMaxEntriesPerLedger`|Maximum number of entries to append to a ledger before triggering a rollover.<br><br>**Note**: it is not recommended that you set this configuration in the production environment.|5000
+
+#### Bucket (required)
+
+A bucket is a basic container that holds your data. Everything you store in Azure BlobStore must be contained in a bucket. You can use a bucket to organize your data and control access to your data, but unlike directory and folder, you cannot nest a bucket.
+
+##### Example
+
+This example names the bucket as _pulsar-topic-offload_.
+
+```conf
+managedLedgerOffloadBucket=pulsar-topic-offload
+```
+
+#### Authentication (required)
+
+To be able to access Azure BlobStore, you need to authenticate with Azure BlobStore.
+
+* Set the environment variables `AZURE_STORAGE_ACCOUNT` and `AZURE_STORAGE_ACCESS_KEY` in `conf/pulsar_env.sh`.
+
+    "export" is important so that the variables are made available in the environment of spawned processes.
+
+    ```bash
+    export AZURE_STORAGE_ACCOUNT=ABC123456789
+    export AZURE_STORAGE_ACCESS_KEY=ded7db27a4558e2ea8bbf0bf37ae0e8521618f366c
+    ```
+
+#### Size of block read/write
+
+You can configure the size of a request sent to or read from Azure BlobStore in the configuration file `broker.conf` or `standalone.conf`. 
+
+Configuration|Description|Default value
+|---|---|---
+`managedLedgerOffloadReadBufferSizeInBytes`|Block size for each individual read when reading back data from Azure BlobStore store.|1 MB
+`managedLedgerOffloadMaxBlockSizeInBytes`|Maximum size of a "part" sent during a multipart upload to Azure BlobStore store. It **cannot** be smaller than 5 MB. |64 MB
+
+### Configure Azure BlobStore offloader to run automatically
+
+Namespace policy can be configured to offload data automatically once a threshold is reached. The threshold is based on the size of data that a topic has stored on a Pulsar cluster. Once the topic reaches the threshold, an offloading operation is triggered automatically. 
+
+Threshold value|Action
+|---|---
+> 0 | It triggers the offloading operation if the topic storage reaches its threshold.
+= 0|It causes a broker to offload data as soon as possible.
+< 0 |It disables automatic offloading operation.
+
+Automatic offloading runs when a new segment is added to a topic log. If you set the threshold on a namespace, but few messages are being produced to the topic, offloader does not work until the current segment is full.
+
+You can configure the threshold size using CLI tools, such as [pulsarctl](https://streamnative.io/docs/v1.0.0/manage-and-monitor/pulsarctl/overview/) or pulsar-admin.
+
+The offload configurations in `broker.conf` and `standalone.conf` are used for the namespaces that do not have namespace level offload policies. Each namespace can have its own offload policy. If you want to set offload policy for each namespace, use the command [`pulsar-admin namespaces set-offload-policies options`](http://pulsar.apache.org/tools/pulsar-admin/2.6.0-SNAPSHOT/#-em-set-offload-policies-em-) command.
+ 
+#### Example
+
+This example sets the Azure BlobStore offloader threshold size to 10 MB using pulsar-admin.
+
+```bash
+bin/pulsar-admin namespaces set-offload-threshold --size 10M my-tenant/my-namespace
+```
+
+> #### Tip
+>
+> For more information about the `pulsar-admin namespaces set-offload-threshold options` command, including flags, descriptions, and default values, see [here](http://pulsar.apache.org/tools/pulsar-admin/2.6.0-SNAPSHOT/#-em-set-offload-threshold-em-). 
+
+### Configure Azure BlobStore offloader to run manually
+
+For individual topics, you can trigger Azure BlobStore offloader manually using one of the following methods:
+
+- Use REST endpoint 
+
+- Use CLI tools (such as [pulsarctl](https://streamnative.io/docs/v1.0.0/manage-and-monitor/pulsarctl/overview/) or pulsar-admin). 
+
+    To trigger it via CLI tools, you need to specify the maximum amount of data (threshold) that should be retained on a Pulsar cluster for a topic. If the size of the topic data on the Pulsar cluster exceeds this threshold, segments from the topic are moved to Azure BlobStore until the threshold is no longer exceeded. Older segments are moved first.
+
+#### Example
+
+- This example triggers the Azure BlobStore offloader to run manually using pulsar-admin.
+
+    ```bash
+    bin/pulsar-admin topics offload --size-threshold 10M my-tenant/my-namespace/topic1
+    ``` 
+
+    **Output**
+
+    ```bash
+    Offload triggered for persistent://my-tenant/my-namespace/topic1 for messages before 2:0:-1
+    ```
+
+    > #### Tip
+    >
+    > For more information about the `pulsar-admin topics offload options` command, including flags, descriptions, and default values, see [here](http://pulsar.apache.org/tools/pulsar-admin/2.6.0-SNAPSHOT/#-em-offload-em-). 
+
+- This example checks the Azure BlobStore offloader status using pulsar-admin.
+
+    ```bash
+    bin/pulsar-admin topics offload-status persistent://my-tenant/my-namespace/topic1
+    ```
+
+    **Output**
+
+    ```bash
+    Offload is currently running
+    ```
+
+    To wait for the Azure BlobStore offloader to complete the job, add the `-w` flag.
+
+    ```bash
+    bin/pulsar-admin topics offload-status -w persistent://my-tenant/my-namespace/topic1
+    ```
+
+    **Output**
+    
+    ```
+    Offload was a success
+    ```
+
+
+    If there is an error in offloading, the error is propagated to the `pulsar-admin topics offload-status` command.
+
+    ```bash
+    bin/pulsar-admin topics offload-status persistent://my-tenant/my-namespace/topic1
+    ```
+
+    **Output**
+
+    ```
+    Error in offload
+    null
+
+    Reason: Error offloading: org.apache.bookkeeper.mledger.ManagedLedgerException: 
+    ````
+
+    > #### Tip
+    >
+    > For more information about the `pulsar-admin topics offload-status options` command, including flags, descriptions, and default values, see [here](http://pulsar.apache.org/tools/pulsar-admin/2.6.0-SNAPSHOT/#-em-offload-status-em-). 
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestUniversalConfigurations.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestUniversalConfigurations.java
new file mode 100644
index 0000000..2e8ef30
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestUniversalConfigurations.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.offload;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.tests.integration.containers.S3Container;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Slf4j
+public class TestUniversalConfigurations extends TestBaseOffload {
+
+    private S3Container s3Container;
+
+    @Override
+    protected void beforeStartCluster() throws Exception {
+        super.beforeStartCluster();
+
+        log.info("s3 container init");
+        s3Container = new S3Container(
+                pulsarCluster.getClusterName(),
+                S3Container.NAME)
+                .withNetwork(pulsarCluster.getNetwork())
+                .withNetworkAliases(S3Container.NAME);
+        s3Container.start();
+        log.info("s3 container start finish.");
+    }
+
+    @AfterClass
+    public void teardownS3() {
+        if (null != s3Container) {
+            s3Container.stop();
+        }
+    }
+
+    @Test(dataProvider =  "ServiceAndAdminUrls")
+    public void testPublishOffloadAndConsumeViaCLI(String serviceUrl, String adminUrl) throws Exception {
+        super.testPublishOffloadAndConsumeViaCLI(serviceUrl, adminUrl);
+    }
+
+    @Test(dataProvider =  "ServiceAndAdminUrls")
+    public void testPublishOffloadAndConsumeViaThreshold(String serviceUrl, String adminUrl) throws Exception {
+        super.testPublishOffloadAndConsumeViaThreshold(serviceUrl, adminUrl);
+    }
+
+    @Test(dataProvider =  "ServiceAndAdminUrls")
+    public void testPublishOffloadAndConsumeDeletionLag(String serviceUrl, String adminUrl) throws Exception {
+        super.testPublishOffloadAndConsumeDeletionLag(serviceUrl, adminUrl);
+
+    }
+
+    @Override
+    protected Map<String, String> getEnv() {
+        Map<String, String> result = new HashMap<>();
+        result.put("managedLedgerMaxEntriesPerLedger", String.valueOf(ENTRIES_PER_LEDGER));
+        result.put("managedLedgerMinLedgerRolloverTimeMinutes", "0");
+        result.put("managedLedgerOffloadDriver", "aws-s3");
+        result.put("managedLedgerOffloadBucket", "pulsar-integtest");
+        result.put("managedLedgerOffloadServiceEndpoint", "http://" + S3Container.NAME + ":9090");
+
+        return result;
+    }
+
+}
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java
index 9fcebc2..ba03f64 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java
@@ -39,6 +39,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.jclouds.Constants;
 import org.jclouds.ContextBuilder;
 import org.jclouds.aws.s3.AWSS3ProviderMetadata;
+import org.jclouds.azureblob.AzureBlobProviderMetadata;
 import org.jclouds.blobstore.BlobStore;
 import org.jclouds.blobstore.BlobStoreContext;
 import org.jclouds.blobstore.TransientApiMetadata;
@@ -117,6 +118,31 @@ public enum JCloudBlobStoreProvider implements Serializable, ConfigValidation, B
         }
     },
 
+    AZURE_BLOB("azureblob", new AzureBlobProviderMetadata()) {
+        @Override
+        public void validate(TieredStorageConfiguration config) throws IllegalArgumentException {
+            VALIDATION.validate(config);
+        }
+
+        @Override
+        public BlobStore getBlobStore(TieredStorageConfiguration config) {
+            return BLOB_STORE_BUILDER.getBlobStore(config);
+        }
+
+        @Override
+        public void buildCredentials(TieredStorageConfiguration config) {
+            String accountName = System.getenv("AZURE_STORAGE_ACCOUNT");
+            if (StringUtils.isEmpty(accountName)) {
+                throw new IllegalArgumentException("Couldn't get the azure storage account.");
+            }
+            String accountKey = System.getenv("AZURE_STORAGE_ACCESS_KEY");
+            if (StringUtils.isEmpty(accountKey)) {
+                throw new IllegalArgumentException("Couldn't get the azure storage access key.");
+            }
+            config.setProviderCredentials(new Credentials(accountName, accountKey));
+        }
+    },
+
     TRANSIENT("transient", new AnonymousProviderMetadata(new TransientApiMetadata(), "")) {
         @Override
         public void validate(TieredStorageConfiguration config) throws IllegalArgumentException {
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java
index f122ec0..5abd867 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java
@@ -65,7 +65,7 @@ public class TieredStorageConfiguration implements Serializable, Cloneable {
     public static final String METADATA_FIELD_ENDPOINT = "serviceEndpoint";
     public static final String METADATA_FIELD_MAX_BLOCK_SIZE = "maxBlockSizeInBytes";
     public static final String METADATA_FIELD_READ_BUFFER_SIZE = "readBufferSizeInBytes";
-    public static final String OFFLOADER_PROPERTY_PREFIX = "managedLedgerOffload.";
+    public static final String OFFLOADER_PROPERTY_PREFIX = "managedLedgerOffload";
 
     protected static final int MB = 1024 * 1024;
 
@@ -115,7 +115,7 @@ public class TieredStorageConfiguration implements Serializable, Cloneable {
     private String getKeyName(String property) {
         StringBuilder sb = new StringBuilder();
         sb.append(OFFLOADER_PROPERTY_PREFIX)
-          .append(property);
+          .append(StringUtils.capitalize(property));
 
         return sb.toString();
     }
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderBase.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderBase.java
index 90e1caf..ea613e7 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderBase.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderBase.java
@@ -37,6 +37,7 @@ import org.apache.bookkeeper.mledger.offload.jcloud.impl.DataBlockHeaderImpl;
 import org.apache.bookkeeper.mledger.offload.jcloud.provider.JCloudBlobStoreProvider;
 import org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration;
 import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.MockZooKeeper;
 import org.apache.zookeeper.data.ACL;
@@ -123,9 +124,9 @@ public abstract class BlobStoreManagedLedgerOffloaderBase {
     protected TieredStorageConfiguration getConfiguration(String bucket) {
         Map<String, String> metaData = new HashMap<String, String> ();
         metaData.put(TieredStorageConfiguration.BLOB_STORE_PROVIDER_KEY, provider.getDriver());
-        metaData.put(TieredStorageConfiguration.OFFLOADER_PROPERTY_PREFIX + TieredStorageConfiguration.METADATA_FIELD_REGION, "");
-        metaData.put(TieredStorageConfiguration.OFFLOADER_PROPERTY_PREFIX + TieredStorageConfiguration.METADATA_FIELD_BUCKET, bucket);
-        metaData.put(TieredStorageConfiguration.OFFLOADER_PROPERTY_PREFIX + TieredStorageConfiguration.METADATA_FIELD_ENDPOINT, "");
+        metaData.put(getConfigKey(TieredStorageConfiguration.METADATA_FIELD_REGION), "");
+        metaData.put(getConfigKey(TieredStorageConfiguration.METADATA_FIELD_BUCKET), bucket);
+        metaData.put(getConfigKey(TieredStorageConfiguration.METADATA_FIELD_ENDPOINT), "");
         
         TieredStorageConfiguration config = TieredStorageConfiguration.create(metaData);
         config.setProviderCredentials(getBlobStoreCredentials());
@@ -133,6 +134,10 @@ public abstract class BlobStoreManagedLedgerOffloaderBase {
         return config;
     }
 
+    private String getConfigKey(String field) {
+        return TieredStorageConfiguration.OFFLOADER_PROPERTY_PREFIX + StringUtils.capitalize(field);
+    }
+
     protected ReadHandle buildReadHandle() throws Exception {
         return buildReadHandle(DEFAULT_BLOCK_SIZE, 1);
     }
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProviderTests.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProviderTests.java
index 4a0a56e..28e5829 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProviderTests.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProviderTests.java
@@ -35,9 +35,9 @@ public class JCloudBlobStoreProviderTests {
     public void awsValidationSuccessTest() {
         Map<String, String> map = new HashMap<String,String>(); 
         map.put(TieredStorageConfiguration.BLOB_STORE_PROVIDER_KEY, JCloudBlobStoreProvider.AWS_S3.getDriver());
-        map.put("managedLedgerOffload.region", "us-east-1");
-        map.put("managedLedgerOffload.bucket", "test bucket");
-        map.put("managedLedgerOffload.maxBlockSizeInBytes", "99999999");
+        map.put("managedLedgerOffloadRegion", "us-east-1");
+        map.put("managedLedgerOffloadBucket", "test bucket");
+        map.put("managedLedgerOffloadMaxBlockSizeInBytes", "99999999");
         config = new TieredStorageConfiguration(map);
         JCloudBlobStoreProvider.AWS_S3.validate(config);
     }
@@ -46,8 +46,8 @@ public class JCloudBlobStoreProviderTests {
     public void awsValidationDefaultBlockSizeTest() {
         Map<String, String> map = new HashMap<String,String>(); 
         map.put(TieredStorageConfiguration.BLOB_STORE_PROVIDER_KEY, JCloudBlobStoreProvider.AWS_S3.getDriver());
-        map.put("managedLedgerOffload.region", "us-east-1");
-        map.put("managedLedgerOffload.bucket", "test bucket");
+        map.put("managedLedgerOffloadRegion", "us-east-1");
+        map.put("managedLedgerOffloadBucket", "test bucket");
         config = new TieredStorageConfiguration(map);
         JCloudBlobStoreProvider.AWS_S3.validate(config);
     }
@@ -57,8 +57,8 @@ public class JCloudBlobStoreProviderTests {
     public void awsValidationMissingRegionTest() {
         Map<String, String> map = new HashMap<String,String>(); 
         map.put(TieredStorageConfiguration.BLOB_STORE_PROVIDER_KEY, JCloudBlobStoreProvider.AWS_S3.getDriver());
-        map.put("managedLedgerOffload.bucket", "my-bucket");
-        map.put("managedLedgerOffload.maxBlockSizeInBytes", "999999");
+        map.put("managedLedgerOffloadBucket", "my-bucket");
+        map.put("managedLedgerOffloadMaxBlockSizeInBytes", "999999");
         config = new TieredStorageConfiguration(map);
         JCloudBlobStoreProvider.AWS_S3.validate(config);
     }
@@ -68,8 +68,8 @@ public class JCloudBlobStoreProviderTests {
     public void awsValidationMissingBucketTest() {
         Map<String, String> map = new HashMap<String,String>(); 
         map.put(TieredStorageConfiguration.BLOB_STORE_PROVIDER_KEY, JCloudBlobStoreProvider.AWS_S3.getDriver());
-        map.put("managedLedgerOffload.region", "us-east-1");
-        map.put("managedLedgerOffload.maxBlockSizeInBytes", "99999999");
+        map.put("managedLedgerOffloadRegion", "us-east-1");
+        map.put("managedLedgerOffloadMaxBlockSizeInBytes", "99999999");
         config = new TieredStorageConfiguration(map);
         assertEquals(config.getRegion(), "us-east-1");
         JCloudBlobStoreProvider.AWS_S3.validate(config);
@@ -81,18 +81,18 @@ public class JCloudBlobStoreProviderTests {
     public void awsValidationBlockSizeTest() {
         Map<String, String> map = new HashMap<String,String>(); 
         map.put(TieredStorageConfiguration.BLOB_STORE_PROVIDER_KEY, JCloudBlobStoreProvider.AWS_S3.getDriver());
-        map.put("managedLedgerOffload.region", "us-east-1");
-        map.put("managedLedgerOffload.bucket", "test bucket");
-        map.put("managedLedgerOffload.maxBlockSizeInBytes", "1");
+        map.put("managedLedgerOffloadRegion", "us-east-1");
+        map.put("managedLedgerOffloadBucket", "test bucket");
+        map.put("managedLedgerOffloadMaxBlockSizeInBytes", "1");
         config = new TieredStorageConfiguration(map);
         JCloudBlobStoreProvider.AWS_S3.validate(config);
     }
    
     @Test
     public void transientValidationSuccessTest() {
-        Map<String, String> map = new HashMap<String,String>(); 
+        Map<String, String> map = new HashMap<String,String>();
         map.put(TieredStorageConfiguration.BLOB_STORE_PROVIDER_KEY, "transient");
-        map.put("managedLedgerOffload.bucket", "test bucket");
+        map.put("managedLedgerOffloadBucket", "test bucket");
         config = new TieredStorageConfiguration(map);
         JCloudBlobStoreProvider.TRANSIENT.validate(config);
     }
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfigurationTests.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfigurationTests.java
index 6e84e8c..41e6255 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfigurationTests.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfigurationTests.java
@@ -53,23 +53,23 @@ public class TieredStorageConfigurationTests {
         TieredStorageConfiguration config = new TieredStorageConfiguration(map);
         List<String> keys = config.getKeys(TieredStorageConfiguration.METADATA_FIELD_BUCKET);
         assertEquals(keys.get(0), BC_S3_BUCKET);
-        assertEquals(keys.get(1), "managedLedgerOffload.bucket");
+        assertEquals(keys.get(1), "managedLedgerOffloadBucket");
         
         keys = config.getKeys(TieredStorageConfiguration.METADATA_FIELD_REGION);
         assertEquals(keys.get(0), BC_S3_REGION);
-        assertEquals(keys.get(1), "managedLedgerOffload.region");
+        assertEquals(keys.get(1), "managedLedgerOffloadRegion");
         
         keys = config.getKeys(TieredStorageConfiguration.METADATA_FIELD_ENDPOINT);
         assertEquals(keys.get(0), BC_S3_ENDPOINT);
-        assertEquals(keys.get(1), "managedLedgerOffload.serviceEndpoint");
+        assertEquals(keys.get(1), "managedLedgerOffloadServiceEndpoint");
         
         keys = config.getKeys(TieredStorageConfiguration.METADATA_FIELD_MAX_BLOCK_SIZE);
         assertEquals(keys.get(0), BC_S3_MAX_BLOCK_SIZE);
-        assertEquals(keys.get(1), "managedLedgerOffload.maxBlockSizeInBytes");
+        assertEquals(keys.get(1), "managedLedgerOffloadMaxBlockSizeInBytes");
         
         keys = config.getKeys(TieredStorageConfiguration.METADATA_FIELD_READ_BUFFER_SIZE);
         assertEquals(keys.get(0), BC_S3_READ_BUFFER_SIZE);
-        assertEquals(keys.get(1), "managedLedgerOffload.readBufferSizeInBytes");
+        assertEquals(keys.get(1), "managedLedgerOffloadReadBufferSizeInBytes");
     }
     
     /**
@@ -79,11 +79,11 @@ public class TieredStorageConfigurationTests {
     public final void awsS3PropertiesTest() {
         Map<String, String> map = new HashMap<String,String>(); 
         map.put(TieredStorageConfiguration.BLOB_STORE_PROVIDER_KEY, JCloudBlobStoreProvider.AWS_S3.getDriver());
-        map.put("managedLedgerOffload.region", "us-east-1");
-        map.put("managedLedgerOffload.bucket", "test bucket");
-        map.put("managedLedgerOffload.maxBlockSizeInBytes", "1");
-        map.put("managedLedgerOffload.readBufferSizeInBytes", "500");
-        map.put("managedLedgerOffload.serviceEndpoint", "http://some-url:9093");
+        map.put("managedLedgerOffloadRegion", "us-east-1");
+        map.put("managedLedgerOffloadBucket", "test bucket");
+        map.put("managedLedgerOffloadMaxBlockSizeInBytes", "1");
+        map.put("managedLedgerOffloadReadBufferSizeInBytes", "500");
+        map.put("managedLedgerOffloadServiceEndpoint", "http://some-url:9093");
         TieredStorageConfiguration config = new TieredStorageConfiguration(map);
         
         assertEquals(config.getRegion(), "us-east-1");
@@ -124,19 +124,19 @@ public class TieredStorageConfigurationTests {
         TieredStorageConfiguration config = new TieredStorageConfiguration(map);
         List<String> keys = config.getKeys(TieredStorageConfiguration.METADATA_FIELD_BUCKET);
         assertEquals(keys.get(0), BC_GCS_BUCKET);
-        assertEquals(keys.get(1), "managedLedgerOffload.bucket");
+        assertEquals(keys.get(1), "managedLedgerOffloadBucket");
         
         keys = config.getKeys(TieredStorageConfiguration.METADATA_FIELD_REGION);
         assertEquals(keys.get(0), BC_GCS_REGION);
-        assertEquals(keys.get(1), "managedLedgerOffload.region");
+        assertEquals(keys.get(1), "managedLedgerOffloadRegion");
         
         keys = config.getKeys(TieredStorageConfiguration.METADATA_FIELD_MAX_BLOCK_SIZE);
         assertEquals(keys.get(0), BC_GCS_MAX_BLOCK_SIZE);
-        assertEquals(keys.get(1), "managedLedgerOffload.maxBlockSizeInBytes");
+        assertEquals(keys.get(1), "managedLedgerOffloadMaxBlockSizeInBytes");
         
         keys = config.getKeys(TieredStorageConfiguration.METADATA_FIELD_READ_BUFFER_SIZE);
         assertEquals(keys.get(0), BC_GCS_READ_BUFFER_SIZE);
-        assertEquals(keys.get(1), "managedLedgerOffload.readBufferSizeInBytes");
+        assertEquals(keys.get(1), "managedLedgerOffloadReadBufferSizeInBytes");
     }
     
     /**
@@ -146,11 +146,11 @@ public class TieredStorageConfigurationTests {
     public final void gcsPropertiesTest() {
         Map<String, String> map = new HashMap<String,String>(); 
         map.put(TieredStorageConfiguration.BLOB_STORE_PROVIDER_KEY, JCloudBlobStoreProvider.GOOGLE_CLOUD_STORAGE.getDriver());
-        map.put("managedLedgerOffload.region", "us-east-1");
-        map.put("managedLedgerOffload.bucket", "test bucket");
-        map.put("managedLedgerOffload.maxBlockSizeInBytes", "1");
-        map.put("managedLedgerOffload.readBufferSizeInBytes", "500");
-        map.put("managedLedgerOffload.serviceEndpoint", "http://some-url:9093");
+        map.put("managedLedgerOffloadRegion", "us-east-1");
+        map.put("managedLedgerOffloadBucket", "test bucket");
+        map.put("managedLedgerOffloadMaxBlockSizeInBytes", "1");
+        map.put("managedLedgerOffloadReadBufferSizeInBytes", "500");
+        map.put("managedLedgerOffloadServiceEndpoint", "http://some-url:9093");
         TieredStorageConfiguration config = new TieredStorageConfiguration(map);
         
         assertEquals(config.getRegion(), "us-east-1");
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TransientBlobStoreFactoryTests.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TransientBlobStoreFactoryTests.java
index dba68cf..ec57023 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TransientBlobStoreFactoryTests.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TransientBlobStoreFactoryTests.java
@@ -55,7 +55,7 @@ public class TransientBlobStoreFactoryTests extends AbstractJCloudBlobStoreFacto
     protected Map<String, String> getConfig() {
         Map<String, String> metadata = new HashMap<String,String>();
         metadata.put(TieredStorageConfiguration.BLOB_STORE_PROVIDER_KEY, "transient");
-        metadata.put("managedLedgerOffload.bucket", TEST_CONTAINER_NAME);
+        metadata.put("managedLedgerOffloadBucket", TEST_CONTAINER_NAME);
         return metadata;
     }
 }