You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/11/04 11:23:38 UTC
[james-project] 02/30: JAMES-2937 Inject RabbitMQChannelPool
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit fc2fa8730d79d4a1a7312af94408648afca126cc
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Mon Oct 7 14:48:36 2019 +0700
JAMES-2937 Inject RabbitMQChannelPool
This avoids repeating ourselves regarding Sender and Receiver specification
and simplifies overall wiring.
This also highlighted some 'dirty' initialization performed within MailSpool
constructor, out of startable chain.
---
backends-common/rabbitmq/pom.xml | 4 ++
.../rabbitmq/ReactorRabbitMQChannelPool.java | 36 ++++++++++++++--
.../rabbitmq/ReactorRabbitMQChannelPoolTest.java | 8 ++--
.../james/mailbox/events/GroupRegistration.java | 11 +++--
.../mailbox/events/GroupRegistrationHandler.java | 17 +++-----
.../mailbox/events/KeyRegistrationHandler.java | 10 ++---
.../james/mailbox/events/RabbitMQEventBus.java | 25 ++++-------
.../james/mailbox/events/RabbitMQEventBusTest.java | 29 ++++++++-----
.../rabbitmq/host/RabbitMQEventBusHostSystem.java | 10 +++--
.../modules/DistributedTaskManagerModule.java | 7 ++++
.../apache/james/jmap/draft/JMAPCommonModule.java | 10 +++++
.../james/modules/rabbitmq/RabbitMQModule.java | 32 +++++----------
.../apache/james/jmap/draft/send/MailSpool.java | 10 ++++-
.../james/jmap/draft/send/MailSpoolTest.java | 1 +
.../org/apache/james/queue/rabbitmq/Dequeuer.java | 8 ++--
.../org/apache/james/queue/rabbitmq/Enqueuer.java | 5 ++-
.../queue/rabbitmq/RabbitMQMailQueueFactory.java | 48 ++++++----------------
.../RabbitMQMailQueueConfigurationChangeTest.java | 17 ++++----
.../queue/rabbitmq/RabbitMQMailQueueTest.java | 26 ++++--------
.../rabbitmq/RabbitMqMailQueueFactoryTest.java | 17 +++-----
.../distributed/RabbitMQTerminationSubscriber.java | 4 +-
.../distributed/RabbitMQWorkQueue.java | 34 ++++++---------
.../distributed/RabbitMQWorkQueueSupplier.scala | 5 +--
.../distributed/DistributedTaskManagerTest.java | 12 ++++--
.../distributed/RabbitMQWorkQueueTest.java | 11 +++--
server/task/task-memory/pom.xml | 4 ++
.../main/java/org/apache/james/task/WorkQueue.java | 8 +++-
.../eventsourcing/EventSourcingTaskManager.scala | 9 ++--
28 files changed, 216 insertions(+), 202 deletions(-)
diff --git a/backends-common/rabbitmq/pom.xml b/backends-common/rabbitmq/pom.xml
index fa1c156..7cd59c1 100644
--- a/backends-common/rabbitmq/pom.xml
+++ b/backends-common/rabbitmq/pom.xml
@@ -37,6 +37,10 @@
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
+ <artifactId>james-server-lifecycle-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>${james.groupId}</groupId>
<artifactId>james-server-testing</artifactId>
<scope>test</scope>
</dependency>
diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
index 9e66225..e240b93 100644
--- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
+++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
@@ -24,12 +24,15 @@ import java.util.Comparator;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.function.BiConsumer;
+import javax.annotation.PreDestroy;
+import javax.inject.Inject;
+
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
-
+import org.apache.james.lifecycle.api.Startable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,10 +45,13 @@ import reactor.core.publisher.SignalType;
import reactor.core.scheduler.Schedulers;
import reactor.rabbitmq.ChannelPool;
import reactor.rabbitmq.RabbitFlux;
+import reactor.rabbitmq.Receiver;
+import reactor.rabbitmq.ReceiverOptions;
import reactor.rabbitmq.Sender;
import reactor.rabbitmq.SenderOptions;
-public class ReactorRabbitMQChannelPool implements ChannelPool {
+public class ReactorRabbitMQChannelPool implements ChannelPool, Startable {
+ private static final int MAX_CHANNELS_NUMBER = 5;
static class ChannelFactory extends BasePooledObjectFactory<Channel> {
@@ -95,6 +101,12 @@ public class ReactorRabbitMQChannelPool implements ChannelPool {
private final Mono<Connection> connectionMono;
private final GenericObjectPool<Channel> pool;
private final ConcurrentSkipListSet<Channel> borrowedChannels;
+ private Sender sender;
+
+ @Inject
+ public ReactorRabbitMQChannelPool(SimpleConnectionPool simpleConnectionPool) {
+ this(simpleConnectionPool.getResilientConnection(), MAX_CHANNELS_NUMBER);
+ }
public ReactorRabbitMQChannelPool(Mono<Connection> connectionMono, int poolSize) {
this.connectionMono = connectionMono;
@@ -106,6 +118,22 @@ public class ReactorRabbitMQChannelPool implements ChannelPool {
this.borrowedChannels = new ConcurrentSkipListSet<>(Comparator.comparingInt(System::identityHashCode));
}
+ public void start() {
+ sender = createSender();
+ }
+
+ public Sender getSender() {
+ return sender;
+ }
+
+ public Receiver createReceiver() {
+ return RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(connectionMono));
+ }
+
+ public Mono<Connection> getConnectionMono() {
+ return connectionMono;
+ }
+
@Override
public Mono<? extends Channel> getChannelMono() {
return Mono.fromCallable(() -> {
@@ -127,7 +155,7 @@ public class ReactorRabbitMQChannelPool implements ChannelPool {
};
}
- public Sender createSender() {
+ private Sender createSender() {
return RabbitFlux.createSender(new SenderOptions()
.connectionMono(connectionMono)
.channelPool(this)
@@ -146,8 +174,10 @@ public class ReactorRabbitMQChannelPool implements ChannelPool {
}
}
+ @PreDestroy
@Override
public void close() {
+ sender.close();
borrowedChannels.forEach(channel -> getChannelCloseHandler().accept(SignalType.ON_NEXT, channel));
borrowedChannels.clear();
pool.close();
diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPoolTest.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPoolTest.java
index 81994c8..21be0bd 100644
--- a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPoolTest.java
+++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPoolTest.java
@@ -66,9 +66,11 @@ class ReactorRabbitMQChannelPoolTest implements ChannelPoolContract {
}
private ReactorRabbitMQChannelPool generateChannelPool(int poolSize) {
- return new ReactorRabbitMQChannelPool(
- rabbitMQExtension.getRabbitConnectionPool().getResilientConnection(),
- poolSize);
+ ReactorRabbitMQChannelPool reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool(
+ rabbitMQExtension.getRabbitConnectionPool().getResilientConnection(),
+ poolSize);
+ reactorRabbitMQChannelPool.start();
+ return reactorRabbitMQChannelPool;
}
// Pool wait timeout is an expected exception
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
index 953a4fb..dac4ae3 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
@@ -31,12 +31,13 @@ import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.Optional;
+import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
import org.apache.james.event.json.EventSerializer;
import org.apache.james.util.MDCBuilder;
import com.github.fge.lambdas.Throwing;
import com.google.common.base.Preconditions;
-import com.rabbitmq.client.Connection;
+
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -45,9 +46,7 @@ import reactor.rabbitmq.AcknowledgableDelivery;
import reactor.rabbitmq.BindingSpecification;
import reactor.rabbitmq.ConsumeOptions;
import reactor.rabbitmq.QueueSpecification;
-import reactor.rabbitmq.RabbitFlux;
import reactor.rabbitmq.Receiver;
-import reactor.rabbitmq.ReceiverOptions;
import reactor.rabbitmq.Sender;
class GroupRegistration implements Registration {
@@ -85,15 +84,15 @@ class GroupRegistration implements Registration {
private final MailboxListenerExecutor mailboxListenerExecutor;
private Optional<Disposable> receiverSubscriber;
- GroupRegistration(Mono<Connection> connectionSupplier, Sender sender, EventSerializer eventSerializer,
+ GroupRegistration(ReactorRabbitMQChannelPool reactorRabbitMQChannelPool, EventSerializer eventSerializer,
MailboxListener mailboxListener, Group group, RetryBackoffConfiguration retryBackoff,
EventDeadLetters eventDeadLetters,
Runnable unregisterGroup, MailboxListenerExecutor mailboxListenerExecutor) {
this.eventSerializer = eventSerializer;
this.mailboxListener = mailboxListener;
this.queueName = WorkQueueName.of(group);
- this.sender = sender;
- this.receiver = RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(connectionSupplier));
+ this.sender = reactorRabbitMQChannelPool.getSender();
+ this.receiver = reactorRabbitMQChannelPool.createReceiver();
this.mailboxListenerExecutor = mailboxListenerExecutor;
this.receiverSubscriber = Optional.empty();
this.unregisterGroup = unregisterGroup;
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistrationHandler.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistrationHandler.java
index 2da507f..5337df6 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistrationHandler.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistrationHandler.java
@@ -23,28 +23,22 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
import org.apache.james.event.json.EventSerializer;
-import com.rabbitmq.client.Connection;
-
-import reactor.core.publisher.Mono;
-import reactor.rabbitmq.Sender;
-
class GroupRegistrationHandler {
private final Map<Group, GroupRegistration> groupRegistrations;
private final EventSerializer eventSerializer;
- private final Sender sender;
- private final Mono<Connection> connectionMono;
private final RetryBackoffConfiguration retryBackoff;
private final EventDeadLetters eventDeadLetters;
private final MailboxListenerExecutor mailboxListenerExecutor;
+ private final ReactorRabbitMQChannelPool reactorRabbitMQChannelPool;
- GroupRegistrationHandler(EventSerializer eventSerializer, Sender sender, Mono<Connection> connectionMono,
+ GroupRegistrationHandler(EventSerializer eventSerializer, ReactorRabbitMQChannelPool reactorRabbitMQChannelPool,
RetryBackoffConfiguration retryBackoff,
EventDeadLetters eventDeadLetters, MailboxListenerExecutor mailboxListenerExecutor) {
this.eventSerializer = eventSerializer;
- this.sender = sender;
- this.connectionMono = connectionMono;
+ this.reactorRabbitMQChannelPool = reactorRabbitMQChannelPool;
this.retryBackoff = retryBackoff;
this.eventDeadLetters = eventDeadLetters;
this.mailboxListenerExecutor = mailboxListenerExecutor;
@@ -73,8 +67,7 @@ class GroupRegistrationHandler {
private GroupRegistration newGroupRegistration(MailboxListener listener, Group group) {
return new GroupRegistration(
- connectionMono,
- sender,
+ reactorRabbitMQChannelPool,
eventSerializer,
listener,
group,
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
index 1f2fbae..dc968fe 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
@@ -28,6 +28,7 @@ import static org.apache.james.mailbox.events.RabbitMQEventBus.EVENT_BUS_ID;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
+import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
import org.apache.james.event.json.EventSerializer;
import org.apache.james.util.MDCBuilder;
import org.apache.james.util.MDCStructuredLogger;
@@ -37,7 +38,6 @@ import org.slf4j.LoggerFactory;
import com.github.fge.lambdas.Throwing;
import com.rabbitmq.client.AMQP;
-import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Delivery;
import reactor.core.Disposable;
@@ -45,9 +45,7 @@ import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.rabbitmq.ConsumeOptions;
import reactor.rabbitmq.QueueSpecification;
-import reactor.rabbitmq.RabbitFlux;
import reactor.rabbitmq.Receiver;
-import reactor.rabbitmq.ReceiverOptions;
import reactor.rabbitmq.Sender;
class KeyRegistrationHandler {
@@ -64,13 +62,13 @@ class KeyRegistrationHandler {
private final MailboxListenerExecutor mailboxListenerExecutor;
private Optional<Disposable> receiverSubscriber;
- KeyRegistrationHandler(EventBusId eventBusId, EventSerializer eventSerializer, Sender sender, Mono<Connection> connectionMono, RoutingKeyConverter routingKeyConverter, LocalListenerRegistry localListenerRegistry, MailboxListenerExecutor mailboxListenerExecutor) {
+ KeyRegistrationHandler(EventBusId eventBusId, EventSerializer eventSerializer, ReactorRabbitMQChannelPool reactorRabbitMQChannelPool, RoutingKeyConverter routingKeyConverter, LocalListenerRegistry localListenerRegistry, MailboxListenerExecutor mailboxListenerExecutor) {
this.eventBusId = eventBusId;
this.eventSerializer = eventSerializer;
- this.sender = sender;
+ this.sender = reactorRabbitMQChannelPool.getSender();
this.routingKeyConverter = routingKeyConverter;
this.localListenerRegistry = localListenerRegistry;
- this.receiver = RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(connectionMono));
+ this.receiver = reactorRabbitMQChannelPool.createReceiver();
this.mailboxListenerExecutor = mailboxListenerExecutor;
this.registrationQueue = new RegistrationQueueName();
this.registrationBinder = new RegistrationBinder(sender, registrationQueue);
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
index a659062..06d786a 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
@@ -25,47 +25,42 @@ import javax.annotation.PreDestroy;
import javax.inject.Inject;
import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
-import org.apache.james.backends.rabbitmq.SimpleConnectionPool;
import org.apache.james.event.json.EventSerializer;
import org.apache.james.lifecycle.api.Startable;
import org.apache.james.metrics.api.MetricFactory;
import com.google.common.base.Preconditions;
-import com.rabbitmq.client.Connection;
+
import reactor.core.publisher.Mono;
-import reactor.rabbitmq.Sender;
public class RabbitMQEventBus implements EventBus, Startable {
- private static final int MAX_CHANNELS_NUMBER = 5;
private static final String NOT_RUNNING_ERROR_MESSAGE = "Event Bus is not running";
static final String MAILBOX_EVENT = "mailboxEvent";
static final String MAILBOX_EVENT_EXCHANGE_NAME = MAILBOX_EVENT + "-exchange";
static final String EVENT_BUS_ID = "eventBusId";
- private final Mono<Connection> connectionMono;
private final EventSerializer eventSerializer;
private final RoutingKeyConverter routingKeyConverter;
private final RetryBackoffConfiguration retryBackoff;
private final EventBusId eventBusId;
private final EventDeadLetters eventDeadLetters;
+ private final ReactorRabbitMQChannelPool channelPool;
private final MailboxListenerExecutor mailboxListenerExecutor;
private volatile boolean isRunning;
private volatile boolean isStopping;
- private ReactorRabbitMQChannelPool channelPool;
private GroupRegistrationHandler groupRegistrationHandler;
private KeyRegistrationHandler keyRegistrationHandler;
- EventDispatcher eventDispatcher;
- private Sender sender;
+ private EventDispatcher eventDispatcher;
@Inject
- public RabbitMQEventBus(SimpleConnectionPool simpleConnectionPool, EventSerializer eventSerializer,
+ public RabbitMQEventBus(ReactorRabbitMQChannelPool reactorRabbitMQChannelPool, EventSerializer eventSerializer,
RetryBackoffConfiguration retryBackoff,
RoutingKeyConverter routingKeyConverter,
EventDeadLetters eventDeadLetters, MetricFactory metricFactory) {
+ this.channelPool = reactorRabbitMQChannelPool;
this.mailboxListenerExecutor = new MailboxListenerExecutor(metricFactory);
this.eventBusId = EventBusId.random();
- this.connectionMono = simpleConnectionPool.getResilientConnection();
this.eventSerializer = eventSerializer;
this.routingKeyConverter = routingKeyConverter;
this.retryBackoff = retryBackoff;
@@ -76,13 +71,11 @@ public class RabbitMQEventBus implements EventBus, Startable {
public void start() {
if (!isRunning && !isStopping) {
- this.channelPool = new ReactorRabbitMQChannelPool(connectionMono, MAX_CHANNELS_NUMBER);
- sender = channelPool.createSender();
LocalListenerRegistry localListenerRegistry = new LocalListenerRegistry();
- keyRegistrationHandler = new KeyRegistrationHandler(eventBusId, eventSerializer, sender, connectionMono, routingKeyConverter, localListenerRegistry, mailboxListenerExecutor);
- groupRegistrationHandler = new GroupRegistrationHandler(eventSerializer, sender, connectionMono, retryBackoff, eventDeadLetters, mailboxListenerExecutor);
- eventDispatcher = new EventDispatcher(eventBusId, eventSerializer, sender, localListenerRegistry, mailboxListenerExecutor);
+ keyRegistrationHandler = new KeyRegistrationHandler(eventBusId, eventSerializer, channelPool, routingKeyConverter, localListenerRegistry, mailboxListenerExecutor);
+ groupRegistrationHandler = new GroupRegistrationHandler(eventSerializer, channelPool, retryBackoff, eventDeadLetters, mailboxListenerExecutor);
+ eventDispatcher = new EventDispatcher(eventBusId, eventSerializer, channelPool.getSender(), localListenerRegistry, mailboxListenerExecutor);
eventDispatcher.start();
keyRegistrationHandler.start();
@@ -97,8 +90,6 @@ public class RabbitMQEventBus implements EventBus, Startable {
isRunning = false;
groupRegistrationHandler.stop();
keyRegistrationHandler.stop();
- sender.close();
- channelPool.close();
}
}
diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
index ab6d885..2b92e20 100644
--- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
+++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
@@ -56,7 +56,7 @@ import org.apache.james.backends.rabbitmq.RabbitMQExtension;
import org.apache.james.backends.rabbitmq.RabbitMQExtension.DockerRestartPolicy;
import org.apache.james.backends.rabbitmq.RabbitMQFixture;
import org.apache.james.backends.rabbitmq.RabbitMQManagementAPI;
-import org.apache.james.backends.rabbitmq.SimpleConnectionPool;
+import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
import org.apache.james.event.json.EventSerializer;
import org.apache.james.mailbox.events.EventBusTestFixture.GroupA;
import org.apache.james.mailbox.events.EventBusTestFixture.MailboxListenerCountingSuccessfulExecution;
@@ -97,11 +97,11 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
private RabbitMQEventBus eventBus;
private RabbitMQEventBus eventBus2;
private RabbitMQEventBus eventBus3;
- private Sender sender;
private EventSerializer eventSerializer;
private RoutingKeyConverter routingKeyConverter;
private MemoryEventDeadLetters memoryEventDeadLetters;
private Mono<Connection> resilientConnection;
+ private ReactorRabbitMQChannelPool reactorRabbitMQChannelPool;
@BeforeEach
void setUp() {
@@ -110,6 +110,8 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
TestId.Factory mailboxIdFactory = new TestId.Factory();
eventSerializer = new EventSerializer(mailboxIdFactory, new TestMessageId.Factory(), new DefaultUserQuotaRootResolver.DefaultQuotaRootDeserializer());
routingKeyConverter = RoutingKeyConverter.forFactories(new MailboxIdRegistrationKey.Factory(mailboxIdFactory));
+ reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool(rabbitMQExtension.getRabbitConnectionPool());
+ reactorRabbitMQChannelPool.start();
eventBus = newEventBus();
eventBus2 = newEventBus();
@@ -119,7 +121,6 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
eventBus2.start();
eventBus3.start();
resilientConnection = rabbitMQExtension.getRabbitConnectionPool().getResilientConnection();
- sender = RabbitFlux.createSender(new SenderOptions().connectionMono(resilientConnection));
}
@AfterEach
@@ -129,17 +130,17 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
eventBus3.stop();
ALL_GROUPS.stream()
.map(GroupRegistration.WorkQueueName::of)
- .forEach(queueName -> sender.delete(QueueSpecification.queue(queueName.asString())).block());
- sender.delete(ExchangeSpecification.exchange(MAILBOX_EVENT_EXCHANGE_NAME)).block();
- sender.close();
+ .forEach(queueName -> reactorRabbitMQChannelPool.getSender().delete(QueueSpecification.queue(queueName.asString())).block());
+ reactorRabbitMQChannelPool.getSender().delete(ExchangeSpecification.exchange(MAILBOX_EVENT_EXCHANGE_NAME)).block();
+ reactorRabbitMQChannelPool.close();
}
private RabbitMQEventBus newEventBus() {
- return newEventBus(rabbitMQExtension.getRabbitConnectionPool());
+ return newEventBus(reactorRabbitMQChannelPool);
}
- private RabbitMQEventBus newEventBus(SimpleConnectionPool connectionPool) {
- return new RabbitMQEventBus(connectionPool, eventSerializer, RetryBackoffConfiguration.DEFAULT, routingKeyConverter, memoryEventDeadLetters, new NoopMetricFactory());
+ private RabbitMQEventBus newEventBus(ReactorRabbitMQChannelPool rabbitMQChannelPool) {
+ return new RabbitMQEventBus(rabbitMQChannelPool, eventSerializer, RetryBackoffConfiguration.DEFAULT, routingKeyConverter, memoryEventDeadLetters, new NoopMetricFactory());
}
@Override
@@ -340,10 +341,18 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
.restartPolicy(DockerRestartPolicy.PER_TEST);
private RabbitMQEventBus rabbitMQEventBusWithNetWorkIssue;
+ private ReactorRabbitMQChannelPool reactorRabbitMQChannelPoolWithNetWorkIssue;
@BeforeEach
void beforeEach() {
- rabbitMQEventBusWithNetWorkIssue = newEventBus(rabbitMQNetWorkIssueExtension.getRabbitConnectionPool());
+ reactorRabbitMQChannelPoolWithNetWorkIssue = new ReactorRabbitMQChannelPool(rabbitMQNetWorkIssueExtension.getRabbitConnectionPool());
+ reactorRabbitMQChannelPoolWithNetWorkIssue.start();
+ rabbitMQEventBusWithNetWorkIssue = newEventBus(reactorRabbitMQChannelPoolWithNetWorkIssue);
+ }
+
+ @AfterEach
+ void afterEach() {
+ reactorRabbitMQChannelPoolWithNetWorkIssue.close();
}
@Test
diff --git a/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java b/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java
index e93ac4f..25c3635 100644
--- a/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java
+++ b/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java
@@ -20,10 +20,10 @@
package org.apache.james.mpt.imapmailbox.rabbitmq.host;
-import java.net.URISyntaxException;
import java.util.concurrent.TimeUnit;
import org.apache.james.backends.rabbitmq.DockerRabbitMQ;
+import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
import org.apache.james.backends.rabbitmq.SimpleConnectionPool;
import org.apache.james.core.quota.QuotaCount;
import org.apache.james.core.quota.QuotaSize;
@@ -63,6 +63,7 @@ public class RabbitMQEventBusHostSystem extends JamesImapHostSystem {
private RabbitMQEventBus eventBus;
private SimpleConnectionPool connectionPool;
private InMemoryIntegrationResources resources;
+ private ReactorRabbitMQChannelPool reactorRabbitMQChannelPool;
RabbitMQEventBusHostSystem(DockerRabbitMQ dockerRabbitMQ) {
this.dockerRabbitMQ = dockerRabbitMQ;
@@ -73,6 +74,8 @@ public class RabbitMQEventBusHostSystem extends JamesImapHostSystem {
super.beforeTest();
connectionPool = new SimpleConnectionPool(dockerRabbitMQ.createRabbitConnectionFactory());
+ reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool(connectionPool);
+ reactorRabbitMQChannelPool.start();
eventBus = createEventBus();
eventBus.start();
@@ -101,18 +104,19 @@ public class RabbitMQEventBusHostSystem extends JamesImapHostSystem {
defaultImapProcessorFactory);
}
- private RabbitMQEventBus createEventBus() throws URISyntaxException {
+ private RabbitMQEventBus createEventBus() {
InMemoryMessageId.Factory messageIdFactory = new InMemoryMessageId.Factory();
InMemoryId.Factory mailboxIdFactory = new InMemoryId.Factory();
EventSerializer eventSerializer = new EventSerializer(mailboxIdFactory, messageIdFactory, new DefaultUserQuotaRootResolver.DefaultQuotaRootDeserializer());
RoutingKeyConverter routingKeyConverter = new RoutingKeyConverter(ImmutableSet.of(new MailboxIdRegistrationKey.Factory(mailboxIdFactory)));
- return new RabbitMQEventBus(connectionPool, eventSerializer, RetryBackoffConfiguration.DEFAULT,
+ return new RabbitMQEventBus(reactorRabbitMQChannelPool, eventSerializer, RetryBackoffConfiguration.DEFAULT,
routingKeyConverter, new MemoryEventDeadLetters(), new NoopMetricFactory());
}
@Override
public void afterTest() {
eventBus.stop();
+ reactorRabbitMQChannelPool.close();
connectionPool.close();
}
diff --git a/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/modules/DistributedTaskManagerModule.java b/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/modules/DistributedTaskManagerModule.java
index b74e6dd..ce0333f 100644
--- a/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/modules/DistributedTaskManagerModule.java
+++ b/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/modules/DistributedTaskManagerModule.java
@@ -30,6 +30,7 @@ import org.apache.james.task.eventsourcing.WorkQueueSupplier;
import org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjection;
import org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjectionModule;
import org.apache.james.task.eventsourcing.distributed.RabbitMQTerminationSubscriber;
+import org.apache.james.task.eventsourcing.distributed.RabbitMQWorkQueue;
import org.apache.james.task.eventsourcing.distributed.RabbitMQWorkQueueSupplier;
import org.apache.james.utils.InitializationOperation;
import org.apache.james.utils.InitilizationOperationBuilder;
@@ -64,5 +65,11 @@ public class DistributedTaskManagerModule extends AbstractModule {
.init(instance::start);
}
+ @ProvidesIntoSet
+ InitializationOperation workQueue(EventSourcingTaskManager instance) {
+ return InitilizationOperationBuilder
+ .forClass(RabbitMQWorkQueue.class)
+ .init(instance::start);
+ }
}
diff --git a/server/container/guice/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/JMAPCommonModule.java b/server/container/guice/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/JMAPCommonModule.java
index ed2c316..dce8120 100644
--- a/server/container/guice/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/JMAPCommonModule.java
+++ b/server/container/guice/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/JMAPCommonModule.java
@@ -42,6 +42,8 @@ import org.apache.james.lifecycle.api.StartUpCheck;
import org.apache.james.util.date.DefaultZonedDateTimeProvider;
import org.apache.james.util.date.ZonedDateTimeProvider;
import org.apache.james.util.mime.MessageContentExtractor;
+import org.apache.james.utils.InitializationOperation;
+import org.apache.james.utils.InitilizationOperationBuilder;
import org.apache.mailet.base.AutomaticallySentMailDetector;
import org.apache.mailet.base.AutomaticallySentMailDetectorImpl;
@@ -50,6 +52,7 @@ import com.google.inject.AbstractModule;
import com.google.inject.Provides;
import com.google.inject.Scopes;
import com.google.inject.multibindings.Multibinder;
+import com.google.inject.multibindings.ProvidesIntoSet;
import com.google.inject.name.Names;
public class JMAPCommonModule extends AbstractModule {
@@ -96,4 +99,11 @@ public class JMAPCommonModule extends AbstractModule {
accessTokenAuthenticationStrategy,
queryParameterAuthenticationStrategy);
}
+
+ @ProvidesIntoSet
+ InitializationOperation workQueue(MailSpool instance) {
+ return InitilizationOperationBuilder
+ .forClass(MailSpool.class)
+ .init(instance::start);
+ }
}
diff --git a/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java b/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java
index 5eb49df..4226aea 100644
--- a/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java
+++ b/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java
@@ -29,10 +29,10 @@ import org.apache.james.backends.cassandra.components.CassandraModule;
import org.apache.james.backends.rabbitmq.RabbitMQChannelPool;
import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
import org.apache.james.backends.rabbitmq.RabbitMQHealthCheck;
+import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
import org.apache.james.backends.rabbitmq.SimpleChannelPool;
import org.apache.james.core.healthcheck.HealthCheck;
import org.apache.james.eventsourcing.eventstore.cassandra.dto.EventDTOModule;
-import org.apache.james.lifecycle.api.Startable;
import org.apache.james.queue.api.MailQueueFactory;
import org.apache.james.queue.api.ManageableMailQueue;
import org.apache.james.queue.rabbitmq.RabbitMQMailQueue;
@@ -50,17 +50,18 @@ import org.apache.james.queue.rabbitmq.view.cassandra.EnqueuedMailsDAO;
import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMailQueueViewConfiguration;
import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMailQueueViewConfigurationModule;
import org.apache.james.queue.rabbitmq.view.cassandra.configuration.EventsourcingConfigurationManagement;
-import org.apache.james.utils.InitialisationOperation;
+import org.apache.james.utils.InitializationOperation;
+import org.apache.james.utils.InitilizationOperationBuilder;
import org.apache.james.utils.PropertiesProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.inject.AbstractModule;
-import com.google.inject.Inject;
import com.google.inject.Provides;
import com.google.inject.Scopes;
import com.google.inject.TypeLiteral;
import com.google.inject.multibindings.Multibinder;
+import com.google.inject.multibindings.ProvidesIntoSet;
public class RabbitMQModule extends AbstractModule {
@@ -70,6 +71,7 @@ public class RabbitMQModule extends AbstractModule {
@Override
protected void configure() {
+ bind(ReactorRabbitMQChannelPool.class).in(Scopes.SINGLETON);
bind(EnqueuedMailsDAO.class).in(Scopes.SINGLETON);
bind(DeletedMailsDAO.class).in(Scopes.SINGLETON);
bind(BrowseStartDAO.class).in(Scopes.SINGLETON);
@@ -88,7 +90,6 @@ public class RabbitMQModule extends AbstractModule {
eventDTOModuleBinder.addBinding().toInstance(CassandraMailQueueViewConfigurationModule.MAIL_QUEUE_VIEW_CONFIGURATION);
Multibinder.newSetBinder(binder(), HealthCheck.class).addBinding().to(RabbitMQHealthCheck.class);
- Multibinder.newSetBinder(binder(), InitialisationOperation.class).addBinding().to(RabbitMQMailQueueFactoryInitialisationOperation.class);
}
@Provides
@@ -145,23 +146,10 @@ public class RabbitMQModule extends AbstractModule {
return RabbitMQMailQueueConfiguration.from(configuration);
}
- @Singleton
- public static class RabbitMQMailQueueFactoryInitialisationOperation implements InitialisationOperation {
- private final RabbitMQMailQueueFactory rabbitMQMailQueueFactory;
-
- @Inject
- public RabbitMQMailQueueFactoryInitialisationOperation(RabbitMQMailQueueFactory rabbitMQMailQueueFactory) {
- this.rabbitMQMailQueueFactory = rabbitMQMailQueueFactory;
- }
-
- @Override
- public void initModule() {
- rabbitMQMailQueueFactory.start();
- }
-
- @Override
- public Class<? extends Startable> forClass() {
- return RabbitMQMailQueueFactory.class;
- }
+ @ProvidesIntoSet
+ InitializationOperation workQueue(ReactorRabbitMQChannelPool instance) {
+ return InitilizationOperationBuilder
+ .forClass(ReactorRabbitMQChannelPool.class)
+ .init(instance::start);
}
}
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/MailSpool.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/MailSpool.java
index d4ebbb6..c652bc9 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/MailSpool.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/MailSpool.java
@@ -21,6 +21,7 @@ package org.apache.james.jmap.draft.send;
import javax.inject.Inject;
+import org.apache.james.lifecycle.api.Startable;
import org.apache.james.queue.api.MailQueue;
import org.apache.james.queue.api.MailQueue.MailQueueException;
import org.apache.james.queue.api.MailQueueFactory;
@@ -30,12 +31,17 @@ import org.apache.mailet.Mail;
import com.google.common.annotations.VisibleForTesting;
-public class MailSpool {
+public class MailSpool implements Startable {
- private final MailQueue queue;
+ private final MailQueueFactory<?> queueFactory;
+ private MailQueue queue;
@Inject
@VisibleForTesting MailSpool(MailQueueFactory<?> queueFactory) {
+ this.queueFactory = queueFactory;
+ }
+
+ public void start() {
queue = queueFactory.createQueue(MailQueueFactory.SPOOL);
}
diff --git a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/send/MailSpoolTest.java b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/send/MailSpoolTest.java
index 147f21e..0552c2f 100644
--- a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/send/MailSpoolTest.java
+++ b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/send/MailSpoolTest.java
@@ -49,6 +49,7 @@ public class MailSpoolTest {
myQueue = mailQueueFactory.createQueue(MailQueueFactory.SPOOL);
mailSpool = new MailSpool(mailQueueFactory);
+ mailSpool.start();
}
@Test
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
index a6de706..21546b9 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
@@ -25,6 +25,7 @@ import java.io.IOException;
import java.util.function.Consumer;
import java.util.function.Function;
+import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
import org.apache.james.metrics.api.Metric;
import org.apache.james.metrics.api.MetricFactory;
import org.apache.james.queue.api.MailQueue;
@@ -33,15 +34,12 @@ import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
import org.apache.mailet.Mail;
import com.github.fge.lambdas.consumers.ThrowingConsumer;
-import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Delivery;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.rabbitmq.AcknowledgableDelivery;
import reactor.rabbitmq.ConsumeOptions;
-import reactor.rabbitmq.RabbitFlux;
-import reactor.rabbitmq.ReceiverOptions;
class Dequeuer {
private static final boolean REQUEUE = true;
@@ -79,14 +77,14 @@ class Dequeuer {
private final MailReferenceSerializer mailReferenceSerializer;
private final MailQueueView mailQueueView;
- Dequeuer(MailQueueName name, Mono<Connection> connectionMono, Function<MailReferenceDTO, MailWithEnqueueId> mailLoader,
+ Dequeuer(MailQueueName name, ReactorRabbitMQChannelPool reactorRabbitMQChannelPool, Function<MailReferenceDTO, MailWithEnqueueId> mailLoader,
MailReferenceSerializer serializer, MetricFactory metricFactory,
MailQueueView mailQueueView) {
this.mailLoader = mailLoader;
this.mailReferenceSerializer = serializer;
this.mailQueueView = mailQueueView;
this.dequeueMetric = metricFactory.generate(DEQUEUED_METRIC_NAME_PREFIX + name.asString());
- this.flux = RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(connectionMono))
+ this.flux = reactorRabbitMQChannelPool.createReceiver()
.consumeManualAck(name.toWorkQueueName().asString(), new ConsumeOptions().qos(EXECUTION_RATE))
.filter(getResponse -> getResponse.getBody() != null);
}
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
index ea7d04a..4e0485c 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
@@ -27,6 +27,7 @@ import java.time.Clock;
import javax.mail.MessagingException;
import javax.mail.internet.MimeMessage;
+import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
import org.apache.james.blob.api.Store;
import org.apache.james.blob.mail.MimeMessagePartsId;
import org.apache.james.metrics.api.Metric;
@@ -51,11 +52,11 @@ class Enqueuer {
private final MailQueueView mailQueueView;
private final Clock clock;
- Enqueuer(MailQueueName name, Sender sender, Store<MimeMessage, MimeMessagePartsId> mimeMessageStore,
+ Enqueuer(MailQueueName name, ReactorRabbitMQChannelPool reactorRabbitMQChannelPool, Store<MimeMessage, MimeMessagePartsId> mimeMessageStore,
MailReferenceSerializer serializer, MetricFactory metricFactory,
MailQueueView mailQueueView, Clock clock) {
this.name = name;
- this.sender = sender;
+ this.sender = reactorRabbitMQChannelPool.getSender();
this.mimeMessageStore = mimeMessageStore;
this.mailReferenceSerializer = serializer;
this.mailQueueView = mailQueueView;
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
index b0748c0..b48f60f 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
@@ -32,17 +32,14 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
-import javax.annotation.PreDestroy;
import javax.inject.Inject;
import javax.mail.internet.MimeMessage;
import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
-import org.apache.james.backends.rabbitmq.SimpleConnectionPool;
import org.apache.james.blob.api.BlobId;
import org.apache.james.blob.api.Store;
import org.apache.james.blob.mail.MimeMessagePartsId;
import org.apache.james.blob.mail.MimeMessageStore;
-import org.apache.james.lifecycle.api.Startable;
import org.apache.james.metrics.api.GaugeRegistry;
import org.apache.james.metrics.api.MetricFactory;
import org.apache.james.queue.api.MailQueueFactory;
@@ -53,23 +50,18 @@ import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
import com.github.fge.lambdas.Throwing;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
-import com.rabbitmq.client.Connection;
import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
import reactor.rabbitmq.BindingSpecification;
import reactor.rabbitmq.ExchangeSpecification;
import reactor.rabbitmq.QueueSpecification;
-import reactor.rabbitmq.Sender;
-public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQueue>, Startable {
- private static final int MAX_CHANNELS_NUMBER = 5;
+public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQueue> {
@VisibleForTesting static class PrivateFactory {
private final MetricFactory metricFactory;
private final GaugeRegistry gaugeRegistry;
- private final Mono<Connection> connectionMono;
- private final Sender sender;
+ private final ReactorRabbitMQChannelPool reactorRabbitMQChannelPool;
private final Store<MimeMessage, MimeMessagePartsId> mimeMessageStore;
private final MailReferenceSerializer mailReferenceSerializer;
private final Function<MailReferenceDTO, MailWithEnqueueId> mailLoader;
@@ -81,7 +73,8 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
@Inject
@VisibleForTesting PrivateFactory(MetricFactory metricFactory,
GaugeRegistry gaugeRegistry,
- Mono<Connection> connectionMono, Sender sender, MimeMessageStore.Factory mimeMessageStoreFactory,
+ ReactorRabbitMQChannelPool reactorRabbitMQChannelPool,
+ MimeMessageStore.Factory mimeMessageStoreFactory,
BlobId.Factory blobIdFactory,
MailQueueView.Factory mailQueueViewFactory,
Clock clock,
@@ -89,8 +82,7 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
RabbitMQMailQueueConfiguration configuration) {
this.metricFactory = metricFactory;
this.gaugeRegistry = gaugeRegistry;
- this.connectionMono = connectionMono;
- this.sender = sender;
+ this.reactorRabbitMQChannelPool = reactorRabbitMQChannelPool;
this.mimeMessageStore = mimeMessageStoreFactory.mimeMessageStore();
this.mailQueueViewFactory = mailQueueViewFactory;
this.clock = clock;
@@ -107,9 +99,9 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
RabbitMQMailQueue rabbitMQMailQueue = new RabbitMQMailQueue(
metricFactory,
mailQueueName,
- new Enqueuer(mailQueueName, sender, mimeMessageStore, mailReferenceSerializer,
+ new Enqueuer(mailQueueName, reactorRabbitMQChannelPool, mimeMessageStore, mailReferenceSerializer,
metricFactory, mailQueueView, clock),
- new Dequeuer(mailQueueName, connectionMono, mailLoader, mailReferenceSerializer,
+ new Dequeuer(mailQueueName, reactorRabbitMQChannelPool, mailLoader, mailReferenceSerializer,
metricFactory, mailQueueView),
mailQueueView,
decoratorFactory);
@@ -145,26 +137,19 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
private final RabbitMQMailQueueManagement mqManagementApi;
private final PrivateFactory privateFactory;
private final RabbitMQMailQueueObjectPool mailQueueObjectPool;
- private final Mono<Connection> connectionMono;
- private ReactorRabbitMQChannelPool reactorRabbitMQChannelPool;
- private Sender sender;
+ private final ReactorRabbitMQChannelPool reactorRabbitMQChannelPool;
@VisibleForTesting
@Inject
- RabbitMQMailQueueFactory(SimpleConnectionPool simpleConnectionPool,
+ RabbitMQMailQueueFactory(ReactorRabbitMQChannelPool reactorRabbitMQChannelPool,
RabbitMQMailQueueManagement mqManagementApi,
PrivateFactory privateFactory) {
- this.connectionMono = simpleConnectionPool.getResilientConnection();
+ this.reactorRabbitMQChannelPool = reactorRabbitMQChannelPool;
this.mqManagementApi = mqManagementApi;
this.privateFactory = privateFactory;
this.mailQueueObjectPool = new RabbitMQMailQueueObjectPool();
}
- public void start() {
- this.reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool(connectionMono, MAX_CHANNELS_NUMBER);
- this.sender = reactorRabbitMQChannelPool.createSender();
- }
-
@Override
public Optional<RabbitMQMailQueue> getQueue(String name) {
return getQueueFromRabbitServer(MailQueueName.fromString(name));
@@ -187,15 +172,15 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
private RabbitMQMailQueue createQueueIntoRabbitServer(MailQueueName mailQueueName) {
String exchangeName = mailQueueName.toRabbitExchangeName().asString();
Flux.concat(
- sender.declareExchange(ExchangeSpecification.exchange(exchangeName)
+ reactorRabbitMQChannelPool.getSender().declareExchange(ExchangeSpecification.exchange(exchangeName)
.durable(true)
.type("direct")),
- sender.declareQueue(QueueSpecification.queue(mailQueueName.toWorkQueueName().asString())
+ reactorRabbitMQChannelPool.getSender().declareQueue(QueueSpecification.queue(mailQueueName.toWorkQueueName().asString())
.durable(DURABLE)
.exclusive(!EXCLUSIVE)
.autoDelete(!AUTO_DELETE)
.arguments(NO_ARGUMENTS)),
- sender.bind(BindingSpecification.binding()
+ reactorRabbitMQChannelPool.getSender().bind(BindingSpecification.binding()
.exchange(mailQueueName.toRabbitExchangeName().asString())
.queue(mailQueueName.toWorkQueueName().asString())
.routingKey(EMPTY_ROUTING_KEY)))
@@ -210,11 +195,4 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
.map(mailQueueObjectPool::retrieveInstanceFor)
.findFirst();
}
-
- @PreDestroy
- public void stop() {
- sender.close();
- reactorRabbitMQChannelPool.close();
- }
-
}
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java
index 7c23f0f..4ad6f85 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java
@@ -61,9 +61,6 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import com.github.fge.lambdas.Throwing;
-import com.rabbitmq.client.Connection;
-
-import reactor.core.publisher.Mono;
class RabbitMQMailQueueConfigurationChangeTest {
private static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory();
@@ -79,7 +76,6 @@ class RabbitMQMailQueueConfigurationChangeTest {
private static final Instant IN_SLICE_1 = Instant.parse("2007-12-03T10:15:30.00Z");
private static final Instant IN_SLICE_2 = IN_SLICE_1.plus(1, HOURS);
private static final Instant IN_SLICE_3 = IN_SLICE_1.plus(2, HOURS);
- public static final int POOL_SIZE = 5;
@RegisterExtension
static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraModule.aggregateModules(
@@ -94,6 +90,7 @@ class RabbitMQMailQueueConfigurationChangeTest {
private UpdatableTickingClock clock;
private RabbitMQMailQueueManagement mqManagementApi;
private MimeMessageStore.Factory mimeMessageStoreFactory;
+ private ReactorRabbitMQChannelPool reactorRabbitMQChannelPool;
@BeforeEach
void setup(CassandraCluster cassandra) throws Exception {
@@ -101,11 +98,14 @@ class RabbitMQMailQueueConfigurationChangeTest {
mimeMessageStoreFactory = MimeMessageStore.factory(blobsDAO);
clock = new UpdatableTickingClock(IN_SLICE_1);
mqManagementApi = new RabbitMQMailQueueManagement(rabbitMQExtension.managementAPI());
+ reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool(rabbitMQExtension.getRabbitConnectionPool());
+ reactorRabbitMQChannelPool.start();
}
@AfterEach
void tearDown() {
mqManagementApi.deleteAllQueues();
+ reactorRabbitMQChannelPool.close();
}
private RabbitMQMailQueue getRabbitMQMailQueue(CassandraCluster cassandra, CassandraMailQueueViewConfiguration mailQueueViewConfiguration) throws Exception {
@@ -119,21 +119,18 @@ class RabbitMQMailQueueConfigurationChangeTest {
.sizeMetricsEnabled(true)
.build();
- Mono<Connection> connectionMono = rabbitMQExtension.getRabbitConnectionPool().getResilientConnection();
- ReactorRabbitMQChannelPool reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool(connectionMono, POOL_SIZE);
+
RabbitMQMailQueueFactory.PrivateFactory privateFactory = new RabbitMQMailQueueFactory.PrivateFactory(
new NoopMetricFactory(),
new NoopGaugeRegistry(),
- connectionMono,
- reactorRabbitMQChannelPool.createSender(),
+ reactorRabbitMQChannelPool,
mimeMessageStoreFactory,
BLOB_ID_FACTORY,
mailQueueViewFactory,
clock,
new RawMailQueueItemDecoratorFactory(),
mailQueueSizeConfiguration);
- RabbitMQMailQueueFactory mailQueueFactory = new RabbitMQMailQueueFactory(rabbitMQExtension.getRabbitConnectionPool(), mqManagementApi, privateFactory);
- mailQueueFactory.start();
+ RabbitMQMailQueueFactory mailQueueFactory = new RabbitMQMailQueueFactory(reactorRabbitMQChannelPool, mqManagementApi, privateFactory);
return mailQueueFactory.createQueue(SPOOL);
}
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
index 127ed23..0102a05 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
@@ -44,6 +44,7 @@ import org.apache.james.blob.cassandra.CassandraBlobModule;
import org.apache.james.blob.cassandra.CassandraBlobStore;
import org.apache.james.blob.mail.MimeMessageStore;
import org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStoreModule;
+
import org.apache.james.metrics.api.Gauge;
import org.apache.james.queue.api.MailQueue;
import org.apache.james.queue.api.MailQueueMetricContract;
@@ -68,7 +69,6 @@ import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.ArgumentCaptor;
import com.github.fge.lambdas.Throwing;
-import com.rabbitmq.client.Connection;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -84,7 +84,6 @@ class RabbitMQMailQueueTest {
private static final Instant IN_SLICE_3 = IN_SLICE_1.plus(2, HOURS);
private static final Instant IN_SLICE_5 = IN_SLICE_1.plus(4, HOURS);
private static final Instant IN_SLICE_7 = IN_SLICE_1.plus(6, HOURS);
- private static final int POOL_SIZE = 5;
@RegisterExtension
static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraModule.aggregateModules(
@@ -100,9 +99,11 @@ class RabbitMQMailQueueTest {
private UpdatableTickingClock clock;
private RabbitMQMailQueue mailQueue;
private RabbitMQMailQueueManagement mqManagementApi;
+ private ReactorRabbitMQChannelPool reactorRabbitMQChannelPool;
@AfterEach
void tearDown() {
+ reactorRabbitMQChannelPool.close();
mqManagementApi.deleteAllQueues();
}
@@ -238,11 +239,6 @@ class RabbitMQMailQueueTest {
}))
.blockLast();
}
-
- @AfterEach
- void tearDown() {
- mqManagementApi.deleteAllQueues();
- }
}
@Nested
@@ -264,11 +260,6 @@ class RabbitMQMailQueueTest {
ArgumentCaptor<Gauge<?>> gaugeCaptor = ArgumentCaptor.forClass(Gauge.class);
verify(metricTestSystem.getSpyGaugeRegistry(), never()).register(any(), gaugeCaptor.capture());
}
-
- @AfterEach
- void tearDown() {
- mqManagementApi.deleteAllQueues();
- }
}
private void setUp(CassandraCluster cassandra, MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem, RabbitMQMailQueueConfiguration configuration) throws Exception {
@@ -284,14 +275,12 @@ class RabbitMQMailQueueTest {
.build(),
mimeMessageStoreFactory);
- Mono<Connection> connectionMono = rabbitMQExtension.getRabbitConnectionPool().getResilientConnection();
- ReactorRabbitMQChannelPool reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool(connectionMono, POOL_SIZE);
-
+ reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool(rabbitMQExtension.getRabbitConnectionPool());
+ reactorRabbitMQChannelPool.start();
RabbitMQMailQueueFactory.PrivateFactory factory = new RabbitMQMailQueueFactory.PrivateFactory(
metricTestSystem.getMetricFactory(),
metricTestSystem.getSpyGaugeRegistry(),
- connectionMono,
- reactorRabbitMQChannelPool.createSender(),
+ reactorRabbitMQChannelPool,
mimeMessageStoreFactory,
BLOB_ID_FACTORY,
mailQueueViewFactory,
@@ -299,8 +288,7 @@ class RabbitMQMailQueueTest {
new RawMailQueueItemDecoratorFactory(),
configuration);
mqManagementApi = new RabbitMQMailQueueManagement(rabbitMQExtension.managementAPI());
- mailQueueFactory = new RabbitMQMailQueueFactory(rabbitMQExtension.getRabbitConnectionPool(), mqManagementApi, factory);
- mailQueueFactory.start();
+ mailQueueFactory = new RabbitMQMailQueueFactory(reactorRabbitMQChannelPool, mqManagementApi, factory);
mailQueue = mailQueueFactory.createQueue(SPOOL);
}
}
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
index fb5b8b7..8ac46b3 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
@@ -46,19 +46,15 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
-import com.rabbitmq.client.Connection;
-
-import reactor.core.publisher.Mono;
-
class RabbitMqMailQueueFactoryTest implements MailQueueFactoryContract<RabbitMQMailQueue> {
private static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory();
- public static final int POOL_SIZE = 5;
@RegisterExtension
static RabbitMQExtension rabbitMQExtension = RabbitMQExtension.singletonRabbitMQ();
private RabbitMQMailQueueFactory mailQueueFactory;
private RabbitMQMailQueueManagement mqManagementApi;
+ private ReactorRabbitMQChannelPool reactorRabbitMQChannelPool;
@BeforeEach
void setup() throws Exception {
@@ -72,13 +68,12 @@ class RabbitMqMailQueueFactoryTest implements MailQueueFactoryContract<RabbitMQM
.sizeMetricsEnabled(true)
.build();
- Mono<Connection> connectionMono = rabbitMQExtension.getRabbitConnectionPool().getResilientConnection();
- ReactorRabbitMQChannelPool reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool(connectionMono, POOL_SIZE);
+ reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool(rabbitMQExtension.getRabbitConnectionPool());
+ reactorRabbitMQChannelPool.start();
RabbitMQMailQueueFactory.PrivateFactory privateFactory = new RabbitMQMailQueueFactory.PrivateFactory(
new NoopMetricFactory(),
new NoopGaugeRegistry(),
- connectionMono,
- reactorRabbitMQChannelPool.createSender(),
+ reactorRabbitMQChannelPool,
mimeMessageStoreFactory,
BLOB_ID_FACTORY,
mailQueueViewFactory,
@@ -86,13 +81,13 @@ class RabbitMqMailQueueFactoryTest implements MailQueueFactoryContract<RabbitMQM
new RawMailQueueItemDecoratorFactory(),
configuration);
mqManagementApi = new RabbitMQMailQueueManagement(rabbitMQExtension.managementAPI());
- mailQueueFactory = new RabbitMQMailQueueFactory(rabbitMQExtension.getRabbitConnectionPool(), mqManagementApi, privateFactory);
- mailQueueFactory.start();
+ mailQueueFactory = new RabbitMQMailQueueFactory(reactorRabbitMQChannelPool, mqManagementApi, privateFactory);
}
@AfterEach
void tearDown() {
mqManagementApi.deleteAllQueues();
+ reactorRabbitMQChannelPool.close();
}
@Override
diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
index d2bdeeb..dbb4b8f 100644
--- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
+++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
@@ -42,6 +42,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Delivery;
+
import reactor.core.Disposable;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Mono;
@@ -83,7 +84,8 @@ public class RabbitMQTerminationSubscriber implements TerminationSubscriber, Sta
}
public void start() {
- sender = channelPool.createSender();
+ channelPool.start();
+ sender = channelPool.getSender();
sender.declareExchange(ExchangeSpecification.exchange(EXCHANGE_NAME)).block();
sender.declare(QueueSpecification.queue(queueName).durable(false).autoDelete(true)).block();
diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
index 21a9880..62a2f1b 100644
--- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
+++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
@@ -25,8 +25,6 @@ import java.util.Optional;
import java.util.UUID;
import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
-import org.apache.james.backends.rabbitmq.SimpleConnectionPool;
-import org.apache.james.lifecycle.api.Startable;
import org.apache.james.server.task.json.JsonTaskSerializer;
import org.apache.james.task.Task;
import org.apache.james.task.TaskId;
@@ -39,8 +37,8 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.ImmutableMap;
import com.rabbitmq.client.AMQP;
-import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Delivery;
+
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.publisher.UnicastProcessor;
@@ -51,12 +49,11 @@ import reactor.rabbitmq.ConsumeOptions;
import reactor.rabbitmq.ExchangeSpecification;
import reactor.rabbitmq.OutboundMessage;
import reactor.rabbitmq.QueueSpecification;
-import reactor.rabbitmq.RabbitFlux;
import reactor.rabbitmq.Receiver;
import reactor.rabbitmq.ReceiverOptions;
import reactor.rabbitmq.Sender;
-public class RabbitMQWorkQueue implements WorkQueue, Startable {
+public class RabbitMQWorkQueue implements WorkQueue {
private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQWorkQueue.class);
// Need at least one by receivers plus a shared one for senders
@@ -71,10 +68,8 @@ public class RabbitMQWorkQueue implements WorkQueue, Startable {
public static final String TASK_ID = "taskId";
private final TaskManagerWorker worker;
- private final Mono<Connection> connectionMono;
private final ReactorRabbitMQChannelPool channelPool;
private final JsonTaskSerializer taskSerializer;
- private Sender sender;
private RabbitMQExclusiveConsumer receiver;
private UnicastProcessor<TaskId> sendCancelRequestsQueue;
private Disposable sendCancelRequestsQueueHandle;
@@ -83,29 +78,28 @@ public class RabbitMQWorkQueue implements WorkQueue, Startable {
private Sender cancelRequestSender;
private Receiver cancelRequestListener;
- public RabbitMQWorkQueue(TaskManagerWorker worker, SimpleConnectionPool simpleConnectionPool, JsonTaskSerializer taskSerializer) {
+ public RabbitMQWorkQueue(TaskManagerWorker worker, ReactorRabbitMQChannelPool reactorRabbitMQChannelPool, JsonTaskSerializer taskSerializer) {
this.worker = worker;
- this.connectionMono = simpleConnectionPool.getResilientConnection();
+ this.channelPool = reactorRabbitMQChannelPool;
this.taskSerializer = taskSerializer;
- this.channelPool = new ReactorRabbitMQChannelPool(connectionMono, MAX_CHANNELS_NUMBER);
}
+ @Override
public void start() {
startWorkqueue();
listenToCancelRequests();
}
private void startWorkqueue() {
- sender = channelPool.createSender();
- sender.declareExchange(ExchangeSpecification.exchange(EXCHANGE_NAME)).block();
- sender.declare(QueueSpecification.queue(QUEUE_NAME).durable(true)).block();
- sender.bind(BindingSpecification.binding(EXCHANGE_NAME, ROUTING_KEY, QUEUE_NAME)).block();
+ channelPool.getSender().declareExchange(ExchangeSpecification.exchange(EXCHANGE_NAME)).block();
+ channelPool.getSender().declare(QueueSpecification.queue(QUEUE_NAME).durable(true)).block();
+ channelPool.getSender().bind(BindingSpecification.binding(EXCHANGE_NAME, ROUTING_KEY, QUEUE_NAME)).block();
consumeWorkqueue();
}
private void consumeWorkqueue() {
- receiver = new RabbitMQExclusiveConsumer(new ReceiverOptions().connectionMono(connectionMono));
+ receiver = new RabbitMQExclusiveConsumer(new ReceiverOptions().connectionMono(channelPool.getConnectionMono()));
receiverHandle = receiver.consumeExclusiveManualAck(QUEUE_NAME, new ConsumeOptions())
.subscribeOn(Schedulers.boundedElastic())
.flatMap(this::executeTask)
@@ -143,7 +137,7 @@ public class RabbitMQWorkQueue implements WorkQueue, Startable {
}
void listenToCancelRequests() {
- cancelRequestSender = channelPool.createSender();
+ cancelRequestSender = channelPool.getSender();
String queueName = CANCEL_REQUESTS_QUEUE_NAME_PREFIX + UUID.randomUUID().toString();
cancelRequestSender.declareExchange(ExchangeSpecification.exchange(CANCEL_REQUESTS_EXCHANGE_NAME)).block();
@@ -159,8 +153,7 @@ public class RabbitMQWorkQueue implements WorkQueue, Startable {
}
private void registerCancelRequestsListener(String queueName) {
- cancelRequestListener = RabbitFlux
- .createReceiver(new ReceiverOptions().connectionMono(connectionMono));
+ cancelRequestListener = channelPool.createReceiver();
cancelRequestListenerHandle = cancelRequestListener
.consumeAutoAck(queueName)
.subscribeOn(Schedulers.boundedElastic())
@@ -188,7 +181,7 @@ public class RabbitMQWorkQueue implements WorkQueue, Startable {
.headers(ImmutableMap.of(TASK_ID, taskWithId.getId().asString()))
.build();
OutboundMessage outboundMessage = new OutboundMessage(EXCHANGE_NAME, ROUTING_KEY, basicProperties, payload);
- sender.send(Mono.just(outboundMessage)).block();
+ channelPool.getSender().send(Mono.just(outboundMessage)).block();
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
@@ -203,12 +196,9 @@ public class RabbitMQWorkQueue implements WorkQueue, Startable {
public void close() {
Optional.ofNullable(receiverHandle).ifPresent(Disposable::dispose);
Optional.ofNullable(receiver).ifPresent(RabbitMQExclusiveConsumer::close);
- Optional.ofNullable(sender).ifPresent(Sender::close);
Optional.ofNullable(sendCancelRequestsQueueHandle).ifPresent(Disposable::dispose);
Optional.ofNullable(cancelRequestListenerHandle).ifPresent(Disposable::dispose);
- Optional.ofNullable(sender).ifPresent(Sender::close);
Optional.ofNullable(cancelRequestSender).ifPresent(Sender::close);
Optional.ofNullable(cancelRequestListener).ifPresent(Receiver::close);
- channelPool.close();
}
}
diff --git a/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala b/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala
index f86be44..7feed07 100644
--- a/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala
+++ b/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala
@@ -22,13 +22,13 @@ import java.time.Duration
import com.google.common.annotations.VisibleForTesting
import javax.inject.Inject
-import org.apache.james.backends.rabbitmq.SimpleConnectionPool
+import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool
import org.apache.james.eventsourcing.EventSourcingSystem
import org.apache.james.server.task.json.JsonTaskSerializer
import org.apache.james.task.SerialTaskManagerWorker
import org.apache.james.task.eventsourcing.{WorkQueueSupplier, WorkerStatusListener}
-class RabbitMQWorkQueueSupplier @Inject()(private val rabbitMQConnectionPool: SimpleConnectionPool,
+class RabbitMQWorkQueueSupplier @Inject()(private val rabbitMQConnectionPool: ReactorRabbitMQChannelPool,
private val jsonTaskSerializer: JsonTaskSerializer) extends WorkQueueSupplier {
val DEFAULT_ADDITIONAL_INFORMATION_POLLING_INTERVAL = Duration.ofSeconds(30)
@@ -41,7 +41,6 @@ class RabbitMQWorkQueueSupplier @Inject()(private val rabbitMQConnectionPool: Si
val listener = WorkerStatusListener(eventSourcingSystem)
val worker = new SerialTaskManagerWorker(listener, additionalInformationPollingInterval)
val rabbitMQWorkQueue = new RabbitMQWorkQueue(worker, rabbitMQConnectionPool, jsonTaskSerializer)
- rabbitMQWorkQueue.start()
rabbitMQWorkQueue
}
}
diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
index 42f6f25..83c60d8 100644
--- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
+++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
@@ -34,7 +34,7 @@ import org.apache.james.backends.cassandra.components.CassandraModule;
import org.apache.james.backends.cassandra.init.CassandraZonedDateTimeModule;
import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
import org.apache.james.backends.rabbitmq.RabbitMQExtension;
-import org.apache.james.backends.rabbitmq.SimpleConnectionPool;
+import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
import org.apache.james.eventsourcing.EventSourcingSystem;
import org.apache.james.eventsourcing.eventstore.EventStore;
import org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStoreExtension;
@@ -74,11 +74,13 @@ import com.github.steveash.guavate.Guavate;
class DistributedTaskManagerTest implements TaskManagerContract {
+ private ReactorRabbitMQChannelPool reactorRabbitMQChannelPool;
+
private static class TrackedRabbitMQWorkQueueSupplier implements WorkQueueSupplier {
private final List<RabbitMQWorkQueue> workQueues;
private final RabbitMQWorkQueueSupplier supplier;
- TrackedRabbitMQWorkQueueSupplier(SimpleConnectionPool rabbitConnectionPool, JsonTaskSerializer taskSerializer) {
+ TrackedRabbitMQWorkQueueSupplier(ReactorRabbitMQChannelPool rabbitConnectionPool, JsonTaskSerializer taskSerializer) {
workQueues = new ArrayList<>();
supplier = new RabbitMQWorkQueueSupplier(rabbitConnectionPool, taskSerializer);
}
@@ -86,6 +88,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
@Override
public WorkQueue apply(EventSourcingSystem eventSourcingSystem) {
RabbitMQWorkQueue workQueue = supplier.apply(eventSourcingSystem, UPDATE_INFORMATION_POLLING_INTERVAL);
+ workQueue.start();
workQueues.add(workQueue);
return workQueue;
}
@@ -138,7 +141,9 @@ class DistributedTaskManagerTest implements TaskManagerContract {
@BeforeEach
void setUp(EventStore eventStore) {
- workQueueSupplier = new TrackedRabbitMQWorkQueueSupplier(rabbitMQExtension.getRabbitConnectionPool(), TASK_SERIALIZER);
+ reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool(rabbitMQExtension.getRabbitConnectionPool());
+ reactorRabbitMQChannelPool.start();
+ workQueueSupplier = new TrackedRabbitMQWorkQueueSupplier(reactorRabbitMQChannelPool, TASK_SERIALIZER);
this.eventStore = eventStore;
terminationSubscribers = new ArrayList<>();
}
@@ -147,6 +152,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
void tearDown() {
terminationSubscribers.forEach(RabbitMQTerminationSubscriber::close);
workQueueSupplier.stopWorkQueues();
+ reactorRabbitMQChannelPool.close();
}
public EventSourcingTaskManager taskManager() {
diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java
index 91af0e2..5468af1 100644
--- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java
+++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java
@@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.apache.james.backends.rabbitmq.RabbitMQExtension;
+import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
import org.apache.james.server.task.json.JsonTaskSerializer;
import org.apache.james.server.task.json.TestTask;
import org.apache.james.server.task.json.dto.TestTaskDTOModules;
@@ -68,6 +69,7 @@ class RabbitMQWorkQueueTest {
private RabbitMQWorkQueue testee;
private ImmediateWorker worker;
private JsonTaskSerializer serializer;
+ private ReactorRabbitMQChannelPool reactorRabbitMQChannelPool;
private static class ImmediateWorker implements TaskManagerWorker {
@@ -101,13 +103,16 @@ class RabbitMQWorkQueueTest {
void setUp() {
worker = spy(new ImmediateWorker());
serializer = new JsonTaskSerializer(TestTaskDTOModules.COMPLETED_TASK_MODULE);
- testee = new RabbitMQWorkQueue(worker, rabbitMQExtension.getRabbitConnectionPool(), serializer);
+ reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool(rabbitMQExtension.getRabbitConnectionPool());
+ reactorRabbitMQChannelPool.start();
+ testee = new RabbitMQWorkQueue(worker, reactorRabbitMQChannelPool, serializer);
testee.start();
}
@AfterEach
void tearDown() {
testee.close();
+ reactorRabbitMQChannelPool.close();
}
@Test
@@ -132,7 +137,7 @@ class RabbitMQWorkQueueTest {
testee.submit(TASK_WITH_ID);
ImmediateWorker otherTaskManagerWorker = new ImmediateWorker();
- try (RabbitMQWorkQueue otherWorkQueue = new RabbitMQWorkQueue(otherTaskManagerWorker, rabbitMQExtension.getRabbitConnectionPool(), serializer)) {
+ try (RabbitMQWorkQueue otherWorkQueue = new RabbitMQWorkQueue(otherTaskManagerWorker, reactorRabbitMQChannelPool, serializer)) {
otherWorkQueue.start();
IntStream.range(0, 9)
@@ -151,7 +156,7 @@ class RabbitMQWorkQueueTest {
ImmediateWorker otherTaskManagerWorker = new ImmediateWorker();
JsonTaskSerializer otherTaskSerializer = new JsonTaskSerializer(TestTaskDTOModules.TEST_TYPE);
- try (RabbitMQWorkQueue otherWorkQueue = new RabbitMQWorkQueue(otherTaskManagerWorker, rabbitMQExtension.getRabbitConnectionPool(), otherTaskSerializer)) {
+ try (RabbitMQWorkQueue otherWorkQueue = new RabbitMQWorkQueue(otherTaskManagerWorker, reactorRabbitMQChannelPool, otherTaskSerializer)) {
//wait to be sur that the first workqueue has subscribed as an exclusive consumer of the RabbitMQ queue.
Thread.sleep(200);
otherWorkQueue.start();
diff --git a/server/task/task-memory/pom.xml b/server/task/task-memory/pom.xml
index 0b60d59..e6dccd7 100644
--- a/server/task/task-memory/pom.xml
+++ b/server/task/task-memory/pom.xml
@@ -44,6 +44,10 @@
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
+ <artifactId>james-server-lifecycle-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>${james.groupId}</groupId>
<artifactId>james-server-util</artifactId>
</dependency>
<dependency>
diff --git a/server/task/task-memory/src/main/java/org/apache/james/task/WorkQueue.java b/server/task/task-memory/src/main/java/org/apache/james/task/WorkQueue.java
index ae363ff..8a4f490 100644
--- a/server/task/task-memory/src/main/java/org/apache/james/task/WorkQueue.java
+++ b/server/task/task-memory/src/main/java/org/apache/james/task/WorkQueue.java
@@ -20,7 +20,13 @@ package org.apache.james.task;
import java.io.Closeable;
-public interface WorkQueue extends Closeable {
+import org.apache.james.lifecycle.api.Startable;
+
+public interface WorkQueue extends Closeable, Startable {
+
+ default void start() {
+
+ }
void submit(TaskWithId taskWithId);
diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
index 35856b0..fe2d869 100644
--- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
+++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
@@ -22,14 +22,15 @@ import java.io.Closeable
import java.time.Duration
import java.util
+import com.google.common.annotations.VisibleForTesting
+import javax.annotation.PreDestroy
import javax.inject.Inject
import org.apache.james.eventsourcing.eventstore.{EventStore, History}
import org.apache.james.eventsourcing.{AggregateId, Subscriber}
+import org.apache.james.lifecycle.api.Startable
import org.apache.james.task.TaskManager.ReachedTimeoutException
import org.apache.james.task._
import org.apache.james.task.eventsourcing.TaskCommand._
-import com.google.common.annotations.VisibleForTesting
-import javax.annotation.PreDestroy
import reactor.core.publisher.{Flux, Mono}
import reactor.core.scheduler.Schedulers
@@ -38,7 +39,7 @@ class EventSourcingTaskManager @Inject @VisibleForTesting private[eventsourcing]
val eventStore: EventStore,
val executionDetailsProjection: TaskExecutionDetailsProjection,
val hostname: Hostname,
- val terminationSubscriber: TerminationSubscriber) extends TaskManager with Closeable {
+ val terminationSubscriber: TerminationSubscriber) extends TaskManager with Closeable with Startable {
private def workDispatcher: Subscriber = {
case Created(aggregateId, _, task, _) =>
@@ -69,6 +70,8 @@ class EventSourcingTaskManager @Inject @VisibleForTesting private[eventsourcing]
private val workQueue: WorkQueue = workQueueSupplier(eventSourcingSystem)
+ def start(): Unit = workQueue.start()
+
override def submit(task: Task): TaskId = {
val taskId = TaskId.generateTaskId
val command = Create(taskId, task)
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org