You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2020/10/06 08:22:07 UTC
[james-project] 03/06: JAMES-3028 Implement bucket name resolution
in the S3BlobStoreDAO
This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit d674a1edb2c75bd983f8c1c010e0d89602707c1b
Author: Rene Cordier <rc...@linagora.com>
AuthorDate: Mon Oct 5 14:05:13 2020 +0700
JAMES-3028 Implement bucket name resolution in the S3BlobStoreDAO
---
.../blob/objectstorage/aws/S3BlobStoreDAO.java | 56 +++++++++++++++-------
.../blob/objectstorage/aws/S3BlobStoreDAOTest.java | 9 +++-
.../aws/S3DeDuplicationBlobStoreTest.java | 10 +++-
...oughBlobStoreTest.java => S3NamespaceTest.java} | 20 +++++---
.../aws/S3PassThroughBlobStoreTest.java | 10 +++-
...toreTest.java => S3PrefixAndNamespaceTest.java} | 21 +++++---
...ThroughBlobStoreTest.java => S3PrefixTest.java} | 17 ++++---
7 files changed, 101 insertions(+), 42 deletions(-)
diff --git a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
index 1cbcceb..1f73847 100644
--- a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
+++ b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
@@ -87,21 +87,24 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable {
private static final int MAX_RETRIES = 5;
private final InstrumentedPool<S3AsyncClient> clientPool;
+ private final BucketNameResolver bucketNameResolver;
@Inject
- S3BlobStoreDAO(AwsS3AuthConfiguration configuration, Region region) {
+ S3BlobStoreDAO(S3BlobStoreConfiguration configuration) {
+ AwsS3AuthConfiguration authConfiguration = configuration.getSpecificAuthConfiguration();
+
S3Configuration pathStyleAccess = S3Configuration.builder()
.pathStyleAccessEnabled(true)
.build();
Callable<S3AsyncClient> clientCreator = () -> S3AsyncClient.builder()
.credentialsProvider(StaticCredentialsProvider.create(
- AwsBasicCredentials.create(configuration.getAccessKeyId(), configuration.getSecretKey())))
+ AwsBasicCredentials.create(authConfiguration.getAccessKeyId(), authConfiguration.getSecretKey())))
.httpClientBuilder(NettyNioAsyncHttpClient.builder()
.maxConcurrency(100)
.maxPendingConnectionAcquires(10_000))
- .endpointOverride(configuration.getEndpoint())
- .region(region.asAws())
+ .endpointOverride(authConfiguration.getEndpoint())
+ .region(configuration.getRegion().asAws())
.serviceConfiguration(pathStyleAccess)
.build();
@@ -111,6 +114,11 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable {
.maxPendingAcquireUnbounded()
.sizeUnbounded()
.fifo();
+
+ bucketNameResolver = BucketNameResolver.builder()
+ .prefix(configuration.getBucketPrefix())
+ .namespace(configuration.getNamespace())
+ .build();
}
public void start() {
@@ -125,10 +133,12 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable {
@Override
public InputStream read(BucketName bucketName, BlobId blobId) throws ObjectStoreIOException, ObjectNotFoundException {
- return getObject(bucketName, blobId)
+ BucketName resolvedBucketName = bucketNameResolver.resolve(bucketName);
+
+ return getObject(resolvedBucketName, blobId)
.map(response -> ReactorUtils.toInputStream(response.flux))
- .onErrorMap(NoSuchBucketException.class, e -> new ObjectNotFoundException("Bucket not found " + bucketName, e))
- .onErrorMap(NoSuchKeyException.class, e -> new ObjectNotFoundException("Blob not found " + bucketName, e))
+ .onErrorMap(NoSuchBucketException.class, e -> new ObjectNotFoundException("Bucket not found " + resolvedBucketName.asString(), e))
+ .onErrorMap(NoSuchKeyException.class, e -> new ObjectNotFoundException("Blob not found " + resolvedBucketName.asString(), e))
.block();
}
@@ -174,24 +184,28 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable {
@Override
public Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId) {
+ BucketName resolvedBucketName = bucketNameResolver.resolve(bucketName);
+
return clientPool.withPoolable(client -> Mono.fromFuture(() ->
client.getObject(
- builder -> builder.bucket(bucketName.asString()).key(blobId.asString()),
+ builder -> builder.bucket(resolvedBucketName.asString()).key(blobId.asString()),
AsyncResponseTransformer.toBytes())))
.next()
- .onErrorMap(NoSuchBucketException.class, e -> new ObjectNotFoundException("Bucket not found " + bucketName, e))
- .onErrorMap(NoSuchKeyException.class, e -> new ObjectNotFoundException("Blob not found " + bucketName, e))
+ .onErrorMap(NoSuchBucketException.class, e -> new ObjectNotFoundException("Bucket not found " + resolvedBucketName.asString(), e))
+ .onErrorMap(NoSuchKeyException.class, e -> new ObjectNotFoundException("Blob not found " + resolvedBucketName.asString(), e))
.map(BytesWrapper::asByteArray);
}
@Override
public Mono<Void> save(BucketName bucketName, BlobId blobId, byte[] data) {
+ BucketName resolvedBucketName = bucketNameResolver.resolve(bucketName);
+
return clientPool.withPoolable(client -> Mono.fromFuture(() ->
client.putObject(
- builder -> builder.bucket(bucketName.asString()).key(blobId.asString()).contentLength((long) data.length),
+ builder -> builder.bucket(resolvedBucketName.asString()).key(blobId.asString()).contentLength((long) data.length),
AsyncRequestBody.fromBytes(data))))
.next()
- .retryWhen(createBucketOnRetry(bucketName))
+ .retryWhen(createBucketOnRetry(resolvedBucketName))
.then();
}
@@ -215,18 +229,20 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable {
@Override
public Mono<Void> save(BucketName bucketName, BlobId blobId, ByteSource content) {
+ BucketName resolvedBucketName = bucketNameResolver.resolve(bucketName);
+
return Mono.using(content::openStream,
stream ->
clientPool.withPoolable(client -> Mono.fromFuture(() ->
client.putObject(
Throwing.<PutObjectRequest.Builder>consumer(
- builder -> builder.bucket(bucketName.asString()).contentLength(content.size()).key(blobId.asString()))
+ builder -> builder.bucket(resolvedBucketName.asString()).contentLength(content.size()).key(blobId.asString()))
.sneakyThrow(),
AsyncRequestBody.fromPublisher(
DataChunker.chunkStream(stream, CHUNK_SIZE))))).next(),
Throwing.consumer(InputStream::close),
LAZY)
- .retryWhen(createBucketOnRetry(bucketName))
+ .retryWhen(createBucketOnRetry(resolvedBucketName))
.onErrorMap(IOException.class, e -> new ObjectStoreIOException("Error saving blob", e))
.onErrorMap(SdkClientException.class, e -> new ObjectStoreIOException("Error saving blob", e))
.then();
@@ -244,8 +260,10 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable {
@Override
public Mono<Void> delete(BucketName bucketName, BlobId blobId) {
+ BucketName resolvedBucketName = bucketNameResolver.resolve(bucketName);
+
return clientPool.withPoolable(client -> Mono.fromFuture(() ->
- client.deleteObject(delete -> delete.bucket(bucketName.asString()).key(blobId.asString()))))
+ client.deleteObject(delete -> delete.bucket(resolvedBucketName.asString()).key(blobId.asString()))))
.next()
.then()
.onErrorResume(NoSuchBucketException.class, e -> Mono.empty());
@@ -253,6 +271,12 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable {
@Override
public Mono<Void> deleteBucket(BucketName bucketName) {
+ BucketName resolvedBucketName = bucketNameResolver.resolve(bucketName);
+
+ return deleteResolvedBucket(resolvedBucketName);
+ }
+
+ private Mono<Void> deleteResolvedBucket(BucketName bucketName) {
return emptyBucket(bucketName)
.onErrorResume(t -> Mono.just(bucketName))
.flatMap(ignore -> clientPool.withPoolable(client -> Mono.fromFuture(() ->
@@ -287,7 +311,7 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable {
public Mono<Void> deleteAllBuckets() {
return clientPool.withPoolable(client -> Mono.fromFuture(client::listBuckets)
.flatMapIterable(ListBucketsResponse::buckets)
- .flatMap(bucket -> deleteBucket(BucketName.of(bucket.name()))))
+ .flatMap(bucket -> deleteResolvedBucket(BucketName.of(bucket.name()))))
.then();
}
}
diff --git a/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAOTest.java b/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAOTest.java
index 5d41e9f..e426926 100644
--- a/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAOTest.java
+++ b/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAOTest.java
@@ -31,13 +31,18 @@ public class S3BlobStoreDAOTest implements BlobStoreDAOContract {
@BeforeAll
static void setUp(DockerAwsS3Container dockerAwsS3) {
- AwsS3AuthConfiguration configuration = AwsS3AuthConfiguration.builder()
+ AwsS3AuthConfiguration authConfiguration = AwsS3AuthConfiguration.builder()
.endpoint(dockerAwsS3.getEndpoint())
.accessKeyId(DockerAwsS3Container.ACCESS_KEY_ID)
.secretKey(DockerAwsS3Container.SECRET_ACCESS_KEY)
.build();
- testee = new S3BlobStoreDAO(configuration, dockerAwsS3.dockerAwsS3().region());
+ S3BlobStoreConfiguration s3Configuration = S3BlobStoreConfiguration.builder()
+ .authConfiguration(authConfiguration)
+ .region(dockerAwsS3.dockerAwsS3().region())
+ .build();
+
+ testee = new S3BlobStoreDAO(s3Configuration);
}
@AfterEach
diff --git a/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3DeDuplicationBlobStoreTest.java b/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3DeDuplicationBlobStoreTest.java
index f2915aa..2dabaa9 100644
--- a/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3DeDuplicationBlobStoreTest.java
+++ b/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3DeDuplicationBlobStoreTest.java
@@ -37,13 +37,19 @@ class S3DeDuplicationBlobStoreTest implements BlobStoreContract {
@BeforeAll
static void setUpClass(DockerAwsS3Container dockerAwsS3) {
- AwsS3AuthConfiguration configuration = AwsS3AuthConfiguration.builder()
+ AwsS3AuthConfiguration authConfiguration = AwsS3AuthConfiguration.builder()
.endpoint(dockerAwsS3.getEndpoint())
.accessKeyId(DockerAwsS3Container.ACCESS_KEY_ID)
.secretKey(DockerAwsS3Container.SECRET_ACCESS_KEY)
.build();
- s3BlobStoreDAO = new S3BlobStoreDAO(configuration, dockerAwsS3.dockerAwsS3().region());
+ S3BlobStoreConfiguration s3Configuration = S3BlobStoreConfiguration.builder()
+ .authConfiguration(authConfiguration)
+ .region(dockerAwsS3.dockerAwsS3().region())
+ .build();
+
+ s3BlobStoreDAO = new S3BlobStoreDAO(s3Configuration);
+
testee = BlobStoreFactory.builder()
.blobStoreDAO(s3BlobStoreDAO)
.blobIdFactory(new HashBlobId.Factory())
diff --git a/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3PassThroughBlobStoreTest.java b/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3NamespaceTest.java
similarity index 82%
copy from server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3PassThroughBlobStoreTest.java
copy to server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3NamespaceTest.java
index 6cd960b..3fdc0b3 100644
--- a/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3PassThroughBlobStoreTest.java
+++ b/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3NamespaceTest.java
@@ -22,6 +22,7 @@ package org.apache.james.blob.objectstorage.aws;
import org.apache.james.blob.api.BlobId;
import org.apache.james.blob.api.BlobStore;
import org.apache.james.blob.api.BlobStoreContract;
+import org.apache.james.blob.api.BucketName;
import org.apache.james.blob.api.HashBlobId;
import org.apache.james.server.blob.deduplication.BlobStoreFactory;
import org.junit.jupiter.api.AfterAll;
@@ -30,24 +31,30 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.extension.ExtendWith;
@ExtendWith(DockerAwsS3Extension.class)
-class S3PassThroughBlobStoreTest implements BlobStoreContract {
-
+class S3NamespaceTest implements BlobStoreContract {
private static BlobStore testee;
private static S3BlobStoreDAO s3BlobStoreDAO;
@BeforeAll
static void setUpClass(DockerAwsS3Container dockerAwsS3) {
- AwsS3AuthConfiguration configuration = AwsS3AuthConfiguration.builder()
+ AwsS3AuthConfiguration authConfiguration = AwsS3AuthConfiguration.builder()
.endpoint(dockerAwsS3.getEndpoint())
.accessKeyId(DockerAwsS3Container.ACCESS_KEY_ID)
.secretKey(DockerAwsS3Container.SECRET_ACCESS_KEY)
.build();
- s3BlobStoreDAO = new S3BlobStoreDAO(configuration, dockerAwsS3.dockerAwsS3().region());
+ S3BlobStoreConfiguration s3Configuration = S3BlobStoreConfiguration.builder()
+ .authConfiguration(authConfiguration)
+ .region(dockerAwsS3.dockerAwsS3().region())
+ .defaultBucketName(BucketName.of("namespace"))
+ .build();
+
+ s3BlobStoreDAO = new S3BlobStoreDAO(s3Configuration);
+
testee = BlobStoreFactory.builder()
.blobStoreDAO(s3BlobStoreDAO)
.blobIdFactory(new HashBlobId.Factory())
- .defaultBucketName()
+ .bucket(BucketName.of("namespace"))
.passthrough();
}
@@ -70,5 +77,4 @@ class S3PassThroughBlobStoreTest implements BlobStoreContract {
public BlobId.Factory blobIdFactory() {
return new HashBlobId.Factory();
}
-
-}
\ No newline at end of file
+}
diff --git a/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3PassThroughBlobStoreTest.java b/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3PassThroughBlobStoreTest.java
index 6cd960b..f535e44 100644
--- a/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3PassThroughBlobStoreTest.java
+++ b/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3PassThroughBlobStoreTest.java
@@ -37,13 +37,19 @@ class S3PassThroughBlobStoreTest implements BlobStoreContract {
@BeforeAll
static void setUpClass(DockerAwsS3Container dockerAwsS3) {
- AwsS3AuthConfiguration configuration = AwsS3AuthConfiguration.builder()
+ AwsS3AuthConfiguration authConfiguration = AwsS3AuthConfiguration.builder()
.endpoint(dockerAwsS3.getEndpoint())
.accessKeyId(DockerAwsS3Container.ACCESS_KEY_ID)
.secretKey(DockerAwsS3Container.SECRET_ACCESS_KEY)
.build();
- s3BlobStoreDAO = new S3BlobStoreDAO(configuration, dockerAwsS3.dockerAwsS3().region());
+ S3BlobStoreConfiguration s3Configuration = S3BlobStoreConfiguration.builder()
+ .authConfiguration(authConfiguration)
+ .region(dockerAwsS3.dockerAwsS3().region())
+ .build();
+
+ s3BlobStoreDAO = new S3BlobStoreDAO(s3Configuration);
+
testee = BlobStoreFactory.builder()
.blobStoreDAO(s3BlobStoreDAO)
.blobIdFactory(new HashBlobId.Factory())
diff --git a/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3DeDuplicationBlobStoreTest.java b/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3PrefixAndNamespaceTest.java
similarity index 80%
copy from server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3DeDuplicationBlobStoreTest.java
copy to server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3PrefixAndNamespaceTest.java
index f2915aa..bde3309 100644
--- a/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3DeDuplicationBlobStoreTest.java
+++ b/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3PrefixAndNamespaceTest.java
@@ -22,6 +22,7 @@ package org.apache.james.blob.objectstorage.aws;
import org.apache.james.blob.api.BlobId;
import org.apache.james.blob.api.BlobStore;
import org.apache.james.blob.api.BlobStoreContract;
+import org.apache.james.blob.api.BucketName;
import org.apache.james.blob.api.HashBlobId;
import org.apache.james.server.blob.deduplication.BlobStoreFactory;
import org.junit.jupiter.api.AfterAll;
@@ -30,24 +31,31 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.extension.ExtendWith;
@ExtendWith(DockerAwsS3Extension.class)
-class S3DeDuplicationBlobStoreTest implements BlobStoreContract {
-
+class S3PrefixAndNamespaceTest implements BlobStoreContract {
private static BlobStore testee;
private static S3BlobStoreDAO s3BlobStoreDAO;
@BeforeAll
static void setUpClass(DockerAwsS3Container dockerAwsS3) {
- AwsS3AuthConfiguration configuration = AwsS3AuthConfiguration.builder()
+ AwsS3AuthConfiguration authConfiguration = AwsS3AuthConfiguration.builder()
.endpoint(dockerAwsS3.getEndpoint())
.accessKeyId(DockerAwsS3Container.ACCESS_KEY_ID)
.secretKey(DockerAwsS3Container.SECRET_ACCESS_KEY)
.build();
- s3BlobStoreDAO = new S3BlobStoreDAO(configuration, dockerAwsS3.dockerAwsS3().region());
+ S3BlobStoreConfiguration s3Configuration = S3BlobStoreConfiguration.builder()
+ .authConfiguration(authConfiguration)
+ .region(dockerAwsS3.dockerAwsS3().region())
+ .defaultBucketName(BucketName.of("namespace"))
+ .bucketPrefix("prefix")
+ .build();
+
+ s3BlobStoreDAO = new S3BlobStoreDAO(s3Configuration);
+
testee = BlobStoreFactory.builder()
.blobStoreDAO(s3BlobStoreDAO)
.blobIdFactory(new HashBlobId.Factory())
- .defaultBucketName()
+ .bucket(BucketName.of("namespace"))
.deduplication();
}
@@ -70,5 +78,4 @@ class S3DeDuplicationBlobStoreTest implements BlobStoreContract {
public BlobId.Factory blobIdFactory() {
return new HashBlobId.Factory();
}
-
-}
\ No newline at end of file
+}
diff --git a/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3PassThroughBlobStoreTest.java b/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3PrefixTest.java
similarity index 85%
copy from server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3PassThroughBlobStoreTest.java
copy to server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3PrefixTest.java
index 6cd960b..b8bb257 100644
--- a/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3PassThroughBlobStoreTest.java
+++ b/server/blob/blob-s3/src/test/java/org/apache/james/blob/objectstorage/aws/S3PrefixTest.java
@@ -30,20 +30,26 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.extension.ExtendWith;
@ExtendWith(DockerAwsS3Extension.class)
-class S3PassThroughBlobStoreTest implements BlobStoreContract {
-
+class S3PrefixTest implements BlobStoreContract {
private static BlobStore testee;
private static S3BlobStoreDAO s3BlobStoreDAO;
@BeforeAll
static void setUpClass(DockerAwsS3Container dockerAwsS3) {
- AwsS3AuthConfiguration configuration = AwsS3AuthConfiguration.builder()
+ AwsS3AuthConfiguration authConfiguration = AwsS3AuthConfiguration.builder()
.endpoint(dockerAwsS3.getEndpoint())
.accessKeyId(DockerAwsS3Container.ACCESS_KEY_ID)
.secretKey(DockerAwsS3Container.SECRET_ACCESS_KEY)
.build();
- s3BlobStoreDAO = new S3BlobStoreDAO(configuration, dockerAwsS3.dockerAwsS3().region());
+ S3BlobStoreConfiguration s3Configuration = S3BlobStoreConfiguration.builder()
+ .authConfiguration(authConfiguration)
+ .region(dockerAwsS3.dockerAwsS3().region())
+ .bucketPrefix("prefix")
+ .build();
+
+ s3BlobStoreDAO = new S3BlobStoreDAO(s3Configuration);
+
testee = BlobStoreFactory.builder()
.blobStoreDAO(s3BlobStoreDAO)
.blobIdFactory(new HashBlobId.Factory())
@@ -70,5 +76,4 @@ class S3PassThroughBlobStoreTest implements BlobStoreContract {
public BlobId.Factory blobIdFactory() {
return new HashBlobId.Factory();
}
-
-}
\ No newline at end of file
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org