You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2021/05/21 01:51:05 UTC

[james-project] 01/02: JAMES-3028 S3Client should not be pooled

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 9c9d5754d8fd33e0483d67397dfa5d55c9daee74
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue May 18 07:50:01 2021 +0700

    JAMES-3028 S3Client should not be pooled
    
    Running above 900 req/s on a 3 James setup I encounter the following
    exception:
    
    ```
    software.amazon.awssdk.core.exception.SdkClientException: Unable to execute HTTP request: Channel was closed before it could be written to.
      at software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:98)
      at software.amazon.awssdk.core.exception.SdkClientException.create(SdkClientException.java:43)
      at software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:198)
      at software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:194)
      at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeRetryExecute(AsyncRetryableStage.java:143)
      at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.lambda$attemptExecute$1(AsyncRetryableStage.java:125)
      at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
      at software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo$0(CompletableFutureUtils.java:74)
      at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
      at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$null$0(MakeAsyncHttpRequestStage.java:104)
      at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
      at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$executeHttpRequest$3(MakeAsyncHttpRequestStage.java:209)
      at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
      at java.base/java.lang.Thread.run(Thread.java:829)
    Caused by: java.io.IOException: Channel was closed before it could be written to.
      at software.amazon.awssdk.http.nio.netty.internal.NettyRequestExecutor.tryConfigurePipeline(NettyRequestExecutor.java:220)
      at software.amazon.awssdk.http.nio.netty.internal.NettyRequestExecutor.makeRequestListener(NettyRequestExecutor.java:168)
      at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
      at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552)
      at io.netty.util.concurrent.DefaultPromise.access$200(DefaultPromise.java:35)
      at io.netty.util.concurrent.DefaultPromise$1.run(DefaultPromise.java:502)
      at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
      at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
      at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:497)
      at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
      at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
      ... 1 common frames omitted
    ```
    
    Removing pooling yield massive improvements for blob appends. Sending
    emails with setMessages -50% mean time, -33% p99. I could reach
    throughput of 1100+ req/s without exceptions.
---
 .../blob/objectstorage/aws/S3BlobStoreDAO.java     | 69 +++++++---------------
 .../modules/objectstorage/S3BlobStoreModule.java   | 12 ----
 2 files changed, 22 insertions(+), 59 deletions(-)

diff --git a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
index 37f2810..d228aee 100644
--- a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
+++ b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
@@ -27,7 +27,6 @@ import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.time.Duration;
 import java.util.List;
-import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 
 import javax.annotation.PreDestroy;
@@ -52,9 +51,6 @@ import com.google.common.io.FileBackedOutputStream;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Schedulers;
-import reactor.pool.InstrumentedPool;
-import reactor.pool.PoolBuilder;
 import reactor.util.retry.RetryBackoffSpec;
 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
 import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
