You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/11/04 11:23:38 UTC

[james-project] 02/30: JAMES-2937 Inject RabbitMQChannelPool

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit fc2fa8730d79d4a1a7312af94408648afca126cc
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Mon Oct 7 14:48:36 2019 +0700

    JAMES-2937 Inject RabbitMQChannelPool
    
    This avoids repeating ourselves regarding Sender and Receiver specification
    and simplifies overall wiring.
    
    This also highlighted some 'dirty' initialization performed within MailSpool
    constructor, out of startable chain.
---
 backends-common/rabbitmq/pom.xml                   |  4 ++
 .../rabbitmq/ReactorRabbitMQChannelPool.java       | 36 ++++++++++++++--
 .../rabbitmq/ReactorRabbitMQChannelPoolTest.java   |  8 ++--
 .../james/mailbox/events/GroupRegistration.java    | 11 +++--
 .../mailbox/events/GroupRegistrationHandler.java   | 17 +++-----
 .../mailbox/events/KeyRegistrationHandler.java     | 10 ++---
 .../james/mailbox/events/RabbitMQEventBus.java     | 25 ++++-------
 .../james/mailbox/events/RabbitMQEventBusTest.java | 29 ++++++++-----
 .../rabbitmq/host/RabbitMQEventBusHostSystem.java  | 10 +++--
 .../modules/DistributedTaskManagerModule.java      |  7 ++++
 .../apache/james/jmap/draft/JMAPCommonModule.java  | 10 +++++
 .../james/modules/rabbitmq/RabbitMQModule.java     | 32 +++++----------
 .../apache/james/jmap/draft/send/MailSpool.java    | 10 ++++-
 .../james/jmap/draft/send/MailSpoolTest.java       |  1 +
 .../org/apache/james/queue/rabbitmq/Dequeuer.java  |  8 ++--
 .../org/apache/james/queue/rabbitmq/Enqueuer.java  |  5 ++-
 .../queue/rabbitmq/RabbitMQMailQueueFactory.java   | 48 ++++++----------------
 .../RabbitMQMailQueueConfigurationChangeTest.java  | 17 ++++----
 .../queue/rabbitmq/RabbitMQMailQueueTest.java      | 26 ++++--------
 .../rabbitmq/RabbitMqMailQueueFactoryTest.java     | 17 +++-----
 .../distributed/RabbitMQTerminationSubscriber.java |  4 +-
 .../distributed/RabbitMQWorkQueue.java             | 34 ++++++---------
 .../distributed/RabbitMQWorkQueueSupplier.scala    |  5 +--
 .../distributed/DistributedTaskManagerTest.java    | 12 ++++--
 .../distributed/RabbitMQWorkQueueTest.java         | 11 +++--
 server/task/task-memory/pom.xml                    |  4 ++
 .../main/java/org/apache/james/task/WorkQueue.java |  8 +++-
 .../eventsourcing/EventSourcingTaskManager.scala   |  9 ++--
 28 files changed, 216 insertions(+), 202 deletions(-)

diff --git a/backends-common/rabbitmq/pom.xml b/backends-common/rabbitmq/pom.xml
index fa1c156..7cd59c1 100644
--- a/backends-common/rabbitmq/pom.xml
+++ b/backends-common/rabbitmq/pom.xml
@@ -37,6 +37,10 @@
         </dependency>
         <dependency>
             <groupId>${james.groupId}</groupId>
+            <artifactId>james-server-lifecycle-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
             <artifactId>james-server-testing</artifactId>
             <scope>test</scope>
         </dependency>
diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
index 9e66225..e240b93 100644
--- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
+++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
@@ -24,12 +24,15 @@ import java.util.Comparator;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.function.BiConsumer;
 
+import javax.annotation.PreDestroy;
+import javax.inject.Inject;
+
 import org.apache.commons.pool2.BasePooledObjectFactory;
 import org.apache.commons.pool2.PooledObject;
 import org.apache.commons.pool2.impl.DefaultPooledObject;
 import org.apache.commons.pool2.impl.GenericObjectPool;
 import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
-
+import org.apache.james.lifecycle.api.Startable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,10 +45,13 @@ import reactor.core.publisher.SignalType;
 import reactor.core.scheduler.Schedulers;
 import reactor.rabbitmq.ChannelPool;
 import reactor.rabbitmq.RabbitFlux;
+import reactor.rabbitmq.Receiver;
+import reactor.rabbitmq.ReceiverOptions;
 import reactor.rabbitmq.Sender;
 import reactor.rabbitmq.SenderOptions;
 
