You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by rc...@apache.org on 2020/05/13 02:03:23 UTC
[james-project] 02/09: JAMES-3149 EventBus caller should choose on
which scheduler he dispatch
This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit f2fbe4f0cc378394254be7b7a7e36cda290009e6
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Mon May 4 18:25:21 2020 +0700
JAMES-3149 EventBus caller should choose on which scheduler he dispatch
This avoid enforcing the thread context switch upon dispatch,
optimizing thread usage.
---
.../apache/james/mailbox/events/GroupContract.java | 5 ++-
.../james/mailbox/events/EventDispatcher.java | 7 +---
.../james/mailbox/store/StoreMailboxManager.java | 49 ++++++++++++----------
.../james/mailbox/store/StoreMessageIdManager.java | 28 ++++++++-----
.../james/mailbox/store/StoreMessageManager.java | 28 +++++++------
.../james/mailbox/store/StoreRightManager.java | 3 ++
6 files changed, 69 insertions(+), 51 deletions(-)
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
index 83ef2cf..dfe31ac 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
@@ -56,6 +56,7 @@ import org.junit.jupiter.api.Test;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSortedMap;
+
import reactor.core.scheduler.Schedulers;
public interface GroupContract {
@@ -158,7 +159,9 @@ public interface GroupContract {
AtomicBoolean successfulRetry = new AtomicBoolean(false);
MailboxListener listener = event -> {
if (event.getEventId().equals(EVENT_ID)) {
- eventBus().dispatch(EVENT_2, NO_KEYS).block();
+ eventBus().dispatch(EVENT_2, NO_KEYS)
+ .subscribeOn(Schedulers.elastic())
+ .block();
successfulRetry.set(true);
}
};
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
index 3de1fe0..c150184 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
@@ -46,7 +46,6 @@ import com.rabbitmq.client.AMQP;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
-import reactor.core.scheduler.Schedulers;
import reactor.rabbitmq.ExchangeSpecification;
import reactor.rabbitmq.OutboundMessage;
import reactor.rabbitmq.Sender;
@@ -95,7 +94,6 @@ public class EventDispatcher {
.concat(
dispatchToLocalListeners(event, keys),
dispatchToRemoteListeners(event, keys))
- .subscribeOn(Schedulers.elastic())
.doOnError(throwable -> LOGGER.error("error while dispatching event", throwable))
.then()
.subscribeWith(MonoProcessor.create());
@@ -106,7 +104,7 @@ public class EventDispatcher {
.flatMap(key -> localListenerRegistry.getLocalMailboxListeners(key)
.map(listener -> Tuples.of(key, listener)))
.filter(pair -> pair.getT2().getExecutionMode() == MailboxListener.ExecutionMode.SYNCHRONOUS)
- .flatMap(pair -> executeListener(event, pair.getT2(), pair.getT1())).subscribeOn(Schedulers.elastic())
+ .flatMap(pair -> executeListener(event, pair.getT2(), pair.getT1()))
.then();
}
@@ -161,8 +159,7 @@ public class EventDispatcher {
if (routingKeys.isEmpty()) {
return Mono.empty();
}
- return sender.send(toMessages(serializedEvent, routingKeys))
- .subscribeOn(Schedulers.elastic());
+ return sender.send(toMessages(serializedEvent, routingKeys));
}
private Flux<OutboundMessage> toMessages(byte[] serializedEvent, Collection<RoutingKey> routingKeys) {
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
index 1f72b2c..bb68c0d 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
@@ -93,6 +93,7 @@ import com.google.common.collect.Iterables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
/**
* This base class of an {@link MailboxManager} implementation provides a high-level api for writing your own
@@ -382,6 +383,7 @@ public class StoreMailboxManager implements MailboxManager {
.mailbox(mailbox)
.build(),
new MailboxIdRegistrationKey(mailbox.getMailboxId()))
+ .subscribeOn(Schedulers.elastic())
.block();
}));
} catch (MailboxExistsException e) {
@@ -451,14 +453,15 @@ public class StoreMailboxManager implements MailboxManager {
Mailbox m = new Mailbox(mailbox);
mailboxMapper.delete(mailbox);
eventBus.dispatch(EventFactory.mailboxDeleted()
- .randomEventId()
- .mailboxSession(session)
- .mailbox(mailbox)
- .quotaRoot(quotaRoot)
- .quotaCount(QuotaCountUsage.count(messageCount))
- .quotaSize(QuotaSizeUsage.size(totalSize))
- .build(),
- new MailboxIdRegistrationKey(mailbox.getMailboxId()))
+ .randomEventId()
+ .mailboxSession(session)
+ .mailbox(mailbox)
+ .quotaRoot(quotaRoot)
+ .quotaCount(QuotaCountUsage.count(messageCount))
+ .quotaSize(QuotaSizeUsage.size(totalSize))
+ .build(),
+ new MailboxIdRegistrationKey(mailbox.getMailboxId()))
+ .subscribeOn(Schedulers.elastic())
.block();
return m;
}
@@ -519,13 +522,14 @@ public class StoreMailboxManager implements MailboxManager {
mapper.rename(mailbox);
eventBus.dispatch(EventFactory.mailboxRenamed()
- .randomEventId()
- .mailboxSession(session)
- .mailboxId(mailbox.getMailboxId())
- .oldPath(from)
- .newPath(newMailboxPath)
- .build(),
- new MailboxIdRegistrationKey(mailbox.getMailboxId()))
+ .randomEventId()
+ .mailboxSession(session)
+ .mailboxId(mailbox.getMailboxId())
+ .oldPath(from)
+ .newPath(newMailboxPath)
+ .build(),
+ new MailboxIdRegistrationKey(mailbox.getMailboxId()))
+ .subscribeOn(Schedulers.elastic())
.block();
// rename submailboxes
@@ -543,13 +547,14 @@ public class StoreMailboxManager implements MailboxManager {
sub.setName(subNewName);
mapper.rename(sub);
eventBus.dispatch(EventFactory.mailboxRenamed()
- .randomEventId()
- .mailboxSession(session)
- .mailboxId(sub.getMailboxId())
- .oldPath(fromPath)
- .newPath(sub.generateAssociatedPath())
- .build(),
- new MailboxIdRegistrationKey(sub.getMailboxId()))
+ .randomEventId()
+ .mailboxSession(session)
+ .mailboxId(sub.getMailboxId())
+ .oldPath(fromPath)
+ .newPath(sub.generateAssociatedPath())
+ .build(),
+ new MailboxIdRegistrationKey(sub.getMailboxId()))
+ .subscribeOn(Schedulers.elastic())
.block();
LOGGER.debug("Rename mailbox sub-mailbox {} to {}", subOriginalName, subNewName);
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
index 3f87736..17c0477 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
@@ -80,6 +80,7 @@ import com.google.common.collect.Sets;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
public class StoreMessageIdManager implements MessageIdManager {
@@ -235,6 +236,7 @@ public class StoreMessageIdManager implements MessageIdManager {
new MailboxIdRegistrationKey(metadataWithMailboxId.getMailboxId())))
.sneakyThrow())
.then()
+ .subscribeOn(Schedulers.elastic())
.block();
}
@@ -305,6 +307,7 @@ public class StoreMessageIdManager implements MessageIdManager {
messageMoves.impactedMailboxIds()
.map(MailboxIdRegistrationKey::new)
.collect(Guavate.toImmutableSet()))
+ .subscribeOn(Schedulers.elastic())
.block();
}
@@ -322,6 +325,7 @@ public class StoreMessageIdManager implements MessageIdManager {
.addMetaData(eventPayload)
.build(),
new MailboxIdRegistrationKey(mailboxId))
+ .subscribeOn(Schedulers.elastic())
.block();
}
}
@@ -331,12 +335,13 @@ public class StoreMessageIdManager implements MessageIdManager {
Mailbox mailbox = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession).findMailboxById(mailboxId);
eventBus.dispatch(EventFactory.flagsUpdated()
- .randomEventId()
- .mailboxSession(mailboxSession)
- .mailbox(mailbox)
- .updatedFlags(updatedFlags)
- .build(),
+ .randomEventId()
+ .mailboxSession(mailboxSession)
+ .mailbox(mailbox)
+ .updatedFlags(updatedFlags)
+ .build(),
new MailboxIdRegistrationKey(mailboxId))
+ .subscribeOn(Schedulers.elastic())
.block();
}
}
@@ -393,12 +398,13 @@ public class StoreMessageIdManager implements MessageIdManager {
save(messageIdMapper, copy);
eventBus.dispatch(EventFactory.added()
- .randomEventId()
- .mailboxSession(mailboxSession)
- .mailbox(mailboxMapper.findMailboxById(mailboxId))
- .addMetaData(copy.metaData())
- .build(),
- new MailboxIdRegistrationKey(mailboxId))
+ .randomEventId()
+ .mailboxSession(mailboxSession)
+ .mailbox(mailboxMapper.findMailboxById(mailboxId))
+ .addMetaData(copy.metaData())
+ .build(),
+ new MailboxIdRegistrationKey(mailboxId))
+ .subscribeOn(Schedulers.elastic())
.block();
}
}
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
index 9fd3ad4..8708344 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
@@ -289,6 +289,7 @@ public class StoreMessageManager implements MessageManager {
.metaData(ImmutableSortedMap.copyOf(deletedMessages))
.build(),
new MailboxIdRegistrationKey(mailbox.getMailboxId()))
+ .subscribeOn(Schedulers.elastic())
.block();
}
@@ -437,12 +438,12 @@ public class StoreMessageManager implements MessageManager {
Mailbox mailbox = getMailboxEntity();
eventBus.dispatch(EventFactory.added()
- .randomEventId()
- .mailboxSession(mailboxSession)
- .mailbox(mailbox)
- .addMetaData(data.getLeft())
- .build(),
- new MailboxIdRegistrationKey(mailbox.getMailboxId()))
+ .randomEventId()
+ .mailboxSession(mailboxSession)
+ .mailbox(mailbox)
+ .addMetaData(data.getLeft())
+ .build(),
+ new MailboxIdRegistrationKey(mailbox.getMailboxId()))
.subscribeOn(Schedulers.elastic())
.block();
MessageMetaData messageMetaData = data.getLeft();
@@ -592,12 +593,13 @@ public class StoreMessageManager implements MessageManager {
List<UpdatedFlags> updatedFlags = Iterators.toStream(it).collect(Guavate.toImmutableList());
eventBus.dispatch(EventFactory.flagsUpdated()
- .randomEventId()
- .mailboxSession(mailboxSession)
- .mailbox(getMailboxEntity())
- .updatedFlags(updatedFlags)
- .build(),
- new MailboxIdRegistrationKey(mailbox.getMailboxId()))
+ .randomEventId()
+ .mailboxSession(mailboxSession)
+ .mailbox(getMailboxEntity())
+ .updatedFlags(updatedFlags)
+ .build(),
+ new MailboxIdRegistrationKey(mailbox.getMailboxId()))
+ .subscribeOn(Schedulers.elastic())
.block();
return updatedFlags.stream().collect(Guavate.toImmutableMap(
@@ -759,6 +761,7 @@ public class StoreMessageManager implements MessageManager {
.messageId(messageIds.build())
.build(),
messageMoves.impactedMailboxIds().map(MailboxIdRegistrationKey::new).collect(Guavate.toImmutableSet())))
+ .subscribeOn(Schedulers.elastic())
.blockLast();
return copiedUids;
@@ -800,6 +803,7 @@ public class StoreMessageManager implements MessageManager {
.session(session)
.build(),
messageMoves.impactedMailboxIds().map(MailboxIdRegistrationKey::new).collect(Guavate.toImmutableSet())))
+ .subscribeOn(Schedulers.elastic())
.blockLast();
return moveUids;
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreRightManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreRightManager.java
index 20f4779..6b4c205 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreRightManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreRightManager.java
@@ -54,6 +54,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
public class StoreRightManager implements RightManager {
public static final boolean GROUP_FOLDER = true;
@@ -156,6 +157,7 @@ public class StoreRightManager implements RightManager {
.aclDiff(aclDiff)
.build(),
new MailboxIdRegistrationKey(mailbox.getMailboxId()))
+ .subscribeOn(Schedulers.elastic())
.block();
}
@@ -241,6 +243,7 @@ public class StoreRightManager implements RightManager {
.aclDiff(aclDiff)
.build(),
new MailboxIdRegistrationKey(mailbox.getMailboxId()))
+ .subscribeOn(Schedulers.elastic())
.block();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org