You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ch...@apache.org on 2022/06/07 01:54:43 UTC

[pulsar] 01/05: [feat] [tiered-storage] Add pure S3 provider for the offloader (#15710)

This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 72629be0d9aac9e850030b18b8642ea591119245
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Wed May 25 15:37:33 2022 +0800

    [feat] [tiered-storage] Add pure S3 provider for the offloader (#15710)
    
    * [improve] [tiered-storage] Add pure S3 provider for the offloader
    ---
    
    *Motivation*
    
    There have some cloud storages are compatible with S3
    APIs, such as aliyun-oss. Some other storages also use
    the S3 APIs and want to offload the data into them, but
    we only support the AWS or the Aliyun.
    The PR https://github.com/apache/pulsar/pull/8985 provides
    the Aliyun offload provider, but it has a force limitation of
    the `S3Constants.PROPERTY_S3_VIRTUAL_HOST_BUCKETS`. That
    is not a limitation on other storage service which compatible
    with S3 APIs.
    This PR provides  a more general offload provider `S3` which uses
    pure JClouds S3 metadata and allows people to override the
    default JClouds properties through system properties.
    
    *Modifications*
    
    - Add the pure S3 offload provider
    
    (cherry picked from commit 047cb0e3117d55a79c0082c0dc3d2ab3c9bcd6f9)
---
 .../jcloud/provider/JCloudBlobStoreProvider.java   | 54 ++++++++++++++++------
 .../provider/TieredStorageConfiguration.java       | 13 ++++++
 .../provider/JCloudBlobStoreProviderTests.java     | 31 ++++++++++++-
 .../provider/TieredStorageConfigurationTests.java  | 17 +++++++
 4 files changed, 99 insertions(+), 16 deletions(-)

diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java
index 44aa92ce924..fc28c0291ce 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java
@@ -181,17 +181,34 @@ public enum JCloudBlobStoreProvider implements Serializable, ConfigValidation, B
     ALIYUN_OSS("aliyun-oss", new AnonymousProviderMetadata(new S3ApiMetadata(), "")) {
         @Override
         public void validate(TieredStorageConfiguration config) throws IllegalArgumentException {
-            ALIYUN_OSS_VALIDATION.validate(config);
+            S3_VALIDATION.validate(config);
         }
 
         @Override
         public BlobStore getBlobStore(TieredStorageConfiguration config) {
-            return ALIYUN_OSS_BLOB_STORE_BUILDER.getBlobStore(config);
+            return S3_BLOB_STORE_BUILDER.getBlobStore(config);
         }
 
         @Override
         public void buildCredentials(TieredStorageConfiguration config) {
-            ALIYUN_OSS_CREDENTIAL_BUILDER.buildCredentials(config);
+            S3_CREDENTIAL_BUILDER.buildCredentials(config);
+        }
+    },
+
+    S3("S3", new AnonymousProviderMetadata(new S3ApiMetadata(), "")) {
+        @Override
+        public BlobStore getBlobStore(TieredStorageConfiguration config) {
+            return S3_BLOB_STORE_BUILDER.getBlobStore(config);
+        }
+
+        @Override
+        public void buildCredentials(TieredStorageConfiguration config) {
+            S3_CREDENTIAL_BUILDER.buildCredentials(config);
+        }
+
+        @Override
+        public void validate(TieredStorageConfiguration config) throws IllegalArgumentException {
+            S3_VALIDATION.validate(config);
         }
     },
 
@@ -374,12 +391,14 @@ public enum JCloudBlobStoreProvider implements Serializable, ConfigValidation, B
         }
     };
 
-    static final BlobStoreBuilder ALIYUN_OSS_BLOB_STORE_BUILDER = (TieredStorageConfiguration config) -> {
+    static final BlobStoreBuilder S3_BLOB_STORE_BUILDER = (TieredStorageConfiguration config) -> {
         ContextBuilder contextBuilder = ContextBuilder.newBuilder(config.getProviderMetadata());
         contextBuilder.modules(Arrays.asList(new SLF4JLoggingModule()));
         Properties overrides = config.getOverrides();
-        // For security reasons, OSS supports only virtual hosted style access.
-        overrides.setProperty(S3Constants.PROPERTY_S3_VIRTUAL_HOST_BUCKETS, "true");
+        if (ALIYUN_OSS.getDriver().equals(config.getDriver())) {
+            // For security reasons, OSS supports only virtual hosted style access.
+            overrides.setProperty(S3Constants.PROPERTY_S3_VIRTUAL_HOST_BUCKETS, "true");
+        }
         contextBuilder.overrides(overrides);
         contextBuilder.endpoint(config.getServiceEndpoint());
 
@@ -396,7 +415,7 @@ public enum JCloudBlobStoreProvider implements Serializable, ConfigValidation, B
         }
     };
 
