You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2023/04/11 08:57:23 UTC
[james-project] branch master updated: [PERF] S3 Blob Store should wrap blocking calls (#1515)
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new 3c3824f696 [PERF] S3 Blob Store should wrap blocking calls (#1515)
3c3824f696 is described below
commit 3c3824f696c4e37b7cb4dc1c1d4d0aa85af28c43
Author: Benoit TELLIER <bt...@linagora.com>
AuthorDate: Tue Apr 11 15:57:16 2023 +0700
[PERF] S3 Blob Store should wrap blocking calls (#1515)
---
.../apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java | 3 ++-
.../server/blob/deduplication/DeDuplicationBlobStore.scala | 11 ++++++-----
2 files changed, 8 insertions(+), 6 deletions(-)
diff --git a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
index 6ca3ccd427..564a8a8381 100644
--- a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
+++ b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
@@ -323,7 +323,8 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable {
.bucket(resolvedBucketName.asString())
.contentLength(contentLength)
.key(blobId.asString()),
- AsyncRequestBody.fromPublisher(chunkStream(chunkSize, stream))));
+ AsyncRequestBody.fromPublisher(chunkStream(chunkSize, stream)
+ .subscribeOn(Schedulers.boundedElastic()))));
}
private Flux<ByteBuffer> chunkStream(int chunkSize, InputStream stream) {
diff --git a/server/blob/blob-storage-strategy/src/main/scala/org/apache/james/server/blob/deduplication/DeDuplicationBlobStore.scala b/server/blob/blob-storage-strategy/src/main/scala/org/apache/james/server/blob/deduplication/DeDuplicationBlobStore.scala
index 30a8ac8881..e7bd582714 100644
--- a/server/blob/blob-storage-strategy/src/main/scala/org/apache/james/server/blob/deduplication/DeDuplicationBlobStore.scala
+++ b/server/blob/blob-storage-strategy/src/main/scala/org/apache/james/server/blob/deduplication/DeDuplicationBlobStore.scala
@@ -31,6 +31,7 @@ import org.apache.james.blob.api.{BlobId, BlobStore, BlobStoreDAO, BucketName}
import org.reactivestreams.Publisher
import reactor.core.publisher.{Flux, Mono}
import reactor.core.scala.publisher.SMono
+import reactor.core.scheduler.Schedulers
import reactor.util.function.{Tuple2, Tuples}
import scala.compat.java8.FunctionConverters._
@@ -58,10 +59,10 @@ class DeDuplicationBlobStore @Inject()(blobStoreDAO: BlobStoreDAO,
Preconditions.checkNotNull(bucketName)
Preconditions.checkNotNull(data)
- val blobId = blobIdFactory.forPayload(data)
-
- SMono(blobStoreDAO.save(bucketName, blobId, data))
- .`then`(SMono.just(blobId))
+ SMono.fromCallable(() => blobIdFactory.forPayload(data))
+ .subscribeOn(Schedulers.boundedElastic())
+ .flatMap(blobId => SMono(blobStoreDAO.save(bucketName, blobId, data))
+ .`then`(SMono.just(blobId)))
}
override def save(bucketName: BucketName, data: InputStream, storagePolicy: BlobStore.StoragePolicy): Publisher[BlobId] = {
@@ -82,7 +83,7 @@ class DeDuplicationBlobStore @Inject()(blobStoreDAO: BlobStoreDAO,
SMono.fromCallable(() => {
IOUtils.copy(hashingInputStream, fileBackedOutputStream)
Tuples.of(blobIdFactory.from(hashingInputStream.hash.toString), fileBackedOutputStream.asByteSource)
- })
+ }).subscribeOn(Schedulers.boundedElastic())
.flatMap((tuple: Tuple2[BlobId, ByteSource]) =>
SMono(blobStoreDAO.save(bucketName, tuple.getT1, tuple.getT2))
.`then`(SMono.just(tuple.getT1)))
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org