You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2021/05/28 01:16:19 UTC

[james-project] 04/07: [PERFORMANCE] Reactify ReferenceUpdater

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 3048a9f990e3f6f80e22b8790be87681916019bb
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Sun May 23 10:39:41 2021 +0700

    [PERFORMANCE] Reactify ReferenceUpdater
---
 .../james/jmap/draft/methods/ReferenceUpdater.java | 55 ++++++++++++----------
 .../methods/SetMessagesCreationProcessor.java      |  5 +-
 .../draft/methods/SetMessagesUpdateProcessor.java  | 10 ++--
 3 files changed, 39 insertions(+), 31 deletions(-)

diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/ReferenceUpdater.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/ReferenceUpdater.java
index 17c946a..a965e67 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/ReferenceUpdater.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/ReferenceUpdater.java
@@ -19,7 +19,6 @@
 
 package org.apache.james.jmap.draft.methods;
 
-import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Optional;
@@ -34,7 +33,6 @@ import org.apache.james.mailbox.MessageManager.FlagsUpdateMode;
 import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.model.Header;
 import org.apache.james.mailbox.model.Headers;
-import org.apache.james.mailbox.model.MailboxId;
 import org.apache.james.mailbox.model.MessageId;
 import org.apache.james.mailbox.model.MultimailboxesSearchQuery;
 import org.apache.james.mailbox.model.SearchQuery;