-    static final ConfigValidation ALIYUN_OSS_VALIDATION = (TieredStorageConfiguration config) -> {
+    static final ConfigValidation S3_VALIDATION = (TieredStorageConfiguration config) -> {
         if (Strings.isNullOrEmpty(config.getServiceEndpoint())) {
             throw new IllegalArgumentException(
                     "ServiceEndpoint must specified for " + config.getDriver() + " offload");
@@ -414,14 +433,21 @@ public enum JCloudBlobStoreProvider implements Serializable, ConfigValidation, B
         }
     };
 
-    static final CredentialBuilder ALIYUN_OSS_CREDENTIAL_BUILDER = (TieredStorageConfiguration config) -> {
-        String accountName = System.getenv("ALIYUN_OSS_ACCESS_KEY_ID");
-        if (StringUtils.isEmpty(accountName)) {
-            throw new IllegalArgumentException("Couldn't get the aliyun oss access key id.");
+    static final CredentialBuilder S3_CREDENTIAL_BUILDER = (TieredStorageConfiguration config) -> {
+        String accountName = System.getenv().getOrDefault("ACCESS_KEY_ID", "");
+        // For forward compatibility
+        if (StringUtils.isEmpty(accountName.trim())) {
+            accountName = System.getenv().getOrDefault("ALIYUN_OSS_ACCESS_KEY_ID", "");
+        }
+        if (StringUtils.isEmpty(accountName.trim())) {
+            throw new IllegalArgumentException("Couldn't get the access key id.");
+        }
+        String accountKey = System.getenv().getOrDefault("ACCESS_KEY_ID", "");
+        if (StringUtils.isEmpty(accountKey.trim())) {
+            accountKey = System.getenv().getOrDefault("ALIYUN_OSS_ACCESS_KEY_SECRET", "");
         }
-        String accountKey = System.getenv("ALIYUN_OSS_ACCESS_KEY_SECRET");
-        if (StringUtils.isEmpty(accountKey)) {
-            throw new IllegalArgumentException("Couldn't get the aliyun oss access key secret.");
+        if (StringUtils.isEmpty(accountKey.trim())) {
+            throw new IllegalArgumentException("Couldn't get the access key secret.");
         }
         Credentials credentials = new Credentials(
                 accountName, accountKey);
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java
index 442980ad336..dfca32eff11 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java
@@ -329,6 +329,19 @@ public class TieredStorageConfiguration {
             overrides.setProperty(S3Constants.PROPERTY_S3_VIRTUAL_HOST_BUCKETS, "false");
         }
 
+        // load more jclouds properties into the overrides
+        System.getProperties().entrySet().stream()
+            .filter(p -> p.getKey().toString().startsWith("jclouds"))
+            .forEach(jcloudsProp -> {
+                overrides.setProperty(jcloudsProp.getKey().toString(), jcloudsProp.getValue().toString());
+            });
+
+        System.getenv().entrySet().stream()
+            .filter(p -> p.getKey().toString().startsWith("jclouds"))
+            .forEach(jcloudsProp -> {
+                overrides.setProperty(jcloudsProp.getKey().toString(), jcloudsProp.getValue().toString());
+            });
+
         log.info("getOverrides: {}", overrides.toString());
         return overrides;
     }
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProviderTests.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProviderTests.java
index 28e5829ba2a..4f0c60bc007 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProviderTests.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProviderTests.java
@@ -23,8 +23,6 @@ import static org.testng.Assert.assertEquals;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.bookkeeper.mledger.offload.jcloud.provider.JCloudBlobStoreProvider;
-import org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration;
 import org.testng.annotations.Test;
 
 public class JCloudBlobStoreProviderTests {
@@ -105,4 +103,33 @@ public class JCloudBlobStoreProviderTests {
         config = new TieredStorageConfiguration(map);
         JCloudBlobStoreProvider.TRANSIENT.validate(config);
     }
+
+    @Test()
+    public void s3ValidationTest() {
+        Map<String, String> map = new HashMap<>();
+        map.put("managedLedgerOffloadDriver", "S3");
+        map.put("managedLedgerOffloadServiceEndpoint", "http://s3.service");
+        map.put("managedLedgerOffloadBucket", "test-s3-bucket");
+        TieredStorageConfiguration configuration = new TieredStorageConfiguration(map);
+        configuration.getProvider().validate(configuration);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class,
+        expectedExceptionsMessageRegExp = "ServiceEndpoint must specified for S3 offload")
+    public void s3ValidationServiceEndpointMissed() {
+        Map<String, String> map = new HashMap<>();
+        map.put("managedLedgerOffloadDriver", "S3");
+        TieredStorageConfiguration configuration = new TieredStorageConfiguration(map);
+        configuration.getProvider().validate(configuration);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class,
+        expectedExceptionsMessageRegExp = "Bucket cannot be empty for S3 offload")
+    public void s3ValidationBucketMissed() {
+        Map<String, String> map = new HashMap<>();
+        map.put("managedLedgerOffloadDriver", "S3");
+        map.put("managedLedgerOffloadServiceEndpoint", "http://s3.service");
+        TieredStorageConfiguration configuration = new TieredStorageConfiguration(map);
+        configuration.getProvider().validate(configuration);
+    }
 }
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfigurationTests.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfigurationTests.java
index f80f3ceaa1a..bf5e046bf70 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfigurationTests.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfigurationTests.java
@@ -22,6 +22,8 @@ import static org.testng.Assert.assertEquals;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
+
 import org.jclouds.domain.Credentials;
 import org.testng.annotations.Test;
 
@@ -205,4 +207,19 @@ public class TieredStorageConfigurationTests {
         assertEquals(config.getMaxBlockSizeInBytes(), new Integer(12));
         assertEquals(config.getReadBufferSizeInBytes(), new Integer(500));
     }
+
+    @Test
+    public void overridePropertiesTest() {
+        Map<String, String> map = new HashMap<>();
+        map.put("s3ManagedLedgerOffloadServiceEndpoint", "http://localhost");
+        map.put("s3ManagedLedgerOffloadRegion", "my-region");
+        System.setProperty("jclouds.SystemPropertyA", "A");
+        System.setProperty("jclouds.region", "jclouds-region");
+        TieredStorageConfiguration config = new TieredStorageConfiguration(map);
+        Properties properties = config.getOverrides();
+        System.out.println(properties.toString());
+        assertEquals(properties.get("jclouds.region"), "jclouds-region");
+        assertEquals(config.getServiceEndpoint(), "http://localhost");
+        assertEquals(properties.get("jclouds.SystemPropertyA"), "A");
+    }
 }