You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ma...@apache.org on 2019/06/14 16:32:58 UTC

[james-project] 07/09: JAMES-2786 Use a channel pool for EventBus

This is an automated email from the ASF dual-hosted git repository.

matthieu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit d3908cda7d7fa8a767432a7019b883ff1fa49548
Author: Gautier DI FOLCO <gd...@linagora.com>
AuthorDate: Fri Jun 14 10:52:04 2019 +0200

    JAMES-2786 Use a channel pool for EventBus
---
 .../james/mailbox/events/EventBusConcurrentTestContract.java | 12 ++++++------
 .../org/apache/james/mailbox/events/GroupRegistration.java   |  1 +
 .../org/apache/james/mailbox/events/RabbitMQEventBus.java    | 12 +++++++++++-
 3 files changed, 18 insertions(+), 7 deletions(-)

diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusConcurrentTestContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusConcurrentTestContract.java
index 2f6e9cb..738969f 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusConcurrentTestContract.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusConcurrentTestContract.java
@@ -74,7 +74,7 @@ public interface EventBusConcurrentTestContract {
             int totalGlobalRegistrations = 3; // GroupA + GroupB + GroupC
 
             ConcurrentTestRunner.builder()
-                .operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, NO_KEYS))
+                .operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, NO_KEYS).block())
                 .threadCount(THREAD_COUNT)
                 .operationCount(OPERATION_COUNT)
                 .runSuccessfullyWithin(FIVE_SECONDS);
@@ -97,7 +97,7 @@ public interface EventBusConcurrentTestContract {
             int totalEventBus = 1;
 
             ConcurrentTestRunner.builder()
-                .operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, ALL_KEYS))
+                .operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, ALL_KEYS).block())
                 .threadCount(THREAD_COUNT)
                 .operationCount(OPERATION_COUNT)
                 .runSuccessfullyWithin(FIVE_SECONDS);
@@ -127,7 +127,7 @@ public interface EventBusConcurrentTestContract {
             int totalEventDeliveredByKeys = totalKeyListenerRegistrations * TOTAL_DISPATCH_OPERATIONS;
 
             ConcurrentTestRunner.builder()
-                .operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, ALL_KEYS))
+                .operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, ALL_KEYS).block())
                 .threadCount(THREAD_COUNT)
                 .operationCount(OPERATION_COUNT)
                 .runSuccessfullyWithin(FIVE_SECONDS);
@@ -159,7 +159,7 @@ public interface EventBusConcurrentTestContract {
             int totalGlobalRegistrations = 3; // GroupA + GroupB + GroupC
 
             ConcurrentTestRunner.builder()
-                .operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, NO_KEYS))
+                .operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, NO_KEYS).block())
                 .threadCount(THREAD_COUNT)
                 .operationCount(OPERATION_COUNT)
                 .runSuccessfullyWithin(FIVE_SECONDS);
@@ -187,7 +187,7 @@ public interface EventBusConcurrentTestContract {
             int totalEventBus = 2; // eventBus1 + eventBus2
 
             ConcurrentTestRunner.builder()
-                .operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, ALL_KEYS))
+                .operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, ALL_KEYS).block())
                 .threadCount(THREAD_COUNT)
                 .operationCount(OPERATION_COUNT)
                 .runSuccessfullyWithin(FIVE_SECONDS);
@@ -223,7 +223,7 @@ public interface EventBusConcurrentTestContract {
             int totalEventDeliveredByKeys = totalKeyListenerRegistrations * totalEventBus * TOTAL_DISPATCH_OPERATIONS;
 
             ConcurrentTestRunner.builder()
-                .operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, ALL_KEYS))
+                .operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, ALL_KEYS).block())
                 .threadCount(THREAD_COUNT)
                 .operationCount(OPERATION_COUNT)
                 .runSuccessfullyWithin(FIVE_SECONDS);
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
index 5164c2d..bbdd46b 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
@@ -140,6 +140,7 @@ class GroupRegistration implements Registration {
         int currentRetryCount = getRetryCount(acknowledgableDelivery);
 
         return delayGenerator.delayIfHaveTo(currentRetryCount)
+            .publishOn(Schedulers.elastic())
             .flatMap(any -> Mono.fromRunnable(Throwing.runnable(() -> runListener(event))))
             .onErrorResume(throwable -> retryHandler.handleRetry(event, currentRetryCount, throwable))
             .then(Mono.fromRunnable(acknowledgableDelivery::ack))
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
index 8ec96d6..784baab 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
@@ -34,11 +34,15 @@ import com.google.common.base.Preconditions;
 import com.rabbitmq.client.Connection;
 
 import reactor.core.publisher.Mono;
+import reactor.rabbitmq.ChannelPool;
+import reactor.rabbitmq.ChannelPoolFactory;
+import reactor.rabbitmq.ChannelPoolOptions;
 import reactor.rabbitmq.RabbitFlux;
 import reactor.rabbitmq.Sender;
 import reactor.rabbitmq.SenderOptions;
 
 public class RabbitMQEventBus implements EventBus, Startable {
+    private static final int MAX_CHANNELS_NUMBER = 5;
     private static final String NOT_RUNNING_ERROR_MESSAGE = "Event Bus is not running";
     static final String MAILBOX_EVENT = "mailboxEvent";
     static final String MAILBOX_EVENT_EXCHANGE_NAME = MAILBOX_EVENT + "-exchange";
@@ -54,6 +58,7 @@ public class RabbitMQEventBus implements EventBus, Startable {
 
     private volatile boolean isRunning;
     private volatile boolean isStopping;
+    private ChannelPool channelPool;
     private GroupRegistrationHandler groupRegistrationHandler;
     private KeyRegistrationHandler keyRegistrationHandler;
     EventDispatcher eventDispatcher;
@@ -77,7 +82,11 @@ public class RabbitMQEventBus implements EventBus, Startable {
 
     public void start() {
         if (!isRunning && !isStopping) {
-            sender = RabbitFlux.createSender(new SenderOptions().connectionMono(connectionMono)
+            this.channelPool = ChannelPoolFactory.createChannelPool(
+                    connectionMono,
+                    new ChannelPoolOptions().maxCacheSize(MAX_CHANNELS_NUMBER)
+            );
+            sender = RabbitFlux.createSender(new SenderOptions().connectionMono(connectionMono).channelPool(channelPool)
                 .resourceManagementChannelMono(connectionMono.map(Throwing.function(Connection::createChannel))));
             LocalListenerRegistry localListenerRegistry = new LocalListenerRegistry();
             keyRegistrationHandler = new KeyRegistrationHandler(eventBusId, eventSerializer, sender, connectionMono, routingKeyConverter, localListenerRegistry, mailboxListenerExecutor);
@@ -97,6 +106,7 @@ public class RabbitMQEventBus implements EventBus, Startable {
             isRunning = false;
             groupRegistrationHandler.stop();
             keyRegistrationHandler.stop();
+            channelPool.close();
             sender.close();
         }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org