You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2021/05/08 00:40:04 UTC
[james-project] 04/08: JAMES-3575 Rewrite
StoreMessageIdManager::setFlags in a more reactive way
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit d4647112f6e71186f8f039c95e94d3425cfc4962
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Sat May 1 22:34:04 2021 +0700
JAMES-3575 Rewrite StoreMessageIdManager::setFlags in a more reactive way
---
.../james/mailbox/store/StoreMessageIdManager.java | 32 ++++++++++------------
1 file changed, 14 insertions(+), 18 deletions(-)
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
index 6ab1ffc..5031573 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
@@ -82,7 +82,6 @@ import com.github.fge.lambdas.functions.ThrowingFunction;
import com.github.steveash.guavate.Guavate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import reactor.core.publisher.Flux;
@@ -125,20 +124,19 @@ public class StoreMessageIdManager implements MessageIdManager {
MailboxMapper mailboxMapper = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession);
int concurrency = 4;
- List<Mailbox> targetMailboxes = Flux.fromIterable(mailboxIds)
+
+ MailboxReactorUtils.block(Flux.fromIterable(mailboxIds)
.flatMap(mailboxMapper::findMailboxById, concurrency)
.collect(Guavate.toImmutableList())
- .subscribeOn(Schedulers.elastic())
- .block();
-
- assertRightsOnMailboxes(targetMailboxes, mailboxSession, Right.Write);
+ .flatMap(Throwing.<List<Mailbox>, Mono<Void>>function(targetMailboxes -> {
+ assertRightsOnMailboxes(targetMailboxes, mailboxSession, Right.Write);
- Multimap<MailboxId, UpdatedFlags> updatedFlags = messageIdMapper.setFlags(messageId, mailboxIds, newState, replace)
- .subscribeOn(Schedulers.elastic())
- .block();
- for (Map.Entry<MailboxId, Collection<UpdatedFlags>> entry : updatedFlags.asMap().entrySet()) {
- dispatchFlagsChange(mailboxSession, entry.getKey(), ImmutableList.copyOf(entry.getValue()), targetMailboxes);
- }
+ return messageIdMapper.setFlags(messageId, mailboxIds, newState, replace)
+ .flatMapIterable(updatedFlags -> updatedFlags.asMap().entrySet())
+ .concatMap(entry -> dispatchFlagsChange(mailboxSession, entry.getKey(), ImmutableList.copyOf(entry.getValue()), targetMailboxes))
+ .then();
+ }).sneakyThrow())
+ .subscribeOn(Schedulers.elastic()));
}
@Override
@@ -389,8 +387,7 @@ public class StoreMessageIdManager implements MessageIdManager {
.then());
}
- private void dispatchFlagsChange(MailboxSession mailboxSession, MailboxId mailboxId, ImmutableList<UpdatedFlags> updatedFlags,
- List<Mailbox> knownMailboxes) throws MailboxException {
+ private Mono<Void> dispatchFlagsChange(MailboxSession mailboxSession, MailboxId mailboxId, ImmutableList<UpdatedFlags> updatedFlags, List<Mailbox> knownMailboxes) {
if (updatedFlags.stream().anyMatch(UpdatedFlags::flagsChanged)) {
Mailbox mailbox = knownMailboxes.stream()
.filter(knownMailbox -> knownMailbox.getMailboxId().equals(mailboxId))
@@ -399,16 +396,15 @@ public class StoreMessageIdManager implements MessageIdManager {
.findMailboxById(mailboxId)
.subscribeOn(Schedulers.elastic())))
.sneakyThrow());
- eventBus.dispatch(EventFactory.flagsUpdated()
+ return eventBus.dispatch(EventFactory.flagsUpdated()
.randomEventId()
.mailboxSession(mailboxSession)
.mailbox(mailbox)
.updatedFlags(updatedFlags)
.build(),
- new MailboxIdRegistrationKey(mailboxId))
- .subscribeOn(Schedulers.elastic())
- .block();
+ new MailboxIdRegistrationKey(mailboxId));
}
+ return Mono.empty();
}
private void validateQuota(MessageMovesWithMailbox messageMoves, MailboxMessage mailboxMessage) throws MailboxException {
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org