-public class ReactorRabbitMQChannelPool implements ChannelPool {
+public class ReactorRabbitMQChannelPool implements ChannelPool, Startable {
+    private static final int MAX_CHANNELS_NUMBER = 5;
 
     static class ChannelFactory extends BasePooledObjectFactory<Channel> {
 
@@ -95,6 +101,12 @@ public class ReactorRabbitMQChannelPool implements ChannelPool {
     private final Mono<Connection> connectionMono;
     private final GenericObjectPool<Channel> pool;
     private final ConcurrentSkipListSet<Channel> borrowedChannels;
+    private Sender sender;
+
+    @Inject
+    public ReactorRabbitMQChannelPool(SimpleConnectionPool simpleConnectionPool) {
+        this(simpleConnectionPool.getResilientConnection(), MAX_CHANNELS_NUMBER);
+    }
 
     public ReactorRabbitMQChannelPool(Mono<Connection> connectionMono, int poolSize) {
         this.connectionMono = connectionMono;
@@ -106,6 +118,22 @@ public class ReactorRabbitMQChannelPool implements ChannelPool {
         this.borrowedChannels = new ConcurrentSkipListSet<>(Comparator.comparingInt(System::identityHashCode));
     }
 
+    public void start() {
+        sender = createSender();
+    }
+
+    public Sender getSender() {
+        return sender;
+    }
+
+    public Receiver createReceiver() {
+        return RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(connectionMono));
+    }
+
+    public Mono<Connection> getConnectionMono() {
+        return connectionMono;
+    }
+
     @Override
     public Mono<? extends Channel> getChannelMono() {
         return Mono.fromCallable(() -> {
@@ -127,7 +155,7 @@ public class ReactorRabbitMQChannelPool implements ChannelPool {
         };
     }
 
-    public Sender createSender() {
+    private Sender createSender() {
        return RabbitFlux.createSender(new SenderOptions()
            .connectionMono(connectionMono)
            .channelPool(this)
@@ -146,8 +174,10 @@ public class ReactorRabbitMQChannelPool implements ChannelPool {
         }
     }
 
+    @PreDestroy
     @Override
     public void close() {
+        sender.close();
         borrowedChannels.forEach(channel -> getChannelCloseHandler().accept(SignalType.ON_NEXT, channel));
         borrowedChannels.clear();
         pool.close();
diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPoolTest.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPoolTest.java
index 81994c8..21be0bd 100644
--- a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPoolTest.java
+++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPoolTest.java
@@ -66,9 +66,11 @@ class ReactorRabbitMQChannelPoolTest implements ChannelPoolContract {
     }
 
     private ReactorRabbitMQChannelPool generateChannelPool(int poolSize) {
-        return new ReactorRabbitMQChannelPool(
-                rabbitMQExtension.getRabbitConnectionPool().getResilientConnection(),
-                poolSize);
+        ReactorRabbitMQChannelPool reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool(
+            rabbitMQExtension.getRabbitConnectionPool().getResilientConnection(),
+            poolSize);
+        reactorRabbitMQChannelPool.start();
+        return reactorRabbitMQChannelPool;
     }
 
     // Pool wait timeout is an expected exception
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
index 953a4fb..dac4ae3 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
@@ -31,12 +31,13 @@ import java.nio.charset.StandardCharsets;
 import java.util.Objects;
 import java.util.Optional;
 
+import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
 import org.apache.james.event.json.EventSerializer;
 import org.apache.james.util.MDCBuilder;
 
 import com.github.fge.lambdas.Throwing;
 import com.google.common.base.Preconditions;
-import com.rabbitmq.client.Connection;
+
 import reactor.core.Disposable;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -45,9 +46,7 @@ import reactor.rabbitmq.AcknowledgableDelivery;
 import reactor.rabbitmq.BindingSpecification;
 import reactor.rabbitmq.ConsumeOptions;
 import reactor.rabbitmq.QueueSpecification;
-import reactor.rabbitmq.RabbitFlux;
 import reactor.rabbitmq.Receiver;
-import reactor.rabbitmq.ReceiverOptions;
 import reactor.rabbitmq.Sender;
 
 class GroupRegistration implements Registration {
@@ -85,15 +84,15 @@ class GroupRegistration implements Registration {
     private final MailboxListenerExecutor mailboxListenerExecutor;
     private Optional<Disposable> receiverSubscriber;
 
-    GroupRegistration(Mono<Connection> connectionSupplier, Sender sender, EventSerializer eventSerializer,
+    GroupRegistration(ReactorRabbitMQChannelPool reactorRabbitMQChannelPool, EventSerializer eventSerializer,
                       MailboxListener mailboxListener, Group group, RetryBackoffConfiguration retryBackoff,
                       EventDeadLetters eventDeadLetters,
                       Runnable unregisterGroup, MailboxListenerExecutor mailboxListenerExecutor) {
         this.eventSerializer = eventSerializer;
         this.mailboxListener = mailboxListener;
         this.queueName = WorkQueueName.of(group);
-        this.sender = sender;
-        this.receiver = RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(connectionSupplier));
+        this.sender = reactorRabbitMQChannelPool.getSender();
+        this.receiver = reactorRabbitMQChannelPool.createReceiver();
         this.mailboxListenerExecutor = mailboxListenerExecutor;
         this.receiverSubscriber = Optional.empty();
         this.unregisterGroup = unregisterGroup;
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistrationHandler.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistrationHandler.java
index 2da507f..5337df6 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistrationHandler.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistrationHandler.java
@@ -23,28 +23,22 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
 import org.apache.james.event.json.EventSerializer;
 
-import com.rabbitmq.client.Connection;
-
-import reactor.core.publisher.Mono;
-import reactor.rabbitmq.Sender;
-
 class GroupRegistrationHandler {
     private final Map<Group, GroupRegistration> groupRegistrations;
     private final EventSerializer eventSerializer;
-    private final Sender sender;
-    private final Mono<Connection> connectionMono;
     private final RetryBackoffConfiguration retryBackoff;
     private final EventDeadLetters eventDeadLetters;
     private final MailboxListenerExecutor mailboxListenerExecutor;
+    private final ReactorRabbitMQChannelPool reactorRabbitMQChannelPool;
 
-    GroupRegistrationHandler(EventSerializer eventSerializer, Sender sender, Mono<Connection> connectionMono,
+    GroupRegistrationHandler(EventSerializer eventSerializer, ReactorRabbitMQChannelPool reactorRabbitMQChannelPool,
                              RetryBackoffConfiguration retryBackoff,
                              EventDeadLetters eventDeadLetters, MailboxListenerExecutor mailboxListenerExecutor) {
         this.eventSerializer = eventSerializer;
-        this.sender = sender;
-        this.connectionMono = connectionMono;
+        this.reactorRabbitMQChannelPool = reactorRabbitMQChannelPool;
         this.retryBackoff = retryBackoff;
         this.eventDeadLetters = eventDeadLetters;
         this.mailboxListenerExecutor = mailboxListenerExecutor;
@@ -73,8 +67,7 @@ class GroupRegistrationHandler {
 
     private GroupRegistration newGroupRegistration(MailboxListener listener, Group group) {
         return new GroupRegistration(
-            connectionMono,
-            sender,
+            reactorRabbitMQChannelPool,
             eventSerializer,
             listener,
             group,
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
index 1f2fbae..dc968fe 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
@@ -28,6 +28,7 @@ import static org.apache.james.mailbox.events.RabbitMQEventBus.EVENT_BUS_ID;
 import java.nio.charset.StandardCharsets;
 import java.util.Optional;
 
+import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
 import org.apache.james.event.json.EventSerializer;
 import org.apache.james.util.MDCBuilder;
 import org.apache.james.util.MDCStructuredLogger;
@@ -37,7 +38,6 @@ import org.slf4j.LoggerFactory;
 
 import com.github.fge.lambdas.Throwing;
 import com.rabbitmq.client.AMQP;
-import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.Delivery;
 
 import reactor.core.Disposable;
@@ -45,9 +45,7 @@ import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 import reactor.rabbitmq.ConsumeOptions;
 import reactor.rabbitmq.QueueSpecification;
-import reactor.rabbitmq.RabbitFlux;
 import reactor.rabbitmq.Receiver;
-import reactor.rabbitmq.ReceiverOptions;
 import reactor.rabbitmq.Sender;
 
 class KeyRegistrationHandler {
@@ -64,13 +62,13 @@ class KeyRegistrationHandler {
     private final MailboxListenerExecutor mailboxListenerExecutor;
     private Optional<Disposable> receiverSubscriber;
 
-    KeyRegistrationHandler(EventBusId eventBusId, EventSerializer eventSerializer, Sender sender, Mono<Connection> connectionMono, RoutingKeyConverter routingKeyConverter, LocalListenerRegistry localListenerRegistry, MailboxListenerExecutor mailboxListenerExecutor) {
+    KeyRegistrationHandler(EventBusId eventBusId, EventSerializer eventSerializer, ReactorRabbitMQChannelPool reactorRabbitMQChannelPool, RoutingKeyConverter routingKeyConverter, LocalListenerRegistry localListenerRegistry, MailboxListenerExecutor mailboxListenerExecutor) {
         this.eventBusId = eventBusId;
         this.eventSerializer = eventSerializer;
-        this.sender = sender;
+        this.sender = reactorRabbitMQChannelPool.getSender();
         this.routingKeyConverter = routingKeyConverter;
         this.localListenerRegistry = localListenerRegistry;
-        this.receiver = RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(connectionMono));
+        this.receiver = reactorRabbitMQChannelPool.createReceiver();
         this.mailboxListenerExecutor = mailboxListenerExecutor;
         this.registrationQueue = new RegistrationQueueName();
         this.registrationBinder = new RegistrationBinder(sender, registrationQueue);
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
index a659062..06d786a 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
@@ -25,47 +25,42 @@ import javax.annotation.PreDestroy;
 import javax.inject.Inject;
 
 import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
-import org.apache.james.backends.rabbitmq.SimpleConnectionPool;
 import org.apache.james.event.json.EventSerializer;
 import org.apache.james.lifecycle.api.Startable;
 import org.apache.james.metrics.api.MetricFactory;
 
 import com.google.common.base.Preconditions;
-import com.rabbitmq.client.Connection;
+
 import reactor.core.publisher.Mono;
-import reactor.rabbitmq.Sender;
 
 public class RabbitMQEventBus implements EventBus, Startable {
-    private static final int MAX_CHANNELS_NUMBER = 5;
     private static final String NOT_RUNNING_ERROR_MESSAGE = "Event Bus is not running";
     static final String MAILBOX_EVENT = "mailboxEvent";
     static final String MAILBOX_EVENT_EXCHANGE_NAME = MAILBOX_EVENT + "-exchange";
     static final String EVENT_BUS_ID = "eventBusId";
 
-    private final Mono<Connection> connectionMono;
     private final EventSerializer eventSerializer;
     private final RoutingKeyConverter routingKeyConverter;
     private final RetryBackoffConfiguration retryBackoff;
     private final EventBusId eventBusId;
     private final EventDeadLetters eventDeadLetters;
+    private final ReactorRabbitMQChannelPool channelPool;
     private final MailboxListenerExecutor mailboxListenerExecutor;
 
     private volatile boolean isRunning;
     private volatile boolean isStopping;
-    private ReactorRabbitMQChannelPool channelPool;
     private GroupRegistrationHandler groupRegistrationHandler;
     private KeyRegistrationHandler keyRegistrationHandler;
-    EventDispatcher eventDispatcher;
-    private Sender sender;
+    private EventDispatcher eventDispatcher;
 
     @Inject
-    public RabbitMQEventBus(SimpleConnectionPool simpleConnectionPool, EventSerializer eventSerializer,
+    public RabbitMQEventBus(ReactorRabbitMQChannelPool reactorRabbitMQChannelPool, EventSerializer eventSerializer,
                      RetryBackoffConfiguration retryBackoff,
                      RoutingKeyConverter routingKeyConverter,
                      EventDeadLetters eventDeadLetters, MetricFactory metricFactory) {
+        this.channelPool = reactorRabbitMQChannelPool;
         this.mailboxListenerExecutor = new MailboxListenerExecutor(metricFactory);
         this.eventBusId = EventBusId.random();
-        this.connectionMono = simpleConnectionPool.getResilientConnection();
         this.eventSerializer = eventSerializer;
         this.routingKeyConverter = routingKeyConverter;
         this.retryBackoff = retryBackoff;
@@ -76,13 +71,11 @@ public class RabbitMQEventBus implements EventBus, Startable {
 
     public void start() {
         if (!isRunning && !isStopping) {
-            this.channelPool = new ReactorRabbitMQChannelPool(connectionMono, MAX_CHANNELS_NUMBER);
 
-            sender = channelPool.createSender();
             LocalListenerRegistry localListenerRegistry = new LocalListenerRegistry();
-            keyRegistrationHandler = new KeyRegistrationHandler(eventBusId, eventSerializer, sender, connectionMono, routingKeyConverter, localListenerRegistry, mailboxListenerExecutor);
-            groupRegistrationHandler = new GroupRegistrationHandler(eventSerializer, sender, connectionMono, retryBackoff, eventDeadLetters, mailboxListenerExecutor);
-            eventDispatcher = new EventDispatcher(eventBusId, eventSerializer, sender, localListenerRegistry, mailboxListenerExecutor);
+            keyRegistrationHandler = new KeyRegistrationHandler(eventBusId, eventSerializer, channelPool, routingKeyConverter, localListenerRegistry, mailboxListenerExecutor);
+            groupRegistrationHandler = new GroupRegistrationHandler(eventSerializer, channelPool, retryBackoff, eventDeadLetters, mailboxListenerExecutor);
+            eventDispatcher = new EventDispatcher(eventBusId, eventSerializer, channelPool.getSender(), localListenerRegistry, mailboxListenerExecutor);
 
             eventDispatcher.start();
             keyRegistrationHandler.start();
@@ -97,8 +90,6 @@ public class RabbitMQEventBus implements EventBus, Startable {
             isRunning = false;
             groupRegistrationHandler.stop();
             keyRegistrationHandler.stop();
-            sender.close();
-            channelPool.close();
         }
     }
 
diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
index ab6d885..2b92e20 100644
--- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
+++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
@@ -56,7 +56,7 @@ import org.apache.james.backends.rabbitmq.RabbitMQExtension;
 import org.apache.james.backends.rabbitmq.RabbitMQExtension.DockerRestartPolicy;
 import org.apache.james.backends.rabbitmq.RabbitMQFixture;
 import org.apache.james.backends.rabbitmq.RabbitMQManagementAPI;
-import org.apache.james.backends.rabbitmq.SimpleConnectionPool;
+import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
 import org.apache.james.event.json.EventSerializer;
 import org.apache.james.mailbox.events.EventBusTestFixture.GroupA;
 import org.apache.james.mailbox.events.EventBusTestFixture.MailboxListenerCountingSuccessfulExecution;
@@ -97,11 +97,11 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
     private RabbitMQEventBus eventBus;
     private RabbitMQEventBus eventBus2;
     private RabbitMQEventBus eventBus3;
-    private Sender sender;
     private EventSerializer eventSerializer;
     private RoutingKeyConverter routingKeyConverter;
     private MemoryEventDeadLetters memoryEventDeadLetters;
     private Mono<Connection> resilientConnection;
+    private ReactorRabbitMQChannelPool reactorRabbitMQChannelPool;
 
     @BeforeEach
     void setUp() {
@@ -110,6 +110,8 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
         TestId.Factory mailboxIdFactory = new TestId.Factory();
         eventSerializer = new EventSerializer(mailboxIdFactory, new TestMessageId.Factory(), new DefaultUserQuotaRootResolver.DefaultQuotaRootDeserializer());
         routingKeyConverter = RoutingKeyConverter.forFactories(new MailboxIdRegistrationKey.Factory(mailboxIdFactory));
+        reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool(rabbitMQExtension.getRabbitConnectionPool());
+        reactorRabbitMQChannelPool.start();
 
         eventBus = newEventBus();
         eventBus2 = newEventBus();
@@ -119,7 +121,6 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
         eventBus2.start();
         eventBus3.start();
         resilientConnection = rabbitMQExtension.getRabbitConnectionPool().getResilientConnection();
-        sender = RabbitFlux.createSender(new SenderOptions().connectionMono(resilientConnection));
     }
 
     @AfterEach
@@ -129,17 +130,17 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
         eventBus3.stop();
         ALL_GROUPS.stream()
             .map(GroupRegistration.WorkQueueName::of)
-            .forEach(queueName -> sender.delete(QueueSpecification.queue(queueName.asString())).block());
-        sender.delete(ExchangeSpecification.exchange(MAILBOX_EVENT_EXCHANGE_NAME)).block();
-        sender.close();
+            .forEach(queueName -> reactorRabbitMQChannelPool.getSender().delete(QueueSpecification.queue(queueName.asString())).block());
+        reactorRabbitMQChannelPool.getSender().delete(ExchangeSpecification.exchange(MAILBOX_EVENT_EXCHANGE_NAME)).block();
+        reactorRabbitMQChannelPool.close();
     }
 
     private RabbitMQEventBus newEventBus() {
-        return newEventBus(rabbitMQExtension.getRabbitConnectionPool());
+        return newEventBus(reactorRabbitMQChannelPool);
     }
 
-    private RabbitMQEventBus newEventBus(SimpleConnectionPool connectionPool) {
-        return new RabbitMQEventBus(connectionPool, eventSerializer, RetryBackoffConfiguration.DEFAULT, routingKeyConverter, memoryEventDeadLetters, new NoopMetricFactory());
+    private RabbitMQEventBus newEventBus(ReactorRabbitMQChannelPool rabbitMQChannelPool) {
+        return new RabbitMQEventBus(rabbitMQChannelPool, eventSerializer, RetryBackoffConfiguration.DEFAULT, routingKeyConverter, memoryEventDeadLetters, new NoopMetricFactory());
     }
 
     @Override
@@ -340,10 +341,18 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
                     .restartPolicy(DockerRestartPolicy.PER_TEST);
 
                 private RabbitMQEventBus rabbitMQEventBusWithNetWorkIssue;
+                private ReactorRabbitMQChannelPool reactorRabbitMQChannelPoolWithNetWorkIssue;
 
                 @BeforeEach
                 void beforeEach() {
-                    rabbitMQEventBusWithNetWorkIssue = newEventBus(rabbitMQNetWorkIssueExtension.getRabbitConnectionPool());
+                    reactorRabbitMQChannelPoolWithNetWorkIssue = new ReactorRabbitMQChannelPool(rabbitMQNetWorkIssueExtension.getRabbitConnectionPool());
+                    reactorRabbitMQChannelPoolWithNetWorkIssue.start();
+                    rabbitMQEventBusWithNetWorkIssue = newEventBus(reactorRabbitMQChannelPoolWithNetWorkIssue);
+                }
+
+                @AfterEach
+                void afterEach() {
+                    reactorRabbitMQChannelPoolWithNetWorkIssue.close();
                 }
 
                 @Test
diff --git a/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java b/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java
index e93ac4f..25c3635 100644
--- a/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java
+++ b/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java
@@ -20,10 +20,10 @@
 
 package org.apache.james.mpt.imapmailbox.rabbitmq.host;
 
-import java.net.URISyntaxException;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.james.backends.rabbitmq.DockerRabbitMQ;
+import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
 import org.apache.james.backends.rabbitmq.SimpleConnectionPool;
 import org.apache.james.core.quota.QuotaCount;
 import org.apache.james.core.quota.QuotaSize;
@@ -63,6 +63,7 @@ public class RabbitMQEventBusHostSystem extends JamesImapHostSystem {
     private RabbitMQEventBus eventBus;
     private SimpleConnectionPool connectionPool;
     private InMemoryIntegrationResources resources;
+    private ReactorRabbitMQChannelPool reactorRabbitMQChannelPool;
 
     RabbitMQEventBusHostSystem(DockerRabbitMQ dockerRabbitMQ) {
         this.dockerRabbitMQ = dockerRabbitMQ;
@@ -73,6 +74,8 @@ public class RabbitMQEventBusHostSystem extends JamesImapHostSystem {
         super.beforeTest();
 
         connectionPool = new SimpleConnectionPool(dockerRabbitMQ.createRabbitConnectionFactory());
+        reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool(connectionPool);
+        reactorRabbitMQChannelPool.start();
         eventBus = createEventBus();
         eventBus.start();
 
@@ -101,18 +104,19 @@ public class RabbitMQEventBusHostSystem extends JamesImapHostSystem {
             defaultImapProcessorFactory);
     }
 
-    private RabbitMQEventBus createEventBus() throws URISyntaxException {
+    private RabbitMQEventBus createEventBus() {
         InMemoryMessageId.Factory messageIdFactory = new InMemoryMessageId.Factory();
         InMemoryId.Factory mailboxIdFactory = new InMemoryId.Factory();
         EventSerializer eventSerializer = new EventSerializer(mailboxIdFactory, messageIdFactory, new DefaultUserQuotaRootResolver.DefaultQuotaRootDeserializer());
         RoutingKeyConverter routingKeyConverter = new RoutingKeyConverter(ImmutableSet.of(new MailboxIdRegistrationKey.Factory(mailboxIdFactory)));
-        return new RabbitMQEventBus(connectionPool, eventSerializer, RetryBackoffConfiguration.DEFAULT,
+        return new RabbitMQEventBus(reactorRabbitMQChannelPool, eventSerializer, RetryBackoffConfiguration.DEFAULT,
             routingKeyConverter, new MemoryEventDeadLetters(), new NoopMetricFactory());
     }
 
     @Override
     public void afterTest() {
         eventBus.stop();
+        reactorRabbitMQChannelPool.close();
         connectionPool.close();
     }
 
diff --git a/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/modules/DistributedTaskManagerModule.java b/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/modules/DistributedTaskManagerModule.java
index b74e6dd..ce0333f 100644
--- a/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/modules/DistributedTaskManagerModule.java
+++ b/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/modules/DistributedTaskManagerModule.java
@@ -30,6 +30,7 @@ import org.apache.james.task.eventsourcing.WorkQueueSupplier;
 import org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjection;
 import org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjectionModule;
 import org.apache.james.task.eventsourcing.distributed.RabbitMQTerminationSubscriber;
+import org.apache.james.task.eventsourcing.distributed.RabbitMQWorkQueue;
 import org.apache.james.task.eventsourcing.distributed.RabbitMQWorkQueueSupplier;
 import org.apache.james.utils.InitializationOperation;
 import org.apache.james.utils.InitilizationOperationBuilder;
@@ -64,5 +65,11 @@ public class DistributedTaskManagerModule extends AbstractModule {
             .init(instance::start);
     }
 
+    @ProvidesIntoSet
+    InitializationOperation workQueue(EventSourcingTaskManager instance) {
+        return InitilizationOperationBuilder
+            .forClass(RabbitMQWorkQueue.class)
+            .init(instance::start);
+    }
 
 }
diff --git a/server/container/guice/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/JMAPCommonModule.java b/server/container/guice/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/JMAPCommonModule.java
index ed2c316..dce8120 100644
--- a/server/container/guice/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/JMAPCommonModule.java
+++ b/server/container/guice/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/JMAPCommonModule.java
@@ -42,6 +42,8 @@ import org.apache.james.lifecycle.api.StartUpCheck;
 import org.apache.james.util.date.DefaultZonedDateTimeProvider;
 import org.apache.james.util.date.ZonedDateTimeProvider;
 import org.apache.james.util.mime.MessageContentExtractor;
+import org.apache.james.utils.InitializationOperation;
+import org.apache.james.utils.InitilizationOperationBuilder;
 import org.apache.mailet.base.AutomaticallySentMailDetector;
 import org.apache.mailet.base.AutomaticallySentMailDetectorImpl;
 
@@ -50,6 +52,7 @@ import com.google.inject.AbstractModule;
 import com.google.inject.Provides;
 import com.google.inject.Scopes;
 import com.google.inject.multibindings.Multibinder;
+import com.google.inject.multibindings.ProvidesIntoSet;
 import com.google.inject.name.Names;
 
 public class JMAPCommonModule extends AbstractModule {
@@ -96,4 +99,11 @@ public class JMAPCommonModule extends AbstractModule {
                 accessTokenAuthenticationStrategy,
                 queryParameterAuthenticationStrategy);
     }
+
+    @ProvidesIntoSet
+    InitializationOperation workQueue(MailSpool instance) {
+        return InitilizationOperationBuilder
+            .forClass(MailSpool.class)
+            .init(instance::start);
+    }
 }
diff --git a/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java b/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java
index 5eb49df..4226aea 100644
--- a/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java
+++ b/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java
@@ -29,10 +29,10 @@ import org.apache.james.backends.cassandra.components.CassandraModule;
 import org.apache.james.backends.rabbitmq.RabbitMQChannelPool;
 import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
 import org.apache.james.backends.rabbitmq.RabbitMQHealthCheck;
+import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
 import org.apache.james.backends.rabbitmq.SimpleChannelPool;
 import org.apache.james.core.healthcheck.HealthCheck;
 import org.apache.james.eventsourcing.eventstore.cassandra.dto.EventDTOModule;
-import org.apache.james.lifecycle.api.Startable;
 import org.apache.james.queue.api.MailQueueFactory;
 import org.apache.james.queue.api.ManageableMailQueue;
 import org.apache.james.queue.rabbitmq.RabbitMQMailQueue;
@@ -50,17 +50,18 @@ import org.apache.james.queue.rabbitmq.view.cassandra.EnqueuedMailsDAO;
 import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMailQueueViewConfiguration;
 import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMailQueueViewConfigurationModule;
 import org.apache.james.queue.rabbitmq.view.cassandra.configuration.EventsourcingConfigurationManagement;
-import org.apache.james.utils.InitialisationOperation;
+import org.apache.james.utils.InitializationOperation;
+import org.apache.james.utils.InitilizationOperationBuilder;
 import org.apache.james.utils.PropertiesProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.inject.AbstractModule;
-import com.google.inject.Inject;
 import com.google.inject.Provides;
 import com.google.inject.Scopes;
 import com.google.inject.TypeLiteral;
 import com.google.inject.multibindings.Multibinder;
+import com.google.inject.multibindings.ProvidesIntoSet;
 
 public class RabbitMQModule extends AbstractModule {
 
@@ -70,6 +71,7 @@ public class RabbitMQModule extends AbstractModule {
 
     @Override
     protected void configure() {
+        bind(ReactorRabbitMQChannelPool.class).in(Scopes.SINGLETON);
         bind(EnqueuedMailsDAO.class).in(Scopes.SINGLETON);
         bind(DeletedMailsDAO.class).in(Scopes.SINGLETON);
         bind(BrowseStartDAO.class).in(Scopes.SINGLETON);
@@ -88,7 +90,6 @@ public class RabbitMQModule extends AbstractModule {
         eventDTOModuleBinder.addBinding().toInstance(CassandraMailQueueViewConfigurationModule.MAIL_QUEUE_VIEW_CONFIGURATION);
 
         Multibinder.newSetBinder(binder(), HealthCheck.class).addBinding().to(RabbitMQHealthCheck.class);
-        Multibinder.newSetBinder(binder(), InitialisationOperation.class).addBinding().to(RabbitMQMailQueueFactoryInitialisationOperation.class);
     }
 
     @Provides
@@ -145,23 +146,10 @@ public class RabbitMQModule extends AbstractModule {
         return RabbitMQMailQueueConfiguration.from(configuration);
     }
 
-    @Singleton
-    public static class RabbitMQMailQueueFactoryInitialisationOperation implements InitialisationOperation {
-        private final RabbitMQMailQueueFactory rabbitMQMailQueueFactory;
-
-        @Inject
-        public RabbitMQMailQueueFactoryInitialisationOperation(RabbitMQMailQueueFactory rabbitMQMailQueueFactory) {
-            this.rabbitMQMailQueueFactory = rabbitMQMailQueueFactory;
-        }
-
-        @Override
-        public void initModule() {
-            rabbitMQMailQueueFactory.start();
-        }
-
-        @Override
-        public Class<? extends Startable> forClass() {
-            return RabbitMQMailQueueFactory.class;
-        }
+    @ProvidesIntoSet
+    InitializationOperation workQueue(ReactorRabbitMQChannelPool instance) {
+        return InitilizationOperationBuilder
+            .forClass(ReactorRabbitMQChannelPool.class)
+            .init(instance::start);
     }
 }
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/MailSpool.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/MailSpool.java
index d4ebbb6..c652bc9 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/MailSpool.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/MailSpool.java
@@ -21,6 +21,7 @@ package org.apache.james.jmap.draft.send;
 
 import javax.inject.Inject;
 
+import org.apache.james.lifecycle.api.Startable;
 import org.apache.james.queue.api.MailQueue;
 import org.apache.james.queue.api.MailQueue.MailQueueException;
 import org.apache.james.queue.api.MailQueueFactory;
@@ -30,12 +31,17 @@ import org.apache.mailet.Mail;
 
 import com.google.common.annotations.VisibleForTesting;
 
-public class MailSpool {
+public class MailSpool implements Startable {
 
-    private final MailQueue queue;
+    private final MailQueueFactory<?> queueFactory;
+    private MailQueue queue;
 
     @Inject
     @VisibleForTesting MailSpool(MailQueueFactory<?> queueFactory) {
+        this.queueFactory = queueFactory;
+    }
+
+    public void start() {
         queue = queueFactory.createQueue(MailQueueFactory.SPOOL);
     }
 
diff --git a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/send/MailSpoolTest.java b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/send/MailSpoolTest.java
index 147f21e..0552c2f 100644
--- a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/send/MailSpoolTest.java
+++ b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/send/MailSpoolTest.java
@@ -49,6 +49,7 @@ public class MailSpoolTest {
         myQueue = mailQueueFactory.createQueue(MailQueueFactory.SPOOL);
 
         mailSpool = new MailSpool(mailQueueFactory);
+        mailSpool.start();
     }
 
     @Test
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
index a6de706..21546b9 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.util.function.Consumer;
 import java.util.function.Function;
 
+import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
 import org.apache.james.metrics.api.Metric;
 import org.apache.james.metrics.api.MetricFactory;
 import org.apache.james.queue.api.MailQueue;
@@ -33,15 +34,12 @@ import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
 import org.apache.mailet.Mail;
 
 import com.github.fge.lambdas.consumers.ThrowingConsumer;
-import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.Delivery;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.rabbitmq.AcknowledgableDelivery;
 import reactor.rabbitmq.ConsumeOptions;
-import reactor.rabbitmq.RabbitFlux;
-import reactor.rabbitmq.ReceiverOptions;
 
 class Dequeuer {
     private static final boolean REQUEUE = true;
@@ -79,14 +77,14 @@ class Dequeuer {
     private final MailReferenceSerializer mailReferenceSerializer;
     private final MailQueueView mailQueueView;
 
-    Dequeuer(MailQueueName name, Mono<Connection> connectionMono, Function<MailReferenceDTO, MailWithEnqueueId> mailLoader,
+    Dequeuer(MailQueueName name, ReactorRabbitMQChannelPool reactorRabbitMQChannelPool, Function<MailReferenceDTO, MailWithEnqueueId> mailLoader,
              MailReferenceSerializer serializer, MetricFactory metricFactory,
              MailQueueView mailQueueView) {
         this.mailLoader = mailLoader;
         this.mailReferenceSerializer = serializer;
         this.mailQueueView = mailQueueView;
         this.dequeueMetric = metricFactory.generate(DEQUEUED_METRIC_NAME_PREFIX + name.asString());
-        this.flux = RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(connectionMono))
+        this.flux = reactorRabbitMQChannelPool.createReceiver()
             .consumeManualAck(name.toWorkQueueName().asString(), new ConsumeOptions().qos(EXECUTION_RATE))
             .filter(getResponse -> getResponse.getBody() != null);
     }
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
index ea7d04a..4e0485c 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
@@ -27,6 +27,7 @@ import java.time.Clock;
 import javax.mail.MessagingException;
 import javax.mail.internet.MimeMessage;
 
+import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
 import org.apache.james.blob.api.Store;
 import org.apache.james.blob.mail.MimeMessagePartsId;
 import org.apache.james.metrics.api.Metric;
@@ -51,11 +52,11 @@ class Enqueuer {
     private final MailQueueView mailQueueView;
     private final Clock clock;
 
-    Enqueuer(MailQueueName name, Sender sender, Store<MimeMessage, MimeMessagePartsId> mimeMessageStore,
+    Enqueuer(MailQueueName name, ReactorRabbitMQChannelPool reactorRabbitMQChannelPool, Store<MimeMessage, MimeMessagePartsId> mimeMessageStore,
              MailReferenceSerializer serializer, MetricFactory metricFactory,
              MailQueueView mailQueueView, Clock clock) {
         this.name = name;
-        this.sender = sender;
+        this.sender = reactorRabbitMQChannelPool.getSender();
         this.mimeMessageStore = mimeMessageStore;
         this.mailReferenceSerializer = serializer;
         this.mailQueueView = mailQueueView;
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
index b0748c0..b48f60f 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
@@ -32,17 +32,14 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Function;
 
-import javax.annotation.PreDestroy;
 import javax.inject.Inject;
 import javax.mail.internet.MimeMessage;
 
 import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
-import org.apache.james.backends.rabbitmq.SimpleConnectionPool;
 import org.apache.james.blob.api.BlobId;
 import org.apache.james.blob.api.Store;
 import org.apache.james.blob.mail.MimeMessagePartsId;
 import org.apache.james.blob.mail.MimeMessageStore;
-import org.apache.james.lifecycle.api.Startable;
 import org.apache.james.metrics.api.GaugeRegistry;
 import org.apache.james.metrics.api.MetricFactory;
 import org.apache.james.queue.api.MailQueueFactory;
@@ -53,23 +50,18 @@ import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
 import com.github.fge.lambdas.Throwing;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableSet;
-import com.rabbitmq.client.Connection;
 
 import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
 import reactor.rabbitmq.BindingSpecification;
 import reactor.rabbitmq.ExchangeSpecification;
 import reactor.rabbitmq.QueueSpecification;
-import reactor.rabbitmq.Sender;
 
-public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQueue>, Startable {
-    private static final int MAX_CHANNELS_NUMBER = 5;
+public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQueue> {
 
     @VisibleForTesting static class PrivateFactory {
         private final MetricFactory metricFactory;
         private final GaugeRegistry gaugeRegistry;
-        private final Mono<Connection> connectionMono;
-        private final Sender sender;
+        private final ReactorRabbitMQChannelPool reactorRabbitMQChannelPool;
         private final Store<MimeMessage, MimeMessagePartsId> mimeMessageStore;
         private final MailReferenceSerializer mailReferenceSerializer;
         private final Function<MailReferenceDTO, MailWithEnqueueId> mailLoader;
@@ -81,7 +73,8 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
         @Inject
         @VisibleForTesting PrivateFactory(MetricFactory metricFactory,
                                           GaugeRegistry gaugeRegistry,
-                                          Mono<Connection> connectionMono, Sender sender, MimeMessageStore.Factory mimeMessageStoreFactory,
+                                          ReactorRabbitMQChannelPool reactorRabbitMQChannelPool,
+                                          MimeMessageStore.Factory mimeMessageStoreFactory,
                                           BlobId.Factory blobIdFactory,
                                           MailQueueView.Factory mailQueueViewFactory,
                                           Clock clock,
@@ -89,8 +82,7 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
                                           RabbitMQMailQueueConfiguration configuration) {
             this.metricFactory = metricFactory;
             this.gaugeRegistry = gaugeRegistry;
-            this.connectionMono = connectionMono;
-            this.sender = sender;
+            this.reactorRabbitMQChannelPool = reactorRabbitMQChannelPool;
             this.mimeMessageStore = mimeMessageStoreFactory.mimeMessageStore();
             this.mailQueueViewFactory = mailQueueViewFactory;
             this.clock = clock;
@@ -107,9 +99,9 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
             RabbitMQMailQueue rabbitMQMailQueue = new RabbitMQMailQueue(
                 metricFactory,
                 mailQueueName,
-                new Enqueuer(mailQueueName, sender, mimeMessageStore, mailReferenceSerializer,
+                new Enqueuer(mailQueueName, reactorRabbitMQChannelPool, mimeMessageStore, mailReferenceSerializer,
                     metricFactory, mailQueueView, clock),
-                new Dequeuer(mailQueueName, connectionMono, mailLoader, mailReferenceSerializer,
+                new Dequeuer(mailQueueName, reactorRabbitMQChannelPool, mailLoader, mailReferenceSerializer,
                     metricFactory, mailQueueView),
                 mailQueueView,
                 decoratorFactory);
@@ -145,26 +137,19 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
     private final RabbitMQMailQueueManagement mqManagementApi;
     private final PrivateFactory privateFactory;
     private final RabbitMQMailQueueObjectPool mailQueueObjectPool;
-    private final Mono<Connection> connectionMono;
-    private ReactorRabbitMQChannelPool reactorRabbitMQChannelPool;
-    private Sender sender;
+    private final ReactorRabbitMQChannelPool reactorRabbitMQChannelPool;
 
     @VisibleForTesting
     @Inject
-    RabbitMQMailQueueFactory(SimpleConnectionPool simpleConnectionPool,
+    RabbitMQMailQueueFactory(ReactorRabbitMQChannelPool reactorRabbitMQChannelPool,
                              RabbitMQMailQueueManagement mqManagementApi,
                              PrivateFactory privateFactory) {
-        this.connectionMono = simpleConnectionPool.getResilientConnection();
+        this.reactorRabbitMQChannelPool = reactorRabbitMQChannelPool;
         this.mqManagementApi = mqManagementApi;
         this.privateFactory = privateFactory;
         this.mailQueueObjectPool = new RabbitMQMailQueueObjectPool();
     }
 
-    public void start() {
-        this.reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool(connectionMono, MAX_CHANNELS_NUMBER);
-        this.sender = reactorRabbitMQChannelPool.createSender();
-    }
-
     @Override
     public Optional<RabbitMQMailQueue> getQueue(String name) {
         return getQueueFromRabbitServer(MailQueueName.fromString(name));
@@ -187,15 +172,15 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
     private RabbitMQMailQueue createQueueIntoRabbitServer(MailQueueName mailQueueName) {
         String exchangeName = mailQueueName.toRabbitExchangeName().asString();
         Flux.concat(
-            sender.declareExchange(ExchangeSpecification.exchange(exchangeName)
+            reactorRabbitMQChannelPool.getSender().declareExchange(ExchangeSpecification.exchange(exchangeName)
                 .durable(true)
                 .type("direct")),
-            sender.declareQueue(QueueSpecification.queue(mailQueueName.toWorkQueueName().asString())
+            reactorRabbitMQChannelPool.getSender().declareQueue(QueueSpecification.queue(mailQueueName.toWorkQueueName().asString())
                 .durable(DURABLE)
                 .exclusive(!EXCLUSIVE)
                 .autoDelete(!AUTO_DELETE)
                 .arguments(NO_ARGUMENTS)),
-            sender.bind(BindingSpecification.binding()
+            reactorRabbitMQChannelPool.getSender().bind(BindingSpecification.binding()
                 .exchange(mailQueueName.toRabbitExchangeName().asString())
                 .queue(mailQueueName.toWorkQueueName().asString())
                 .routingKey(EMPTY_ROUTING_KEY)))
@@ -210,11 +195,4 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
             .map(mailQueueObjectPool::retrieveInstanceFor)
             .findFirst();
     }
-
-    @PreDestroy
-    public void stop() {
-        sender.close();
-        reactorRabbitMQChannelPool.close();
-    }
-
 }
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java
index 7c23f0f..4ad6f85 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java
@@ -61,9 +61,6 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
 import com.github.fge.lambdas.Throwing;
-import com.rabbitmq.client.Connection;
-
-import reactor.core.publisher.Mono;
 
 class RabbitMQMailQueueConfigurationChangeTest {
     private static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory();
@@ -79,7 +76,6 @@ class RabbitMQMailQueueConfigurationChangeTest {
     private static final Instant IN_SLICE_1 = Instant.parse("2007-12-03T10:15:30.00Z");
     private static final Instant IN_SLICE_2 = IN_SLICE_1.plus(1, HOURS);
     private static final Instant IN_SLICE_3 = IN_SLICE_1.plus(2, HOURS);
-    public static final int POOL_SIZE = 5;
 
     @RegisterExtension
     static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraModule.aggregateModules(
@@ -94,6 +90,7 @@ class RabbitMQMailQueueConfigurationChangeTest {
     private UpdatableTickingClock clock;
     private RabbitMQMailQueueManagement mqManagementApi;
     private MimeMessageStore.Factory mimeMessageStoreFactory;
+    private ReactorRabbitMQChannelPool reactorRabbitMQChannelPool;
 
     @BeforeEach
     void setup(CassandraCluster cassandra) throws Exception {
@@ -101,11 +98,14 @@ class RabbitMQMailQueueConfigurationChangeTest {
         mimeMessageStoreFactory = MimeMessageStore.factory(blobsDAO);
         clock = new UpdatableTickingClock(IN_SLICE_1);
         mqManagementApi = new RabbitMQMailQueueManagement(rabbitMQExtension.managementAPI());
+        reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool(rabbitMQExtension.getRabbitConnectionPool());
+        reactorRabbitMQChannelPool.start();
     }
 
     @AfterEach
     void tearDown() {
         mqManagementApi.deleteAllQueues();
+        reactorRabbitMQChannelPool.close();
     }
 
     private RabbitMQMailQueue getRabbitMQMailQueue(CassandraCluster cassandra, CassandraMailQueueViewConfiguration mailQueueViewConfiguration) throws Exception {
@@ -119,21 +119,18 @@ class RabbitMQMailQueueConfigurationChangeTest {
             .sizeMetricsEnabled(true)
             .build();
 
-        Mono<Connection> connectionMono = rabbitMQExtension.getRabbitConnectionPool().getResilientConnection();
-        ReactorRabbitMQChannelPool reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool(connectionMono, POOL_SIZE);
+
         RabbitMQMailQueueFactory.PrivateFactory privateFactory = new RabbitMQMailQueueFactory.PrivateFactory(
             new NoopMetricFactory(),
             new NoopGaugeRegistry(),
-            connectionMono,
-            reactorRabbitMQChannelPool.createSender(),
+            reactorRabbitMQChannelPool,
             mimeMessageStoreFactory,
             BLOB_ID_FACTORY,
             mailQueueViewFactory,
             clock,
             new RawMailQueueItemDecoratorFactory(),
             mailQueueSizeConfiguration);
-        RabbitMQMailQueueFactory mailQueueFactory = new RabbitMQMailQueueFactory(rabbitMQExtension.getRabbitConnectionPool(), mqManagementApi, privateFactory);
-        mailQueueFactory.start();
+        RabbitMQMailQueueFactory mailQueueFactory = new RabbitMQMailQueueFactory(reactorRabbitMQChannelPool, mqManagementApi, privateFactory);
         return mailQueueFactory.createQueue(SPOOL);
     }
 
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
index 127ed23..0102a05 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
@@ -44,6 +44,7 @@ import org.apache.james.blob.cassandra.CassandraBlobModule;
 import org.apache.james.blob.cassandra.CassandraBlobStore;
 import org.apache.james.blob.mail.MimeMessageStore;
 import org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStoreModule;
+
 import org.apache.james.metrics.api.Gauge;
 import org.apache.james.queue.api.MailQueue;
 import org.apache.james.queue.api.MailQueueMetricContract;
@@ -68,7 +69,6 @@ import org.junit.jupiter.api.extension.RegisterExtension;
 import org.mockito.ArgumentCaptor;
 
 import com.github.fge.lambdas.Throwing;
-import com.rabbitmq.client.Connection;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -84,7 +84,6 @@ class RabbitMQMailQueueTest {
     private static final Instant IN_SLICE_3 = IN_SLICE_1.plus(2, HOURS);
     private static final Instant IN_SLICE_5 = IN_SLICE_1.plus(4, HOURS);
     private static final Instant IN_SLICE_7 = IN_SLICE_1.plus(6, HOURS);
-    private static final int POOL_SIZE = 5;
 
     @RegisterExtension
     static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraModule.aggregateModules(
@@ -100,9 +99,11 @@ class RabbitMQMailQueueTest {
     private UpdatableTickingClock clock;
     private RabbitMQMailQueue mailQueue;
     private RabbitMQMailQueueManagement mqManagementApi;
+    private ReactorRabbitMQChannelPool reactorRabbitMQChannelPool;
 
     @AfterEach
     void tearDown() {
+        reactorRabbitMQChannelPool.close();
         mqManagementApi.deleteAllQueues();
     }
 
@@ -238,11 +239,6 @@ class RabbitMQMailQueueTest {
                 }))
                 .blockLast();
         }
-
-        @AfterEach
-        void tearDown() {
-            mqManagementApi.deleteAllQueues();
-        }
     }
 
     @Nested
@@ -264,11 +260,6 @@ class RabbitMQMailQueueTest {
             ArgumentCaptor<Gauge<?>> gaugeCaptor = ArgumentCaptor.forClass(Gauge.class);
             verify(metricTestSystem.getSpyGaugeRegistry(), never()).register(any(), gaugeCaptor.capture());
         }
-
-        @AfterEach
-        void tearDown() {
-            mqManagementApi.deleteAllQueues();
-        }
     }
 
     private void setUp(CassandraCluster cassandra, MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem, RabbitMQMailQueueConfiguration configuration) throws Exception {
@@ -284,14 +275,12 @@ class RabbitMQMailQueueTest {
                 .build(),
             mimeMessageStoreFactory);
 
-        Mono<Connection> connectionMono = rabbitMQExtension.getRabbitConnectionPool().getResilientConnection();
-        ReactorRabbitMQChannelPool reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool(connectionMono, POOL_SIZE);
-
+        reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool(rabbitMQExtension.getRabbitConnectionPool());
+        reactorRabbitMQChannelPool.start();
         RabbitMQMailQueueFactory.PrivateFactory factory = new RabbitMQMailQueueFactory.PrivateFactory(
             metricTestSystem.getMetricFactory(),
             metricTestSystem.getSpyGaugeRegistry(),
-            connectionMono,
-            reactorRabbitMQChannelPool.createSender(),
+            reactorRabbitMQChannelPool,
             mimeMessageStoreFactory,
             BLOB_ID_FACTORY,
             mailQueueViewFactory,
@@ -299,8 +288,7 @@ class RabbitMQMailQueueTest {
             new RawMailQueueItemDecoratorFactory(),
             configuration);
         mqManagementApi = new RabbitMQMailQueueManagement(rabbitMQExtension.managementAPI());
-        mailQueueFactory = new RabbitMQMailQueueFactory(rabbitMQExtension.getRabbitConnectionPool(), mqManagementApi, factory);
-        mailQueueFactory.start();
+        mailQueueFactory = new RabbitMQMailQueueFactory(reactorRabbitMQChannelPool, mqManagementApi, factory);
         mailQueue = mailQueueFactory.createQueue(SPOOL);
     }
 }
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
index fb5b8b7..8ac46b3 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
@@ -46,19 +46,15 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
-import com.rabbitmq.client.Connection;
-
-import reactor.core.publisher.Mono;
-
 class RabbitMqMailQueueFactoryTest implements MailQueueFactoryContract<RabbitMQMailQueue> {
     private static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory();
-    public static final int POOL_SIZE = 5;
 
     @RegisterExtension
     static RabbitMQExtension rabbitMQExtension = RabbitMQExtension.singletonRabbitMQ();
 
     private RabbitMQMailQueueFactory mailQueueFactory;
     private RabbitMQMailQueueManagement mqManagementApi;
+    private ReactorRabbitMQChannelPool reactorRabbitMQChannelPool;
 
     @BeforeEach
     void setup() throws Exception {
@@ -72,13 +68,12 @@ class RabbitMqMailQueueFactoryTest implements MailQueueFactoryContract<RabbitMQM
             .sizeMetricsEnabled(true)
             .build();
 
-        Mono<Connection> connectionMono = rabbitMQExtension.getRabbitConnectionPool().getResilientConnection();
-        ReactorRabbitMQChannelPool reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool(connectionMono, POOL_SIZE);
+        reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool(rabbitMQExtension.getRabbitConnectionPool());
+        reactorRabbitMQChannelPool.start();
         RabbitMQMailQueueFactory.PrivateFactory privateFactory = new RabbitMQMailQueueFactory.PrivateFactory(
             new NoopMetricFactory(),
             new NoopGaugeRegistry(),
-            connectionMono,
-            reactorRabbitMQChannelPool.createSender(),
+            reactorRabbitMQChannelPool,
             mimeMessageStoreFactory,
             BLOB_ID_FACTORY,
             mailQueueViewFactory,
@@ -86,13 +81,13 @@ class RabbitMqMailQueueFactoryTest implements MailQueueFactoryContract<RabbitMQM
             new RawMailQueueItemDecoratorFactory(),
             configuration);
         mqManagementApi = new RabbitMQMailQueueManagement(rabbitMQExtension.managementAPI());
-        mailQueueFactory = new RabbitMQMailQueueFactory(rabbitMQExtension.getRabbitConnectionPool(), mqManagementApi, privateFactory);
-        mailQueueFactory.start();
+        mailQueueFactory = new RabbitMQMailQueueFactory(reactorRabbitMQChannelPool, mqManagementApi, privateFactory);
     }
 
     @AfterEach
     void tearDown() {
         mqManagementApi.deleteAllQueues();
+        reactorRabbitMQChannelPool.close();
     }
 
     @Override
diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
index d2bdeeb..dbb4b8f 100644
--- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
+++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
@@ -42,6 +42,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.rabbitmq.client.AMQP;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.Delivery;
+
 import reactor.core.Disposable;
 import reactor.core.publisher.DirectProcessor;
 import reactor.core.publisher.Mono;
@@ -83,7 +84,8 @@ public class RabbitMQTerminationSubscriber implements TerminationSubscriber, Sta
     }
 
     public void start() {
-        sender = channelPool.createSender();
+        channelPool.start();
+        sender = channelPool.getSender();
 
         sender.declareExchange(ExchangeSpecification.exchange(EXCHANGE_NAME)).block();
         sender.declare(QueueSpecification.queue(queueName).durable(false).autoDelete(true)).block();
diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
index 21a9880..62a2f1b 100644
--- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
+++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
@@ -25,8 +25,6 @@ import java.util.Optional;
 import java.util.UUID;
 
 import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
-import org.apache.james.backends.rabbitmq.SimpleConnectionPool;
-import org.apache.james.lifecycle.api.Startable;
 import org.apache.james.server.task.json.JsonTaskSerializer;
 import org.apache.james.task.Task;
 import org.apache.james.task.TaskId;
@@ -39,8 +37,8 @@ import org.slf4j.LoggerFactory;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.collect.ImmutableMap;
 import com.rabbitmq.client.AMQP;
-import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.Delivery;
+
 import reactor.core.Disposable;
 import reactor.core.publisher.Mono;
 import reactor.core.publisher.UnicastProcessor;
@@ -51,12 +49,11 @@ import reactor.rabbitmq.ConsumeOptions;
 import reactor.rabbitmq.ExchangeSpecification;
 import reactor.rabbitmq.OutboundMessage;
 import reactor.rabbitmq.QueueSpecification;
-import reactor.rabbitmq.RabbitFlux;
 import reactor.rabbitmq.Receiver;
 import reactor.rabbitmq.ReceiverOptions;
 import reactor.rabbitmq.Sender;
 
-public class RabbitMQWorkQueue implements WorkQueue, Startable {
+public class RabbitMQWorkQueue implements WorkQueue {
     private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQWorkQueue.class);
 
     // Need at least one by receivers plus a shared one for senders
@@ -71,10 +68,8 @@ public class RabbitMQWorkQueue implements WorkQueue, Startable {
     public static final String TASK_ID = "taskId";
 
     private final TaskManagerWorker worker;
-    private final Mono<Connection> connectionMono;
     private final ReactorRabbitMQChannelPool channelPool;
     private final JsonTaskSerializer taskSerializer;
-    private Sender sender;
     private RabbitMQExclusiveConsumer receiver;
     private UnicastProcessor<TaskId> sendCancelRequestsQueue;
     private Disposable sendCancelRequestsQueueHandle;
@@ -83,29 +78,28 @@ public class RabbitMQWorkQueue implements WorkQueue, Startable {
     private Sender cancelRequestSender;
     private Receiver cancelRequestListener;
 
-    public RabbitMQWorkQueue(TaskManagerWorker worker, SimpleConnectionPool simpleConnectionPool, JsonTaskSerializer taskSerializer) {
+    public RabbitMQWorkQueue(TaskManagerWorker worker, ReactorRabbitMQChannelPool reactorRabbitMQChannelPool, JsonTaskSerializer taskSerializer) {
         this.worker = worker;
-        this.connectionMono = simpleConnectionPool.getResilientConnection();
+        this.channelPool = reactorRabbitMQChannelPool;
         this.taskSerializer = taskSerializer;
-        this.channelPool = new ReactorRabbitMQChannelPool(connectionMono, MAX_CHANNELS_NUMBER);
     }
 
+    @Override
     public void start() {
         startWorkqueue();
         listenToCancelRequests();
     }
 
     private void startWorkqueue() {
-        sender = channelPool.createSender();
-        sender.declareExchange(ExchangeSpecification.exchange(EXCHANGE_NAME)).block();
-        sender.declare(QueueSpecification.queue(QUEUE_NAME).durable(true)).block();
-        sender.bind(BindingSpecification.binding(EXCHANGE_NAME, ROUTING_KEY, QUEUE_NAME)).block();
+        channelPool.getSender().declareExchange(ExchangeSpecification.exchange(EXCHANGE_NAME)).block();
+        channelPool.getSender().declare(QueueSpecification.queue(QUEUE_NAME).durable(true)).block();
+        channelPool.getSender().bind(BindingSpecification.binding(EXCHANGE_NAME, ROUTING_KEY, QUEUE_NAME)).block();
 
         consumeWorkqueue();
     }
 
     private void consumeWorkqueue() {
-        receiver = new RabbitMQExclusiveConsumer(new ReceiverOptions().connectionMono(connectionMono));
+        receiver = new RabbitMQExclusiveConsumer(new ReceiverOptions().connectionMono(channelPool.getConnectionMono()));
         receiverHandle = receiver.consumeExclusiveManualAck(QUEUE_NAME, new ConsumeOptions())
             .subscribeOn(Schedulers.boundedElastic())
             .flatMap(this::executeTask)
@@ -143,7 +137,7 @@ public class RabbitMQWorkQueue implements WorkQueue, Startable {
     }
 
     void listenToCancelRequests() {
-        cancelRequestSender = channelPool.createSender();
+        cancelRequestSender = channelPool.getSender();
         String queueName = CANCEL_REQUESTS_QUEUE_NAME_PREFIX + UUID.randomUUID().toString();
 
         cancelRequestSender.declareExchange(ExchangeSpecification.exchange(CANCEL_REQUESTS_EXCHANGE_NAME)).block();
@@ -159,8 +153,7 @@ public class RabbitMQWorkQueue implements WorkQueue, Startable {
     }
 
     private void registerCancelRequestsListener(String queueName) {
-        cancelRequestListener = RabbitFlux
-            .createReceiver(new ReceiverOptions().connectionMono(connectionMono));
+        cancelRequestListener = channelPool.createReceiver();
         cancelRequestListenerHandle = cancelRequestListener
             .consumeAutoAck(queueName)
             .subscribeOn(Schedulers.boundedElastic())
@@ -188,7 +181,7 @@ public class RabbitMQWorkQueue implements WorkQueue, Startable {
                 .headers(ImmutableMap.of(TASK_ID, taskWithId.getId().asString()))
                 .build();
             OutboundMessage outboundMessage = new OutboundMessage(EXCHANGE_NAME, ROUTING_KEY, basicProperties, payload);
-            sender.send(Mono.just(outboundMessage)).block();
+            channelPool.getSender().send(Mono.just(outboundMessage)).block();
         } catch (JsonProcessingException e) {
             throw new RuntimeException(e);
         }
@@ -203,12 +196,9 @@ public class RabbitMQWorkQueue implements WorkQueue, Startable {
     public void close() {
         Optional.ofNullable(receiverHandle).ifPresent(Disposable::dispose);
         Optional.ofNullable(receiver).ifPresent(RabbitMQExclusiveConsumer::close);
-        Optional.ofNullable(sender).ifPresent(Sender::close);
         Optional.ofNullable(sendCancelRequestsQueueHandle).ifPresent(Disposable::dispose);
         Optional.ofNullable(cancelRequestListenerHandle).ifPresent(Disposable::dispose);
-        Optional.ofNullable(sender).ifPresent(Sender::close);
         Optional.ofNullable(cancelRequestSender).ifPresent(Sender::close);
         Optional.ofNullable(cancelRequestListener).ifPresent(Receiver::close);
-        channelPool.close();
     }
 }
diff --git a/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala b/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala
index f86be44..7feed07 100644
--- a/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala
+++ b/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala
@@ -22,13 +22,13 @@ import java.time.Duration
 
 import com.google.common.annotations.VisibleForTesting
 import javax.inject.Inject
-import org.apache.james.backends.rabbitmq.SimpleConnectionPool
+import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool
 import org.apache.james.eventsourcing.EventSourcingSystem
 import org.apache.james.server.task.json.JsonTaskSerializer
 import org.apache.james.task.SerialTaskManagerWorker
 import org.apache.james.task.eventsourcing.{WorkQueueSupplier, WorkerStatusListener}
 
-class RabbitMQWorkQueueSupplier @Inject()(private val rabbitMQConnectionPool: SimpleConnectionPool,
+class RabbitMQWorkQueueSupplier @Inject()(private val rabbitMQConnectionPool: ReactorRabbitMQChannelPool,
                                 private val jsonTaskSerializer: JsonTaskSerializer) extends WorkQueueSupplier {
 
   val DEFAULT_ADDITIONAL_INFORMATION_POLLING_INTERVAL =  Duration.ofSeconds(30)
@@ -41,7 +41,6 @@ class RabbitMQWorkQueueSupplier @Inject()(private val rabbitMQConnectionPool: Si
     val listener = WorkerStatusListener(eventSourcingSystem)
     val worker = new SerialTaskManagerWorker(listener, additionalInformationPollingInterval)
     val rabbitMQWorkQueue = new RabbitMQWorkQueue(worker, rabbitMQConnectionPool, jsonTaskSerializer)
-    rabbitMQWorkQueue.start()
     rabbitMQWorkQueue
   }
 }
diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
index 42f6f25..83c60d8 100644
--- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
+++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
@@ -34,7 +34,7 @@ import org.apache.james.backends.cassandra.components.CassandraModule;
 import org.apache.james.backends.cassandra.init.CassandraZonedDateTimeModule;
 import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
 import org.apache.james.backends.rabbitmq.RabbitMQExtension;
-import org.apache.james.backends.rabbitmq.SimpleConnectionPool;
+import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
 import org.apache.james.eventsourcing.EventSourcingSystem;
 import org.apache.james.eventsourcing.eventstore.EventStore;
 import org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStoreExtension;
@@ -74,11 +74,13 @@ import com.github.steveash.guavate.Guavate;
 
 class DistributedTaskManagerTest implements TaskManagerContract {
 
+    private ReactorRabbitMQChannelPool reactorRabbitMQChannelPool;
+
     private static class TrackedRabbitMQWorkQueueSupplier implements WorkQueueSupplier {
         private final List<RabbitMQWorkQueue> workQueues;
         private final RabbitMQWorkQueueSupplier supplier;
 
-        TrackedRabbitMQWorkQueueSupplier(SimpleConnectionPool rabbitConnectionPool, JsonTaskSerializer taskSerializer) {
+        TrackedRabbitMQWorkQueueSupplier(ReactorRabbitMQChannelPool rabbitConnectionPool, JsonTaskSerializer taskSerializer) {
             workQueues = new ArrayList<>();
             supplier = new RabbitMQWorkQueueSupplier(rabbitConnectionPool, taskSerializer);
         }
@@ -86,6 +88,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
         @Override
         public WorkQueue apply(EventSourcingSystem eventSourcingSystem) {
             RabbitMQWorkQueue workQueue = supplier.apply(eventSourcingSystem, UPDATE_INFORMATION_POLLING_INTERVAL);
+            workQueue.start();
             workQueues.add(workQueue);
             return workQueue;
         }
@@ -138,7 +141,9 @@ class DistributedTaskManagerTest implements TaskManagerContract {
 
     @BeforeEach
     void setUp(EventStore eventStore) {
-        workQueueSupplier = new TrackedRabbitMQWorkQueueSupplier(rabbitMQExtension.getRabbitConnectionPool(), TASK_SERIALIZER);
+        reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool(rabbitMQExtension.getRabbitConnectionPool());
+        reactorRabbitMQChannelPool.start();
+        workQueueSupplier = new TrackedRabbitMQWorkQueueSupplier(reactorRabbitMQChannelPool, TASK_SERIALIZER);
         this.eventStore = eventStore;
         terminationSubscribers = new ArrayList<>();
     }
@@ -147,6 +152,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
     void tearDown() {
         terminationSubscribers.forEach(RabbitMQTerminationSubscriber::close);
         workQueueSupplier.stopWorkQueues();
+        reactorRabbitMQChannelPool.close();
     }
 
     public EventSourcingTaskManager taskManager() {
diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java
index 91af0e2..5468af1 100644
--- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java
+++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java
@@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.IntStream;
 
 import org.apache.james.backends.rabbitmq.RabbitMQExtension;
+import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
 import org.apache.james.server.task.json.JsonTaskSerializer;
 import org.apache.james.server.task.json.TestTask;
 import org.apache.james.server.task.json.dto.TestTaskDTOModules;
@@ -68,6 +69,7 @@ class RabbitMQWorkQueueTest {
     private RabbitMQWorkQueue testee;
     private ImmediateWorker worker;
     private JsonTaskSerializer serializer;
+    private ReactorRabbitMQChannelPool reactorRabbitMQChannelPool;
 
     private static class ImmediateWorker implements TaskManagerWorker {
 
@@ -101,13 +103,16 @@ class RabbitMQWorkQueueTest {
     void setUp() {
         worker = spy(new ImmediateWorker());
         serializer = new JsonTaskSerializer(TestTaskDTOModules.COMPLETED_TASK_MODULE);
-        testee = new RabbitMQWorkQueue(worker, rabbitMQExtension.getRabbitConnectionPool(), serializer);
+        reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool(rabbitMQExtension.getRabbitConnectionPool());
+        reactorRabbitMQChannelPool.start();
+        testee = new RabbitMQWorkQueue(worker, reactorRabbitMQChannelPool, serializer);
         testee.start();
     }
 
     @AfterEach
     void tearDown() {
         testee.close();
+        reactorRabbitMQChannelPool.close();
     }
 
     @Test
@@ -132,7 +137,7 @@ class RabbitMQWorkQueueTest {
         testee.submit(TASK_WITH_ID);
 
         ImmediateWorker otherTaskManagerWorker = new ImmediateWorker();
-        try (RabbitMQWorkQueue otherWorkQueue = new RabbitMQWorkQueue(otherTaskManagerWorker, rabbitMQExtension.getRabbitConnectionPool(), serializer)) {
+        try (RabbitMQWorkQueue otherWorkQueue = new RabbitMQWorkQueue(otherTaskManagerWorker, reactorRabbitMQChannelPool, serializer)) {
             otherWorkQueue.start();
 
             IntStream.range(0, 9)
@@ -151,7 +156,7 @@ class RabbitMQWorkQueueTest {
 
         ImmediateWorker otherTaskManagerWorker = new ImmediateWorker();
         JsonTaskSerializer otherTaskSerializer = new JsonTaskSerializer(TestTaskDTOModules.TEST_TYPE);
-        try (RabbitMQWorkQueue otherWorkQueue = new RabbitMQWorkQueue(otherTaskManagerWorker, rabbitMQExtension.getRabbitConnectionPool(), otherTaskSerializer)) {
+        try (RabbitMQWorkQueue otherWorkQueue = new RabbitMQWorkQueue(otherTaskManagerWorker, reactorRabbitMQChannelPool, otherTaskSerializer)) {
             //wait to be sur that the first workqueue has subscribed as an exclusive consumer of the RabbitMQ queue.
             Thread.sleep(200);
             otherWorkQueue.start();
diff --git a/server/task/task-memory/pom.xml b/server/task/task-memory/pom.xml
index 0b60d59..e6dccd7 100644
--- a/server/task/task-memory/pom.xml
+++ b/server/task/task-memory/pom.xml
@@ -44,6 +44,10 @@
         </dependency>
         <dependency>
             <groupId>${james.groupId}</groupId>
+            <artifactId>james-server-lifecycle-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
             <artifactId>james-server-util</artifactId>
         </dependency>
         <dependency>
diff --git a/server/task/task-memory/src/main/java/org/apache/james/task/WorkQueue.java b/server/task/task-memory/src/main/java/org/apache/james/task/WorkQueue.java
index ae363ff..8a4f490 100644
--- a/server/task/task-memory/src/main/java/org/apache/james/task/WorkQueue.java
+++ b/server/task/task-memory/src/main/java/org/apache/james/task/WorkQueue.java
@@ -20,7 +20,13 @@ package org.apache.james.task;
 
 import java.io.Closeable;
 
-public interface WorkQueue extends Closeable {
+import org.apache.james.lifecycle.api.Startable;
+
+public interface WorkQueue extends Closeable, Startable {
+
+    default void start() {
+
+    }
 
     void submit(TaskWithId taskWithId);
 
diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
index 35856b0..fe2d869 100644
--- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
+++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
@@ -22,14 +22,15 @@ import java.io.Closeable
 import java.time.Duration
 import java.util
 
+import com.google.common.annotations.VisibleForTesting
+import javax.annotation.PreDestroy
 import javax.inject.Inject
 import org.apache.james.eventsourcing.eventstore.{EventStore, History}
 import org.apache.james.eventsourcing.{AggregateId, Subscriber}
+import org.apache.james.lifecycle.api.Startable
 import org.apache.james.task.TaskManager.ReachedTimeoutException
 import org.apache.james.task._
 import org.apache.james.task.eventsourcing.TaskCommand._
-import com.google.common.annotations.VisibleForTesting
-import javax.annotation.PreDestroy
 import reactor.core.publisher.{Flux, Mono}
 import reactor.core.scheduler.Schedulers
 
@@ -38,7 +39,7 @@ class EventSourcingTaskManager @Inject @VisibleForTesting private[eventsourcing]
                                                                                   val eventStore: EventStore,
                                                                                   val executionDetailsProjection: TaskExecutionDetailsProjection,
                                                                                   val hostname: Hostname,
-                                                                                  val terminationSubscriber: TerminationSubscriber) extends TaskManager with Closeable {
+                                                                                  val terminationSubscriber: TerminationSubscriber) extends TaskManager with Closeable with Startable {
 
   private def workDispatcher: Subscriber = {
     case Created(aggregateId, _, task, _) =>
@@ -69,6 +70,8 @@ class EventSourcingTaskManager @Inject @VisibleForTesting private[eventsourcing]
 
   private val workQueue: WorkQueue = workQueueSupplier(eventSourcingSystem)
 
+  def start(): Unit = workQueue.start()
+
   override def submit(task: Task): TaskId = {
     val taskId = TaskId.generateTaskId
     val command = Create(taskId, task)


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org