You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2021/05/21 01:51:05 UTC
[james-project] 01/02: JAMES-3028 S3Client should not be pooled
This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 9c9d5754d8fd33e0483d67397dfa5d55c9daee74
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue May 18 07:50:01 2021 +0700
JAMES-3028 S3Client should not be pooled
Running above 900 req/s on a 3 James setup I encounter the following
exception:
```
software.amazon.awssdk.core.exception.SdkClientException: Unable to execute HTTP request: Channel was closed before it could be written to.
at software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:98)
at software.amazon.awssdk.core.exception.SdkClientException.create(SdkClientException.java:43)
at software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:198)
at software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:194)
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeRetryExecute(AsyncRetryableStage.java:143)
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.lambda$attemptExecute$1(AsyncRetryableStage.java:125)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo$0(CompletableFutureUtils.java:74)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$null$0(MakeAsyncHttpRequestStage.java:104)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$executeHttpRequest$3(MakeAsyncHttpRequestStage.java:209)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.IOException: Channel was closed before it could be written to.
at software.amazon.awssdk.http.nio.netty.internal.NettyRequestExecutor.tryConfigurePipeline(NettyRequestExecutor.java:220)
at software.amazon.awssdk.http.nio.netty.internal.NettyRequestExecutor.makeRequestListener(NettyRequestExecutor.java:168)
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552)
at io.netty.util.concurrent.DefaultPromise.access$200(DefaultPromise.java:35)
at io.netty.util.concurrent.DefaultPromise$1.run(DefaultPromise.java:502)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:497)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
... 1 common frames omitted
```
Removing pooling yield massive improvements for blob appends. Sending
emails with setMessages -50% mean time, -33% p99. I could reach
throughput of 1100+ req/s without exceptions.
---
.../blob/objectstorage/aws/S3BlobStoreDAO.java | 69 +++++++---------------
.../modules/objectstorage/S3BlobStoreModule.java | 12 ----
2 files changed, 22 insertions(+), 59 deletions(-)
diff --git a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
index 37f2810..d228aee 100644
--- a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
+++ b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
@@ -27,7 +27,6 @@ import java.io.InputStream;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.List;
-import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import javax.annotation.PreDestroy;
@@ -52,9 +51,6 @@ import com.google.common.io.FileBackedOutputStream;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Schedulers;
-import reactor.pool.InstrumentedPool;
-import reactor.pool.PoolBuilder;
import reactor.util.retry.RetryBackoffSpec;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
@@ -83,12 +79,11 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable {
private static final int EMPTY_BUCKET_BATCH_SIZE = 1000;
private static final int FILE_THRESHOLD = 1024 * 100;
private static final Duration FIRST_BACK_OFF = Duration.ofMillis(100);
- private static final Duration FOREVER = Duration.ofMillis(Long.MAX_VALUE);
private static final boolean LAZY = false;
private static final int MAX_RETRIES = 5;
- private final InstrumentedPool<S3AsyncClient> clientPool;
private final BucketNameResolver bucketNameResolver;
+ private final S3AsyncClient client;
@Inject
S3BlobStoreDAO(S3BlobStoreConfiguration configuration) {
@@ -98,7 +93,7 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable {
.pathStyleAccessEnabled(true)
.build();
- Callable<S3AsyncClient> clientCreator = () -> S3AsyncClient.builder()
+ client = S3AsyncClient.builder()
.credentialsProvider(StaticCredentialsProvider.create(
AwsBasicCredentials.create(authConfiguration.getAccessKeyId(), authConfiguration.getSecretKey())))
.httpClientBuilder(NettyNioAsyncHttpClient.builder()
@@ -109,27 +104,16 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable {
.serviceConfiguration(pathStyleAccess)
.build();
- clientPool = PoolBuilder.from(Mono.fromCallable(clientCreator))
- .acquisitionScheduler(Schedulers.elastic())
- .destroyHandler(client -> Mono.fromRunnable(client::close))
- .maxPendingAcquireUnbounded()
- .sizeUnbounded()
- .fifo();
-
bucketNameResolver = BucketNameResolver.builder()
.prefix(configuration.getBucketPrefix())
.namespace(configuration.getNamespace())
.build();
}
- public void start() {
- clientPool.warmup().block();
- }
-
@Override
@PreDestroy
public void close() {
- clientPool.dispose();
+ client.close();
}
@Override
@@ -150,7 +134,7 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable {
}
private Mono<FluxResponse> getObject(BucketName bucketName, BlobId blobId) {
- return clientPool.withPoolable(client -> Mono.fromFuture(() ->
+ return Mono.fromFuture(() ->
client.getObject(
builder -> builder.bucket(bucketName.asString()).key(blobId.asString()),
new AsyncResponseTransformer<GetObjectResponse, FluxResponse>() {
@@ -178,8 +162,7 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable {
response.flux = Flux.from(publisher);
response.supportingCompletableFuture.complete(response);
}
- })))
- .next();
+ }));
}
@@ -187,11 +170,10 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable {
public Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId) {
BucketName resolvedBucketName = bucketNameResolver.resolve(bucketName);
- return clientPool.withPoolable(client -> Mono.fromFuture(() ->
+ return Mono.fromFuture(() ->
client.getObject(
builder -> builder.bucket(resolvedBucketName.asString()).key(blobId.asString()),
- AsyncResponseTransformer.toBytes())))
- .next()
+ AsyncResponseTransformer.toBytes()))
.onErrorMap(NoSuchBucketException.class, e -> new ObjectNotFoundException("Bucket not found " + resolvedBucketName.asString(), e))
.onErrorMap(NoSuchKeyException.class, e -> new ObjectNotFoundException("Blob not found " + resolvedBucketName.asString(), e))
.map(BytesWrapper::asByteArray);
@@ -201,11 +183,10 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable {
public Mono<Void> save(BucketName bucketName, BlobId blobId, byte[] data) {
BucketName resolvedBucketName = bucketNameResolver.resolve(bucketName);
- return clientPool.withPoolable(client -> Mono.fromFuture(() ->
+ return Mono.fromFuture(() ->
client.putObject(
builder -> builder.bucket(resolvedBucketName.asString()).key(blobId.asString()).contentLength((long) data.length),
- AsyncRequestBody.fromBytes(data))))
- .next()
+ AsyncRequestBody.fromBytes(data)))
.retryWhen(createBucketOnRetry(resolvedBucketName))
.then();
}
@@ -233,14 +214,13 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable {
BucketName resolvedBucketName = bucketNameResolver.resolve(bucketName);
return Mono.using(content::openStream,
- stream ->
- clientPool.withPoolable(client -> Mono.fromFuture(() ->
+ stream -> Mono.fromFuture(() ->
client.putObject(
Throwing.<PutObjectRequest.Builder>consumer(
builder -> builder.bucket(resolvedBucketName.asString()).contentLength(content.size()).key(blobId.asString()))
.sneakyThrow(),
AsyncRequestBody.fromPublisher(
- DataChunker.chunkStream(stream, CHUNK_SIZE))))).next(),
+ DataChunker.chunkStream(stream, CHUNK_SIZE)))),
Throwing.consumer(InputStream::close),
LAZY)
.retryWhen(createBucketOnRetry(resolvedBucketName))
@@ -254,10 +234,8 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable {
.maxAttempts(MAX_RETRIES)
.doBeforeRetryAsync(retrySignal -> {
if (retrySignal.failure() instanceof NoSuchBucketException) {
- return clientPool.withPoolable(client -> Mono
- .fromFuture(client.createBucket(builder -> builder.bucket(bucketName.asString())))
- .onErrorResume(BucketAlreadyOwnedByYouException.class, e -> Mono.empty()))
- .next()
+ return Mono.fromFuture(client.createBucket(builder -> builder.bucket(bucketName.asString())))
+ .onErrorResume(BucketAlreadyOwnedByYouException.class, e -> Mono.empty())
.then();
} else {
return Mono.error(retrySignal.failure());
@@ -269,9 +247,8 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable {
public Mono<Void> delete(BucketName bucketName, BlobId blobId) {
BucketName resolvedBucketName = bucketNameResolver.resolve(bucketName);
- return clientPool.withPoolable(client -> Mono.fromFuture(() ->
- client.deleteObject(delete -> delete.bucket(resolvedBucketName.asString()).key(blobId.asString()))))
- .next()
+ return Mono.fromFuture(() ->
+ client.deleteObject(delete -> delete.bucket(resolvedBucketName.asString()).key(blobId.asString())))
.then()
.onErrorResume(NoSuchBucketException.class, e -> Mono.empty());
}
@@ -286,16 +263,15 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable {
private Mono<Void> deleteResolvedBucket(BucketName bucketName) {
return emptyBucket(bucketName)
.onErrorResume(t -> Mono.just(bucketName))
- .flatMap(ignore -> clientPool.withPoolable(client -> Mono.fromFuture(() ->
+ .flatMap(ignore -> Mono.fromFuture(() ->
client.deleteBucket(builder -> builder.bucket(bucketName.asString()))))
- .next())
.onErrorResume(t -> Mono.empty())
.then();
}
private Mono<BucketName> emptyBucket(BucketName bucketName) {
- return clientPool.withPoolable(client -> Mono.fromFuture(() -> client.listObjects(builder -> builder.bucket(bucketName.asString())))
- .flatMapIterable(ListObjectsResponse::contents))
+ return Mono.fromFuture(() -> client.listObjects(builder -> builder.bucket(bucketName.asString())))
+ .flatMapIterable(ListObjectsResponse::contents)
.window(EMPTY_BUCKET_BATCH_SIZE)
.flatMap(this::buildListForBatch, DEFAULT_CONCURRENCY)
.flatMap(identifiers -> deleteObjects(bucketName, identifiers), DEFAULT_CONCURRENCY)
@@ -309,16 +285,15 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable {
}
private Mono<DeleteObjectsResponse> deleteObjects(BucketName bucketName, List<ObjectIdentifier> identifiers) {
- return clientPool.withPoolable(client -> Mono.fromFuture(() -> client.deleteObjects(builder ->
- builder.bucket(bucketName.asString()).delete(delete -> delete.objects(identifiers)))))
- .next();
+ return Mono.fromFuture(() -> client.deleteObjects(builder ->
+ builder.bucket(bucketName.asString()).delete(delete -> delete.objects(identifiers))));
}
@VisibleForTesting
public Mono<Void> deleteAllBuckets() {
- return clientPool.withPoolable(client -> Mono.fromFuture(client::listBuckets)
+ return Mono.fromFuture(client::listBuckets)
.flatMapIterable(ListBucketsResponse::buckets)
- .flatMap(bucket -> deleteResolvedBucket(BucketName.of(bucket.name())), DEFAULT_CONCURRENCY))
+ .flatMap(bucket -> deleteResolvedBucket(BucketName.of(bucket.name())), DEFAULT_CONCURRENCY)
.then();
}
}
diff --git a/server/container/guice/blob/s3/src/main/java/org/apache/james/modules/objectstorage/S3BlobStoreModule.java b/server/container/guice/blob/s3/src/main/java/org/apache/james/modules/objectstorage/S3BlobStoreModule.java
index c6494e2..c97c669 100644
--- a/server/container/guice/blob/s3/src/main/java/org/apache/james/modules/objectstorage/S3BlobStoreModule.java
+++ b/server/container/guice/blob/s3/src/main/java/org/apache/james/modules/objectstorage/S3BlobStoreModule.java
@@ -26,18 +26,13 @@ import javax.inject.Singleton;
import org.apache.commons.configuration2.Configuration;
import org.apache.commons.configuration2.ex.ConfigurationException;
import org.apache.james.blob.objectstorage.aws.S3BlobStoreConfiguration;
-import org.apache.james.blob.objectstorage.aws.S3BlobStoreDAO;
import org.apache.james.modules.mailbox.ConfigurationComponent;
-import org.apache.james.utils.InitializationOperation;
-import org.apache.james.utils.InitilizationOperationBuilder;
import org.apache.james.utils.PropertiesProvider;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
-import com.google.inject.multibindings.ProvidesIntoSet;
public class S3BlobStoreModule extends AbstractModule {
-
@Provides
@Singleton
private S3BlobStoreConfiguration getObjectStorageConfiguration(PropertiesProvider propertiesProvider) throws ConfigurationException {
@@ -48,11 +43,4 @@ public class S3BlobStoreModule extends AbstractModule {
throw new ConfigurationException(ConfigurationComponent.NAME + " configuration was not found");
}
}
-
- @ProvidesIntoSet
- InitializationOperation startS3BlobStoreDAO(S3BlobStoreDAO s3BlobStoreDAO) {
- return InitilizationOperationBuilder
- .forClass(S3BlobStoreDAO.class)
- .init(s3BlobStoreDAO::start);
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org