You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/05/07 08:33:12 UTC

[GitHub] sijie closed pull request #1723: Skeleton code for S3 offload

sijie closed pull request #1723: Skeleton code for S3 offload
URL: https://github.com/apache/incubator-pulsar/pull/1723
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/conf/broker.conf b/conf/broker.conf
index 94c7d6fb43..5d6cb4b0f9 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -471,6 +471,22 @@ functionsWorkerEnabled=false
 # Enable topic level metrics
 exposePublisherStats=true
 
+### --- Ledger Offloading --- ###
+
+# Driver to use to offload old data to long term storage (Possible values: S3)
+managedLedgerOffloadDriver=
+
+# Maximum number of thread pool threads for ledger offloading
+managedLedgerOffloadMaxThreads=2
+
+# For Amazon S3 ledger offload, AWS region
+s3ManagedLedgerOffloadRegion=
+
+# For Amazon S3 ledger offload, Bucket to place offloaded ledger into
+s3ManagedLedgerOffloadBucket=
+
+# For Amazon S3 ledger offload, Alternative endpoint to connect to (useful for testing)
+s3ManagedLedgerOffloadServiceEndpoint=
 
 ### --- Deprecated config variables --- ###
 
diff --git a/pom.xml b/pom.xml
index 26db5d3342..18c0c0e14e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -145,9 +145,11 @@ flexible messaging model and an intuitive client API.</description>
     <aerospike-client.version>4.1.5</aerospike-client.version>
     <kafka-client.version>0.10.2.1</kafka-client.version>
     <rabbitmq-client.version>5.1.1</rabbitmq-client.version>
+    <aws-sdk.version>1.11.297</aws-sdk.version>
 
     <!-- test dependencies -->
     <disruptor.version>3.4.0</disruptor.version>
+    <s3mock.version>0.2.5</s3mock.version>
   </properties>
 
   <dependencyManagement>
@@ -699,6 +701,14 @@ flexible messaging model and an intuitive client API.</description>
         <version>${sketches.version}</version>
       </dependency>
 
+      <dependency>
+        <groupId>com.amazonaws</groupId>
+        <artifactId>aws-java-sdk-bom</artifactId>
+        <version>${aws-sdk.version}</version>
+        <type>pom</type>
+        <scope>import</scope>
+      </dependency>
+
       <!-- use shaded dependency util pulsar bump zookeeper version to 3.5 -->
       <dependency>
         <groupId>org.apache.distributedlog</groupId>
@@ -713,6 +723,12 @@ flexible messaging model and an intuitive client API.</description>
         <version>${disruptor.version}</version>
       </dependency>
 
+      <dependency>
+        <groupId>io.findify</groupId>
+        <artifactId>s3mock_2.12</artifactId>
+        <version>${s3mock.version}</version>
+        <scope>test</scope>
+      </dependency>
     </dependencies>
   </dependencyManagement>
 
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 9a69fc31c8..43d929d58e 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -468,6 +468,22 @@
     // If true, export publisher stats when returning topics stats from the admin rest api
     private boolean exposePublisherStats = true;
 
+    /**** --- Ledger Offloading --- ****/
+    // Driver to use to offload old data to long term storage
+    private String managedLedgerOffloadDriver = null;
+
+    // Maximum number of thread pool threads for ledger offloading
+    private int managedLedgerOffloadMaxThreads = 2;
+
+    // For Amazon S3 ledger offload, AWS region
+    private String s3ManagedLedgerOffloadRegion = null;
+
+    // For Amazon S3 ledger offload, Bucket to place offloaded ledger into
+    private String s3ManagedLedgerOffloadBucket = null;
+
+    // For Amazon S3 ledger offload, Alternative endpoint to connect to (useful for testing)
+    private String s3ManagedLedgerOffloadServiceEndpoint = null;
+
     public String getZookeeperServers() {
         return zookeeperServers;
     }
