You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2021/05/08 00:40:06 UTC

[james-project] 06/08: JAMES-3575 Reactive single message move for JMAP RFC-8621

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit bde8a850b1536d992b296d51a855f3e6e5bb73e2
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Sat May 1 22:48:36 2021 +0700

    JAMES-3575 Reactive single message move for JMAP RFC-8621
---
 .../org/apache/james/mailbox/MessageIdManager.java |  2 +
 .../james/mailbox/store/StoreMessageIdManager.java | 44 ++++++++++++----------
 .../jmap/method/EmailSetUpdatePerformer.scala      |  4 +-
 3 files changed, 28 insertions(+), 22 deletions(-)

diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java
index abb0f8f..218819a 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java
@@ -70,6 +70,8 @@ public interface MessageIdManager {
 
     void setInMailboxes(MessageId messageId, Collection<MailboxId> mailboxIds, MailboxSession mailboxSession) throws MailboxException;
 
+    Publisher<Void> setInMailboxesReactive(MessageId messageId, Collection<MailboxId> mailboxIds, MailboxSession mailboxSession);
+
     default List<MessageResult> getMessage(MessageId messageId, FetchGroup fetchGroup, MailboxSession mailboxSession) throws MailboxException {
         return getMessages(ImmutableList.of(messageId), fetchGroup, mailboxSession);
     }
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
index 7dfc8ae..a7fa33b 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
@@ -286,25 +286,29 @@ public class StoreMessageIdManager implements MessageIdManager {
 
     @Override
     public void setInMailboxes(MessageId messageId, Collection<MailboxId> targetMailboxIds, MailboxSession mailboxSession) throws MailboxException {
-        List<MailboxMessage> currentMailboxMessages = findRelatedMailboxMessages(messageId, mailboxSession);
-
-        MailboxReactorUtils.block(messageMovesWithMailbox(MessageMoves.builder()
-            .targetMailboxIds(targetMailboxIds)
-            .previousMailboxIds(toMailboxIds(currentMailboxMessages))
-            .build(), mailboxSession)
-            .flatMap(Throwing.<MessageMovesWithMailbox, Mono<Void>>function(messageMove -> {
-                MessageMovesWithMailbox refined = messageMove.filterPrevious(hasRightsOnMailbox(mailboxSession, Right.Read));
+        MailboxReactorUtils.block(setInMailboxesReactive(messageId, targetMailboxIds, mailboxSession)
+            .subscribeOn(Schedulers.elastic()));
+    }
 
-                if (messageMove.getPreviousMailboxes().isEmpty()) {
-                    LOGGER.info("Tried to access {} not accessible for {}", messageId, mailboxSession.getUser().asString());
+    @Override
+    public Mono<Void> setInMailboxesReactive(MessageId messageId, Collection<MailboxId> targetMailboxIds, MailboxSession mailboxSession) {
+        return findRelatedMailboxMessages(messageId, mailboxSession)
+            .flatMap(currentMailboxMessages -> messageMovesWithMailbox(MessageMoves.builder()
+                .targetMailboxIds(targetMailboxIds)
+                .previousMailboxIds(toMailboxIds(currentMailboxMessages))
+                .build(), mailboxSession)
+                .flatMap(Throwing.<MessageMovesWithMailbox, Mono<Void>>function(messageMove -> {
+                    MessageMovesWithMailbox refined = messageMove.filterPrevious(hasRightsOnMailbox(mailboxSession, Right.Read));
+
+                    if (messageMove.getPreviousMailboxes().isEmpty()) {
+                        LOGGER.info("Tried to access {} not accessible for {}", messageId, mailboxSession.getUser().asString());
+                        return Mono.empty();
+                    }
+                    if (refined.isChange()) {
+                        return applyMessageMoves(mailboxSession, currentMailboxMessages, refined);
+                    }
                     return Mono.empty();
-                }
-                if (refined.isChange()) {
-                    return applyMessageMoves(mailboxSession, currentMailboxMessages, refined);
-                }
-                return Mono.empty();
-            }).sneakyThrow())
-            .subscribeOn(Schedulers.elastic()));
+                }).sneakyThrow()));
     }
 
     public void setInMailboxesNoCheck(MessageId messageId, MailboxId targetMailboxId, MailboxSession mailboxSession) throws MailboxException {
@@ -325,11 +329,11 @@ public class StoreMessageIdManager implements MessageIdManager {
             .subscribeOn(Schedulers.elastic()));
     }
 
-    private List<MailboxMessage> findRelatedMailboxMessages(MessageId messageId, MailboxSession mailboxSession) throws MailboxException {
+    private Mono<List<MailboxMessage>> findRelatedMailboxMessages(MessageId messageId, MailboxSession mailboxSession) {
         MessageIdMapper messageIdMapper = mailboxSessionMapperFactory.getMessageIdMapper(mailboxSession);
 
-        return MailboxReactorUtils.block(messageIdMapper.findReactive(ImmutableList.of(messageId), MessageMapper.FetchType.Metadata)
-            .collect(Guavate.toImmutableList()));
+        return messageIdMapper.findReactive(ImmutableList.of(messageId), MessageMapper.FetchType.Metadata)
+            .collect(Guavate.toImmutableList());
     }
 
     private Mono<Void> applyMessageMoves(MailboxSession mailboxSession, List<MailboxMessage> currentMailboxMessages, MessageMovesWithMailbox messageMoves) throws MailboxNotFoundException {
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetUpdatePerformer.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetUpdatePerformer.scala
index c69517f..409ecaf 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetUpdatePerformer.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetUpdatePerformer.scala
@@ -219,11 +219,11 @@ class EmailSetUpdatePerformer @Inject() (serializer: EmailSetSerializer,
     if (targetIds.equals(mailboxIds)) {
       SMono.just[EmailUpdateResult](EmailUpdateSuccess(messageId))
     } else {
-      SMono.fromCallable(() => messageIdManager.setInMailboxes(messageId, targetIds.value.asJava, session))
-        .subscribeOn(Schedulers.elastic())
+      SMono(messageIdManager.setInMailboxesReactive(messageId, targetIds.value.asJava, session))
         .`then`(SMono.just[EmailUpdateResult](EmailUpdateSuccess(messageId)))
         .onErrorResume(e => SMono.just[EmailUpdateResult](EmailUpdateFailure(EmailSet.asUnparsed(messageId), e)))
         .switchIfEmpty(SMono.just[EmailUpdateResult](EmailUpdateSuccess(messageId)))
+        .subscribeOn(Schedulers.elastic())
     }
   }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org