You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2021/05/28 01:16:18 UTC

[james-project] 03/07: [PERFORMANCE] Reactify MessageSender

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit ee180cef59703678caec8a2db8a8b6594b9fc84e
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Sun May 23 10:27:34 2021 +0700

    [PERFORMANCE] Reactify MessageSender
---
 .../james/jmap/draft/methods/MessageSender.java     | 21 ++++++++-------------
 .../james/jmap/draft/methods/SendMDNProcessor.java  |  4 ++--
 .../draft/methods/SetMessagesCreationProcessor.java |  2 +-
 .../draft/methods/SetMessagesUpdateProcessor.java   |  2 +-
 .../org/apache/james/jmap/draft/send/MailSpool.java |  7 ++++---
 .../apache/james/jmap/draft/send/MailSpoolTest.java |  4 ++--
 6 files changed, 18 insertions(+), 22 deletions(-)

diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/MessageSender.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/MessageSender.java
index c0505a3..1903c75 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/MessageSender.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/MessageSender.java
@@ -37,6 +37,8 @@ import org.apache.mailet.Mail;
 
 import com.google.common.annotations.VisibleForTesting;
 
+import reactor.core.publisher.Mono;
+
 public class MessageSender {
     public static class MessageMimeMessageSource extends MimeMessageSource {
         private final String id;
@@ -70,15 +72,10 @@ public class MessageSender {
         this.mailSpool = mailSpool;
     }
 
-    public void sendMessage(MetaDataWithContent message,
-                            Envelope envelope,
-                            MailboxSession session) throws MessagingException {
-        Mail mail = buildMail(message, envelope);
-        try {
-            sendMessage(message.getMessageId(), mail, session);
-        } finally {
-            LifecycleUtil.dispose(mail);
-        }
+    public Mono<Void> sendMessage(MetaDataWithContent message, Envelope envelope, MailboxSession session) {
+        return Mono.usingWhen(Mono.fromCallable(() -> buildMail(message, envelope)),
+            mail -> sendMessage(message.getMessageId(), mail, session),
+            mail -> Mono.fromRunnable(() -> LifecycleUtil.dispose(mail)));
     }
 
     @VisibleForTesting
@@ -93,10 +90,8 @@ public class MessageSender {
         return mail;
     }
 
-    public void sendMessage(MessageId messageId,
-                            Mail mail,
-                            MailboxSession session) throws MessagingException {
+    public Mono<Void> sendMessage(MessageId messageId, Mail mail, MailboxSession session) {
         MailMetadata metadata = new MailMetadata(messageId, session.getUser().asString());
-        mailSpool.send(mail, metadata);
+        return mailSpool.send(mail, metadata);
     }
 }
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SendMDNProcessor.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SendMDNProcessor.java
index a390807..f1e450e 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SendMDNProcessor.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SendMDNProcessor.java
@@ -147,8 +147,8 @@ public class SendMDNProcessor implements SetMessagesProcessor {
         MetaDataWithContent metaDataWithContent = messageAppender.appendMessageInMailbox(mdnAnswer,
             getOutbox(mailboxSession), seen, mailboxSession);
 
-        messageSender.sendMessage(metaDataWithContent,
-            Envelope.fromMime4JMessage(mdnAnswer), mailboxSession);
+        messageSender.sendMessage(metaDataWithContent, Envelope.fromMime4JMessage(mdnAnswer), mailboxSession)
+            .block();
 
         return metaDataWithContent.getMessageId();
     }
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesCreationProcessor.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesCreationProcessor.java
index ece4c20..1c505fd 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesCreationProcessor.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesCreationProcessor.java
@@ -293,7 +293,7 @@ public class SetMessagesCreationProcessor implements SetMessagesProcessor {
         MetaDataWithContent newMessage = messageAppender.appendMessageInMailboxes(entry, toMailboxIds(entry), session);
         MessageFullView jmapMessage = messageFullViewFactory.fromMetaDataWithContent(newMessage).block();
         Envelope envelope = EnvelopeUtils.fromMessage(jmapMessage);
-        messageSender.sendMessage(newMessage, envelope, session);
+        messageSender.sendMessage(newMessage, envelope, session).block();
         referenceUpdater.updateReferences(entry.getValue().getHeaders(), session);
         return new ValueWithId.MessageWithId(entry.getCreationId(), jmapMessage);
     }
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessor.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessor.java
index b550e32..2cc67b1 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessor.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessor.java
@@ -333,7 +333,7 @@ public class SetMessagesUpdateProcessor implements SetMessagesProcessor {
                         .asOptional()
                         .map(Username::fromMailAddress);
                     assertUserCanSendFrom(mailboxSession.getUser(), fromUser);
-                    messageSender.sendMessage(messageId, mail, mailboxSession);
+                    messageSender.sendMessage(messageId, mail, mailboxSession).block();
                     referenceUpdater.updateReferences(messageToSend.getHeaders(), mailboxSession);
                     return SetMessagesResponse.builder();
                 }).subscribeOn(Schedulers.elastic()))
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/MailSpool.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/MailSpool.java
index afda197..762f45e 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/MailSpool.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/MailSpool.java
@@ -27,7 +27,6 @@ import javax.inject.Inject;
 import org.apache.james.lifecycle.api.Disposable;
 import org.apache.james.lifecycle.api.Startable;
 import org.apache.james.queue.api.MailQueue;
-import org.apache.james.queue.api.MailQueue.MailQueueException;
 import org.apache.james.queue.api.MailQueueFactory;
 import org.apache.mailet.Attribute;
 import org.apache.mailet.AttributeValue;
@@ -37,6 +36,8 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 
+import reactor.core.publisher.Mono;
+
 public class MailSpool implements Startable, Disposable {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(MailSpool.class);
@@ -62,9 +63,9 @@ public class MailSpool implements Startable, Disposable {
         }
     }
 
-    public void send(Mail mail, MailMetadata metadata) throws MailQueueException {
+    public Mono<Void> send(Mail mail, MailMetadata metadata) {
         mail.setAttribute(new Attribute(MailMetadata.MAIL_METADATA_MESSAGE_ID_ATTRIBUTE, AttributeValue.of(metadata.getMessageId().serialize())));
         mail.setAttribute(new Attribute(MailMetadata.MAIL_METADATA_USERNAME_ATTRIBUTE, AttributeValue.of(metadata.getUsername())));
-        queue.enQueue(mail);
+        return Mono.from(queue.enqueueReactive(mail));
     }
 }
diff --git a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/send/MailSpoolTest.java b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/send/MailSpoolTest.java
index 66e6576..34d05d0 100644
--- a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/send/MailSpoolTest.java
+++ b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/send/MailSpoolTest.java
@@ -63,7 +63,7 @@ public class MailSpoolTest {
             .mimeMessage(MimeMessageUtil.mimeMessageFromBytes("header: value\r\n".getBytes(UTF_8)))
             .build();
 
-        mailSpool.send(mail, new MailMetadata(MESSAGE_ID, USERNAME));
+        mailSpool.send(mail, new MailMetadata(MESSAGE_ID, USERNAME)).block();
 
         MailQueueItem actual = Flux.from(myQueue.deQueue()).blockFirst();
         assertThat(actual.getMail().getName()).isEqualTo(NAME);
@@ -76,7 +76,7 @@ public class MailSpoolTest {
             .mimeMessage(MimeMessageUtil.mimeMessageFromBytes("header: value\r\n".getBytes(UTF_8)))
             .build();
 
-        mailSpool.send(mail, new MailMetadata(MESSAGE_ID, USERNAME));
+        mailSpool.send(mail, new MailMetadata(MESSAGE_ID, USERNAME)).block();
 
         MailQueueItem actual = Flux.from(myQueue.deQueue()).blockFirst();
         assertThat(actual.getMail().getAttribute(MailMetadata.MAIL_METADATA_USERNAME_ATTRIBUTE))

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org