You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2021/05/08 00:40:05 UTC

[james-project] 05/08: JAMES-3575 Reactive single flag update for JMAP RFC-8621

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 42fc513c5be8547d139b85210268b52f5941dfa2
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Sat May 1 22:38:48 2021 +0700

    JAMES-3575 Reactive single flag update for JMAP RFC-8621
---
 .../main/java/org/apache/james/mailbox/MessageIdManager.java   |  2 ++
 .../org/apache/james/mailbox/store/StoreMessageIdManager.java  | 10 +++++++---
 .../org/apache/james/jmap/method/EmailSetUpdatePerformer.scala |  5 ++---
 3 files changed, 11 insertions(+), 6 deletions(-)

diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java
index 6fef5d4..abb0f8f 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java
@@ -52,6 +52,8 @@ public interface MessageIdManager {
 
     void setFlags(Flags newState, FlagsUpdateMode replace, MessageId messageId, List<MailboxId> mailboxIds, MailboxSession mailboxSession) throws MailboxException;
 
+    Publisher<Void> setFlagsReactive(Flags newState, FlagsUpdateMode replace, MessageId messageId, List<MailboxId> mailboxIds, MailboxSession mailboxSession) throws MailboxException;
+
     List<MessageResult> getMessages(Collection<MessageId> messageIds, FetchGroup minimal, MailboxSession mailboxSession) throws MailboxException;
 
     default Publisher<MessageResult> getMessagesReactive(Collection<MessageId> messageIds, FetchGroup minimal, MailboxSession mailboxSession) {
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
index 5031573..7dfc8ae 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
@@ -120,12 +120,17 @@ public class StoreMessageIdManager implements MessageIdManager {
 
     @Override
     public void setFlags(Flags newState, MessageManager.FlagsUpdateMode replace, MessageId messageId, List<MailboxId> mailboxIds, MailboxSession mailboxSession) throws MailboxException {
+        MailboxReactorUtils.block(setFlagsReactive(newState, replace, messageId, mailboxIds, mailboxSession));
+    }
+
+    @Override
+    public Mono<Void> setFlagsReactive(Flags newState, MessageManager.FlagsUpdateMode replace, MessageId messageId, List<MailboxId> mailboxIds, MailboxSession mailboxSession) {
         MessageIdMapper messageIdMapper = mailboxSessionMapperFactory.getMessageIdMapper(mailboxSession);
         MailboxMapper mailboxMapper = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession);
 
         int concurrency = 4;
 
-        MailboxReactorUtils.block(Flux.fromIterable(mailboxIds)
+        return Flux.fromIterable(mailboxIds)
             .flatMap(mailboxMapper::findMailboxById, concurrency)
             .collect(Guavate.toImmutableList())
             .flatMap(Throwing.<List<Mailbox>, Mono<Void>>function(targetMailboxes -> {
@@ -135,8 +140,7 @@ public class StoreMessageIdManager implements MessageIdManager {
                     .flatMapIterable(updatedFlags -> updatedFlags.asMap().entrySet())
                     .concatMap(entry -> dispatchFlagsChange(mailboxSession, entry.getKey(), ImmutableList.copyOf(entry.getValue()), targetMailboxes))
                     .then();
-            }).sneakyThrow())
-            .subscribeOn(Schedulers.elastic()));
+            }).sneakyThrow());
     }
 
     @Override
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetUpdatePerformer.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetUpdatePerformer.scala
index 3e42e69..c69517f 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetUpdatePerformer.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetUpdatePerformer.scala
@@ -235,10 +235,9 @@ class EmailSetUpdatePerformer @Inject() (serializer: EmailSetSerializer,
     if (newFlags.equals(originalFlags)) {
       SMono.just[EmailUpdateResult](EmailUpdateSuccess(messageId))
     } else {
-      SMono.fromCallable(() =>
-        messageIdManager.setFlags(newFlags, FlagsUpdateMode.REPLACE, messageId, ImmutableList.copyOf(mailboxIds.value.asJavaCollection), session))
-        .subscribeOn(Schedulers.elastic())
+      SMono(messageIdManager.setFlagsReactive(newFlags, FlagsUpdateMode.REPLACE, messageId, ImmutableList.copyOf(mailboxIds.value.asJavaCollection), session))
         .`then`(SMono.just[EmailUpdateResult](EmailUpdateSuccess(messageId)))
+        .subscribeOn(Schedulers.elastic())
     }
   }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org