You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2021/05/28 01:16:18 UTC
[james-project] 03/07: [PERFORMANCE] Reactify MessageSender
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit ee180cef59703678caec8a2db8a8b6594b9fc84e
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Sun May 23 10:27:34 2021 +0700
[PERFORMANCE] Reactify MessageSender
---
.../james/jmap/draft/methods/MessageSender.java | 21 ++++++++-------------
.../james/jmap/draft/methods/SendMDNProcessor.java | 4 ++--
.../draft/methods/SetMessagesCreationProcessor.java | 2 +-
.../draft/methods/SetMessagesUpdateProcessor.java | 2 +-
.../org/apache/james/jmap/draft/send/MailSpool.java | 7 ++++---
.../apache/james/jmap/draft/send/MailSpoolTest.java | 4 ++--
6 files changed, 18 insertions(+), 22 deletions(-)
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/MessageSender.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/MessageSender.java
index c0505a3..1903c75 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/MessageSender.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/MessageSender.java
@@ -37,6 +37,8 @@ import org.apache.mailet.Mail;
import com.google.common.annotations.VisibleForTesting;
+import reactor.core.publisher.Mono;
+
public class MessageSender {
public static class MessageMimeMessageSource extends MimeMessageSource {
private final String id;
@@ -70,15 +72,10 @@ public class MessageSender {
this.mailSpool = mailSpool;
}
- public void sendMessage(MetaDataWithContent message,
- Envelope envelope,
- MailboxSession session) throws MessagingException {
- Mail mail = buildMail(message, envelope);
- try {
- sendMessage(message.getMessageId(), mail, session);
- } finally {
- LifecycleUtil.dispose(mail);
- }
+ public Mono<Void> sendMessage(MetaDataWithContent message, Envelope envelope, MailboxSession session) {
+ return Mono.usingWhen(Mono.fromCallable(() -> buildMail(message, envelope)),
+ mail -> sendMessage(message.getMessageId(), mail, session),
+ mail -> Mono.fromRunnable(() -> LifecycleUtil.dispose(mail)));
}
@VisibleForTesting
@@ -93,10 +90,8 @@ public class MessageSender {
return mail;
}
- public void sendMessage(MessageId messageId,
- Mail mail,
- MailboxSession session) throws MessagingException {
+ public Mono<Void> sendMessage(MessageId messageId, Mail mail, MailboxSession session) {
MailMetadata metadata = new MailMetadata(messageId, session.getUser().asString());
- mailSpool.send(mail, metadata);
+ return mailSpool.send(mail, metadata);
}
}
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SendMDNProcessor.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SendMDNProcessor.java
index a390807..f1e450e 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SendMDNProcessor.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SendMDNProcessor.java
@@ -147,8 +147,8 @@ public class SendMDNProcessor implements SetMessagesProcessor {
MetaDataWithContent metaDataWithContent = messageAppender.appendMessageInMailbox(mdnAnswer,
getOutbox(mailboxSession), seen, mailboxSession);
- messageSender.sendMessage(metaDataWithContent,
- Envelope.fromMime4JMessage(mdnAnswer), mailboxSession);
+ messageSender.sendMessage(metaDataWithContent, Envelope.fromMime4JMessage(mdnAnswer), mailboxSession)
+ .block();
return metaDataWithContent.getMessageId();
}
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesCreationProcessor.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesCreationProcessor.java
index ece4c20..1c505fd 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesCreationProcessor.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesCreationProcessor.java
@@ -293,7 +293,7 @@ public class SetMessagesCreationProcessor implements SetMessagesProcessor {
MetaDataWithContent newMessage = messageAppender.appendMessageInMailboxes(entry, toMailboxIds(entry), session);
MessageFullView jmapMessage = messageFullViewFactory.fromMetaDataWithContent(newMessage).block();
Envelope envelope = EnvelopeUtils.fromMessage(jmapMessage);
- messageSender.sendMessage(newMessage, envelope, session);
+ messageSender.sendMessage(newMessage, envelope, session).block();
referenceUpdater.updateReferences(entry.getValue().getHeaders(), session);
return new ValueWithId.MessageWithId(entry.getCreationId(), jmapMessage);
}
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessor.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessor.java
index b550e32..2cc67b1 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessor.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessor.java
@@ -333,7 +333,7 @@ public class SetMessagesUpdateProcessor implements SetMessagesProcessor {
.asOptional()
.map(Username::fromMailAddress);
assertUserCanSendFrom(mailboxSession.getUser(), fromUser);
- messageSender.sendMessage(messageId, mail, mailboxSession);
+ messageSender.sendMessage(messageId, mail, mailboxSession).block();
referenceUpdater.updateReferences(messageToSend.getHeaders(), mailboxSession);
return SetMessagesResponse.builder();
}).subscribeOn(Schedulers.elastic()))
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/MailSpool.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/MailSpool.java
index afda197..762f45e 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/MailSpool.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/MailSpool.java
@@ -27,7 +27,6 @@ import javax.inject.Inject;
import org.apache.james.lifecycle.api.Disposable;
import org.apache.james.lifecycle.api.Startable;
import org.apache.james.queue.api.MailQueue;
-import org.apache.james.queue.api.MailQueue.MailQueueException;
import org.apache.james.queue.api.MailQueueFactory;
import org.apache.mailet.Attribute;
import org.apache.mailet.AttributeValue;
@@ -37,6 +36,8 @@ import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
+import reactor.core.publisher.Mono;
+
public class MailSpool implements Startable, Disposable {
private static final Logger LOGGER = LoggerFactory.getLogger(MailSpool.class);
@@ -62,9 +63,9 @@ public class MailSpool implements Startable, Disposable {
}
}
- public void send(Mail mail, MailMetadata metadata) throws MailQueueException {
+ public Mono<Void> send(Mail mail, MailMetadata metadata) {
mail.setAttribute(new Attribute(MailMetadata.MAIL_METADATA_MESSAGE_ID_ATTRIBUTE, AttributeValue.of(metadata.getMessageId().serialize())));
mail.setAttribute(new Attribute(MailMetadata.MAIL_METADATA_USERNAME_ATTRIBUTE, AttributeValue.of(metadata.getUsername())));
- queue.enQueue(mail);
+ return Mono.from(queue.enqueueReactive(mail));
}
}
diff --git a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/send/MailSpoolTest.java b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/send/MailSpoolTest.java
index 66e6576..34d05d0 100644
--- a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/send/MailSpoolTest.java
+++ b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/send/MailSpoolTest.java
@@ -63,7 +63,7 @@ public class MailSpoolTest {
.mimeMessage(MimeMessageUtil.mimeMessageFromBytes("header: value\r\n".getBytes(UTF_8)))
.build();
- mailSpool.send(mail, new MailMetadata(MESSAGE_ID, USERNAME));
+ mailSpool.send(mail, new MailMetadata(MESSAGE_ID, USERNAME)).block();
MailQueueItem actual = Flux.from(myQueue.deQueue()).blockFirst();
assertThat(actual.getMail().getName()).isEqualTo(NAME);
@@ -76,7 +76,7 @@ public class MailSpoolTest {
.mimeMessage(MimeMessageUtil.mimeMessageFromBytes("header: value\r\n".getBytes(UTF_8)))
.build();
- mailSpool.send(mail, new MailMetadata(MESSAGE_ID, USERNAME));
+ mailSpool.send(mail, new MailMetadata(MESSAGE_ID, USERNAME)).block();
MailQueueItem actual = Flux.from(myQueue.deQueue()).blockFirst();
assertThat(actual.getMail().getAttribute(MailMetadata.MAIL_METADATA_USERNAME_ATTRIBUTE))
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org