You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ma...@apache.org on 2019/06/14 16:32:58 UTC
[james-project] 07/09: JAMES-2786 Use a channel pool for EventBus
This is an automated email from the ASF dual-hosted git repository.
matthieu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit d3908cda7d7fa8a767432a7019b883ff1fa49548
Author: Gautier DI FOLCO <gd...@linagora.com>
AuthorDate: Fri Jun 14 10:52:04 2019 +0200
JAMES-2786 Use a channel pool for EventBus
---
.../james/mailbox/events/EventBusConcurrentTestContract.java | 12 ++++++------
.../org/apache/james/mailbox/events/GroupRegistration.java | 1 +
.../org/apache/james/mailbox/events/RabbitMQEventBus.java | 12 +++++++++++-
3 files changed, 18 insertions(+), 7 deletions(-)
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusConcurrentTestContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusConcurrentTestContract.java
index 2f6e9cb..738969f 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusConcurrentTestContract.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusConcurrentTestContract.java
@@ -74,7 +74,7 @@ public interface EventBusConcurrentTestContract {
int totalGlobalRegistrations = 3; // GroupA + GroupB + GroupC
ConcurrentTestRunner.builder()
- .operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, NO_KEYS))
+ .operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, NO_KEYS).block())
.threadCount(THREAD_COUNT)
.operationCount(OPERATION_COUNT)
.runSuccessfullyWithin(FIVE_SECONDS);
@@ -97,7 +97,7 @@ public interface EventBusConcurrentTestContract {
int totalEventBus = 1;
ConcurrentTestRunner.builder()
- .operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, ALL_KEYS))
+ .operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, ALL_KEYS).block())
.threadCount(THREAD_COUNT)
.operationCount(OPERATION_COUNT)
.runSuccessfullyWithin(FIVE_SECONDS);
@@ -127,7 +127,7 @@ public interface EventBusConcurrentTestContract {
int totalEventDeliveredByKeys = totalKeyListenerRegistrations * TOTAL_DISPATCH_OPERATIONS;
ConcurrentTestRunner.builder()
- .operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, ALL_KEYS))
+ .operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, ALL_KEYS).block())
.threadCount(THREAD_COUNT)
.operationCount(OPERATION_COUNT)
.runSuccessfullyWithin(FIVE_SECONDS);
@@ -159,7 +159,7 @@ public interface EventBusConcurrentTestContract {
int totalGlobalRegistrations = 3; // GroupA + GroupB + GroupC
ConcurrentTestRunner.builder()
- .operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, NO_KEYS))
+ .operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, NO_KEYS).block())
.threadCount(THREAD_COUNT)
.operationCount(OPERATION_COUNT)
.runSuccessfullyWithin(FIVE_SECONDS);
@@ -187,7 +187,7 @@ public interface EventBusConcurrentTestContract {
int totalEventBus = 2; // eventBus1 + eventBus2
ConcurrentTestRunner.builder()
- .operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, ALL_KEYS))
+ .operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, ALL_KEYS).block())
.threadCount(THREAD_COUNT)
.operationCount(OPERATION_COUNT)
.runSuccessfullyWithin(FIVE_SECONDS);
@@ -223,7 +223,7 @@ public interface EventBusConcurrentTestContract {
int totalEventDeliveredByKeys = totalKeyListenerRegistrations * totalEventBus * TOTAL_DISPATCH_OPERATIONS;
ConcurrentTestRunner.builder()
- .operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, ALL_KEYS))
+ .operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, ALL_KEYS).block())
.threadCount(THREAD_COUNT)
.operationCount(OPERATION_COUNT)
.runSuccessfullyWithin(FIVE_SECONDS);
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
index 5164c2d..bbdd46b 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
@@ -140,6 +140,7 @@ class GroupRegistration implements Registration {
int currentRetryCount = getRetryCount(acknowledgableDelivery);
return delayGenerator.delayIfHaveTo(currentRetryCount)
+ .publishOn(Schedulers.elastic())
.flatMap(any -> Mono.fromRunnable(Throwing.runnable(() -> runListener(event))))
.onErrorResume(throwable -> retryHandler.handleRetry(event, currentRetryCount, throwable))
.then(Mono.fromRunnable(acknowledgableDelivery::ack))
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
index 8ec96d6..784baab 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
@@ -34,11 +34,15 @@ import com.google.common.base.Preconditions;
import com.rabbitmq.client.Connection;
import reactor.core.publisher.Mono;
+import reactor.rabbitmq.ChannelPool;
+import reactor.rabbitmq.ChannelPoolFactory;
+import reactor.rabbitmq.ChannelPoolOptions;
import reactor.rabbitmq.RabbitFlux;
import reactor.rabbitmq.Sender;
import reactor.rabbitmq.SenderOptions;
public class RabbitMQEventBus implements EventBus, Startable {
+ private static final int MAX_CHANNELS_NUMBER = 5;
private static final String NOT_RUNNING_ERROR_MESSAGE = "Event Bus is not running";
static final String MAILBOX_EVENT = "mailboxEvent";
static final String MAILBOX_EVENT_EXCHANGE_NAME = MAILBOX_EVENT + "-exchange";
@@ -54,6 +58,7 @@ public class RabbitMQEventBus implements EventBus, Startable {
private volatile boolean isRunning;
private volatile boolean isStopping;
+ private ChannelPool channelPool;
private GroupRegistrationHandler groupRegistrationHandler;
private KeyRegistrationHandler keyRegistrationHandler;
EventDispatcher eventDispatcher;
@@ -77,7 +82,11 @@ public class RabbitMQEventBus implements EventBus, Startable {
public void start() {
if (!isRunning && !isStopping) {
- sender = RabbitFlux.createSender(new SenderOptions().connectionMono(connectionMono)
+ this.channelPool = ChannelPoolFactory.createChannelPool(
+ connectionMono,
+ new ChannelPoolOptions().maxCacheSize(MAX_CHANNELS_NUMBER)
+ );
+ sender = RabbitFlux.createSender(new SenderOptions().connectionMono(connectionMono).channelPool(channelPool)
.resourceManagementChannelMono(connectionMono.map(Throwing.function(Connection::createChannel))));
LocalListenerRegistry localListenerRegistry = new LocalListenerRegistry();
keyRegistrationHandler = new KeyRegistrationHandler(eventBusId, eventSerializer, sender, connectionMono, routingKeyConverter, localListenerRegistry, mailboxListenerExecutor);
@@ -97,6 +106,7 @@ public class RabbitMQEventBus implements EventBus, Startable {
isRunning = false;
groupRegistrationHandler.stop();
keyRegistrationHandler.stop();
+ channelPool.close();
sender.close();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org