You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/12/29 09:08:50 UTC

[pulsar] branch master updated: [Issue 8887][tiered-storage-jcloud] support tiered-storage provider by aliyun OSS (#8985)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 98ad39f  [Issue 8887][tiered-storage-jcloud] support tiered-storage provider by aliyun OSS (#8985)
98ad39f is described below

commit 98ad39ffa51239e389c73411dfb8df7f5592a5aa
Author: wangyufan <wa...@gmail.com>
AuthorDate: Tue Dec 29 17:08:16 2020 +0800

    [Issue 8887][tiered-storage-jcloud] support tiered-storage provider by aliyun OSS (#8985)
    
    [Issue 8887][tiered-storage-jcloud] support tiered-storage provider by aliyun OSS
---
 .../common/policies/data/OffloadPolicies.java      |  4 +-
 .../jcloud/provider/JCloudBlobStoreProvider.java   | 81 +++++++++++++++++++++-
 2 files changed, 83 insertions(+), 2 deletions(-)

diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
index 422bf24..4c5058b 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
@@ -60,7 +60,9 @@ public class OffloadPolicies implements Serializable {
     public final static int DEFAULT_READ_BUFFER_SIZE_IN_BYTES = 1024 * 1024;      // 1MB
     public final static int DEFAULT_OFFLOAD_MAX_THREADS = 2;
     public final static int DEFAULT_OFFLOAD_MAX_PREFETCH_ROUNDS = 1;
-    public final static String[] DRIVER_NAMES = {"S3", "aws-s3", "google-cloud-storage", "filesystem", "azureblob"};
+    public final static String[] DRIVER_NAMES = {
+            "S3", "aws-s3", "google-cloud-storage", "filesystem", "azureblob", "aliyun-oss"
+    };
     public final static String DEFAULT_OFFLOADER_DIRECTORY = "./offloaders";
     public final static Long DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES = null;
     public final static Long DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS = null;
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java
index 9d0871e..ba7065e 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java
@@ -33,6 +33,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
 import java.nio.charset.Charset;
+import java.util.Properties;
 import java.util.UUID;
 
 import lombok.extern.slf4j.Slf4j;
@@ -57,6 +58,8 @@ import org.jclouds.googlecloud.GoogleCredentialsFromJson;
 import org.jclouds.googlecloudstorage.GoogleCloudStorageProviderMetadata;
 import org.jclouds.providers.AnonymousProviderMetadata;
 import org.jclouds.providers.ProviderMetadata;
+import org.jclouds.s3.S3ApiMetadata;
+import org.jclouds.s3.reference.S3Constants;
 
 /**
  * Enumeration of the supported JCloud Blob Store Providers.
@@ -162,6 +165,28 @@ public enum JCloudBlobStoreProvider implements Serializable, ConfigValidation, B
         }
     },
 
+
+    /**
+     * Aliyun OSS is compatible with the S3 API
+     * https://www.alibabacloud.com/help/doc-detail/64919.htm
+     */
+    ALIYUN_OSS("aliyun-oss", new AnonymousProviderMetadata(new S3ApiMetadata(), "")) {
+        @Override
+        public void validate(TieredStorageConfiguration config) throws IllegalArgumentException {
+            ALIYUN_OSS_VALIDATION.validate(config);
+        }
+
+        @Override
+        public BlobStore getBlobStore(TieredStorageConfiguration config) {
+            return ALIYUN_OSS_BLOB_STORE_BUILDER.getBlobStore(config);
+        }
+
+        @Override
+        public void buildCredentials(TieredStorageConfiguration config) {
+            ALIYUN_OSS_CREDENTIAL_BUILDER.buildCredentials(config);
+        }
+    },
+
     TRANSIENT("transient", new AnonymousProviderMetadata(new TransientApiMetadata(), "")) {
         @Override
         public void validate(TieredStorageConfiguration config) throws IllegalArgumentException {
@@ -177,7 +202,7 @@ public enum JCloudBlobStoreProvider implements Serializable, ConfigValidation, B
             ContextBuilder builder =  ContextBuilder.newBuilder("transient");
             BlobStoreContext ctx = builder
                     .buildView(BlobStoreContext.class);
-            
+
             BlobStore bs = ctx.getBlobStore();
 
             if (!bs.containerExists(config.getBucket())) {
@@ -312,4 +337,58 @@ public enum JCloudBlobStoreProvider implements Serializable, ConfigValidation, B
             }
         }
     };
+
+    static final BlobStoreBuilder ALIYUN_OSS_BLOB_STORE_BUILDER = (TieredStorageConfiguration config) -> {
+        ContextBuilder contextBuilder = ContextBuilder.newBuilder(config.getProviderMetadata());
+        Properties overrides = config.getOverrides();
+        // For security reasons, OSS supports only virtual hosted style access.
+        overrides.setProperty(S3Constants.PROPERTY_S3_VIRTUAL_HOST_BUCKETS, "true");
+        contextBuilder.overrides(overrides);
+        contextBuilder.endpoint(config.getServiceEndpoint());
+
+        if (config.getProviderCredentials() != null) {
+            return contextBuilder
+                    .credentialsSupplier(config.getCredentials())
+                    .buildView(BlobStoreContext.class)
+                    .getBlobStore();
+        } else {
+            log.warn("The credentials is null. driver: {}, bucket: {}", config.getDriver(), config.getBucket());
+            return contextBuilder
+                    .buildView(BlobStoreContext.class)
+                    .getBlobStore();
+        }
+    };
+
+    static final ConfigValidation ALIYUN_OSS_VALIDATION = (TieredStorageConfiguration config) -> {
+        if (Strings.isNullOrEmpty(config.getServiceEndpoint())) {
+            throw new IllegalArgumentException(
+                    "ServiceEndpoint must specified for " + config.getDriver() + " offload");
+        }
+
+        if (Strings.isNullOrEmpty(config.getBucket())) {
+            throw new IllegalArgumentException(
+                    "Bucket cannot be empty for " + config.getDriver() + " offload");
+        }
+
+        if (config.getMaxBlockSizeInBytes() < (5 * 1024 * 1024)) {
+            throw new IllegalArgumentException(
+                    "ManagedLedgerOffloadMaxBlockSizeInBytes cannot be less than 5MB for "
+                            + config.getDriver() + " offload");
+        }
+    };
+
+    static final CredentialBuilder ALIYUN_OSS_CREDENTIAL_BUILDER = (TieredStorageConfiguration config) -> {
+        String accountName = System.getenv("ALIYUN_OSS_ACCESS_KEY_ID");
+        if (StringUtils.isEmpty(accountName)) {
+            throw new IllegalArgumentException("Couldn't get the aliyun oss access key id.");
+        }
+        String accountKey = System.getenv("ALIYUN_OSS_ACCESS_KEY_SECRET");
+        if (StringUtils.isEmpty(accountKey)) {
+            throw new IllegalArgumentException("Couldn't get the aliyun oss access key secret.");
+        }
+        Credentials credentials = new Credentials(
+                accountName, accountKey);
+        config.setProviderCredentials(() -> credentials);
+    };
+
 }
\ No newline at end of file