You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2023/04/11 08:57:23 UTC

[james-project] branch master updated: [PERF] S3 Blob Store should wrap blocking calls (#1515)

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c3824f696 [PERF] S3 Blob Store should wrap blocking calls (#1515)
3c3824f696 is described below

commit 3c3824f696c4e37b7cb4dc1c1d4d0aa85af28c43
Author: Benoit TELLIER <bt...@linagora.com>
AuthorDate: Tue Apr 11 15:57:16 2023 +0700

    [PERF] S3 Blob Store should wrap blocking calls (#1515)
---
 .../apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java   |  3 ++-
 .../server/blob/deduplication/DeDuplicationBlobStore.scala    | 11 ++++++-----
 2 files changed, 8 insertions(+), 6 deletions(-)

diff --git a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
index 6ca3ccd427..564a8a8381 100644
--- a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
+++ b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
@@ -323,7 +323,8 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable {
                 .bucket(resolvedBucketName.asString())
                 .contentLength(contentLength)
                 .key(blobId.asString()),
-            AsyncRequestBody.fromPublisher(chunkStream(chunkSize, stream))));
+            AsyncRequestBody.fromPublisher(chunkStream(chunkSize, stream)
+                .subscribeOn(Schedulers.boundedElastic()))));
     }
 
     private Flux<ByteBuffer> chunkStream(int chunkSize, InputStream stream) {
diff --git a/server/blob/blob-storage-strategy/src/main/scala/org/apache/james/server/blob/deduplication/DeDuplicationBlobStore.scala b/server/blob/blob-storage-strategy/src/main/scala/org/apache/james/server/blob/deduplication/DeDuplicationBlobStore.scala
index 30a8ac8881..e7bd582714 100644
--- a/server/blob/blob-storage-strategy/src/main/scala/org/apache/james/server/blob/deduplication/DeDuplicationBlobStore.scala
+++ b/server/blob/blob-storage-strategy/src/main/scala/org/apache/james/server/blob/deduplication/DeDuplicationBlobStore.scala
@@ -31,6 +31,7 @@ import org.apache.james.blob.api.{BlobId, BlobStore, BlobStoreDAO, BucketName}
 import org.reactivestreams.Publisher
 import reactor.core.publisher.{Flux, Mono}
 import reactor.core.scala.publisher.SMono
+import reactor.core.scheduler.Schedulers
 import reactor.util.function.{Tuple2, Tuples}
 
 import scala.compat.java8.FunctionConverters._
@@ -58,10 +59,10 @@ class DeDuplicationBlobStore @Inject()(blobStoreDAO: BlobStoreDAO,
     Preconditions.checkNotNull(bucketName)
     Preconditions.checkNotNull(data)
 
-    val blobId = blobIdFactory.forPayload(data)
-
-    SMono(blobStoreDAO.save(bucketName, blobId, data))
-      .`then`(SMono.just(blobId))
+    SMono.fromCallable(() => blobIdFactory.forPayload(data))
+      .subscribeOn(Schedulers.boundedElastic())
+      .flatMap(blobId => SMono(blobStoreDAO.save(bucketName, blobId, data))
+        .`then`(SMono.just(blobId)))
   }
 
   override def save(bucketName: BucketName, data: InputStream, storagePolicy: BlobStore.StoragePolicy): Publisher[BlobId] = {
@@ -82,7 +83,7 @@ class DeDuplicationBlobStore @Inject()(blobStoreDAO: BlobStoreDAO,
     SMono.fromCallable(() => {
       IOUtils.copy(hashingInputStream, fileBackedOutputStream)
       Tuples.of(blobIdFactory.from(hashingInputStream.hash.toString), fileBackedOutputStream.asByteSource)
-    })
+    }).subscribeOn(Schedulers.boundedElastic())
       .flatMap((tuple: Tuple2[BlobId, ByteSource]) =>
         SMono(blobStoreDAO.save(bucketName, tuple.getT1, tuple.getT2))
           .`then`(SMono.just(tuple.getT1)))


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org