You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by rc...@apache.org on 2020/05/13 02:03:23 UTC

[james-project] 02/09: JAMES-3149 EventBus caller should choose on which scheduler he dispatch

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit f2fbe4f0cc378394254be7b7a7e36cda290009e6
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Mon May 4 18:25:21 2020 +0700

    JAMES-3149 EventBus caller should choose on which scheduler he dispatch
    
    This avoid enforcing the thread context switch upon dispatch,
    optimizing thread usage.
---
 .../apache/james/mailbox/events/GroupContract.java |  5 ++-
 .../james/mailbox/events/EventDispatcher.java      |  7 +---
 .../james/mailbox/store/StoreMailboxManager.java   | 49 ++++++++++++----------
 .../james/mailbox/store/StoreMessageIdManager.java | 28 ++++++++-----
 .../james/mailbox/store/StoreMessageManager.java   | 28 +++++++------
 .../james/mailbox/store/StoreRightManager.java     |  3 ++
 6 files changed, 69 insertions(+), 51 deletions(-)

diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
index 83ef2cf..dfe31ac 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
@@ -56,6 +56,7 @@ import org.junit.jupiter.api.Test;
 
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.ImmutableSortedMap;
+
 import reactor.core.scheduler.Schedulers;
 
 public interface GroupContract {
@@ -158,7 +159,9 @@ public interface GroupContract {
             AtomicBoolean successfulRetry = new AtomicBoolean(false);
             MailboxListener listener = event -> {
                 if (event.getEventId().equals(EVENT_ID)) {
-                    eventBus().dispatch(EVENT_2, NO_KEYS).block();
+                    eventBus().dispatch(EVENT_2, NO_KEYS)
+                        .subscribeOn(Schedulers.elastic())
+                        .block();
                     successfulRetry.set(true);
                 }
             };
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
index 3de1fe0..c150184 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
@@ -46,7 +46,6 @@ import com.rabbitmq.client.AMQP;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.publisher.MonoProcessor;
-import reactor.core.scheduler.Schedulers;
 import reactor.rabbitmq.ExchangeSpecification;
 import reactor.rabbitmq.OutboundMessage;
 import reactor.rabbitmq.Sender;
@@ -95,7 +94,6 @@ public class EventDispatcher {
             .concat(
                 dispatchToLocalListeners(event, keys),
                 dispatchToRemoteListeners(event, keys))
-            .subscribeOn(Schedulers.elastic())
             .doOnError(throwable -> LOGGER.error("error while dispatching event", throwable))
             .then()
             .subscribeWith(MonoProcessor.create());
@@ -106,7 +104,7 @@ public class EventDispatcher {
             .flatMap(key -> localListenerRegistry.getLocalMailboxListeners(key)
                 .map(listener -> Tuples.of(key, listener)))
             .filter(pair -> pair.getT2().getExecutionMode() == MailboxListener.ExecutionMode.SYNCHRONOUS)
-            .flatMap(pair -> executeListener(event, pair.getT2(), pair.getT1())).subscribeOn(Schedulers.elastic())
+            .flatMap(pair -> executeListener(event, pair.getT2(), pair.getT1()))
             .then();
     }
 
@@ -161,8 +159,7 @@ public class EventDispatcher {
         if (routingKeys.isEmpty()) {
             return Mono.empty();
         }
-        return sender.send(toMessages(serializedEvent, routingKeys))
-            .subscribeOn(Schedulers.elastic());
+        return sender.send(toMessages(serializedEvent, routingKeys));
     }
 
     private Flux<OutboundMessage> toMessages(byte[] serializedEvent, Collection<RoutingKey> routingKeys) {
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
index 1f72b2c..bb68c0d 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
@@ -93,6 +93,7 @@ import com.google.common.collect.Iterables;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
 /**
  * This base class of an {@link MailboxManager} implementation provides a high-level api for writing your own
@@ -382,6 +383,7 @@ public class StoreMailboxManager implements MailboxManager {
                                 .mailbox(mailbox)
                                 .build(),
                                 new MailboxIdRegistrationKey(mailbox.getMailboxId()))
+                            .subscribeOn(Schedulers.elastic())
                             .block();
                     }));
                 } catch (MailboxExistsException e) {
@@ -451,14 +453,15 @@ public class StoreMailboxManager implements MailboxManager {
         Mailbox m = new Mailbox(mailbox);
         mailboxMapper.delete(mailbox);
         eventBus.dispatch(EventFactory.mailboxDeleted()
-            .randomEventId()
-            .mailboxSession(session)
-            .mailbox(mailbox)
-            .quotaRoot(quotaRoot)
-            .quotaCount(QuotaCountUsage.count(messageCount))
-            .quotaSize(QuotaSizeUsage.size(totalSize))
-            .build(),
-            new MailboxIdRegistrationKey(mailbox.getMailboxId()))
+                .randomEventId()
+                .mailboxSession(session)
+                .mailbox(mailbox)
+                .quotaRoot(quotaRoot)
+                .quotaCount(QuotaCountUsage.count(messageCount))
+                .quotaSize(QuotaSizeUsage.size(totalSize))
+                .build(),
+                new MailboxIdRegistrationKey(mailbox.getMailboxId()))
+            .subscribeOn(Schedulers.elastic())
             .block();
         return m;
     }
@@ -519,13 +522,14 @@ public class StoreMailboxManager implements MailboxManager {
         mapper.rename(mailbox);
 
         eventBus.dispatch(EventFactory.mailboxRenamed()
-            .randomEventId()
-            .mailboxSession(session)
-            .mailboxId(mailbox.getMailboxId())
-            .oldPath(from)
-            .newPath(newMailboxPath)
-            .build(),
-            new MailboxIdRegistrationKey(mailbox.getMailboxId()))
+                .randomEventId()
+                .mailboxSession(session)
+                .mailboxId(mailbox.getMailboxId())
+                .oldPath(from)
+                .newPath(newMailboxPath)
+                .build(),
+                new MailboxIdRegistrationKey(mailbox.getMailboxId()))
+            .subscribeOn(Schedulers.elastic())
             .block();
 
         // rename submailboxes
@@ -543,13 +547,14 @@ public class StoreMailboxManager implements MailboxManager {
                 sub.setName(subNewName);
                 mapper.rename(sub);
                 eventBus.dispatch(EventFactory.mailboxRenamed()
-                    .randomEventId()
-                    .mailboxSession(session)
-                    .mailboxId(sub.getMailboxId())
-                    .oldPath(fromPath)
-                    .newPath(sub.generateAssociatedPath())
-                    .build(),
-                    new MailboxIdRegistrationKey(sub.getMailboxId()))
+                        .randomEventId()
+                        .mailboxSession(session)
+                        .mailboxId(sub.getMailboxId())
+                        .oldPath(fromPath)
+                        .newPath(sub.generateAssociatedPath())
+                        .build(),
+                        new MailboxIdRegistrationKey(sub.getMailboxId()))
+                    .subscribeOn(Schedulers.elastic())
                     .block();
 
                 LOGGER.debug("Rename mailbox sub-mailbox {} to {}", subOriginalName, subNewName);
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
index 3f87736..17c0477 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
@@ -80,6 +80,7 @@ import com.google.common.collect.Sets;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
 public class StoreMessageIdManager implements MessageIdManager {
 
@@ -235,6 +236,7 @@ public class StoreMessageIdManager implements MessageIdManager {
                 new MailboxIdRegistrationKey(metadataWithMailboxId.getMailboxId())))
                 .sneakyThrow())
             .then()
+            .subscribeOn(Schedulers.elastic())
             .block();
     }
 
@@ -305,6 +307,7 @@ public class StoreMessageIdManager implements MessageIdManager {
             messageMoves.impactedMailboxIds()
                 .map(MailboxIdRegistrationKey::new)
                 .collect(Guavate.toImmutableSet()))
+            .subscribeOn(Schedulers.elastic())
             .block();
     }
 
@@ -322,6 +325,7 @@ public class StoreMessageIdManager implements MessageIdManager {
                 .addMetaData(eventPayload)
                 .build(),
                 new MailboxIdRegistrationKey(mailboxId))
+            .subscribeOn(Schedulers.elastic())
             .block();
         }
     }
@@ -331,12 +335,13 @@ public class StoreMessageIdManager implements MessageIdManager {
             Mailbox mailbox = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession).findMailboxById(mailboxId);
 
             eventBus.dispatch(EventFactory.flagsUpdated()
-                .randomEventId()
-                .mailboxSession(mailboxSession)
-                .mailbox(mailbox)
-                .updatedFlags(updatedFlags)
-                .build(),
+                    .randomEventId()
+                    .mailboxSession(mailboxSession)
+                    .mailbox(mailbox)
+                    .updatedFlags(updatedFlags)
+                    .build(),
                 new MailboxIdRegistrationKey(mailboxId))
+                .subscribeOn(Schedulers.elastic())
                 .block();
         }
     }
@@ -393,12 +398,13 @@ public class StoreMessageIdManager implements MessageIdManager {
             save(messageIdMapper, copy);
 
             eventBus.dispatch(EventFactory.added()
-                .randomEventId()
-                .mailboxSession(mailboxSession)
-                .mailbox(mailboxMapper.findMailboxById(mailboxId))
-                .addMetaData(copy.metaData())
-                .build(),
-                new MailboxIdRegistrationKey(mailboxId))
+                    .randomEventId()
+                    .mailboxSession(mailboxSession)
+                    .mailbox(mailboxMapper.findMailboxById(mailboxId))
+                    .addMetaData(copy.metaData())
+                    .build(),
+                    new MailboxIdRegistrationKey(mailboxId))
+                .subscribeOn(Schedulers.elastic())
                 .block();
         }
     }
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
index 9fd3ad4..8708344 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
@@ -289,6 +289,7 @@ public class StoreMessageManager implements MessageManager {
                 .metaData(ImmutableSortedMap.copyOf(deletedMessages))
                 .build(),
             new MailboxIdRegistrationKey(mailbox.getMailboxId()))
+            .subscribeOn(Schedulers.elastic())
             .block();
     }
 
@@ -437,12 +438,12 @@ public class StoreMessageManager implements MessageManager {
                 Mailbox mailbox = getMailboxEntity();
 
                 eventBus.dispatch(EventFactory.added()
-                    .randomEventId()
-                    .mailboxSession(mailboxSession)
-                    .mailbox(mailbox)
-                    .addMetaData(data.getLeft())
-                    .build(),
-                    new MailboxIdRegistrationKey(mailbox.getMailboxId()))
+                        .randomEventId()
+                        .mailboxSession(mailboxSession)
+                        .mailbox(mailbox)
+                        .addMetaData(data.getLeft())
+                        .build(),
+                        new MailboxIdRegistrationKey(mailbox.getMailboxId()))
                     .subscribeOn(Schedulers.elastic())
                     .block();
                 MessageMetaData messageMetaData = data.getLeft();
@@ -592,12 +593,13 @@ public class StoreMessageManager implements MessageManager {
         List<UpdatedFlags> updatedFlags = Iterators.toStream(it).collect(Guavate.toImmutableList());
 
         eventBus.dispatch(EventFactory.flagsUpdated()
-            .randomEventId()
-            .mailboxSession(mailboxSession)
-            .mailbox(getMailboxEntity())
-            .updatedFlags(updatedFlags)
-            .build(),
-            new MailboxIdRegistrationKey(mailbox.getMailboxId()))
+                .randomEventId()
+                .mailboxSession(mailboxSession)
+                .mailbox(getMailboxEntity())
+                .updatedFlags(updatedFlags)
+                .build(),
+                new MailboxIdRegistrationKey(mailbox.getMailboxId()))
+            .subscribeOn(Schedulers.elastic())
             .block();
 
         return updatedFlags.stream().collect(Guavate.toImmutableMap(
@@ -759,6 +761,7 @@ public class StoreMessageManager implements MessageManager {
                     .messageId(messageIds.build())
                     .build(),
                 messageMoves.impactedMailboxIds().map(MailboxIdRegistrationKey::new).collect(Guavate.toImmutableSet())))
+            .subscribeOn(Schedulers.elastic())
             .blockLast();
 
         return copiedUids;
@@ -800,6 +803,7 @@ public class StoreMessageManager implements MessageManager {
                     .session(session)
                     .build(),
                 messageMoves.impactedMailboxIds().map(MailboxIdRegistrationKey::new).collect(Guavate.toImmutableSet())))
+            .subscribeOn(Schedulers.elastic())
             .blockLast();
 
         return moveUids;
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreRightManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreRightManager.java
index 20f4779..6b4c205 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreRightManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreRightManager.java
@@ -54,6 +54,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
 
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
 public class StoreRightManager implements RightManager {
     public static final boolean GROUP_FOLDER = true;
@@ -156,6 +157,7 @@ public class StoreRightManager implements RightManager {
             .aclDiff(aclDiff)
             .build(),
             new MailboxIdRegistrationKey(mailbox.getMailboxId()))
+            .subscribeOn(Schedulers.elastic())
             .block();
     }
 
@@ -241,6 +243,7 @@ public class StoreRightManager implements RightManager {
             .aclDiff(aclDiff)
             .build(),
             new MailboxIdRegistrationKey(mailbox.getMailboxId()))
+            .subscribeOn(Schedulers.elastic())
             .block();
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org