@@ -48,6 +46,7 @@ import com.github.steveash.guavate.Guavate;
 import com.google.common.collect.Iterables;
 
 import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
 public class ReferenceUpdater {
     public static final String X_FORWARDED_ID_HEADER = "X-Forwarded-Message-Id";
@@ -64,45 +63,49 @@ public class ReferenceUpdater {
         this.mailboxManager = mailboxManager;
     }
 
-    public void updateReferences(Headers headers, MailboxSession session) throws MailboxException {
+    public Mono<Void> updateReferences(Headers headers, MailboxSession session) throws MailboxException {
         Map<String, String> headersAsMap = Iterators.toStream(headers.headers())
             .collect(Guavate.toImmutableMap(Header::getName, Header::getValue));
-        updateReferences(headersAsMap, session);
+        return updateReferences(headersAsMap, session);
     }
 
-    public void updateReferences(Map<String, String> headers, MailboxSession session) throws MailboxException {
+    public Mono<Void> updateReferences(Map<String, String> headers, MailboxSession session) throws MailboxException {
         Optional<String> inReplyToId = Optional.ofNullable(headers.get(RFC2822Headers.IN_REPLY_TO));
         Optional<String> forwardedId = Optional.ofNullable(headers.get(X_FORWARDED_ID_HEADER));
-        inReplyToId.ifPresent(Throwing.consumer((String id) -> updateAnswered(id, session)).sneakyThrow());
-        forwardedId.ifPresent(Throwing.consumer((String id) -> updateForwarded(id, session)).sneakyThrow());
+
+        return inReplyToId.map(Throwing.function((String id) -> updateAnswered(id, session)).sneakyThrow()).orElse(Mono.empty())
+            .then(forwardedId.map((Throwing.function((String id) -> updateForwarded(id, session)).sneakyThrow())).orElse(Mono.empty()));
     }
 
-    private void updateAnswered(String messageId, MailboxSession session) throws MailboxException {
-        updateFlag(messageId, session, new Flags(Flags.Flag.ANSWERED));
+    private Mono<Void> updateAnswered(String messageId, MailboxSession session) throws MailboxException {
+        return updateFlag(messageId, session, new Flags(Flags.Flag.ANSWERED));
     }
 
-    private void updateForwarded(String messageId, MailboxSession session) throws MailboxException {
-        updateFlag(messageId, session, FORWARDED_FLAG);
+    private Mono<Void> updateForwarded(String messageId, MailboxSession session) throws MailboxException {
+        return updateFlag(messageId, session, FORWARDED_FLAG);
     }
 
-    private void updateFlag(String messageId, MailboxSession session, Flags flag) throws MailboxException {
+    private Mono<Void> updateFlag(String messageId, MailboxSession session, Flags flag) throws MailboxException {
         int limit = 2;
         MultimailboxesSearchQuery searchByRFC822MessageId = MultimailboxesSearchQuery
             .from(SearchQuery.of(SearchQuery.mimeMessageID(messageId)))
             .build();
-        List<MessageId> references = Flux.from(mailboxManager.search(searchByRFC822MessageId, session, limit))
-            .collectList().block();
-        try {
-            MessageId reference = Iterables.getOnlyElement(references);
-            List<MailboxId> mailboxIds = Flux.from(messageIdManager.messageMetadata(reference, session))
-                .map(metaData -> metaData.getComposedMessageId().getMailboxId())
-                .collect(Guavate.toImmutableList())
-                .block();
-            messageIdManager.setFlags(flag, FlagsUpdateMode.ADD, reference, mailboxIds, session);
-        } catch (NoSuchElementException e) {
-            logger.info("Unable to find a message with this Mime Message Id: " + messageId);
-        } catch (IllegalArgumentException e) {
-            logger.info("Too many messages are matching this Mime Message Id: " + messageId);
-        }
+        return Flux.from(mailboxManager.search(searchByRFC822MessageId, session, limit))
+            .collectList()
+            .flatMap(references -> {
+                MessageId reference = Iterables.getOnlyElement(references);
+                return Flux.from(messageIdManager.messageMetadata(reference, session))
+                    .map(metaData -> metaData.getComposedMessageId().getMailboxId())
+                    .collect(Guavate.toImmutableList())
+                    .flatMap(mailboxIds -> Mono.from(messageIdManager.setFlagsReactive(flag, FlagsUpdateMode.ADD, reference, mailboxIds, session)));
+            })
+            .onErrorResume(NoSuchElementException.class, e -> {
+                logger.info("Unable to find a message with this Mime Message Id: " + messageId);
+                return Mono.empty();
+            })
+            .onErrorResume(IllegalArgumentException.class, e -> {
+                logger.info("Too many messages are matching this Mime Message Id: " + messageId);
+                return Mono.empty();
+            });
     }
 }
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesCreationProcessor.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesCreationProcessor.java
index 1c505fd..fc1ca2b 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesCreationProcessor.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesCreationProcessor.java
@@ -293,8 +293,9 @@ public class SetMessagesCreationProcessor implements SetMessagesProcessor {
         MetaDataWithContent newMessage = messageAppender.appendMessageInMailboxes(entry, toMailboxIds(entry), session);
         MessageFullView jmapMessage = messageFullViewFactory.fromMetaDataWithContent(newMessage).block();
         Envelope envelope = EnvelopeUtils.fromMessage(jmapMessage);
-        messageSender.sendMessage(newMessage, envelope, session).block();
-        referenceUpdater.updateReferences(entry.getValue().getHeaders(), session);
+        messageSender.sendMessage(newMessage, envelope, session)
+            .then(referenceUpdater.updateReferences(entry.getValue().getHeaders(), session))
+            .block();
         return new ValueWithId.MessageWithId(entry.getCreationId(), jmapMessage);
     }
 
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessor.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessor.java
index 2cc67b1..f92e91c 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessor.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessor.java
@@ -36,6 +36,7 @@ import javax.mail.MessagingException;
 import javax.mail.Session;
 import javax.mail.internet.MimeMessage;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.james.core.Username;
 import org.apache.james.jmap.draft.exceptions.InvalidOutboxMoveException;
 import org.apache.james.jmap.draft.model.Keyword;
@@ -70,6 +71,7 @@ import org.apache.james.util.StreamUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.github.fge.lambdas.Throwing;
 import com.github.steveash.guavate.Guavate;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
@@ -333,10 +335,12 @@ public class SetMessagesUpdateProcessor implements SetMessagesProcessor {
                         .asOptional()
                         .map(Username::fromMailAddress);
                     assertUserCanSendFrom(mailboxSession.getUser(), fromUser);
-                    messageSender.sendMessage(messageId, mail, mailboxSession).block();
-                    referenceUpdater.updateReferences(messageToSend.getHeaders(), mailboxSession);
-                    return SetMessagesResponse.builder();
+                    return Pair.of(messageToSend, mail);
                 }).subscribeOn(Schedulers.elastic()))
+                .flatMap(Throwing.<Pair<MessageResult, MailImpl>, Mono<SetMessagesResponse.Builder>>function(
+                    pair -> messageSender.sendMessage(messageId, pair.getRight(), mailboxSession)
+                        .then(referenceUpdater.updateReferences(pair.getKey().getHeaders(), mailboxSession))
+                        .thenReturn(SetMessagesResponse.builder())).sneakyThrow())
                 .switchIfEmpty(Mono.just(addMessageIdNotFoundToResponse(messageId)));
         }
         return Mono.just(SetMessagesResponse.builder());

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org