You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/10 15:01:47 UTC
[pulsar] 03/12: [feat] [tiered-storage] Add pure S3 provider for the offloader (#15710)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit b2de04362b302d8d4294e6e4073c9d046f9bb84d
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Wed May 25 15:37:33 2022 +0800
[feat] [tiered-storage] Add pure S3 provider for the offloader (#15710)
* [improve] [tiered-storage] Add pure S3 provider for the offloader
---
*Motivation*
There have some cloud storages are compatible with S3
APIs, such as aliyun-oss. Some other storages also use
the S3 APIs and want to offload the data into them, but
we only support the AWS or the Aliyun.
The PR https://github.com/apache/pulsar/pull/8985 provides
the Aliyun offload provider, but it has a force limitation of
the `S3Constants.PROPERTY_S3_VIRTUAL_HOST_BUCKETS`. That
is not a limitation on other storage service which compatible
with S3 APIs.
This PR provides a more general offload provider `S3` which uses
pure JClouds S3 metadata and allows people to override the
default JClouds properties through system properties.
*Modifications*
- Add the pure S3 offload provider
(cherry picked from commit 047cb0e3117d55a79c0082c0dc3d2ab3c9bcd6f9)
---
.../jcloud/provider/JCloudBlobStoreProvider.java | 54 ++++++++++++++++------
.../provider/TieredStorageConfiguration.java | 13 ++++++
.../provider/JCloudBlobStoreProviderTests.java | 31 ++++++++++++-
.../provider/TieredStorageConfigurationTests.java | 17 +++++++
4 files changed, 99 insertions(+), 16 deletions(-)
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java
index 44aa92ce924..fc28c0291ce 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java
@@ -181,17 +181,34 @@ public enum JCloudBlobStoreProvider implements Serializable, ConfigValidation, B
ALIYUN_OSS("aliyun-oss", new AnonymousProviderMetadata(new S3ApiMetadata(), "")) {
@Override
public void validate(TieredStorageConfiguration config) throws IllegalArgumentException {
- ALIYUN_OSS_VALIDATION.validate(config);
+ S3_VALIDATION.validate(config);
}
@Override
public BlobStore getBlobStore(TieredStorageConfiguration config) {
- return ALIYUN_OSS_BLOB_STORE_BUILDER.getBlobStore(config);
+ return S3_BLOB_STORE_BUILDER.getBlobStore(config);
}
@Override
public void buildCredentials(TieredStorageConfiguration config) {
- ALIYUN_OSS_CREDENTIAL_BUILDER.buildCredentials(config);
+ S3_CREDENTIAL_BUILDER.buildCredentials(config);
+ }
+ },
+
+ S3("S3", new AnonymousProviderMetadata(new S3ApiMetadata(), "")) {
+ @Override
+ public BlobStore getBlobStore(TieredStorageConfiguration config) {
+ return S3_BLOB_STORE_BUILDER.getBlobStore(config);
+ }
+
+ @Override
+ public void buildCredentials(TieredStorageConfiguration config) {
+ S3_CREDENTIAL_BUILDER.buildCredentials(config);
+ }
+
+ @Override
+ public void validate(TieredStorageConfiguration config) throws IllegalArgumentException {
+ S3_VALIDATION.validate(config);
}
},
@@ -374,12 +391,14 @@ public enum JCloudBlobStoreProvider implements Serializable, ConfigValidation, B
}
};
- static final BlobStoreBuilder ALIYUN_OSS_BLOB_STORE_BUILDER = (TieredStorageConfiguration config) -> {
+ static final BlobStoreBuilder S3_BLOB_STORE_BUILDER = (TieredStorageConfiguration config) -> {
ContextBuilder contextBuilder = ContextBuilder.newBuilder(config.getProviderMetadata());
contextBuilder.modules(Arrays.asList(new SLF4JLoggingModule()));
Properties overrides = config.getOverrides();
- // For security reasons, OSS supports only virtual hosted style access.
- overrides.setProperty(S3Constants.PROPERTY_S3_VIRTUAL_HOST_BUCKETS, "true");
+ if (ALIYUN_OSS.getDriver().equals(config.getDriver())) {
+ // For security reasons, OSS supports only virtual hosted style access.
+ overrides.setProperty(S3Constants.PROPERTY_S3_VIRTUAL_HOST_BUCKETS, "true");
+ }
contextBuilder.overrides(overrides);
contextBuilder.endpoint(config.getServiceEndpoint());
@@ -396,7 +415,7 @@ public enum JCloudBlobStoreProvider implements Serializable, ConfigValidation, B
}
};
- static final ConfigValidation ALIYUN_OSS_VALIDATION = (TieredStorageConfiguration config) -> {
+ static final ConfigValidation S3_VALIDATION = (TieredStorageConfiguration config) -> {
if (Strings.isNullOrEmpty(config.getServiceEndpoint())) {
throw new IllegalArgumentException(
"ServiceEndpoint must specified for " + config.getDriver() + " offload");
@@ -414,14 +433,21 @@ public enum JCloudBlobStoreProvider implements Serializable, ConfigValidation, B
}
};
- static final CredentialBuilder ALIYUN_OSS_CREDENTIAL_BUILDER = (TieredStorageConfiguration config) -> {
- String accountName = System.getenv("ALIYUN_OSS_ACCESS_KEY_ID");
- if (StringUtils.isEmpty(accountName)) {
- throw new IllegalArgumentException("Couldn't get the aliyun oss access key id.");
+ static final CredentialBuilder S3_CREDENTIAL_BUILDER = (TieredStorageConfiguration config) -> {
+ String accountName = System.getenv().getOrDefault("ACCESS_KEY_ID", "");
+ // For forward compatibility
+ if (StringUtils.isEmpty(accountName.trim())) {
+ accountName = System.getenv().getOrDefault("ALIYUN_OSS_ACCESS_KEY_ID", "");
+ }
+ if (StringUtils.isEmpty(accountName.trim())) {
+ throw new IllegalArgumentException("Couldn't get the access key id.");
+ }
+ String accountKey = System.getenv().getOrDefault("ACCESS_KEY_ID", "");
+ if (StringUtils.isEmpty(accountKey.trim())) {
+ accountKey = System.getenv().getOrDefault("ALIYUN_OSS_ACCESS_KEY_SECRET", "");
}
- String accountKey = System.getenv("ALIYUN_OSS_ACCESS_KEY_SECRET");
- if (StringUtils.isEmpty(accountKey)) {
- throw new IllegalArgumentException("Couldn't get the aliyun oss access key secret.");
+ if (StringUtils.isEmpty(accountKey.trim())) {
+ throw new IllegalArgumentException("Couldn't get the access key secret.");
}
Credentials credentials = new Credentials(
accountName, accountKey);
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java
index c1054969a42..18e3bbf0db8 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java
@@ -329,6 +329,19 @@ public class TieredStorageConfiguration {
overrides.setProperty(S3Constants.PROPERTY_S3_VIRTUAL_HOST_BUCKETS, "false");
}
+ // load more jclouds properties into the overrides
+ System.getProperties().entrySet().stream()
+ .filter(p -> p.getKey().toString().startsWith("jclouds"))
+ .forEach(jcloudsProp -> {
+ overrides.setProperty(jcloudsProp.getKey().toString(), jcloudsProp.getValue().toString());
+ });
+
+ System.getenv().entrySet().stream()
+ .filter(p -> p.getKey().toString().startsWith("jclouds"))
+ .forEach(jcloudsProp -> {
+ overrides.setProperty(jcloudsProp.getKey().toString(), jcloudsProp.getValue().toString());
+ });
+
log.info("getOverrides: {}", overrides.toString());
return overrides;
}
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProviderTests.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProviderTests.java
index 28e5829ba2a..4f0c60bc007 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProviderTests.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProviderTests.java
@@ -23,8 +23,6 @@ import static org.testng.Assert.assertEquals;
import java.util.HashMap;
import java.util.Map;
-import org.apache.bookkeeper.mledger.offload.jcloud.provider.JCloudBlobStoreProvider;
-import org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration;
import org.testng.annotations.Test;
public class JCloudBlobStoreProviderTests {
@@ -105,4 +103,33 @@ public class JCloudBlobStoreProviderTests {
config = new TieredStorageConfiguration(map);
JCloudBlobStoreProvider.TRANSIENT.validate(config);
}
+
+ @Test()
+ public void s3ValidationTest() {
+ Map<String, String> map = new HashMap<>();
+ map.put("managedLedgerOffloadDriver", "S3");
+ map.put("managedLedgerOffloadServiceEndpoint", "http://s3.service");
+ map.put("managedLedgerOffloadBucket", "test-s3-bucket");
+ TieredStorageConfiguration configuration = new TieredStorageConfiguration(map);
+ configuration.getProvider().validate(configuration);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class,
+ expectedExceptionsMessageRegExp = "ServiceEndpoint must specified for S3 offload")
+ public void s3ValidationServiceEndpointMissed() {
+ Map<String, String> map = new HashMap<>();
+ map.put("managedLedgerOffloadDriver", "S3");
+ TieredStorageConfiguration configuration = new TieredStorageConfiguration(map);
+ configuration.getProvider().validate(configuration);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class,
+ expectedExceptionsMessageRegExp = "Bucket cannot be empty for S3 offload")
+ public void s3ValidationBucketMissed() {
+ Map<String, String> map = new HashMap<>();
+ map.put("managedLedgerOffloadDriver", "S3");
+ map.put("managedLedgerOffloadServiceEndpoint", "http://s3.service");
+ TieredStorageConfiguration configuration = new TieredStorageConfiguration(map);
+ configuration.getProvider().validate(configuration);
+ }
}
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfigurationTests.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfigurationTests.java
index f80f3ceaa1a..bf5e046bf70 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfigurationTests.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfigurationTests.java
@@ -22,6 +22,8 @@ import static org.testng.Assert.assertEquals;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
+
import org.jclouds.domain.Credentials;
import org.testng.annotations.Test;
@@ -205,4 +207,19 @@ public class TieredStorageConfigurationTests {
assertEquals(config.getMaxBlockSizeInBytes(), new Integer(12));
assertEquals(config.getReadBufferSizeInBytes(), new Integer(500));
}
+
+ @Test
+ public void overridePropertiesTest() {
+ Map<String, String> map = new HashMap<>();
+ map.put("s3ManagedLedgerOffloadServiceEndpoint", "http://localhost");
+ map.put("s3ManagedLedgerOffloadRegion", "my-region");
+ System.setProperty("jclouds.SystemPropertyA", "A");
+ System.setProperty("jclouds.region", "jclouds-region");
+ TieredStorageConfiguration config = new TieredStorageConfiguration(map);
+ Properties properties = config.getOverrides();
+ System.out.println(properties.toString());
+ assertEquals(properties.get("jclouds.region"), "jclouds-region");
+ assertEquals(config.getServiceEndpoint(), "http://localhost");
+ assertEquals(properties.get("jclouds.SystemPropertyA"), "A");
+ }
}