You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2021/05/21 01:51:04 UTC

[james-project] branch master updated (10cdbb0 -> 5e52bc4)

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git.


    from 10cdbb0  JAMES-3586 Use LOCAL_ONE for optimistic consistency downgrades
     new 9c9d575  JAMES-3028 S3Client should not be pooled
     new 5e52bc4  JAMES-3028 Allow setting up S3 HTTP concurrency at the Netty level

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../pages/distributed/configure/blobstore.adoc     |  3 +
 .../aws/S3BlobStoreConfiguration.java              | 24 +++++++-
 .../blob/objectstorage/aws/S3BlobStoreDAO.java     | 71 +++++++---------------
 .../S3BlobStoreConfigurationReader.java            |  3 +
 .../modules/objectstorage/S3BlobStoreModule.java   | 12 ----
 src/site/xdoc/server/config-blobstore.xml          |  3 +
 6 files changed, 53 insertions(+), 63 deletions(-)

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 02/02: JAMES-3028 Allow setting up S3 HTTP concurrency at the Netty level

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 5e52bc456ca12060d4342600466f1051605c3fd0
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue May 18 08:00:15 2021 +0700

    JAMES-3028 Allow setting up S3 HTTP concurrency at the Netty level
---
 .../pages/distributed/configure/blobstore.adoc     |  3 +++
 .../aws/S3BlobStoreConfiguration.java              | 24 +++++++++++++++++++---
 .../blob/objectstorage/aws/S3BlobStoreDAO.java     |  2 +-
 .../S3BlobStoreConfigurationReader.java            |  3 +++
 src/site/xdoc/server/config-blobstore.xml          |  3 +++
 5 files changed, 31 insertions(+), 4 deletions(-)

diff --git a/docs/modules/servers/pages/distributed/configure/blobstore.adoc b/docs/modules/servers/pages/distributed/configure/blobstore.adoc
index c256b13..5611040 100644
--- a/docs/modules/servers/pages/distributed/configure/blobstore.adoc
+++ b/docs/modules/servers/pages/distributed/configure/blobstore.adoc
@@ -109,6 +109,9 @@ Maximum size of stored objects expressed in bytes.
 
 | objectstorage.s3.secretKey
 | https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html#access-keys-and-secret-access-keys[S3 access key secret]
+
+| objectstorage.s3.http.concurrency
+| Allow setting the number of concurrent HTTP requests allowed by the Netty driver.
 |===
 
 ==== Buckets Configuration