@@ -1625,4 +1641,45 @@ public boolean isRunningStandalone() {
     public void setRunningStandalone(boolean isRunningStandalone) {
         this.isRunningStandalone = isRunningStandalone;
     }
+
+    /**** --- Ledger Offload ---- ****/
+    public void setManagedLedgerOffloadDriver(String driver) {
+        this.managedLedgerOffloadDriver = driver;
+    }
+
+    public String getManagedLedgerOffloadDriver() {
+        return this.managedLedgerOffloadDriver;
+    }
+
+    public void setManagedLedgerOffloadMaxThreads(int maxThreads) {
+        this.managedLedgerOffloadMaxThreads = maxThreads;
+    }
+
+    public int getManagedLedgerOffloadMaxThreads() {
+        return this.managedLedgerOffloadMaxThreads;
+    }
+
+    public void setS3ManagedLedgerOffloadRegion(String region) {
+        this.s3ManagedLedgerOffloadRegion = region;
+    }
+
+    public String getS3ManagedLedgerOffloadRegion() {
+        return this.s3ManagedLedgerOffloadRegion;
+    }
+
+    public void setS3ManagedLedgerOffloadBucket(String bucket) {
+        this.s3ManagedLedgerOffloadBucket = bucket;
+    }
+
+    public String getS3ManagedLedgerOffloadBucket() {
+        return this.s3ManagedLedgerOffloadBucket;
+    }
+
+    public void setS3ManagedLedgerOffloadServiceEndpoint(String endpoint) {
+        this.s3ManagedLedgerOffloadServiceEndpoint = endpoint;
+    }
+
+    public String getS3ManagedLedgerOffloadServiceEndpoint() {
+        return this.s3ManagedLedgerOffloadServiceEndpoint;
+    }
 }
diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index 0c9fe462f6..a830674bbe 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -137,6 +137,17 @@
       <scope>test</scope>
     </dependency>
 
+    <dependency>
+      <groupId>com.amazonaws</groupId>
+      <artifactId>aws-java-sdk-s3</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>io.findify</groupId>
+      <artifactId>s3mock_2.12</artifactId>
+      <scope>test</scope>
+    </dependency>
+
     <!-- functions related dependencies (begin) -->
 
     <dependency>
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 8d39fda30f..38d8e849a9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -62,6 +62,7 @@
 import org.apache.pulsar.broker.loadbalance.LoadSheddingTask;
 import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
 import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.broker.s3offload.S3ManagedLedgerOffloader;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
@@ -136,6 +137,8 @@
             .build();
     private final ScheduledExecutorService loadManagerExecutor;
     private ScheduledExecutorService compactorExecutor;
+    private ScheduledExecutorService offloaderScheduler;
+    private LedgerOffloader offloader;
     private ScheduledFuture<?> loadReportTask = null;
     private ScheduledFuture<?> loadSheddingTask = null;
     private ScheduledFuture<?> loadResourceQuotaTask = null;
@@ -259,6 +262,10 @@ public void close() throws PulsarServerException {
                 compactorExecutor.shutdown();
             }
 