@@ -83,12 +79,11 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable {
     private static final int EMPTY_BUCKET_BATCH_SIZE = 1000;
     private static final int FILE_THRESHOLD = 1024 * 100;
     private static final Duration FIRST_BACK_OFF = Duration.ofMillis(100);
-    private static final Duration FOREVER = Duration.ofMillis(Long.MAX_VALUE);
     private static final boolean LAZY = false;
     private static final int MAX_RETRIES = 5;
 
-    private final InstrumentedPool<S3AsyncClient> clientPool;
     private final BucketNameResolver bucketNameResolver;
+    private final S3AsyncClient client;
 
     @Inject
     S3BlobStoreDAO(S3BlobStoreConfiguration configuration) {
@@ -98,7 +93,7 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable {
             .pathStyleAccessEnabled(true)
             .build();
 
-        Callable<S3AsyncClient> clientCreator = () -> S3AsyncClient.builder()
+        client = S3AsyncClient.builder()
             .credentialsProvider(StaticCredentialsProvider.create(
                 AwsBasicCredentials.create(authConfiguration.getAccessKeyId(), authConfiguration.getSecretKey())))
             .httpClientBuilder(NettyNioAsyncHttpClient.builder()
@@ -109,27 +104,16 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable {
             .serviceConfiguration(pathStyleAccess)
             .build();
 
-        clientPool = PoolBuilder.from(Mono.fromCallable(clientCreator))
-            .acquisitionScheduler(Schedulers.elastic())
-            .destroyHandler(client -> Mono.fromRunnable(client::close))
-            .maxPendingAcquireUnbounded()
-            .sizeUnbounded()
-            .fifo();
-
         bucketNameResolver = BucketNameResolver.builder()
             .prefix(configuration.getBucketPrefix())
             .namespace(configuration.getNamespace())
             .build();
     }
 
-    public void start() {
-        clientPool.warmup().block();
-    }
-
     @Override
     @PreDestroy
     public void close() {
-        clientPool.dispose();
+        client.close();
     }
 
     @Override
@@ -150,7 +134,7 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable {
     }
 
     private Mono<FluxResponse> getObject(BucketName bucketName, BlobId blobId) {
-        return clientPool.withPoolable(client -> Mono.fromFuture(() ->
+        return Mono.fromFuture(() ->
             client.getObject(
                 builder -> builder.bucket(bucketName.asString()).key(blobId.asString()),
                 new AsyncResponseTransformer<GetObjectResponse, FluxResponse>() {
@@ -178,8 +162,7 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable {
                         response.flux = Flux.from(publisher);
                         response.supportingCompletableFuture.complete(response);
                     }
-                })))
-            .next();
+                }));
     }
 
 
@@ -187,11 +170,10 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable {
     public Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId) {
         BucketName resolvedBucketName = bucketNameResolver.resolve(bucketName);
 
-        return clientPool.withPoolable(client -> Mono.fromFuture(() ->
+        return Mono.fromFuture(() ->
                 client.getObject(
                     builder -> builder.bucket(resolvedBucketName.asString()).key(blobId.asString()),
-                    AsyncResponseTransformer.toBytes())))
-            .next()
+                    AsyncResponseTransformer.toBytes()))
             .onErrorMap(NoSuchBucketException.class, e -> new ObjectNotFoundException("Bucket not found " + resolvedBucketName.asString(), e))
             .onErrorMap(NoSuchKeyException.class, e -> new ObjectNotFoundException("Blob not found " + resolvedBucketName.asString(), e))
             .map(BytesWrapper::asByteArray);
@@ -201,11 +183,10 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable {
     public Mono<Void> save(BucketName bucketName, BlobId blobId, byte[] data) {
         BucketName resolvedBucketName = bucketNameResolver.resolve(bucketName);
 
-        return clientPool.withPoolable(client -> Mono.fromFuture(() ->
+        return Mono.fromFuture(() ->
                 client.putObject(
                     builder -> builder.bucket(resolvedBucketName.asString()).key(blobId.asString()).contentLength((long) data.length),
-                    AsyncRequestBody.fromBytes(data))))
-            .next()
+                    AsyncRequestBody.fromBytes(data)))
             .retryWhen(createBucketOnRetry(resolvedBucketName))
             .then();
     }
@@ -233,14 +214,13 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable {
         BucketName resolvedBucketName = bucketNameResolver.resolve(bucketName);
 
         return Mono.using(content::openStream,
-            stream ->
-                clientPool.withPoolable(client -> Mono.fromFuture(() ->
+            stream -> Mono.fromFuture(() ->
                     client.putObject(
                         Throwing.<PutObjectRequest.Builder>consumer(
                             builder -> builder.bucket(resolvedBucketName.asString()).contentLength(content.size()).key(blobId.asString()))
                         .sneakyThrow(),
                         AsyncRequestBody.fromPublisher(
-                            DataChunker.chunkStream(stream, CHUNK_SIZE))))).next(),
+                            DataChunker.chunkStream(stream, CHUNK_SIZE)))),
             Throwing.consumer(InputStream::close),
             LAZY)
             .retryWhen(createBucketOnRetry(resolvedBucketName))
@@ -254,10 +234,8 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable {
             .maxAttempts(MAX_RETRIES)
             .doBeforeRetryAsync(retrySignal -> {
                 if (retrySignal.failure() instanceof NoSuchBucketException) {
-                    return clientPool.withPoolable(client -> Mono
-                        .fromFuture(client.createBucket(builder -> builder.bucket(bucketName.asString())))
-                        .onErrorResume(BucketAlreadyOwnedByYouException.class, e -> Mono.empty()))
-                        .next()
+                    return Mono.fromFuture(client.createBucket(builder -> builder.bucket(bucketName.asString())))
+                        .onErrorResume(BucketAlreadyOwnedByYouException.class, e -> Mono.empty())
                         .then();
                 } else {
                     return Mono.error(retrySignal.failure());
@@ -269,9 +247,8 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable {
     public Mono<Void> delete(BucketName bucketName, BlobId blobId) {
         BucketName resolvedBucketName = bucketNameResolver.resolve(bucketName);
 
-        return clientPool.withPoolable(client -> Mono.fromFuture(() ->
-                client.deleteObject(delete -> delete.bucket(resolvedBucketName.asString()).key(blobId.asString()))))
-            .next()
+        return Mono.fromFuture(() ->
+                client.deleteObject(delete -> delete.bucket(resolvedBucketName.asString()).key(blobId.asString())))
             .then()
             .onErrorResume(NoSuchBucketException.class, e -> Mono.empty());
     }
@@ -286,16 +263,15 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable {
     private Mono<Void> deleteResolvedBucket(BucketName bucketName) {
         return emptyBucket(bucketName)
             .onErrorResume(t -> Mono.just(bucketName))
-            .flatMap(ignore -> clientPool.withPoolable(client -> Mono.fromFuture(() ->
+            .flatMap(ignore -> Mono.fromFuture(() ->
                 client.deleteBucket(builder -> builder.bucket(bucketName.asString()))))
-                .next())
             .onErrorResume(t -> Mono.empty())
             .then();
     }
 
     private Mono<BucketName> emptyBucket(BucketName bucketName) {
-        return clientPool.withPoolable(client -> Mono.fromFuture(() -> client.listObjects(builder -> builder.bucket(bucketName.asString())))
-            .flatMapIterable(ListObjectsResponse::contents))
+        return Mono.fromFuture(() -> client.listObjects(builder -> builder.bucket(bucketName.asString())))
+            .flatMapIterable(ListObjectsResponse::contents)
             .window(EMPTY_BUCKET_BATCH_SIZE)
             .flatMap(this::buildListForBatch, DEFAULT_CONCURRENCY)
             .flatMap(identifiers -> deleteObjects(bucketName, identifiers), DEFAULT_CONCURRENCY)
@@ -309,16 +285,15 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable {
     }
 
     private Mono<DeleteObjectsResponse> deleteObjects(BucketName bucketName, List<ObjectIdentifier> identifiers) {
-        return clientPool.withPoolable(client -> Mono.fromFuture(() -> client.deleteObjects(builder ->
-            builder.bucket(bucketName.asString()).delete(delete -> delete.objects(identifiers)))))
-            .next();
+        return Mono.fromFuture(() -> client.deleteObjects(builder ->
+            builder.bucket(bucketName.asString()).delete(delete -> delete.objects(identifiers))));
     }
 
     @VisibleForTesting
     public Mono<Void> deleteAllBuckets() {
-        return clientPool.withPoolable(client -> Mono.fromFuture(client::listBuckets)
+        return Mono.fromFuture(client::listBuckets)
                 .flatMapIterable(ListBucketsResponse::buckets)
-                     .flatMap(bucket -> deleteResolvedBucket(BucketName.of(bucket.name())), DEFAULT_CONCURRENCY))
+                     .flatMap(bucket -> deleteResolvedBucket(BucketName.of(bucket.name())), DEFAULT_CONCURRENCY)
             .then();
     }
 }
diff --git a/server/container/guice/blob/s3/src/main/java/org/apache/james/modules/objectstorage/S3BlobStoreModule.java b/server/container/guice/blob/s3/src/main/java/org/apache/james/modules/objectstorage/S3BlobStoreModule.java
index c6494e2..c97c669 100644
--- a/server/container/guice/blob/s3/src/main/java/org/apache/james/modules/objectstorage/S3BlobStoreModule.java
+++ b/server/container/guice/blob/s3/src/main/java/org/apache/james/modules/objectstorage/S3BlobStoreModule.java
@@ -26,18 +26,13 @@ import javax.inject.Singleton;
 import org.apache.commons.configuration2.Configuration;
 import org.apache.commons.configuration2.ex.ConfigurationException;
 import org.apache.james.blob.objectstorage.aws.S3BlobStoreConfiguration;
-import org.apache.james.blob.objectstorage.aws.S3BlobStoreDAO;
 import org.apache.james.modules.mailbox.ConfigurationComponent;
-import org.apache.james.utils.InitializationOperation;
-import org.apache.james.utils.InitilizationOperationBuilder;
 import org.apache.james.utils.PropertiesProvider;
 
 import com.google.inject.AbstractModule;
 import com.google.inject.Provides;
-import com.google.inject.multibindings.ProvidesIntoSet;
 
 public class S3BlobStoreModule extends AbstractModule {
-
     @Provides
     @Singleton
     private S3BlobStoreConfiguration getObjectStorageConfiguration(PropertiesProvider propertiesProvider) throws ConfigurationException {
@@ -48,11 +43,4 @@ public class S3BlobStoreModule extends AbstractModule {
             throw new ConfigurationException(ConfigurationComponent.NAME + " configuration was not found");
         }
     }
-
-    @ProvidesIntoSet
-    InitializationOperation startS3BlobStoreDAO(S3BlobStoreDAO s3BlobStoreDAO) {
-        return InitilizationOperationBuilder
-            .forClass(S3BlobStoreDAO.class)
-            .init(s3BlobStoreDAO::start);
-    }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org