You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2021/05/28 01:16:19 UTC
[james-project] 04/07: [PERFORMANCE] Reactify ReferenceUpdater
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 3048a9f990e3f6f80e22b8790be87681916019bb
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Sun May 23 10:39:41 2021 +0700
[PERFORMANCE] Reactify ReferenceUpdater
---
.../james/jmap/draft/methods/ReferenceUpdater.java | 55 ++++++++++++----------
.../methods/SetMessagesCreationProcessor.java | 5 +-
.../draft/methods/SetMessagesUpdateProcessor.java | 10 ++--
3 files changed, 39 insertions(+), 31 deletions(-)
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/ReferenceUpdater.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/ReferenceUpdater.java
index 17c946a..a965e67 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/ReferenceUpdater.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/ReferenceUpdater.java
@@ -19,7 +19,6 @@
package org.apache.james.jmap.draft.methods;
-import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
@@ -34,7 +33,6 @@ import org.apache.james.mailbox.MessageManager.FlagsUpdateMode;
import org.apache.james.mailbox.exception.MailboxException;
import org.apache.james.mailbox.model.Header;
import org.apache.james.mailbox.model.Headers;
-import org.apache.james.mailbox.model.MailboxId;
import org.apache.james.mailbox.model.MessageId;
import org.apache.james.mailbox.model.MultimailboxesSearchQuery;
import org.apache.james.mailbox.model.SearchQuery;
@@ -48,6 +46,7 @@ import com.github.steveash.guavate.Guavate;
import com.google.common.collect.Iterables;
import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
public class ReferenceUpdater {
public static final String X_FORWARDED_ID_HEADER = "X-Forwarded-Message-Id";
@@ -64,45 +63,49 @@ public class ReferenceUpdater {
this.mailboxManager = mailboxManager;
}
- public void updateReferences(Headers headers, MailboxSession session) throws MailboxException {
+ public Mono<Void> updateReferences(Headers headers, MailboxSession session) throws MailboxException {
Map<String, String> headersAsMap = Iterators.toStream(headers.headers())
.collect(Guavate.toImmutableMap(Header::getName, Header::getValue));
- updateReferences(headersAsMap, session);
+ return updateReferences(headersAsMap, session);
}
- public void updateReferences(Map<String, String> headers, MailboxSession session) throws MailboxException {
+ public Mono<Void> updateReferences(Map<String, String> headers, MailboxSession session) throws MailboxException {
Optional<String> inReplyToId = Optional.ofNullable(headers.get(RFC2822Headers.IN_REPLY_TO));
Optional<String> forwardedId = Optional.ofNullable(headers.get(X_FORWARDED_ID_HEADER));
- inReplyToId.ifPresent(Throwing.consumer((String id) -> updateAnswered(id, session)).sneakyThrow());
- forwardedId.ifPresent(Throwing.consumer((String id) -> updateForwarded(id, session)).sneakyThrow());
+
+ return inReplyToId.map(Throwing.function((String id) -> updateAnswered(id, session)).sneakyThrow()).orElse(Mono.empty())
+ .then(forwardedId.map((Throwing.function((String id) -> updateForwarded(id, session)).sneakyThrow())).orElse(Mono.empty()));
}
- private void updateAnswered(String messageId, MailboxSession session) throws MailboxException {
- updateFlag(messageId, session, new Flags(Flags.Flag.ANSWERED));
+ private Mono<Void> updateAnswered(String messageId, MailboxSession session) throws MailboxException {
+ return updateFlag(messageId, session, new Flags(Flags.Flag.ANSWERED));
}
- private void updateForwarded(String messageId, MailboxSession session) throws MailboxException {
- updateFlag(messageId, session, FORWARDED_FLAG);
+ private Mono<Void> updateForwarded(String messageId, MailboxSession session) throws MailboxException {
+ return updateFlag(messageId, session, FORWARDED_FLAG);
}
- private void updateFlag(String messageId, MailboxSession session, Flags flag) throws MailboxException {
+ private Mono<Void> updateFlag(String messageId, MailboxSession session, Flags flag) throws MailboxException {
int limit = 2;
MultimailboxesSearchQuery searchByRFC822MessageId = MultimailboxesSearchQuery
.from(SearchQuery.of(SearchQuery.mimeMessageID(messageId)))
.build();
- List<MessageId> references = Flux.from(mailboxManager.search(searchByRFC822MessageId, session, limit))
- .collectList().block();
- try {
- MessageId reference = Iterables.getOnlyElement(references);
- List<MailboxId> mailboxIds = Flux.from(messageIdManager.messageMetadata(reference, session))
- .map(metaData -> metaData.getComposedMessageId().getMailboxId())
- .collect(Guavate.toImmutableList())
- .block();
- messageIdManager.setFlags(flag, FlagsUpdateMode.ADD, reference, mailboxIds, session);
- } catch (NoSuchElementException e) {
- logger.info("Unable to find a message with this Mime Message Id: " + messageId);
- } catch (IllegalArgumentException e) {
- logger.info("Too many messages are matching this Mime Message Id: " + messageId);
- }
+ return Flux.from(mailboxManager.search(searchByRFC822MessageId, session, limit))
+ .collectList()
+ .flatMap(references -> {
+ MessageId reference = Iterables.getOnlyElement(references);
+ return Flux.from(messageIdManager.messageMetadata(reference, session))
+ .map(metaData -> metaData.getComposedMessageId().getMailboxId())
+ .collect(Guavate.toImmutableList())
+ .flatMap(mailboxIds -> Mono.from(messageIdManager.setFlagsReactive(flag, FlagsUpdateMode.ADD, reference, mailboxIds, session)));
+ })
+ .onErrorResume(NoSuchElementException.class, e -> {
+ logger.info("Unable to find a message with this Mime Message Id: " + messageId);
+ return Mono.empty();
+ })
+ .onErrorResume(IllegalArgumentException.class, e -> {
+ logger.info("Too many messages are matching this Mime Message Id: " + messageId);
+ return Mono.empty();
+ });
}
}
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesCreationProcessor.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesCreationProcessor.java
index 1c505fd..fc1ca2b 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesCreationProcessor.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesCreationProcessor.java
@@ -293,8 +293,9 @@ public class SetMessagesCreationProcessor implements SetMessagesProcessor {
MetaDataWithContent newMessage = messageAppender.appendMessageInMailboxes(entry, toMailboxIds(entry), session);
MessageFullView jmapMessage = messageFullViewFactory.fromMetaDataWithContent(newMessage).block();
Envelope envelope = EnvelopeUtils.fromMessage(jmapMessage);
- messageSender.sendMessage(newMessage, envelope, session).block();
- referenceUpdater.updateReferences(entry.getValue().getHeaders(), session);
+ messageSender.sendMessage(newMessage, envelope, session)
+ .then(referenceUpdater.updateReferences(entry.getValue().getHeaders(), session))
+ .block();
return new ValueWithId.MessageWithId(entry.getCreationId(), jmapMessage);
}
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessor.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessor.java
index 2cc67b1..f92e91c 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessor.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessor.java
@@ -36,6 +36,7 @@ import javax.mail.MessagingException;
import javax.mail.Session;
import javax.mail.internet.MimeMessage;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.james.core.Username;
import org.apache.james.jmap.draft.exceptions.InvalidOutboxMoveException;
import org.apache.james.jmap.draft.model.Keyword;
@@ -70,6 +71,7 @@ import org.apache.james.util.StreamUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.github.fge.lambdas.Throwing;
import com.github.steveash.guavate.Guavate;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
@@ -333,10 +335,12 @@ public class SetMessagesUpdateProcessor implements SetMessagesProcessor {
.asOptional()
.map(Username::fromMailAddress);
assertUserCanSendFrom(mailboxSession.getUser(), fromUser);
- messageSender.sendMessage(messageId, mail, mailboxSession).block();
- referenceUpdater.updateReferences(messageToSend.getHeaders(), mailboxSession);
- return SetMessagesResponse.builder();
+ return Pair.of(messageToSend, mail);
}).subscribeOn(Schedulers.elastic()))
+ .flatMap(Throwing.<Pair<MessageResult, MailImpl>, Mono<SetMessagesResponse.Builder>>function(
+ pair -> messageSender.sendMessage(messageId, pair.getRight(), mailboxSession)
+ .then(referenceUpdater.updateReferences(pair.getKey().getHeaders(), mailboxSession))
+ .thenReturn(SetMessagesResponse.builder())).sneakyThrow())
.switchIfEmpty(Mono.just(addMessageIdNotFoundToResponse(messageId)));
}
return Mono.just(SetMessagesResponse.builder());
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org