diff --git a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreConfiguration.java b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreConfiguration.java
index d0a1b7a..a456d4a 100644
--- a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreConfiguration.java
+++ b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreConfiguration.java
@@ -51,6 +51,7 @@ public class S3BlobStoreConfiguration {
 
             private Optional<BucketName> defaultBucketName;
             private Optional<String> bucketPrefix;
+            private Optional<Integer> httpConcurrency;
             private Region region;
 
             public ReadyToBuild(AwsS3AuthConfiguration specificAuthConfiguration, Region region) {
@@ -58,6 +59,7 @@ public class S3BlobStoreConfiguration {
                 this.region = region;
                 this.defaultBucketName = Optional.empty();
                 this.bucketPrefix = Optional.empty();
+                this.httpConcurrency = Optional.empty();
             }
 
             public ReadyToBuild defaultBucketName(Optional<BucketName> defaultBucketName) {
@@ -80,27 +82,37 @@ public class S3BlobStoreConfiguration {
                 return this;
             }
 
+            public ReadyToBuild httpConcurrency(Optional<Integer> httpConcurrency) {
+                this.httpConcurrency = httpConcurrency;
+                return this;
+            }
+
             public S3BlobStoreConfiguration build() {
-                return new S3BlobStoreConfiguration(bucketPrefix, defaultBucketName, region, specificAuthConfiguration);
+                return new S3BlobStoreConfiguration(bucketPrefix, defaultBucketName, region, specificAuthConfiguration, httpConcurrency.orElse(DEFAULT_HTTP_CONCURRENCY));
             }
         }
 
     }
 
+    public static int DEFAULT_HTTP_CONCURRENCY = 100;
+
     private final Region region;
     private final AwsS3AuthConfiguration specificAuthConfiguration;
     private final Optional<BucketName> namespace;
     private final Optional<String> bucketPrefix;
+    private final int httpConcurrency;
 
     @VisibleForTesting
     S3BlobStoreConfiguration(Optional<String> bucketPrefix,
                              Optional<BucketName> namespace,
                              Region region,
-                             AwsS3AuthConfiguration specificAuthConfiguration) {
+                             AwsS3AuthConfiguration specificAuthConfiguration,
+                             int httpConcurrency) {
         this.bucketPrefix = bucketPrefix;
         this.namespace = namespace;
         this.region = region;
         this.specificAuthConfiguration = specificAuthConfiguration;
+        this.httpConcurrency = httpConcurrency;
     }
 
     public Optional<BucketName> getNamespace() {
@@ -115,6 +127,10 @@ public class S3BlobStoreConfiguration {
         return bucketPrefix;
     }
 
+    public int getHttpConcurrency() {
+        return httpConcurrency;
+    }
+
     public Region getRegion() {
         return region;
     }
@@ -127,6 +143,7 @@ public class S3BlobStoreConfiguration {
             return Objects.equals(this.namespace, that.namespace)
                 && Objects.equals(this.bucketPrefix, that.bucketPrefix)
                 && Objects.equals(this.region, that.region)
+                && Objects.equals(this.httpConcurrency, that.httpConcurrency)
                 && Objects.equals(this.specificAuthConfiguration, that.specificAuthConfiguration);
         }
         return false;
@@ -134,13 +151,14 @@ public class S3BlobStoreConfiguration {
 
     @Override
     public final int hashCode() {
-        return Objects.hash(namespace, bucketPrefix, specificAuthConfiguration);
+        return Objects.hash(namespace, bucketPrefix, httpConcurrency, specificAuthConfiguration);
     }
 
     @Override
     public String toString() {
         return MoreObjects.toStringHelper(this)
             .add("namespace", namespace)
+            .add("httpConcurrency", httpConcurrency)
             .add("bucketPrefix", bucketPrefix)
             .add("region", region)
             .add("specificAuthConfiguration", specificAuthConfiguration)
diff --git a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
index d228aee..0a12fea 100644
--- a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
+++ b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
@@ -97,7 +97,7 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable {
             .credentialsProvider(StaticCredentialsProvider.create(
                 AwsBasicCredentials.create(authConfiguration.getAccessKeyId(), authConfiguration.getSecretKey())))
             .httpClientBuilder(NettyNioAsyncHttpClient.builder()
-                .maxConcurrency(100)
+                .maxConcurrency(configuration.getHttpConcurrency())
                 .maxPendingConnectionAcquires(10_000))
             .endpointOverride(authConfiguration.getEndpoint())
             .region(configuration.getRegion().asAws())
diff --git a/server/container/guice/blob/s3/src/main/java/org/apache/james/modules/objectstorage/S3BlobStoreConfigurationReader.java b/server/container/guice/blob/s3/src/main/java/org/apache/james/modules/objectstorage/S3BlobStoreConfigurationReader.java
index 418c7a7..44b74f6 100644
--- a/server/container/guice/blob/s3/src/main/java/org/apache/james/modules/objectstorage/S3BlobStoreConfigurationReader.java
+++ b/server/container/guice/blob/s3/src/main/java/org/apache/james/modules/objectstorage/S3BlobStoreConfigurationReader.java
@@ -33,8 +33,10 @@ public class S3BlobStoreConfigurationReader {
     private static final String OBJECTSTORAGE_NAMESPACE = "objectstorage.namespace";
     private static final String OBJECTSTORAGE_BUCKET_PREFIX = "objectstorage.bucketPrefix";
     private static final String OBJECTSTORAGE_S3_REGION = "objectstorage.s3.region";
+    private static final String OBJECTSTORAGE_S3_HTTP_CONCURRENCY = "objectstorage.s3.http.concurrency";
 
     public static S3BlobStoreConfiguration from(Configuration configuration) throws ConfigurationException {
+        Optional<Integer> httpConcurrency = Optional.ofNullable(configuration.getInteger(OBJECTSTORAGE_S3_HTTP_CONCURRENCY, null));
         Optional<String> namespace = Optional.ofNullable(configuration.getString(OBJECTSTORAGE_NAMESPACE, null));
         Optional<String> bucketPrefix = Optional.ofNullable(configuration.getString(OBJECTSTORAGE_BUCKET_PREFIX, null));
         Region region = Optional.ofNullable(configuration.getString(OBJECTSTORAGE_S3_REGION, null))
@@ -46,6 +48,7 @@ public class S3BlobStoreConfigurationReader {
             .region(region)
             .defaultBucketName(namespace.map(BucketName::of))
             .bucketPrefix(bucketPrefix)
+            .httpConcurrency(httpConcurrency)
             .build();
     }
 
diff --git a/src/site/xdoc/server/config-blobstore.xml b/src/site/xdoc/server/config-blobstore.xml
index 15ae7b2..513f8ca 100644
--- a/src/site/xdoc/server/config-blobstore.xml
+++ b/src/site/xdoc/server/config-blobstore.xml
@@ -149,6 +149,9 @@ generate salt with : openssl rand -hex 16
 
                         <dt><strong>objectstorage.s3.secretKey</strong></dt>
                         <dd><a href="https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html#access-keys-and-secret-access-keys">S3 access key secret</a></dd>
+
+                        <dt><strong>objectstorage.s3.http.concurrency</strong></dt>
+                        <dd>Allow setting the number of concurrent HTTP requests allowed by the Netty driver.</dd>
                     </dl>
                 </subsection>
             </subsection>

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 01/02: JAMES-3028 S3Client should not be pooled

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 9c9d5754d8fd33e0483d67397dfa5d55c9daee74
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue May 18 07:50:01 2021 +0700

    JAMES-3028 S3Client should not be pooled
    
    Running above 900 req/s on a 3 James setup I encounter the following
    exception:
    
    ```
    software.amazon.awssdk.core.exception.SdkClientException: Unable to execute HTTP request: Channel was closed before it could be written to.
      at software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:98)
      at software.amazon.awssdk.core.exception.SdkClientException.create(SdkClientException.java:43)
      at software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:198)
      at software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:194)
      at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeRetryExecute(AsyncRetryableStage.java:143)
      at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.lambda$attemptExecute$1(AsyncRetryableStage.java:125)
      at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
      at software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo$0(CompletableFutureUtils.java:74)
      at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
      at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$null$0(MakeAsyncHttpRequestStage.java:104)
      at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
      at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$executeHttpRequest$3(MakeAsyncHttpRequestStage.java:209)
      at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
      at java.base/java.lang.Thread.run(Thread.java:829)
    Caused by: java.io.IOException: Channel was closed before it could be written to.
      at software.amazon.awssdk.http.nio.netty.internal.NettyRequestExecutor.tryConfigurePipeline(NettyRequestExecutor.java:220)
      at software.amazon.awssdk.http.nio.netty.internal.NettyRequestExecutor.makeRequestListener(NettyRequestExecutor.java:168)
      at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
      at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552)
      at io.netty.util.concurrent.DefaultPromise.access$200(DefaultPromise.java:35)
      at io.netty.util.concurrent.DefaultPromise$1.run(DefaultPromise.java:502)
      at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
      at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
      at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:497)
      at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
      at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
      ... 1 common frames omitted
    ```
    
    Removing pooling yield massive improvements for blob appends. Sending
    emails with setMessages -50% mean time, -33% p99. I could reach
    throughput of 1100+ req/s without exceptions.
---
 .../blob/objectstorage/aws/S3BlobStoreDAO.java     | 69 +++++++---------------
 .../modules/objectstorage/S3BlobStoreModule.java   | 12 ----
 2 files changed, 22 insertions(+), 59 deletions(-)

diff --git a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
index 37f2810..d228aee 100644
--- a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
+++ b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
@@ -27,7 +27,6 @@ import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.time.Duration;
 import java.util.List;
-import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 
 import javax.annotation.PreDestroy;
@@ -52,9 +51,6 @@ import com.google.common.io.FileBackedOutputStream;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Schedulers;
-import reactor.pool.InstrumentedPool;
-import reactor.pool.PoolBuilder;
 import reactor.util.retry.RetryBackoffSpec;
 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
 import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
@@ -83,12 +79,11 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable {
     private static final int EMPTY_BUCKET_BATCH_SIZE = 1000;
     private static final int FILE_THRESHOLD = 1024 * 100;
     private static final Duration FIRST_BACK_OFF = Duration.ofMillis(100);
-    private static final Duration FOREVER = Duration.ofMillis(Long.MAX_VALUE);
     private static final boolean LAZY = false;
     private static final int MAX_RETRIES = 5;
 
-    private final InstrumentedPool<S3AsyncClient> clientPool;
     private final BucketNameResolver bucketNameResolver;
+    private final S3AsyncClient client;
 
     @Inject
     S3BlobStoreDAO(S3BlobStoreConfiguration configuration) {
@@ -98,7 +93,7 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable {
             .pathStyleAccessEnabled(true)
             .build();
 
-        Callable<S3AsyncClient> clientCreator = () -> S3AsyncClient.builder()
+        client = S3AsyncClient.builder()
             .credentialsProvider(StaticCredentialsProvider.create(
                 AwsBasicCredentials.create(authConfiguration.getAccessKeyId(), authConfiguration.getSecretKey())))
             .httpClientBuilder(NettyNioAsyncHttpClient.builder()
@@ -109,27 +104,16 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable {
             .serviceConfiguration(pathStyleAccess)
             .build();
 
-        clientPool = PoolBuilder.from(Mono.fromCallable(clientCreator))
-            .acquisitionScheduler(Schedulers.elastic())
-            .destroyHandler(client -> Mono.fromRunnable(client::close))
-            .maxPendingAcquireUnbounded()
-            .sizeUnbounded()
-            .fifo();
-
         bucketNameResolver = BucketNameResolver.builder()
             .prefix(configuration.getBucketPrefix())
             .namespace(configuration.getNamespace())
             .build();
     }
 
-    public void start() {
-        clientPool.warmup().block();
-    }
-
     @Override
     @PreDestroy
     public void close() {
-        clientPool.dispose();
+        client.close();
     }
 
     @Override
@@ -150,7 +134,7 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable {
     }
 
     private Mono<FluxResponse> getObject(BucketName bucketName, BlobId blobId) {
-        return clientPool.withPoolable(client -> Mono.fromFuture(() ->
+        return Mono.fromFuture(() ->
             client.getObject(
                 builder -> builder.bucket(bucketName.asString()).key(blobId.asString()),
                 new AsyncResponseTransformer<GetObjectResponse, FluxResponse>() {
@@ -178,8 +162,7 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable {
                         response.flux = Flux.from(publisher);
                         response.supportingCompletableFuture.complete(response);
                     }
-                })))
-            .next();
+                }));
     }
 
 
@@ -187,11 +170,10 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable {
     public Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId) {
         BucketName resolvedBucketName = bucketNameResolver.resolve(bucketName);
 
-        return clientPool.withPoolable(client -> Mono.fromFuture(() ->
+        return Mono.fromFuture(() ->
                 client.getObject(
                     builder -> builder.bucket(resolvedBucketName.asString()).key(blobId.asString()),
-                    AsyncResponseTransformer.toBytes())))
-            .next()
+                    AsyncResponseTransformer.toBytes()))
             .onErrorMap(NoSuchBucketException.class, e -> new ObjectNotFoundException("Bucket not found " + resolvedBucketName.asString(), e))
             .onErrorMap(NoSuchKeyException.class, e -> new ObjectNotFoundException("Blob not found " + resolvedBucketName.asString(), e))
             .map(BytesWrapper::asByteArray);
@@ -201,11 +183,10 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable {
     public Mono<Void> save(BucketName bucketName, BlobId blobId, byte[] data) {
         BucketName resolvedBucketName = bucketNameResolver.resolve(bucketName);
 
-        return clientPool.withPoolable(client -> Mono.fromFuture(() ->
+        return Mono.fromFuture(() ->
                 client.putObject(
                     builder -> builder.bucket(resolvedBucketName.asString()).key(blobId.asString()).contentLength((long) data.length),
-                    AsyncRequestBody.fromBytes(data))))
-            .next()
+                    AsyncRequestBody.fromBytes(data)))
             .retryWhen(createBucketOnRetry(resolvedBucketName))
             .then();
     }
@@ -233,14 +214,13 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable {
         BucketName resolvedBucketName = bucketNameResolver.resolve(bucketName);
 
         return Mono.using(content::openStream,
-            stream ->
-                clientPool.withPoolable(client -> Mono.fromFuture(() ->
+            stream -> Mono.fromFuture(() ->
                     client.putObject(
                         Throwing.<PutObjectRequest.Builder>consumer(
                             builder -> builder.bucket(resolvedBucketName.asString()).contentLength(content.size()).key(blobId.asString()))
                         .sneakyThrow(),
                         AsyncRequestBody.fromPublisher(
-                            DataChunker.chunkStream(stream, CHUNK_SIZE))))).next(),
+                            DataChunker.chunkStream(stream, CHUNK_SIZE)))),
             Throwing.consumer(InputStream::close),
             LAZY)
             .retryWhen(createBucketOnRetry(resolvedBucketName))
@@ -254,10 +234,8 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable {
             .maxAttempts(MAX_RETRIES)
             .doBeforeRetryAsync(retrySignal -> {
                 if (retrySignal.failure() instanceof NoSuchBucketException) {
-                    return clientPool.withPoolable(client -> Mono
-                        .fromFuture(client.createBucket(builder -> builder.bucket(bucketName.asString())))
-                        .onErrorResume(BucketAlreadyOwnedByYouException.class, e -> Mono.empty()))
-                        .next()
+                    return Mono.fromFuture(client.createBucket(builder -> builder.bucket(bucketName.asString())))
+                        .onErrorResume(BucketAlreadyOwnedByYouException.class, e -> Mono.empty())
                         .then();
                 } else {
                     return Mono.error(retrySignal.failure());
@@ -269,9 +247,8 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable {
     public Mono<Void> delete(BucketName bucketName, BlobId blobId) {
         BucketName resolvedBucketName = bucketNameResolver.resolve(bucketName);
 
-        return clientPool.withPoolable(client -> Mono.fromFuture(() ->
-                client.deleteObject(delete -> delete.bucket(resolvedBucketName.asString()).key(blobId.asString()))))
-            .next()
+        return Mono.fromFuture(() ->
+                client.deleteObject(delete -> delete.bucket(resolvedBucketName.asString()).key(blobId.asString())))
             .then()
             .onErrorResume(NoSuchBucketException.class, e -> Mono.empty());
     }
@@ -286,16 +263,15 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable {
     private Mono<Void> deleteResolvedBucket(BucketName bucketName) {
         return emptyBucket(bucketName)
             .onErrorResume(t -> Mono.just(bucketName))
-            .flatMap(ignore -> clientPool.withPoolable(client -> Mono.fromFuture(() ->
+            .flatMap(ignore -> Mono.fromFuture(() ->
                 client.deleteBucket(builder -> builder.bucket(bucketName.asString()))))
-                .next())
             .onErrorResume(t -> Mono.empty())
             .then();
     }
 
     private Mono<BucketName> emptyBucket(BucketName bucketName) {
-        return clientPool.withPoolable(client -> Mono.fromFuture(() -> client.listObjects(builder -> builder.bucket(bucketName.asString())))
-            .flatMapIterable(ListObjectsResponse::contents))
+        return Mono.fromFuture(() -> client.listObjects(builder -> builder.bucket(bucketName.asString())))
+            .flatMapIterable(ListObjectsResponse::contents)
             .window(EMPTY_BUCKET_BATCH_SIZE)
             .flatMap(this::buildListForBatch, DEFAULT_CONCURRENCY)
             .flatMap(identifiers -> deleteObjects(bucketName, identifiers), DEFAULT_CONCURRENCY)
@@ -309,16 +285,15 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable {
     }
 
     private Mono<DeleteObjectsResponse> deleteObjects(BucketName bucketName, List<ObjectIdentifier> identifiers) {
-        return clientPool.withPoolable(client -> Mono.fromFuture(() -> client.deleteObjects(builder ->
-            builder.bucket(bucketName.asString()).delete(delete -> delete.objects(identifiers)))))
-            .next();
+        return Mono.fromFuture(() -> client.deleteObjects(builder ->
+            builder.bucket(bucketName.asString()).delete(delete -> delete.objects(identifiers))));
     }
 
     @VisibleForTesting
     public Mono<Void> deleteAllBuckets() {
-        return clientPool.withPoolable(client -> Mono.fromFuture(client::listBuckets)
+        return Mono.fromFuture(client::listBuckets)
                 .flatMapIterable(ListBucketsResponse::buckets)
-                     .flatMap(bucket -> deleteResolvedBucket(BucketName.of(bucket.name())), DEFAULT_CONCURRENCY))
+                     .flatMap(bucket -> deleteResolvedBucket(BucketName.of(bucket.name())), DEFAULT_CONCURRENCY)
             .then();
     }
 }
diff --git a/server/container/guice/blob/s3/src/main/java/org/apache/james/modules/objectstorage/S3BlobStoreModule.java b/server/container/guice/blob/s3/src/main/java/org/apache/james/modules/objectstorage/S3BlobStoreModule.java
index c6494e2..c97c669 100644
--- a/server/container/guice/blob/s3/src/main/java/org/apache/james/modules/objectstorage/S3BlobStoreModule.java
+++ b/server/container/guice/blob/s3/src/main/java/org/apache/james/modules/objectstorage/S3BlobStoreModule.java
@@ -26,18 +26,13 @@ import javax.inject.Singleton;
 import org.apache.commons.configuration2.Configuration;
 import org.apache.commons.configuration2.ex.ConfigurationException;
 import org.apache.james.blob.objectstorage.aws.S3BlobStoreConfiguration;
-import org.apache.james.blob.objectstorage.aws.S3BlobStoreDAO;
 import org.apache.james.modules.mailbox.ConfigurationComponent;
-import org.apache.james.utils.InitializationOperation;
-import org.apache.james.utils.InitilizationOperationBuilder;
 import org.apache.james.utils.PropertiesProvider;
 
 import com.google.inject.AbstractModule;
 import com.google.inject.Provides;
-import com.google.inject.multibindings.ProvidesIntoSet;
 
 public class S3BlobStoreModule extends AbstractModule {
-
     @Provides
     @Singleton
     private S3BlobStoreConfiguration getObjectStorageConfiguration(PropertiesProvider propertiesProvider) throws ConfigurationException {
@@ -48,11 +43,4 @@ public class S3BlobStoreModule extends AbstractModule {
             throw new ConfigurationException(ConfigurationComponent.NAME + " configuration was not found");
         }
     }
-
-    @ProvidesIntoSet
-    InitializationOperation startS3BlobStoreDAO(S3BlobStoreDAO s3BlobStoreDAO) {
-        return InitilizationOperationBuilder
-            .forClass(S3BlobStoreDAO.class)
-            .init(s3BlobStoreDAO::start);
-    }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org