You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/05/10 06:14:55 UTC

[incubator-pulsar] branch master updated: Inject AmazonS3 into S3ManagedLedgerOffloader (#1755)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c7117a3  Inject AmazonS3 into S3ManagedLedgerOffloader (#1755)
c7117a3 is described below

commit c7117a3f045d8f33beca20de926d6641e99424e8
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Thu May 10 08:14:52 2018 +0200

    Inject AmazonS3 into S3ManagedLedgerOffloader (#1755)
    
    Rather than creating it directly in the offloader, inject the AmazonS3
    object so that it can be mocked for testing.
    
    Master Issue: #1511
---
 .../org/apache/pulsar/broker/PulsarService.java    |  2 +-
 .../broker/s3offload/S3ManagedLedgerOffloader.java | 14 ++++++++----
 .../s3offload/S3ManagedLedgerOffloaderTest.java    | 26 +++++++++-------------
 3 files changed, 22 insertions(+), 20 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 7b975f3..5f6dbca 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -653,7 +653,7 @@ public class PulsarService implements AutoCloseable {
             throws PulsarServerException {
         if (conf.getManagedLedgerOffloadDriver() != null
             && conf.getManagedLedgerOffloadDriver().equalsIgnoreCase(S3ManagedLedgerOffloader.DRIVER_NAME)) {
-            return new S3ManagedLedgerOffloader(conf, getOffloaderScheduler(conf));
+            return S3ManagedLedgerOffloader.create(conf, getOffloaderScheduler(conf));
         } else {
             return NullLedgerOffloader.INSTANCE;
         }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
index c11fb3d..163b79b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
@@ -40,8 +40,8 @@ public class S3ManagedLedgerOffloader implements LedgerOffloader {
     private final AmazonS3 s3client;
     private final String bucket;
 
-    public S3ManagedLedgerOffloader(ServiceConfiguration conf,
-                                    ScheduledExecutorService scheduler)
+    public static S3ManagedLedgerOffloader create(ServiceConfiguration conf,
+                                                  ScheduledExecutorService scheduler)
             throws PulsarServerException {
         String region = conf.getS3ManagedLedgerOffloadRegion();
         String bucket = conf.getS3ManagedLedgerOffloadBucket();
@@ -53,12 +53,18 @@ public class S3ManagedLedgerOffloader implements LedgerOffloader {
             throw new PulsarServerException("s3ManagedLedgerOffloadBucket cannot be empty is s3 offload enabled");
         }
 
-        AmazonS3ClientBuilder builder = AmazonS3ClientBuilder.standard().withRegion(region);
+        AmazonS3ClientBuilder builder = AmazonS3ClientBuilder.standard();
         if (!Strings.isNullOrEmpty(endpoint)) {
             builder.setEndpointConfiguration(new EndpointConfiguration(endpoint, region));
             builder.setPathStyleAccessEnabled(true);
+        } else {
+            builder.setRegion(region);
         }
-        s3client = builder.build();
+        return new S3ManagedLedgerOffloader(builder.build(), bucket, scheduler);
+    }
+
+    S3ManagedLedgerOffloader(AmazonS3 s3client, String bucket, ScheduledExecutorService scheduler) {
+        this.s3client = s3client;
         this.bucket = bucket;
         this.scheduler = scheduler;
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java
index 4291a2c..deeacd6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java
@@ -51,7 +51,8 @@ class S3ManagedLedgerOffloaderTest {
     final ScheduledExecutorService scheduler;
     final MockBookKeeper bk;
     S3Mock s3mock = null;
-    String endpoint = null;
+    AmazonS3 s3client = null;
+    String s3endpoint = null;
 
     final static String REGION = "foobar";
     final static String BUCKET = "foobar";
@@ -65,13 +66,13 @@ class S3ManagedLedgerOffloaderTest {
     public void start() throws Exception {
         s3mock = new S3Mock.Builder().withPort(0).withInMemoryBackend().build();
         int port = s3mock.start().localAddress().getPort();
-        endpoint = "http://localhost:" + port;
+        s3endpoint = "http://localhost:" + port;
 
-        AmazonS3 client = AmazonS3ClientBuilder.standard()
+        s3client = AmazonS3ClientBuilder.standard()
             .withRegion(REGION)
-            .withEndpointConfiguration(new EndpointConfiguration(endpoint, REGION))
+            .withEndpointConfiguration(new EndpointConfiguration(s3endpoint, REGION))
             .withPathStyleAccessEnabled(true).build();
-        client.createBucket(BUCKET);
+        s3client.createBucket(BUCKET);
     }
 
     @AfterMethod
@@ -93,12 +94,7 @@ class S3ManagedLedgerOffloaderTest {
 
     @Test
     public void testHappyCase() throws Exception {
-        ServiceConfiguration conf = new ServiceConfiguration();
-        conf.setManagedLedgerOffloadDriver(S3ManagedLedgerOffloader.DRIVER_NAME);
-        conf.setS3ManagedLedgerOffloadBucket(BUCKET);
-        conf.setS3ManagedLedgerOffloadRegion(REGION);
-        conf.setS3ManagedLedgerOffloadServiceEndpoint(endpoint);
-        LedgerOffloader offloader = new S3ManagedLedgerOffloader(conf, scheduler);
+        LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, BUCKET, scheduler);
 
         offloader.offload(buildReadHandle(), UUID.randomUUID(), new HashMap<>()).get();
     }
@@ -108,9 +104,9 @@ class S3ManagedLedgerOffloaderTest {
         ServiceConfiguration conf = new ServiceConfiguration();
         conf.setManagedLedgerOffloadDriver(S3ManagedLedgerOffloader.DRIVER_NAME);
         conf.setS3ManagedLedgerOffloadBucket("no-bucket");
+        conf.setS3ManagedLedgerOffloadServiceEndpoint(s3endpoint);
         conf.setS3ManagedLedgerOffloadRegion(REGION);
-        conf.setS3ManagedLedgerOffloadServiceEndpoint(endpoint);
-        LedgerOffloader offloader = new S3ManagedLedgerOffloader(conf, scheduler);
+        LedgerOffloader offloader = S3ManagedLedgerOffloader.create(conf, scheduler);
 
         try {
             offloader.offload(buildReadHandle(), UUID.randomUUID(), new HashMap<>()).get();
@@ -127,7 +123,7 @@ class S3ManagedLedgerOffloaderTest {
         conf.setS3ManagedLedgerOffloadBucket(BUCKET);
 
         try {
-            new S3ManagedLedgerOffloader(conf, scheduler);
+            S3ManagedLedgerOffloader.create(conf, scheduler);
             Assert.fail("Should have thrown exception");
         } catch (PulsarServerException pse) {
             // correct
@@ -141,7 +137,7 @@ class S3ManagedLedgerOffloaderTest {
         conf.setS3ManagedLedgerOffloadRegion(REGION);
 
         try {
-            new S3ManagedLedgerOffloader(conf, scheduler);
+            S3ManagedLedgerOffloader.create(conf, scheduler);
             Assert.fail("Should have thrown exception");
         } catch (PulsarServerException pse) {
             // correct

-- 
To stop receiving notification emails like this one, please contact
sijie@apache.org.