You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2023/04/13 07:23:10 UTC
[james-project] 06/07: [PERF] CassandraMessageDAOV3: save headers on boundedElastic
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit a3f253e92c920635d25a99e775831acaf664fb7d
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed Apr 12 13:46:31 2023 +0700
[PERF] CassandraMessageDAOV3: save headers on boundedElastic
Those are blocking operations for large files, and were performed on the
Cassandra driver event loop.
---
.../cassandra/mail/CassandraMessageDAOV3.java | 47 +++++++++++-----------
.../cassandra/mail/CassandraMessageMapper.java | 9 ++---
2 files changed, 26 insertions(+), 30 deletions(-)
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java
index 575c8852fd..fe45b058e8 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java
@@ -70,7 +70,6 @@ import org.apache.james.blob.api.BlobId;
import org.apache.james.blob.api.BlobStore;
import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
import org.apache.james.mailbox.cassandra.table.CassandraMessageV3Table.Attachments;
-import org.apache.james.mailbox.exception.MailboxException;
import org.apache.james.mailbox.model.AttachmentId;
import org.apache.james.mailbox.model.ByteContent;
import org.apache.james.mailbox.model.Cid;
@@ -99,6 +98,7 @@ import com.google.common.io.ByteSource;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
import reactor.util.function.Tuple2;
public class CassandraMessageDAOV3 {
@@ -178,7 +178,7 @@ public class CassandraMessageDAOV3 {
.build());
}
- public Mono<Tuple2<BlobId, BlobId>> save(MailboxMessage message) throws MailboxException {
+ public Mono<Tuple2<BlobId, BlobId>> save(MailboxMessage message) {
return saveContent(message)
.flatMap(pair -> cassandraAsyncExecutor.executeVoid(boundWriteStatement(message, pair)).thenReturn(pair));
}
@@ -214,32 +214,31 @@ public class CassandraMessageDAOV3 {
}
- private Mono<Tuple2<BlobId, BlobId>> saveContent(MailboxMessage message) throws MailboxException {
- try {
- byte[] headerContent = IOUtils.toByteArray(message.getHeaderContent(), message.getHeaderOctets());
- ByteSource bodyByteSource = new ByteSource() {
- @Override
- public InputStream openStream() {
- try {
- return message.getBodyContent();
- } catch (IOException e) {
- throw new RuntimeException(e);
+ private Mono<Tuple2<BlobId, BlobId>> saveContent(MailboxMessage message) {
+ return Mono.fromCallable(() -> IOUtils.toByteArray(message.getHeaderContent(), message.getHeaderOctets()))
+ .subscribeOn(Schedulers.boundedElastic())
+ .flatMap(headerContent -> {
+ ByteSource bodyByteSource = new ByteSource() {
+ @Override
+ public InputStream openStream() {
+ try {
+ return message.getBodyContent();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
- }
- @Override
- public long size() {
- return message.getBodyOctets();
- }
- };
+ @Override
+ public long size() {
+ return message.getBodyOctets();
+ }
+ };
- Mono<BlobId> headerFuture = Mono.from(blobStore.save(blobStore.getDefaultBucketName(), headerContent, SIZE_BASED));
- Mono<BlobId> bodyFuture = Mono.from(blobStore.save(blobStore.getDefaultBucketName(), bodyByteSource, LOW_COST));
+ Mono<BlobId> headerFuture = Mono.from(blobStore.save(blobStore.getDefaultBucketName(), headerContent, SIZE_BASED));
+ Mono<BlobId> bodyFuture = Mono.from(blobStore.save(blobStore.getDefaultBucketName(), bodyByteSource, LOW_COST));
- return headerFuture.zipWith(bodyFuture);
- } catch (IOException e) {
- throw new MailboxException("Error saving mail content", e);
- }
+ return headerFuture.zipWith(bodyFuture);
+ });
}
private BoundStatement boundWriteStatement(MailboxMessage message, Tuple2<BlobId, BlobId> pair) {
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index 5f6924ae0b..04ed2cab13 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -75,7 +75,6 @@ import org.apache.james.util.streams.Limit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.github.fge.lambdas.Throwing;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Streams;
@@ -405,10 +404,8 @@ public class CassandraMessageMapper implements MessageMapper {
CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
return addUidAndModseqAndSaveDate(message, mailboxId)
- .flatMap(Throwing.function((MailboxMessage messageWithUidAndModSeq) ->
- save(mailbox, messageWithUidAndModSeq)
- .thenReturn(messageWithUidAndModSeq)))
- .map(MailboxMessage::metaData);
+ .flatMap(messageWithUidAndModSeq -> save(mailbox, messageWithUidAndModSeq)
+ .thenReturn(messageWithUidAndModSeq.metaData()));
}
private Mono<MailboxMessage> addUidAndModseqAndSaveDate(MailboxMessage message, CassandraId mailboxId) {
@@ -620,7 +617,7 @@ public class CassandraMessageMapper implements MessageMapper {
.map(MailboxMessage::metaData);
}
- private Mono<Void> save(Mailbox mailbox, MailboxMessage message) throws MailboxException {
+ private Mono<Void> save(Mailbox mailbox, MailboxMessage message) {
CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
return messageDAOV3.save(message)
.flatMap(headerAndBodyBlobIds -> insertIds(message, mailboxId, headerAndBodyBlobIds.getT1()));
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org