You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2021/05/08 00:40:05 UTC
[james-project] 05/08: JAMES-3575 Reactive single flag update for
JMAP RFC-8621
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 42fc513c5be8547d139b85210268b52f5941dfa2
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Sat May 1 22:38:48 2021 +0700
JAMES-3575 Reactive single flag update for JMAP RFC-8621
---
.../main/java/org/apache/james/mailbox/MessageIdManager.java | 2 ++
.../org/apache/james/mailbox/store/StoreMessageIdManager.java | 10 +++++++---
.../org/apache/james/jmap/method/EmailSetUpdatePerformer.scala | 5 ++---
3 files changed, 11 insertions(+), 6 deletions(-)
diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java
index 6fef5d4..abb0f8f 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java
@@ -52,6 +52,8 @@ public interface MessageIdManager {
void setFlags(Flags newState, FlagsUpdateMode replace, MessageId messageId, List<MailboxId> mailboxIds, MailboxSession mailboxSession) throws MailboxException;
+ Publisher<Void> setFlagsReactive(Flags newState, FlagsUpdateMode replace, MessageId messageId, List<MailboxId> mailboxIds, MailboxSession mailboxSession) throws MailboxException;
+
List<MessageResult> getMessages(Collection<MessageId> messageIds, FetchGroup minimal, MailboxSession mailboxSession) throws MailboxException;
default Publisher<MessageResult> getMessagesReactive(Collection<MessageId> messageIds, FetchGroup minimal, MailboxSession mailboxSession) {
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
index 5031573..7dfc8ae 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
@@ -120,12 +120,17 @@ public class StoreMessageIdManager implements MessageIdManager {
@Override
public void setFlags(Flags newState, MessageManager.FlagsUpdateMode replace, MessageId messageId, List<MailboxId> mailboxIds, MailboxSession mailboxSession) throws MailboxException {
+ MailboxReactorUtils.block(setFlagsReactive(newState, replace, messageId, mailboxIds, mailboxSession));
+ }
+
+ @Override
+ public Mono<Void> setFlagsReactive(Flags newState, MessageManager.FlagsUpdateMode replace, MessageId messageId, List<MailboxId> mailboxIds, MailboxSession mailboxSession) {
MessageIdMapper messageIdMapper = mailboxSessionMapperFactory.getMessageIdMapper(mailboxSession);
MailboxMapper mailboxMapper = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession);
int concurrency = 4;
- MailboxReactorUtils.block(Flux.fromIterable(mailboxIds)
+ return Flux.fromIterable(mailboxIds)
.flatMap(mailboxMapper::findMailboxById, concurrency)
.collect(Guavate.toImmutableList())
.flatMap(Throwing.<List<Mailbox>, Mono<Void>>function(targetMailboxes -> {
@@ -135,8 +140,7 @@ public class StoreMessageIdManager implements MessageIdManager {
.flatMapIterable(updatedFlags -> updatedFlags.asMap().entrySet())
.concatMap(entry -> dispatchFlagsChange(mailboxSession, entry.getKey(), ImmutableList.copyOf(entry.getValue()), targetMailboxes))
.then();
- }).sneakyThrow())
- .subscribeOn(Schedulers.elastic()));
+ }).sneakyThrow());
}
@Override
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetUpdatePerformer.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetUpdatePerformer.scala
index 3e42e69..c69517f 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetUpdatePerformer.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetUpdatePerformer.scala
@@ -235,10 +235,9 @@ class EmailSetUpdatePerformer @Inject() (serializer: EmailSetSerializer,
if (newFlags.equals(originalFlags)) {
SMono.just[EmailUpdateResult](EmailUpdateSuccess(messageId))
} else {
- SMono.fromCallable(() =>
- messageIdManager.setFlags(newFlags, FlagsUpdateMode.REPLACE, messageId, ImmutableList.copyOf(mailboxIds.value.asJavaCollection), session))
- .subscribeOn(Schedulers.elastic())
+ SMono(messageIdManager.setFlagsReactive(newFlags, FlagsUpdateMode.REPLACE, messageId, ImmutableList.copyOf(mailboxIds.value.asJavaCollection), session))
.`then`(SMono.just[EmailUpdateResult](EmailUpdateSuccess(messageId)))
+ .subscribeOn(Schedulers.elastic())
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org