You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/05/10 06:14:55 UTC
[incubator-pulsar] branch master updated: Inject AmazonS3 into
S3ManagedLedgerOffloader (#1755)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c7117a3 Inject AmazonS3 into S3ManagedLedgerOffloader (#1755)
c7117a3 is described below
commit c7117a3f045d8f33beca20de926d6641e99424e8
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Thu May 10 08:14:52 2018 +0200
Inject AmazonS3 into S3ManagedLedgerOffloader (#1755)
Rather than creating it directly in the offloader, inject the AmazonS3
object so that it can be mocked for testing.
Master Issue: #1511
---
.../org/apache/pulsar/broker/PulsarService.java | 2 +-
.../broker/s3offload/S3ManagedLedgerOffloader.java | 14 ++++++++----
.../s3offload/S3ManagedLedgerOffloaderTest.java | 26 +++++++++-------------
3 files changed, 22 insertions(+), 20 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 7b975f3..5f6dbca 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -653,7 +653,7 @@ public class PulsarService implements AutoCloseable {
throws PulsarServerException {
if (conf.getManagedLedgerOffloadDriver() != null
&& conf.getManagedLedgerOffloadDriver().equalsIgnoreCase(S3ManagedLedgerOffloader.DRIVER_NAME)) {
- return new S3ManagedLedgerOffloader(conf, getOffloaderScheduler(conf));
+ return S3ManagedLedgerOffloader.create(conf, getOffloaderScheduler(conf));
} else {
return NullLedgerOffloader.INSTANCE;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
index c11fb3d..163b79b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
@@ -40,8 +40,8 @@ public class S3ManagedLedgerOffloader implements LedgerOffloader {
private final AmazonS3 s3client;
private final String bucket;
- public S3ManagedLedgerOffloader(ServiceConfiguration conf,
- ScheduledExecutorService scheduler)
+ public static S3ManagedLedgerOffloader create(ServiceConfiguration conf,
+ ScheduledExecutorService scheduler)
throws PulsarServerException {
String region = conf.getS3ManagedLedgerOffloadRegion();
String bucket = conf.getS3ManagedLedgerOffloadBucket();
@@ -53,12 +53,18 @@ public class S3ManagedLedgerOffloader implements LedgerOffloader {
throw new PulsarServerException("s3ManagedLedgerOffloadBucket cannot be empty is s3 offload enabled");
}
- AmazonS3ClientBuilder builder = AmazonS3ClientBuilder.standard().withRegion(region);
+ AmazonS3ClientBuilder builder = AmazonS3ClientBuilder.standard();
if (!Strings.isNullOrEmpty(endpoint)) {
builder.setEndpointConfiguration(new EndpointConfiguration(endpoint, region));
builder.setPathStyleAccessEnabled(true);
+ } else {
+ builder.setRegion(region);
}
- s3client = builder.build();
+ return new S3ManagedLedgerOffloader(builder.build(), bucket, scheduler);
+ }
+
+ S3ManagedLedgerOffloader(AmazonS3 s3client, String bucket, ScheduledExecutorService scheduler) {
+ this.s3client = s3client;
this.bucket = bucket;
this.scheduler = scheduler;
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java
index 4291a2c..deeacd6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java
@@ -51,7 +51,8 @@ class S3ManagedLedgerOffloaderTest {
final ScheduledExecutorService scheduler;
final MockBookKeeper bk;
S3Mock s3mock = null;
- String endpoint = null;
+ AmazonS3 s3client = null;
+ String s3endpoint = null;
final static String REGION = "foobar";
final static String BUCKET = "foobar";
@@ -65,13 +66,13 @@ class S3ManagedLedgerOffloaderTest {
public void start() throws Exception {
s3mock = new S3Mock.Builder().withPort(0).withInMemoryBackend().build();
int port = s3mock.start().localAddress().getPort();
- endpoint = "http://localhost:" + port;
+ s3endpoint = "http://localhost:" + port;
- AmazonS3 client = AmazonS3ClientBuilder.standard()
+ s3client = AmazonS3ClientBuilder.standard()
.withRegion(REGION)
- .withEndpointConfiguration(new EndpointConfiguration(endpoint, REGION))
+ .withEndpointConfiguration(new EndpointConfiguration(s3endpoint, REGION))
.withPathStyleAccessEnabled(true).build();
- client.createBucket(BUCKET);
+ s3client.createBucket(BUCKET);
}
@AfterMethod
@@ -93,12 +94,7 @@ class S3ManagedLedgerOffloaderTest {
@Test
public void testHappyCase() throws Exception {
- ServiceConfiguration conf = new ServiceConfiguration();
- conf.setManagedLedgerOffloadDriver(S3ManagedLedgerOffloader.DRIVER_NAME);
- conf.setS3ManagedLedgerOffloadBucket(BUCKET);
- conf.setS3ManagedLedgerOffloadRegion(REGION);
- conf.setS3ManagedLedgerOffloadServiceEndpoint(endpoint);
- LedgerOffloader offloader = new S3ManagedLedgerOffloader(conf, scheduler);
+ LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, BUCKET, scheduler);
offloader.offload(buildReadHandle(), UUID.randomUUID(), new HashMap<>()).get();
}
@@ -108,9 +104,9 @@ class S3ManagedLedgerOffloaderTest {
ServiceConfiguration conf = new ServiceConfiguration();
conf.setManagedLedgerOffloadDriver(S3ManagedLedgerOffloader.DRIVER_NAME);
conf.setS3ManagedLedgerOffloadBucket("no-bucket");
+ conf.setS3ManagedLedgerOffloadServiceEndpoint(s3endpoint);
conf.setS3ManagedLedgerOffloadRegion(REGION);
- conf.setS3ManagedLedgerOffloadServiceEndpoint(endpoint);
- LedgerOffloader offloader = new S3ManagedLedgerOffloader(conf, scheduler);
+ LedgerOffloader offloader = S3ManagedLedgerOffloader.create(conf, scheduler);
try {
offloader.offload(buildReadHandle(), UUID.randomUUID(), new HashMap<>()).get();
@@ -127,7 +123,7 @@ class S3ManagedLedgerOffloaderTest {
conf.setS3ManagedLedgerOffloadBucket(BUCKET);
try {
- new S3ManagedLedgerOffloader(conf, scheduler);
+ S3ManagedLedgerOffloader.create(conf, scheduler);
Assert.fail("Should have thrown exception");
} catch (PulsarServerException pse) {
// correct
@@ -141,7 +137,7 @@ class S3ManagedLedgerOffloaderTest {
conf.setS3ManagedLedgerOffloadRegion(REGION);
try {
- new S3ManagedLedgerOffloader(conf, scheduler);
+ S3ManagedLedgerOffloader.create(conf, scheduler);
Assert.fail("Should have thrown exception");
} catch (PulsarServerException pse) {
// correct
--
To stop receiving notification emails like this one, please contact
sijie@apache.org.