You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/12/29 09:08:50 UTC
[pulsar] branch master updated: [Issue 8887][tiered-storage-jcloud]
support tiered-storage provider by aliyun OSS (#8985)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 98ad39f [Issue 8887][tiered-storage-jcloud] support tiered-storage provider by aliyun OSS (#8985)
98ad39f is described below
commit 98ad39ffa51239e389c73411dfb8df7f5592a5aa
Author: wangyufan <wa...@gmail.com>
AuthorDate: Tue Dec 29 17:08:16 2020 +0800
[Issue 8887][tiered-storage-jcloud] support tiered-storage provider by aliyun OSS (#8985)
[Issue 8887][tiered-storage-jcloud] support tiered-storage provider by aliyun OSS
---
.../common/policies/data/OffloadPolicies.java | 4 +-
.../jcloud/provider/JCloudBlobStoreProvider.java | 81 +++++++++++++++++++++-
2 files changed, 83 insertions(+), 2 deletions(-)
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
index 422bf24..4c5058b 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
@@ -60,7 +60,9 @@ public class OffloadPolicies implements Serializable {
public final static int DEFAULT_READ_BUFFER_SIZE_IN_BYTES = 1024 * 1024; // 1MB
public final static int DEFAULT_OFFLOAD_MAX_THREADS = 2;
public final static int DEFAULT_OFFLOAD_MAX_PREFETCH_ROUNDS = 1;
- public final static String[] DRIVER_NAMES = {"S3", "aws-s3", "google-cloud-storage", "filesystem", "azureblob"};
+ public final static String[] DRIVER_NAMES = {
+ "S3", "aws-s3", "google-cloud-storage", "filesystem", "azureblob", "aliyun-oss"
+ };
public final static String DEFAULT_OFFLOADER_DIRECTORY = "./offloaders";
public final static Long DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES = null;
public final static Long DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS = null;
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java
index 9d0871e..ba7065e 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java
@@ -33,6 +33,7 @@ import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.Charset;
+import java.util.Properties;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;
@@ -57,6 +58,8 @@ import org.jclouds.googlecloud.GoogleCredentialsFromJson;
import org.jclouds.googlecloudstorage.GoogleCloudStorageProviderMetadata;
import org.jclouds.providers.AnonymousProviderMetadata;
import org.jclouds.providers.ProviderMetadata;
+import org.jclouds.s3.S3ApiMetadata;
+import org.jclouds.s3.reference.S3Constants;
/**
* Enumeration of the supported JCloud Blob Store Providers.
@@ -162,6 +165,28 @@ public enum JCloudBlobStoreProvider implements Serializable, ConfigValidation, B
}
},
+
+ /**
+ * Aliyun OSS is compatible with the S3 API
+ * https://www.alibabacloud.com/help/doc-detail/64919.htm
+ */
+ ALIYUN_OSS("aliyun-oss", new AnonymousProviderMetadata(new S3ApiMetadata(), "")) {
+ @Override
+ public void validate(TieredStorageConfiguration config) throws IllegalArgumentException {
+ ALIYUN_OSS_VALIDATION.validate(config);
+ }
+
+ @Override
+ public BlobStore getBlobStore(TieredStorageConfiguration config) {
+ return ALIYUN_OSS_BLOB_STORE_BUILDER.getBlobStore(config);
+ }
+
+ @Override
+ public void buildCredentials(TieredStorageConfiguration config) {
+ ALIYUN_OSS_CREDENTIAL_BUILDER.buildCredentials(config);
+ }
+ },
+
TRANSIENT("transient", new AnonymousProviderMetadata(new TransientApiMetadata(), "")) {
@Override
public void validate(TieredStorageConfiguration config) throws IllegalArgumentException {
@@ -177,7 +202,7 @@ public enum JCloudBlobStoreProvider implements Serializable, ConfigValidation, B
ContextBuilder builder = ContextBuilder.newBuilder("transient");
BlobStoreContext ctx = builder
.buildView(BlobStoreContext.class);
-
+
BlobStore bs = ctx.getBlobStore();
if (!bs.containerExists(config.getBucket())) {
@@ -312,4 +337,58 @@ public enum JCloudBlobStoreProvider implements Serializable, ConfigValidation, B
}
}
};
+
+ static final BlobStoreBuilder ALIYUN_OSS_BLOB_STORE_BUILDER = (TieredStorageConfiguration config) -> {
+ ContextBuilder contextBuilder = ContextBuilder.newBuilder(config.getProviderMetadata());
+ Properties overrides = config.getOverrides();
+ // For security reasons, OSS supports only virtual hosted style access.
+ overrides.setProperty(S3Constants.PROPERTY_S3_VIRTUAL_HOST_BUCKETS, "true");
+ contextBuilder.overrides(overrides);
+ contextBuilder.endpoint(config.getServiceEndpoint());
+
+ if (config.getProviderCredentials() != null) {
+ return contextBuilder
+ .credentialsSupplier(config.getCredentials())
+ .buildView(BlobStoreContext.class)
+ .getBlobStore();
+ } else {
+ log.warn("The credentials is null. driver: {}, bucket: {}", config.getDriver(), config.getBucket());
+ return contextBuilder
+ .buildView(BlobStoreContext.class)
+ .getBlobStore();
+ }
+ };
+
+ static final ConfigValidation ALIYUN_OSS_VALIDATION = (TieredStorageConfiguration config) -> {
+ if (Strings.isNullOrEmpty(config.getServiceEndpoint())) {
+ throw new IllegalArgumentException(
+ "ServiceEndpoint must specified for " + config.getDriver() + " offload");
+ }
+
+ if (Strings.isNullOrEmpty(config.getBucket())) {
+ throw new IllegalArgumentException(
+ "Bucket cannot be empty for " + config.getDriver() + " offload");
+ }
+
+ if (config.getMaxBlockSizeInBytes() < (5 * 1024 * 1024)) {
+ throw new IllegalArgumentException(
+ "ManagedLedgerOffloadMaxBlockSizeInBytes cannot be less than 5MB for "
+ + config.getDriver() + " offload");
+ }
+ };
+
+ static final CredentialBuilder ALIYUN_OSS_CREDENTIAL_BUILDER = (TieredStorageConfiguration config) -> {
+ String accountName = System.getenv("ALIYUN_OSS_ACCESS_KEY_ID");
+ if (StringUtils.isEmpty(accountName)) {
+ throw new IllegalArgumentException("Couldn't get the aliyun oss access key id.");
+ }
+ String accountKey = System.getenv("ALIYUN_OSS_ACCESS_KEY_SECRET");
+ if (StringUtils.isEmpty(accountKey)) {
+ throw new IllegalArgumentException("Couldn't get the aliyun oss access key secret.");
+ }
+ Credentials credentials = new Credentials(
+ accountName, accountKey);
+ config.setProviderCredentials(() -> credentials);
+ };
+
}
\ No newline at end of file