+            if (offloaderScheduler != null) {
+                offloaderScheduler.shutdown();
+            }
+
             // executor is not initialized in mocks even when real close method is called
             // guard against null executors
             if (executor != null) {
@@ -327,6 +334,8 @@ public void start() throws PulsarServerException {
             // needs load management service
             this.startNamespaceService();
 
+            this.offloader = createManagedLedgerOffloader(this.getConfiguration());
+
             LOG.info("Starting Pulsar Broker service; version: '{}'", ( brokerVersion != null ? brokerVersion : "unknown" )  );
             brokerService.start();
 
@@ -638,7 +647,17 @@ public ManagedLedgerFactory getManagedLedgerFactory() {
     }
 
     public LedgerOffloader getManagedLedgerOffloader() {
-        return NullLedgerOffloader.INSTANCE;
+        return offloader;
+    }
+
+    public synchronized LedgerOffloader createManagedLedgerOffloader(ServiceConfiguration conf)
+            throws PulsarServerException {
+        if (conf.getManagedLedgerOffloadDriver() != null
+            && conf.getManagedLedgerOffloadDriver().equalsIgnoreCase(S3ManagedLedgerOffloader.DRIVER_NAME)) {
+            return new S3ManagedLedgerOffloader(conf, getOffloaderScheduler(conf));
+        } else {
+            return NullLedgerOffloader.INSTANCE;
+        }
     }
 
     public ZooKeeperCache getLocalZkCache() {
@@ -701,6 +720,15 @@ public synchronized Compactor getCompactor() throws PulsarServerException {
         return this.compactor;
     }
 
+    protected synchronized ScheduledExecutorService getOffloaderScheduler(ServiceConfiguration conf) {
+        if (this.offloaderScheduler == null) {
+            this.offloaderScheduler = Executors.newScheduledThreadPool(
+                    conf.getManagedLedgerOffloadMaxThreads(),
+                    new DefaultThreadFactory("offloader-"));
+        }
+        return this.offloaderScheduler;
+    }
+
     public synchronized PulsarClient getClient() throws PulsarServerException {
         if (this.client == null) {
             try {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
new file mode 100644
index 0000000000..c11fb3d9ec
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.s3offload;
+
+import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+
+import com.google.common.base.Strings;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.mledger.LedgerOffloader;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.ServiceConfiguration;
+
+public class S3ManagedLedgerOffloader implements LedgerOffloader {
+    public static final String DRIVER_NAME = "S3";
+    private final ScheduledExecutorService scheduler;
+    private final AmazonS3 s3client;
+    private final String bucket;
+
+    public S3ManagedLedgerOffloader(ServiceConfiguration conf,
+                                    ScheduledExecutorService scheduler)
+            throws PulsarServerException {
+        String region = conf.getS3ManagedLedgerOffloadRegion();
+        String bucket = conf.getS3ManagedLedgerOffloadBucket();
+        String endpoint = conf.getS3ManagedLedgerOffloadServiceEndpoint();
+        if (Strings.isNullOrEmpty(region)) {
+            throw new PulsarServerException("s3ManagedLedgerOffloadRegion cannot be empty is s3 offload enabled");
+        }
+        if (Strings.isNullOrEmpty(bucket)) {
+            throw new PulsarServerException("s3ManagedLedgerOffloadBucket cannot be empty is s3 offload enabled");
+        }
+
+        AmazonS3ClientBuilder builder = AmazonS3ClientBuilder.standard().withRegion(region);
+        if (!Strings.isNullOrEmpty(endpoint)) {
+            builder.setEndpointConfiguration(new EndpointConfiguration(endpoint, region));
+            builder.setPathStyleAccessEnabled(true);
+        }
+        s3client = builder.build();
+        this.bucket = bucket;
+        this.scheduler = scheduler;
+    }
+
+    @Override
+    public CompletableFuture<Void> offload(ReadHandle ledger,
+                                           UUID uid,
+                                           Map<String, String> extraMetadata) {
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        scheduler.submit(() -> {
+                try {
+                    s3client.putObject(bucket, uid.toString(), uid.toString());
+                    promise.complete(null);
+                } catch (Throwable t) {
+                    promise.completeExceptionally(t);
+                }
+            });
+        return promise;
+    }
+
+    @Override
+    public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uid) {
+        CompletableFuture<ReadHandle> promise = new CompletableFuture<>();
+        promise.completeExceptionally(new UnsupportedOperationException());
+        return promise;
+    }
+
+    @Override
+    public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid) {
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        promise.completeExceptionally(new UnsupportedOperationException());
+        return promise;
+    }
+}
+
+
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java
new file mode 100644
index 0000000000..4291a2c022
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.s3offload;
+
+import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+
+import io.findify.s3mock.S3Mock;
+import io.netty.util.concurrent.DefaultThreadFactory;
+
+import java.util.HashMap;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.MockBookKeeper;
+import org.apache.bookkeeper.client.api.DigestType;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.mledger.LedgerOffloader;
+
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+class S3ManagedLedgerOffloaderTest {
+
+    final ScheduledExecutorService scheduler;
+    final MockBookKeeper bk;
+    S3Mock s3mock = null;
+    String endpoint = null;
+
+    final static String REGION = "foobar";
+    final static String BUCKET = "foobar";
+
+    S3ManagedLedgerOffloaderTest() throws Exception {
+        scheduler = Executors.newScheduledThreadPool(1, new DefaultThreadFactory("offloader-"));
+        bk = new MockBookKeeper(MockedPulsarServiceBaseTest.createMockZooKeeper());
+     }
+
+    @BeforeMethod
+    public void start() throws Exception {
+        s3mock = new S3Mock.Builder().withPort(0).withInMemoryBackend().build();
+        int port = s3mock.start().localAddress().getPort();
+        endpoint = "http://localhost:" + port;
+
+        AmazonS3 client = AmazonS3ClientBuilder.standard()
+            .withRegion(REGION)
+            .withEndpointConfiguration(new EndpointConfiguration(endpoint, REGION))
+            .withPathStyleAccessEnabled(true).build();
+        client.createBucket(BUCKET);
+    }
+
+    @AfterMethod
+    public void stop() throws Exception {
+        if (s3mock != null) {
+            s3mock.shutdown();
+        }
+    }
+
+    private ReadHandle buildReadHandle() throws Exception {
+        LedgerHandle lh = bk.createLedger(1,1,1, BookKeeper.DigestType.CRC32, "foobar".getBytes());
+        lh.addEntry("foobar".getBytes());
+        lh.close();
+
+        ReadHandle readHandle = bk.newOpenLedgerOp().withLedgerId(lh.getId())
+            .withPassword("foobar".getBytes()).withDigestType(DigestType.CRC32).execute().get();
+        return lh;
+    }
+
+    @Test
+    public void testHappyCase() throws Exception {
+        ServiceConfiguration conf = new ServiceConfiguration();
+        conf.setManagedLedgerOffloadDriver(S3ManagedLedgerOffloader.DRIVER_NAME);
+        conf.setS3ManagedLedgerOffloadBucket(BUCKET);
+        conf.setS3ManagedLedgerOffloadRegion(REGION);
+        conf.setS3ManagedLedgerOffloadServiceEndpoint(endpoint);
+        LedgerOffloader offloader = new S3ManagedLedgerOffloader(conf, scheduler);
+
+        offloader.offload(buildReadHandle(), UUID.randomUUID(), new HashMap<>()).get();
+    }
+
+    @Test
+    public void testBucketDoesNotExist() throws Exception {
+        ServiceConfiguration conf = new ServiceConfiguration();
+        conf.setManagedLedgerOffloadDriver(S3ManagedLedgerOffloader.DRIVER_NAME);
+        conf.setS3ManagedLedgerOffloadBucket("no-bucket");
+        conf.setS3ManagedLedgerOffloadRegion(REGION);
+        conf.setS3ManagedLedgerOffloadServiceEndpoint(endpoint);
+        LedgerOffloader offloader = new S3ManagedLedgerOffloader(conf, scheduler);
+
+        try {
+            offloader.offload(buildReadHandle(), UUID.randomUUID(), new HashMap<>()).get();
+            Assert.fail("Shouldn't be able to add to bucket");
+        } catch (ExecutionException e) {
+            Assert.assertTrue(e.getMessage().contains("NoSuchBucket"));
+        }
+    }
+
+    @Test
+    public void testNoRegionConfigured() throws Exception {
+        ServiceConfiguration conf = new ServiceConfiguration();
+        conf.setManagedLedgerOffloadDriver(S3ManagedLedgerOffloader.DRIVER_NAME);
+        conf.setS3ManagedLedgerOffloadBucket(BUCKET);
+
+        try {
+            new S3ManagedLedgerOffloader(conf, scheduler);
+            Assert.fail("Should have thrown exception");
+        } catch (PulsarServerException pse) {
+            // correct
+        }
+    }
+
+    @Test
+    public void testNoBucketConfigured() throws Exception {
+        ServiceConfiguration conf = new ServiceConfiguration();
+        conf.setManagedLedgerOffloadDriver(S3ManagedLedgerOffloader.DRIVER_NAME);
+        conf.setS3ManagedLedgerOffloadRegion(REGION);
+
+        try {
+            new S3ManagedLedgerOffloader(conf, scheduler);
+            Assert.fail("Should have thrown exception");
+        } catch (PulsarServerException pse) {
+            // correct
+        }
+    }
+}
+


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services