You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2023/04/13 07:23:10 UTC

[james-project] 06/07: [PERF] CassandraMessageDAOV3: save headers on boundedElastic

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit a3f253e92c920635d25a99e775831acaf664fb7d
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed Apr 12 13:46:31 2023 +0700

    [PERF] CassandraMessageDAOV3: save headers on boundedElastic
    
    Those are blocking operations for large files, and were performed on the
    Cassandra driver event loop.
---
 .../cassandra/mail/CassandraMessageDAOV3.java      | 47 +++++++++++-----------
 .../cassandra/mail/CassandraMessageMapper.java     |  9 ++---
 2 files changed, 26 insertions(+), 30 deletions(-)

diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java
index 575c8852fd..fe45b058e8 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java
@@ -70,7 +70,6 @@ import org.apache.james.blob.api.BlobId;
 import org.apache.james.blob.api.BlobStore;
 import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
 import org.apache.james.mailbox.cassandra.table.CassandraMessageV3Table.Attachments;
-import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.model.AttachmentId;
 import org.apache.james.mailbox.model.ByteContent;
 import org.apache.james.mailbox.model.Cid;
@@ -99,6 +98,7 @@ import com.google.common.io.ByteSource;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 import reactor.util.function.Tuple2;
 
 public class CassandraMessageDAOV3 {
@@ -178,7 +178,7 @@ public class CassandraMessageDAOV3 {
             .build());
     }
 
-    public Mono<Tuple2<BlobId, BlobId>> save(MailboxMessage message) throws MailboxException {
+    public Mono<Tuple2<BlobId, BlobId>> save(MailboxMessage message) {
         return saveContent(message)
             .flatMap(pair -> cassandraAsyncExecutor.executeVoid(boundWriteStatement(message, pair)).thenReturn(pair));
     }
@@ -214,32 +214,31 @@ public class CassandraMessageDAOV3 {
 
     }
 
-    private Mono<Tuple2<BlobId, BlobId>> saveContent(MailboxMessage message) throws MailboxException {
-        try {
-            byte[] headerContent = IOUtils.toByteArray(message.getHeaderContent(), message.getHeaderOctets());
-            ByteSource bodyByteSource = new ByteSource() {
-                @Override
-                public InputStream openStream() {
-                    try {
-                        return message.getBodyContent();
-                    } catch (IOException e) {
-                        throw new RuntimeException(e);
+    private Mono<Tuple2<BlobId, BlobId>> saveContent(MailboxMessage message) {
+        return Mono.fromCallable(() -> IOUtils.toByteArray(message.getHeaderContent(), message.getHeaderOctets()))
+            .subscribeOn(Schedulers.boundedElastic())
+            .flatMap(headerContent -> {
+                ByteSource bodyByteSource = new ByteSource() {
+                    @Override
+                    public InputStream openStream() {
+                        try {
+                            return message.getBodyContent();
+                        } catch (IOException e) {
+                            throw new RuntimeException(e);
+                        }
                     }
-                }
 
-                @Override
-                public long size() {
-                    return message.getBodyOctets();
-                }
-            };
+                    @Override
+                    public long size() {
+                        return message.getBodyOctets();
+                    }
+                };
 
-            Mono<BlobId> headerFuture = Mono.from(blobStore.save(blobStore.getDefaultBucketName(), headerContent, SIZE_BASED));
-            Mono<BlobId> bodyFuture = Mono.from(blobStore.save(blobStore.getDefaultBucketName(), bodyByteSource, LOW_COST));
+                Mono<BlobId> headerFuture = Mono.from(blobStore.save(blobStore.getDefaultBucketName(), headerContent, SIZE_BASED));
+                Mono<BlobId> bodyFuture = Mono.from(blobStore.save(blobStore.getDefaultBucketName(), bodyByteSource, LOW_COST));
 
-            return headerFuture.zipWith(bodyFuture);
-        } catch (IOException e) {
-            throw new MailboxException("Error saving mail content", e);
-        }
+                return headerFuture.zipWith(bodyFuture);
+            });
     }
 
     private BoundStatement boundWriteStatement(MailboxMessage message, Tuple2<BlobId, BlobId> pair) {
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index 5f6924ae0b..04ed2cab13 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -75,7 +75,6 @@ import org.apache.james.util.streams.Limit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.github.fge.lambdas.Throwing;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Streams;
@@ -405,10 +404,8 @@ public class CassandraMessageMapper implements MessageMapper {
         CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
 
         return addUidAndModseqAndSaveDate(message, mailboxId)
-            .flatMap(Throwing.function((MailboxMessage messageWithUidAndModSeq) ->
-                save(mailbox, messageWithUidAndModSeq)
-                    .thenReturn(messageWithUidAndModSeq)))
-            .map(MailboxMessage::metaData);
+            .flatMap(messageWithUidAndModSeq -> save(mailbox, messageWithUidAndModSeq)
+                .thenReturn(messageWithUidAndModSeq.metaData()));
     }
 
     private Mono<MailboxMessage> addUidAndModseqAndSaveDate(MailboxMessage message, CassandraId mailboxId) {
@@ -620,7 +617,7 @@ public class CassandraMessageMapper implements MessageMapper {
             .map(MailboxMessage::metaData);
     }
 
-    private Mono<Void> save(Mailbox mailbox, MailboxMessage message) throws MailboxException {
+    private Mono<Void> save(Mailbox mailbox, MailboxMessage message) {
         CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
         return messageDAOV3.save(message)
             .flatMap(headerAndBodyBlobIds -> insertIds(message, mailboxId, headerAndBodyBlobIds.getT1()));


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org