You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/03/06 04:13:16 UTC

[james-project] branch master updated: JAMES-3085 Drop direct references to ReactorRabbitMQChannelPool

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new b2354d4  JAMES-3085 Drop direct references to ReactorRabbitMQChannelPool
b2354d4 is described below

commit b2354d440f8104fabd59c57e155ce2005c4e1d20
Author: Gautier DI FOLCO <gd...@linagora.com>
AuthorDate: Mon Mar 2 12:40:14 2020 +0100

    JAMES-3085 Drop direct references to ReactorRabbitMQChannelPool
    
    ReactorRabbitMQChannelPool was introduced in order to limit our
    RabbitMQ's channels usage. It was set to 5.
    
    We have 4 kinds of usage of it:
    
     * Ever running Receivers (4)
     * Temporary Senders (4)
     * Permanent Senders (2)
     * Management operations (3)
    
    *Ever running Receivers* hold a channel forever and are allocated at
    startup.
    *Management operations* need a channel but do not need them exclusively.
    The real benefit of ReactorRabbitMQChannelPool is for the *Temporary
    Senders* which request a channel each time.
    
    Before this commit, only one channel was available for *Management
    operations* and *Senders*, leading to starvations and timeouts
    under heavy loads.
    
    This commit make the *Ever running Receivers* relying directly on a
    channel. The other usages relying (indirectly) on
    ReactorRabbitMQChannelPool (with a smaller channel pool).
    
    Now 7 channels are open (4 for the *Ever running Receivers* and
    3 for the ReactorRabbitMQChannelPool) instead of 5.
---
 .../rabbitmq/ReactorRabbitMQChannelPool.java       |  4 +--
 .../james/backends/rabbitmq/ReceiverProvider.java  | 27 +++++++++++++++++
 .../james/backends/rabbitmq/RabbitMQExtension.java | 12 +++++++-
 .../james/mailbox/events/GroupRegistration.java    |  8 ++---
 .../mailbox/events/GroupRegistrationHandler.java   | 15 ++++++----
 .../mailbox/events/KeyRegistrationHandler.java     |  8 ++---
 .../james/mailbox/events/RabbitMQEventBus.java     | 23 +++++++-------
 .../james/mailbox/events/RabbitMQEventBusTest.java | 21 +++++++------
 .../rabbitmq/host/RabbitMQEventBusHostSystem.java  |  5 ++--
 .../james/modules/rabbitmq/RabbitMQModule.java     | 33 ++++++++++++++------
 .../mailets/remote/delivery/DeliveryRunnable.java  |  2 --
 .../org/apache/james/queue/rabbitmq/Dequeuer.java  |  6 ++--
 .../org/apache/james/queue/rabbitmq/Enqueuer.java  |  5 ++--
 .../queue/rabbitmq/RabbitMQMailQueueFactory.java   | 29 +++++++++---------
 .../RabbitMQMailQueueConfigurationChangeTest.java  |  5 ++--
 .../queue/rabbitmq/RabbitMQMailQueueTest.java      |  4 +--
 .../rabbitmq/RabbitMqMailQueueFactoryTest.java     |  5 ++--
 .../distributed/RabbitMQTerminationSubscriber.java | 17 +++++------
 .../distributed/RabbitMQWorkQueue.java             | 35 +++++++++++-----------
 .../distributed/RabbitMQWorkQueueSupplier.scala    | 24 ++++++++-------
 .../distributed/DistributedTaskManagerTest.java    | 13 ++++----
 .../RabbitMQTerminationSubscriberTest.java         |  2 +-
 .../RabbitMQWorkQueuePersistenceTest.java          |  4 +--
 .../distributed/RabbitMQWorkQueueTest.java         |  6 ++--
 24 files changed, 186 insertions(+), 127 deletions(-)

diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
index 2f789bb..db3e900 100644
--- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
+++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
@@ -26,7 +26,6 @@ import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.function.BiConsumer;
 
 import javax.annotation.PreDestroy;
-import javax.inject.Inject;
 
 import org.apache.commons.pool2.BasePooledObjectFactory;
 import org.apache.commons.pool2.PooledObject;
@@ -103,7 +102,7 @@ public class ReactorRabbitMQChannelPool implements ChannelPool, Startable {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(ReactorRabbitMQChannelPool.class);
     private static final long MAXIMUM_BORROW_TIMEOUT_IN_MS = Duration.ofSeconds(5).toMillis();
-    private static final int MAX_CHANNELS_NUMBER = 5;
+    private static final int MAX_CHANNELS_NUMBER = 3;
     private static final int MAX_BORROW_RETRIES = 3;
     private static final Duration MIN_BORROW_DELAY = Duration.ofMillis(50);
     private static final Duration FOREVER = Duration.ofMillis(Long.MAX_VALUE);
@@ -113,7 +112,6 @@ public class ReactorRabbitMQChannelPool implements ChannelPool, Startable {
     private final ConcurrentSkipListSet<Channel> borrowedChannels;
     private Sender sender;
 
-    @Inject
     public ReactorRabbitMQChannelPool(SimpleConnectionPool simpleConnectionPool) {
         this(simpleConnectionPool.getResilientConnection(), MAX_CHANNELS_NUMBER);
     }
diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReceiverProvider.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReceiverProvider.java
new file mode 100644
index 0000000..8d90134
--- /dev/null
+++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReceiverProvider.java
@@ -0,0 +1,27 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ * http://www.apache.org/licenses/LICENSE-2.0                   *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ***************************************************************/
+
+package org.apache.james.backends.rabbitmq;
+
+import reactor.rabbitmq.Receiver;
+
+@FunctionalInterface
+public interface ReceiverProvider {
+    Receiver createReceiver();
+}
diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQExtension.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQExtension.java
index a909bb0..2dc8d31 100644
--- a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQExtension.java
+++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQExtension.java
@@ -32,6 +32,8 @@ import org.junit.jupiter.api.extension.ParameterContext;
 import org.junit.jupiter.api.extension.ParameterResolutionException;
 import org.junit.jupiter.api.extension.ParameterResolver;
 
+import reactor.rabbitmq.Sender;
+
 public class RabbitMQExtension implements BeforeAllCallback, BeforeEachCallback, AfterAllCallback, AfterEachCallback, ParameterResolver {
 
     private static final Consumer<DockerRabbitMQ> DO_NOTHING = dockerRabbitMQ -> {};
@@ -113,7 +115,7 @@ public class RabbitMQExtension implements BeforeAllCallback, BeforeEachCallback,
 
         RabbitMQConnectionFactory connectionFactory = createRabbitConnectionFactory();
         connectionPool = new SimpleConnectionPool(connectionFactory);
-        channelPool = new ReactorRabbitMQChannelPool(connectionPool);
+        channelPool = new ReactorRabbitMQChannelPool(connectionPool.getResilientConnection(), 5);
         channelPool.start();
     }
 
@@ -144,6 +146,14 @@ public class RabbitMQExtension implements BeforeAllCallback, BeforeEachCallback,
         return channelPool;
     }
 
+    public Sender getSender() {
+        return channelPool.getSender();
+    }
+
+    public ReceiverProvider getReceiverProvider() {
+        return channelPool::createReceiver;
+    }
+
     public SimpleConnectionPool getConnectionPool() {
         return connectionPool;
     }
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
index 5622412..da0fe77 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
@@ -31,7 +31,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.Objects;
 import java.util.Optional;
 
-import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
+import org.apache.james.backends.rabbitmq.ReceiverProvider;
 import org.apache.james.event.json.EventSerializer;
 import org.apache.james.util.MDCBuilder;
 
@@ -84,15 +84,15 @@ class GroupRegistration implements Registration {
     private final MailboxListenerExecutor mailboxListenerExecutor;
     private Optional<Disposable> receiverSubscriber;
 
-    GroupRegistration(ReactorRabbitMQChannelPool reactorRabbitMQChannelPool, EventSerializer eventSerializer,
+    GroupRegistration(Sender sender, ReceiverProvider receiverProvider, EventSerializer eventSerializer,
                       MailboxListener mailboxListener, Group group, RetryBackoffConfiguration retryBackoff,
                       EventDeadLetters eventDeadLetters,
                       Runnable unregisterGroup, MailboxListenerExecutor mailboxListenerExecutor) {
         this.eventSerializer = eventSerializer;
         this.mailboxListener = mailboxListener;
         this.queueName = WorkQueueName.of(group);
-        this.sender = reactorRabbitMQChannelPool.getSender();
-        this.receiver = reactorRabbitMQChannelPool.createReceiver();
+        this.sender = sender;
+        this.receiver = receiverProvider.createReceiver();
         this.mailboxListenerExecutor = mailboxListenerExecutor;
         this.receiverSubscriber = Optional.empty();
         this.unregisterGroup = unregisterGroup;
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistrationHandler.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistrationHandler.java
index 5337df6..201d3c3 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistrationHandler.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistrationHandler.java
@@ -23,22 +23,26 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 
-import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
+import org.apache.james.backends.rabbitmq.ReceiverProvider;
 import org.apache.james.event.json.EventSerializer;
 
+import reactor.rabbitmq.Sender;
+
 class GroupRegistrationHandler {
     private final Map<Group, GroupRegistration> groupRegistrations;
     private final EventSerializer eventSerializer;
+    private final Sender sender;
+    private final ReceiverProvider receiverProvider;
     private final RetryBackoffConfiguration retryBackoff;
     private final EventDeadLetters eventDeadLetters;
     private final MailboxListenerExecutor mailboxListenerExecutor;
-    private final ReactorRabbitMQChannelPool reactorRabbitMQChannelPool;
 
-    GroupRegistrationHandler(EventSerializer eventSerializer, ReactorRabbitMQChannelPool reactorRabbitMQChannelPool,
+    GroupRegistrationHandler(EventSerializer eventSerializer, Sender sender, ReceiverProvider receiverProvider,
                              RetryBackoffConfiguration retryBackoff,
                              EventDeadLetters eventDeadLetters, MailboxListenerExecutor mailboxListenerExecutor) {
         this.eventSerializer = eventSerializer;
-        this.reactorRabbitMQChannelPool = reactorRabbitMQChannelPool;
+        this.sender = sender;
+        this.receiverProvider = receiverProvider;
         this.retryBackoff = retryBackoff;
         this.eventDeadLetters = eventDeadLetters;
         this.mailboxListenerExecutor = mailboxListenerExecutor;
@@ -67,7 +71,8 @@ class GroupRegistrationHandler {
 
     private GroupRegistration newGroupRegistration(MailboxListener listener, Group group) {
         return new GroupRegistration(
-            reactorRabbitMQChannelPool,
+            sender,
+            receiverProvider,
             eventSerializer,
             listener,
             group,
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
index 41db3f8..ae49471 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
@@ -28,7 +28,7 @@ import static org.apache.james.mailbox.events.RabbitMQEventBus.EVENT_BUS_ID;
 import java.nio.charset.StandardCharsets;
 import java.util.Optional;
 
-import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
+import org.apache.james.backends.rabbitmq.ReceiverProvider;
 import org.apache.james.event.json.EventSerializer;
 import org.apache.james.util.MDCBuilder;
 import org.apache.james.util.MDCStructuredLogger;
@@ -62,13 +62,13 @@ class KeyRegistrationHandler {
     private final MailboxListenerExecutor mailboxListenerExecutor;
     private Optional<Disposable> receiverSubscriber;
 
-    KeyRegistrationHandler(EventBusId eventBusId, EventSerializer eventSerializer, ReactorRabbitMQChannelPool reactorRabbitMQChannelPool, RoutingKeyConverter routingKeyConverter, LocalListenerRegistry localListenerRegistry, MailboxListenerExecutor mailboxListenerExecutor) {
+    KeyRegistrationHandler(EventBusId eventBusId, EventSerializer eventSerializer, Sender sender, ReceiverProvider receiverProvider, RoutingKeyConverter routingKeyConverter, LocalListenerRegistry localListenerRegistry, MailboxListenerExecutor mailboxListenerExecutor) {
         this.eventBusId = eventBusId;
         this.eventSerializer = eventSerializer;
-        this.sender = reactorRabbitMQChannelPool.getSender();
+        this.sender = sender;
         this.routingKeyConverter = routingKeyConverter;
         this.localListenerRegistry = localListenerRegistry;
-        this.receiver = reactorRabbitMQChannelPool.createReceiver();
+        this.receiver = receiverProvider.createReceiver();
         this.mailboxListenerExecutor = mailboxListenerExecutor;
         this.registrationQueue = new RegistrationQueueName();
         this.registrationBinder = new RegistrationBinder(sender, registrationQueue);
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
index 06d786a..ecc1b4f 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
@@ -24,7 +24,7 @@ import java.util.Set;
 import javax.annotation.PreDestroy;
 import javax.inject.Inject;
 
-import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
+import org.apache.james.backends.rabbitmq.ReceiverProvider;
 import org.apache.james.event.json.EventSerializer;
 import org.apache.james.lifecycle.api.Startable;
 import org.apache.james.metrics.api.MetricFactory;
@@ -32,6 +32,7 @@ import org.apache.james.metrics.api.MetricFactory;
 import com.google.common.base.Preconditions;
 
 import reactor.core.publisher.Mono;
+import reactor.rabbitmq.Sender;
 
 public class RabbitMQEventBus implements EventBus, Startable {
     private static final String NOT_RUNNING_ERROR_MESSAGE = "Event Bus is not running";
@@ -44,8 +45,9 @@ public class RabbitMQEventBus implements EventBus, Startable {
     private final RetryBackoffConfiguration retryBackoff;
     private final EventBusId eventBusId;
     private final EventDeadLetters eventDeadLetters;
-    private final ReactorRabbitMQChannelPool channelPool;
     private final MailboxListenerExecutor mailboxListenerExecutor;
+    private final Sender sender;
+    private final ReceiverProvider receiverProvider;
 
     private volatile boolean isRunning;
     private volatile boolean isStopping;
@@ -54,11 +56,12 @@ public class RabbitMQEventBus implements EventBus, Startable {
     private EventDispatcher eventDispatcher;
 
     @Inject
-    public RabbitMQEventBus(ReactorRabbitMQChannelPool reactorRabbitMQChannelPool, EventSerializer eventSerializer,
-                     RetryBackoffConfiguration retryBackoff,
-                     RoutingKeyConverter routingKeyConverter,
-                     EventDeadLetters eventDeadLetters, MetricFactory metricFactory) {
-        this.channelPool = reactorRabbitMQChannelPool;
+    public RabbitMQEventBus(Sender sender, ReceiverProvider receiverProvider, EventSerializer eventSerializer,
+                            RetryBackoffConfiguration retryBackoff,
+                            RoutingKeyConverter routingKeyConverter,
+                            EventDeadLetters eventDeadLetters, MetricFactory metricFactory) {
+        this.sender = sender;
+        this.receiverProvider = receiverProvider;
         this.mailboxListenerExecutor = new MailboxListenerExecutor(metricFactory);
         this.eventBusId = EventBusId.random();
         this.eventSerializer = eventSerializer;
@@ -73,9 +76,9 @@ public class RabbitMQEventBus implements EventBus, Startable {
         if (!isRunning && !isStopping) {
 
             LocalListenerRegistry localListenerRegistry = new LocalListenerRegistry();
-            keyRegistrationHandler = new KeyRegistrationHandler(eventBusId, eventSerializer, channelPool, routingKeyConverter, localListenerRegistry, mailboxListenerExecutor);
-            groupRegistrationHandler = new GroupRegistrationHandler(eventSerializer, channelPool, retryBackoff, eventDeadLetters, mailboxListenerExecutor);
-            eventDispatcher = new EventDispatcher(eventBusId, eventSerializer, channelPool.getSender(), localListenerRegistry, mailboxListenerExecutor);
+            keyRegistrationHandler = new KeyRegistrationHandler(eventBusId, eventSerializer, sender, receiverProvider, routingKeyConverter, localListenerRegistry, mailboxListenerExecutor);
+            groupRegistrationHandler = new GroupRegistrationHandler(eventSerializer, sender, receiverProvider, retryBackoff, eventDeadLetters, mailboxListenerExecutor);
+            eventDispatcher = new EventDispatcher(eventBusId, eventSerializer, sender, localListenerRegistry, mailboxListenerExecutor);
 
             eventDispatcher.start();
             keyRegistrationHandler.start();
diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
index 5436749..35d3581 100644
--- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
+++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
@@ -56,7 +56,7 @@ import org.apache.james.backends.rabbitmq.RabbitMQExtension;
 import org.apache.james.backends.rabbitmq.RabbitMQExtension.DockerRestartPolicy;
 import org.apache.james.backends.rabbitmq.RabbitMQFixture;
 import org.apache.james.backends.rabbitmq.RabbitMQManagementAPI;
-import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
+import org.apache.james.backends.rabbitmq.ReceiverProvider;
 import org.apache.james.event.json.EventSerializer;
 import org.apache.james.mailbox.events.EventBusTestFixture.GroupA;
 import org.apache.james.mailbox.events.EventBusTestFixture.MailboxListenerCountingSuccessfulExecution;
@@ -119,19 +119,18 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
         eventBus3.stop();
         ALL_GROUPS.stream()
             .map(GroupRegistration.WorkQueueName::of)
-            .forEach(queueName -> rabbitMQExtension.getRabbitChannelPool().getSender().delete(QueueSpecification.queue(queueName.asString())).block());
-        rabbitMQExtension.getRabbitChannelPool()
-            .getSender()
+            .forEach(queueName -> rabbitMQExtension.getSender().delete(QueueSpecification.queue(queueName.asString())).block());
+        rabbitMQExtension.getSender()
             .delete(ExchangeSpecification.exchange(MAILBOX_EVENT_EXCHANGE_NAME))
             .block();
     }
 
     private RabbitMQEventBus newEventBus() {
-        return newEventBus(rabbitMQExtension.getRabbitChannelPool());
+        return newEventBus(rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider());
     }
 
-    private RabbitMQEventBus newEventBus(ReactorRabbitMQChannelPool rabbitMQChannelPool) {
-        return new RabbitMQEventBus(rabbitMQChannelPool, eventSerializer, RetryBackoffConfiguration.DEFAULT, routingKeyConverter, memoryEventDeadLetters, new RecordingMetricFactory());
+    private RabbitMQEventBus newEventBus(Sender sender, ReceiverProvider receiverProvider) {
+        return new RabbitMQEventBus(sender, receiverProvider, eventSerializer, RetryBackoffConfiguration.DEFAULT, routingKeyConverter, memoryEventDeadLetters, new RecordingMetricFactory());
     }
 
     @Override
@@ -256,7 +255,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
 
         @BeforeEach
         void setUp() {
-            Sender sender = rabbitMQExtension.getRabbitChannelPool().getSender();
+            Sender sender = rabbitMQExtension.getSender();
 
             sender.declareQueue(QueueSpecification.queue(MAILBOX_WORK_QUEUE_NAME)
                 .durable(DURABLE)
@@ -286,7 +285,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
         }
 
         private Event dequeueEvent() {
-            try (Receiver receiver = rabbitMQExtension.getRabbitChannelPool().createReceiver()) {
+            try (Receiver receiver = rabbitMQExtension.getReceiverProvider().createReceiver()) {
                 byte[] eventInBytes = receiver.consumeAutoAck(MAILBOX_WORK_QUEUE_NAME)
                     .blockFirst()
                     .getBody();
@@ -328,7 +327,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
 
                 @BeforeEach
                 void beforeEach() {
-                    rabbitMQEventBusWithNetWorkIssue = newEventBus(rabbitMQNetWorkIssueExtension.getRabbitChannelPool());
+                    rabbitMQEventBusWithNetWorkIssue = newEventBus(rabbitMQNetWorkIssueExtension.getSender(), rabbitMQNetWorkIssueExtension.getReceiverProvider());
                 }
 
                 @Test
@@ -676,4 +675,4 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
         RabbitMQFixture.awaitAtMostThirtySeconds
             .untilAsserted(() -> verify(listener).event(EVENT));
     }
-}
\ No newline at end of file
+}
diff --git a/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java b/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java
index c34510c..08b41e9 100644
--- a/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java
+++ b/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java
@@ -109,8 +109,9 @@ public class RabbitMQEventBusHostSystem extends JamesImapHostSystem {
         InMemoryId.Factory mailboxIdFactory = new InMemoryId.Factory();
         EventSerializer eventSerializer = new EventSerializer(mailboxIdFactory, messageIdFactory, new DefaultUserQuotaRootResolver.DefaultQuotaRootDeserializer());
         RoutingKeyConverter routingKeyConverter = new RoutingKeyConverter(ImmutableSet.of(new MailboxIdRegistrationKey.Factory(mailboxIdFactory)));
-        return new RabbitMQEventBus(reactorRabbitMQChannelPool, eventSerializer, RetryBackoffConfiguration.DEFAULT,
-            routingKeyConverter, new MemoryEventDeadLetters(), new RecordingMetricFactory());
+        return new RabbitMQEventBus(reactorRabbitMQChannelPool.getSender(), reactorRabbitMQChannelPool::createReceiver,
+            eventSerializer, RetryBackoffConfiguration.DEFAULT, routingKeyConverter, new MemoryEventDeadLetters(),
+            new RecordingMetricFactory());
     }
 
     @Override
diff --git a/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java b/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java
index ea8c317..1d223b3 100644
--- a/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java
+++ b/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java
@@ -29,6 +29,8 @@ import org.apache.james.backends.cassandra.components.CassandraModule;
 import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
 import org.apache.james.backends.rabbitmq.RabbitMQHealthCheck;
 import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
+import org.apache.james.backends.rabbitmq.ReceiverProvider;
+import org.apache.james.backends.rabbitmq.SimpleConnectionPool;
 import org.apache.james.core.healthcheck.HealthCheck;
 import org.apache.james.eventsourcing.Event;
 import org.apache.james.eventsourcing.eventstore.cassandra.dto.EventDTO;
@@ -50,8 +52,6 @@ import org.apache.james.queue.rabbitmq.view.cassandra.EnqueuedMailsDAO;
 import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMailQueueViewConfiguration;
 import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMailQueueViewConfigurationModule;
 import org.apache.james.queue.rabbitmq.view.cassandra.configuration.EventsourcingConfigurationManagement;
-import org.apache.james.utils.InitializationOperation;
-import org.apache.james.utils.InitilizationOperationBuilder;
 import org.apache.james.utils.PropertiesProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,7 +61,10 @@ import com.google.inject.Provides;
 import com.google.inject.Scopes;
 import com.google.inject.TypeLiteral;
 import com.google.inject.multibindings.Multibinder;
-import com.google.inject.multibindings.ProvidesIntoSet;
+
+import reactor.rabbitmq.RabbitFlux;
+import reactor.rabbitmq.ReceiverOptions;
+import reactor.rabbitmq.Sender;
 
 public class RabbitMQModule extends AbstractModule {
 
@@ -71,7 +74,6 @@ public class RabbitMQModule extends AbstractModule {
 
     @Override
     protected void configure() {
-        bind(ReactorRabbitMQChannelPool.class).in(Scopes.SINGLETON);
         bind(EnqueuedMailsDAO.class).in(Scopes.SINGLETON);
         bind(DeletedMailsDAO.class).in(Scopes.SINGLETON);
         bind(BrowseStartDAO.class).in(Scopes.SINGLETON);
@@ -143,10 +145,23 @@ public class RabbitMQModule extends AbstractModule {
         return RabbitMQMailQueueConfiguration.from(configuration);
     }
 
-    @ProvidesIntoSet
-    InitializationOperation workQueue(ReactorRabbitMQChannelPool instance) {
-        return InitilizationOperationBuilder
-            .forClass(ReactorRabbitMQChannelPool.class)
-            .init(instance::start);
+    @Provides
+    @Singleton
+    ReactorRabbitMQChannelPool provideReactorRabbitMQChannelPool(SimpleConnectionPool simpleConnectionPool) {
+        ReactorRabbitMQChannelPool channelPool = new ReactorRabbitMQChannelPool(simpleConnectionPool);
+        channelPool.start();
+        return channelPool;
+    }
+
+    @Provides
+    @Singleton
+    public Sender provideRabbitMQSender(ReactorRabbitMQChannelPool channelPool) {
+        return channelPool.getSender();
+    }
+
+    @Provides
+    @Singleton
+    public ReceiverProvider provideRabbitMQReceiver(SimpleConnectionPool simpleConnectionPool) {
+        return () -> RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(simpleConnectionPool.getResilientConnection()));
     }
 }
diff --git a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/DeliveryRunnable.java b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/DeliveryRunnable.java
index a6d92d0..0fa2b96 100644
--- a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/DeliveryRunnable.java
+++ b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/DeliveryRunnable.java
@@ -21,7 +21,6 @@ package org.apache.james.transport.mailets.remote.delivery;
 
 import java.time.Duration;
 import java.util.Date;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Supplier;
 
 import org.apache.james.dnsservice.api.DNSService;
@@ -48,7 +47,6 @@ public class DeliveryRunnable implements Disposable {
     private static final Logger LOGGER = LoggerFactory.getLogger(DeliveryRunnable.class);
 
     public static final Supplier<Date> CURRENT_DATE_SUPPLIER = Date::new;
-    public static final AtomicBoolean DEFAULT_NOT_STARTED = new AtomicBoolean(false);
     public static final String OUTGOING_MAILS = "outgoingMails";
     public static final String REMOTE_DELIVERY_TRIAL = "RemoteDeliveryTrial";
 
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
index 21546b9..ece7018 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
@@ -25,7 +25,7 @@ import java.io.IOException;
 import java.util.function.Consumer;
 import java.util.function.Function;
 
-import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
+import org.apache.james.backends.rabbitmq.ReceiverProvider;
 import org.apache.james.metrics.api.Metric;
 import org.apache.james.metrics.api.MetricFactory;
 import org.apache.james.queue.api.MailQueue;
@@ -77,14 +77,14 @@ class Dequeuer {
     private final MailReferenceSerializer mailReferenceSerializer;
     private final MailQueueView mailQueueView;
 
-    Dequeuer(MailQueueName name, ReactorRabbitMQChannelPool reactorRabbitMQChannelPool, Function<MailReferenceDTO, MailWithEnqueueId> mailLoader,
+    Dequeuer(MailQueueName name, ReceiverProvider receiverProvider, Function<MailReferenceDTO, MailWithEnqueueId> mailLoader,
              MailReferenceSerializer serializer, MetricFactory metricFactory,
              MailQueueView mailQueueView) {
         this.mailLoader = mailLoader;
         this.mailReferenceSerializer = serializer;
         this.mailQueueView = mailQueueView;
         this.dequeueMetric = metricFactory.generate(DEQUEUED_METRIC_NAME_PREFIX + name.asString());
-        this.flux = reactorRabbitMQChannelPool.createReceiver()
+        this.flux = receiverProvider.createReceiver()
             .consumeManualAck(name.toWorkQueueName().asString(), new ConsumeOptions().qos(EXECUTION_RATE))
             .filter(getResponse -> getResponse.getBody() != null);
     }
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
index 06bb328..481e380 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
@@ -28,7 +28,6 @@ import java.time.Clock;
 import javax.mail.MessagingException;
 import javax.mail.internet.MimeMessage;
 
-import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
 import org.apache.james.blob.api.Store;
 import org.apache.james.blob.mail.MimeMessagePartsId;
 import org.apache.james.metrics.api.Metric;
@@ -54,11 +53,11 @@ class Enqueuer {
     private final MailQueueView mailQueueView;
     private final Clock clock;
 
-    Enqueuer(MailQueueName name, ReactorRabbitMQChannelPool reactorRabbitMQChannelPool, Store<MimeMessage, MimeMessagePartsId> mimeMessageStore,
+    Enqueuer(MailQueueName name, Sender sender, Store<MimeMessage, MimeMessagePartsId> mimeMessageStore,
              MailReferenceSerializer serializer, MetricFactory metricFactory,
              MailQueueView mailQueueView, Clock clock) {
         this.name = name;
-        this.sender = reactorRabbitMQChannelPool.getSender();
+        this.sender = sender;
         this.mimeMessageStore = mimeMessageStore;
         this.mailReferenceSerializer = serializer;
         this.mailQueueView = mailQueueView;
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
index 7c0325a..f771118 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
@@ -35,7 +35,7 @@ import java.util.function.Function;
 import javax.inject.Inject;
 import javax.mail.internet.MimeMessage;
 
-import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
+import org.apache.james.backends.rabbitmq.ReceiverProvider;
 import org.apache.james.blob.api.BlobId;
 import org.apache.james.blob.api.Store;
 import org.apache.james.blob.mail.MimeMessagePartsId;
@@ -55,13 +55,15 @@ import reactor.core.publisher.Flux;
 import reactor.rabbitmq.BindingSpecification;
 import reactor.rabbitmq.ExchangeSpecification;
 import reactor.rabbitmq.QueueSpecification;
+import reactor.rabbitmq.Sender;
 
 public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQueue> {
 
     @VisibleForTesting static class PrivateFactory {
         private final MetricFactory metricFactory;
         private final GaugeRegistry gaugeRegistry;
-        private final ReactorRabbitMQChannelPool reactorRabbitMQChannelPool;
+        private final ReceiverProvider receiverProvider;
+        private final Sender sender;
         private final Store<MimeMessage, MimeMessagePartsId> mimeMessageStore;
         private final MailReferenceSerializer mailReferenceSerializer;
         private final Function<MailReferenceDTO, MailWithEnqueueId> mailLoader;
@@ -73,8 +75,7 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
         @Inject
         @VisibleForTesting PrivateFactory(MetricFactory metricFactory,
                                           GaugeRegistry gaugeRegistry,
-                                          ReactorRabbitMQChannelPool reactorRabbitMQChannelPool,
-                                          MimeMessageStore.Factory mimeMessageStoreFactory,
+                                          Sender sender, ReceiverProvider receiverProvider, MimeMessageStore.Factory mimeMessageStoreFactory,
                                           BlobId.Factory blobIdFactory,
                                           MailQueueView.Factory mailQueueViewFactory,
                                           Clock clock,
@@ -82,7 +83,8 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
                                           RabbitMQMailQueueConfiguration configuration) {
             this.metricFactory = metricFactory;
             this.gaugeRegistry = gaugeRegistry;
-            this.reactorRabbitMQChannelPool = reactorRabbitMQChannelPool;
+            this.sender = sender;
+            this.receiverProvider = receiverProvider;
             this.mimeMessageStore = mimeMessageStoreFactory.mimeMessageStore();
             this.mailQueueViewFactory = mailQueueViewFactory;
             this.clock = clock;
@@ -99,9 +101,9 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
             RabbitMQMailQueue rabbitMQMailQueue = new RabbitMQMailQueue(
                 metricFactory,
                 mailQueueName,
-                new Enqueuer(mailQueueName, reactorRabbitMQChannelPool, mimeMessageStore, mailReferenceSerializer,
+                new Enqueuer(mailQueueName, sender, mimeMessageStore, mailReferenceSerializer,
                     metricFactory, mailQueueView, clock),
-                new Dequeuer(mailQueueName, reactorRabbitMQChannelPool, mailLoader, mailReferenceSerializer,
+                new Dequeuer(mailQueueName, receiverProvider, mailLoader, mailReferenceSerializer,
                     metricFactory, mailQueueView),
                 mailQueueView,
                 decoratorFactory);
@@ -137,14 +139,14 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
     private final RabbitMQMailQueueManagement mqManagementApi;
     private final PrivateFactory privateFactory;
     private final RabbitMQMailQueueObjectPool mailQueueObjectPool;
-    private final ReactorRabbitMQChannelPool reactorRabbitMQChannelPool;
+    private final Sender sender;
 
     @VisibleForTesting
     @Inject
-    RabbitMQMailQueueFactory(ReactorRabbitMQChannelPool reactorRabbitMQChannelPool,
+    RabbitMQMailQueueFactory(Sender sender,
                              RabbitMQMailQueueManagement mqManagementApi,
                              PrivateFactory privateFactory) {
-        this.reactorRabbitMQChannelPool = reactorRabbitMQChannelPool;
+        this.sender = sender;
         this.mqManagementApi = mqManagementApi;
         this.privateFactory = privateFactory;
         this.mailQueueObjectPool = new RabbitMQMailQueueObjectPool();
@@ -172,16 +174,15 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
     private RabbitMQMailQueue createQueueIntoRabbitServer(MailQueueName mailQueueName) {
         String exchangeName = mailQueueName.toRabbitExchangeName().asString();
         Flux.concat(
-            reactorRabbitMQChannelPool.getSender().declareExchange(ExchangeSpecification.exchange(exchangeName)
+            sender.declareExchange(ExchangeSpecification.exchange(exchangeName)
                 .durable(true)
                 .type("direct")),
-            reactorRabbitMQChannelPool.getSender().declareQueue(QueueSpecification.queue(mailQueueName.toWorkQueueName().asString())
+            sender.declareQueue(QueueSpecification.queue(mailQueueName.toWorkQueueName().asString())
                 .durable(DURABLE)
                 .exclusive(!EXCLUSIVE)
                 .autoDelete(!AUTO_DELETE)
                 .arguments(NO_ARGUMENTS)),
-            reactorRabbitMQChannelPool.getSender()
-                .bind(BindingSpecification.binding()
+            sender.bind(BindingSpecification.binding()
                 .exchange(mailQueueName.toRabbitExchangeName().asString())
                 .queue(mailQueueName.toWorkQueueName().asString())
                 .routingKey(EMPTY_ROUTING_KEY)))
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java
index e0b9910..35792e7 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java
@@ -118,14 +118,15 @@ class RabbitMQMailQueueConfigurationChangeTest {
         RabbitMQMailQueueFactory.PrivateFactory privateFactory = new RabbitMQMailQueueFactory.PrivateFactory(
             new RecordingMetricFactory(),
             new NoopGaugeRegistry(),
-            rabbitMQExtension.getRabbitChannelPool(),
+            rabbitMQExtension.getSender(),
+            rabbitMQExtension.getReceiverProvider(),
             mimeMessageStoreFactory,
             BLOB_ID_FACTORY,
             mailQueueViewFactory,
             clock,
             new RawMailQueueItemDecoratorFactory(),
             mailQueueSizeConfiguration);
-        RabbitMQMailQueueFactory mailQueueFactory = new RabbitMQMailQueueFactory(rabbitMQExtension.getRabbitChannelPool(), mqManagementApi, privateFactory);
+        RabbitMQMailQueueFactory mailQueueFactory = new RabbitMQMailQueueFactory(rabbitMQExtension.getSender(), mqManagementApi, privateFactory);
         return mailQueueFactory.createQueue(SPOOL);
     }
 
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
index cfad2b0..57783d2 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
@@ -336,7 +336,7 @@ class RabbitMQMailQueueTest {
         RabbitMQMailQueueFactory.PrivateFactory factory = new RabbitMQMailQueueFactory.PrivateFactory(
             metricTestSystem.getMetricFactory(),
             metricTestSystem.getSpyGaugeRegistry(),
-            rabbitMQExtension.getRabbitChannelPool(),
+            rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(),
             mimeMessageStoreFactory,
             BLOB_ID_FACTORY,
             mailQueueViewFactory,
@@ -344,7 +344,7 @@ class RabbitMQMailQueueTest {
             new RawMailQueueItemDecoratorFactory(),
             configuration);
         mqManagementApi = new RabbitMQMailQueueManagement(rabbitMQExtension.managementAPI());
-        mailQueueFactory = new RabbitMQMailQueueFactory(rabbitMQExtension.getRabbitChannelPool(), mqManagementApi, factory);
+        mailQueueFactory = new RabbitMQMailQueueFactory(rabbitMQExtension.getSender(), mqManagementApi, factory);
         mailQueue = mailQueueFactory.createQueue(SPOOL);
     }
 }
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
index 16ab46d..a717476 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
@@ -69,7 +69,8 @@ class RabbitMqMailQueueFactoryTest implements MailQueueFactoryContract<RabbitMQM
         RabbitMQMailQueueFactory.PrivateFactory privateFactory = new RabbitMQMailQueueFactory.PrivateFactory(
             new RecordingMetricFactory(),
             new NoopGaugeRegistry(),
-            rabbitMQExtension.getRabbitChannelPool(),
+            rabbitMQExtension.getSender(),
+            rabbitMQExtension.getReceiverProvider(),
             mimeMessageStoreFactory,
             BLOB_ID_FACTORY,
             mailQueueViewFactory,
@@ -77,7 +78,7 @@ class RabbitMqMailQueueFactoryTest implements MailQueueFactoryContract<RabbitMQM
             new RawMailQueueItemDecoratorFactory(),
             configuration);
         mqManagementApi = new RabbitMQMailQueueManagement(rabbitMQExtension.managementAPI());
-        mailQueueFactory = new RabbitMQMailQueueFactory(rabbitMQExtension.getRabbitChannelPool(), mqManagementApi, privateFactory);
+        mailQueueFactory = new RabbitMQMailQueueFactory(rabbitMQExtension.getSender(), mqManagementApi, privateFactory);
     }
 
     @AfterEach
diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
index a949af5..174101a 100644
--- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
+++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
@@ -28,7 +28,7 @@ import java.util.UUID;
 import javax.annotation.PreDestroy;
 import javax.inject.Inject;
 
-import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
+import org.apache.james.backends.rabbitmq.ReceiverProvider;
 import org.apache.james.eventsourcing.Event;
 import org.apache.james.eventsourcing.eventstore.cassandra.JsonEventSerializer;
 import org.apache.james.lifecycle.api.Startable;
@@ -54,31 +54,29 @@ import reactor.rabbitmq.Sender;
 
 public class RabbitMQTerminationSubscriber implements TerminationSubscriber, Startable, Closeable {
     private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQTerminationSubscriber.class);
-    private static final Integer MAX_CHANNELS_NUMBER = 1;
     private static final String EXCHANGE_NAME = "terminationSubscriberExchange";
     private static final String QUEUE_NAME_PREFIX = "terminationSubscriber";
     private static final String ROUTING_KEY = "terminationSubscriberRoutingKey";
 
     private final JsonEventSerializer serializer;
-    private final ReactorRabbitMQChannelPool channelPool;
+    private final Sender sender;
+    private final ReceiverProvider receiverProvider;
     private final String queueName;
     private UnicastProcessor<OutboundMessage> sendQueue;
     private DirectProcessor<Event> listener;
     private Disposable sendQueueHandle;
     private Disposable listenQueueHandle;
     private Receiver listenerReceiver;
-    private Sender sender;
 
     @Inject
-    RabbitMQTerminationSubscriber(ReactorRabbitMQChannelPool channelPool, JsonEventSerializer serializer) {
+    RabbitMQTerminationSubscriber(Sender sender, ReceiverProvider receiverProvider, JsonEventSerializer serializer) {
+        this.sender = sender;
+        this.receiverProvider = receiverProvider;
         this.serializer = serializer;
-        this.channelPool = channelPool;
         this.queueName = QUEUE_NAME_PREFIX + UUID.randomUUID().toString();
     }
 
     public void start() {
-        sender = channelPool.getSender();
-
         sender.declareExchange(ExchangeSpecification.exchange(EXCHANGE_NAME)).block();
         sender.declare(QueueSpecification.queue(queueName).durable(false).autoDelete(true)).block();
         sender.bind(BindingSpecification.binding(EXCHANGE_NAME, ROUTING_KEY, queueName)).block();
@@ -88,7 +86,7 @@ public class RabbitMQTerminationSubscriber implements TerminationSubscriber, Sta
             .subscribeOn(Schedulers.elastic())
             .subscribe();
 
-        listenerReceiver = channelPool.createReceiver();
+        listenerReceiver = receiverProvider.createReceiver();
         listener = DirectProcessor.create();
         listenQueueHandle = listenerReceiver
             .consumeAutoAck(queueName)
@@ -132,6 +130,5 @@ public class RabbitMQTerminationSubscriber implements TerminationSubscriber, Sta
         Optional.ofNullable(sendQueueHandle).ifPresent(Disposable::dispose);
         Optional.ofNullable(listenQueueHandle).ifPresent(Disposable::dispose);
         Optional.ofNullable(listenerReceiver).ifPresent(Receiver::close);
-        Optional.ofNullable(sender).ifPresent(Sender::close);
     }
 }
diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
index 93208fe..de68b24 100644
--- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
+++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
@@ -28,7 +28,7 @@ import java.util.Optional;
 import java.util.UUID;
 
 import org.apache.james.backends.rabbitmq.Constants;
-import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
+import org.apache.james.backends.rabbitmq.ReceiverProvider;
 import org.apache.james.server.task.json.JsonTaskSerializer;
 import org.apache.james.task.Task;
 import org.apache.james.task.TaskId;
@@ -55,7 +55,6 @@ import reactor.rabbitmq.ExchangeSpecification;
 import reactor.rabbitmq.OutboundMessage;
 import reactor.rabbitmq.QueueSpecification;
 import reactor.rabbitmq.Receiver;
-import reactor.rabbitmq.ReceiverOptions;
 import reactor.rabbitmq.Sender;
 
 public class RabbitMQWorkQueue implements WorkQueue {
@@ -75,19 +74,20 @@ public class RabbitMQWorkQueue implements WorkQueue {
     public static final Duration FOREVER = Duration.ofMillis(Long.MAX_VALUE);
 
     private final TaskManagerWorker worker;
-    private final ReactorRabbitMQChannelPool channelPool;
     private final JsonTaskSerializer taskSerializer;
+    private final Sender sender;
+    private final ReceiverProvider receiverProvider;
     private Receiver receiver;
     private UnicastProcessor<TaskId> sendCancelRequestsQueue;
     private Disposable sendCancelRequestsQueueHandle;
     private Disposable receiverHandle;
     private Disposable cancelRequestListenerHandle;
-    private Sender cancelRequestSender;
     private Receiver cancelRequestListener;
 
-    public RabbitMQWorkQueue(TaskManagerWorker worker, ReactorRabbitMQChannelPool reactorRabbitMQChannelPool, JsonTaskSerializer taskSerializer) {
+    public RabbitMQWorkQueue(TaskManagerWorker worker, Sender sender, ReceiverProvider receiverProvider, JsonTaskSerializer taskSerializer) {
         this.worker = worker;
-        this.channelPool = reactorRabbitMQChannelPool;
+        this.receiverProvider = receiverProvider;
+        this.sender = sender;
         this.taskSerializer = taskSerializer;
     }
 
@@ -104,13 +104,13 @@ public class RabbitMQWorkQueue implements WorkQueue {
 
     @VisibleForTesting
     void declareQueue() {
-        Mono<AMQP.Exchange.DeclareOk> declareExchange = channelPool.getSender()
+        Mono<AMQP.Exchange.DeclareOk> declareExchange = sender
             .declareExchange(ExchangeSpecification.exchange(EXCHANGE_NAME))
             .retryBackoff(NUM_RETRIES, FIRST_BACKOFF, FOREVER);
-        Mono<AMQP.Queue.DeclareOk> declareQueue = channelPool.getSender()
+        Mono<AMQP.Queue.DeclareOk> declareQueue = sender
             .declare(QueueSpecification.queue(QUEUE_NAME).durable(true).arguments(Constants.WITH_SINGLE_ACTIVE_CONSUMER))
             .retryBackoff(NUM_RETRIES, FIRST_BACKOFF, FOREVER);
-        Mono<AMQP.Queue.BindOk> bindQueueToExchange = channelPool.getSender()
+        Mono<AMQP.Queue.BindOk> bindQueueToExchange = sender
             .bind(BindingSpecification.binding(EXCHANGE_NAME, ROUTING_KEY, QUEUE_NAME))
             .retryBackoff(NUM_RETRIES, FIRST_BACKOFF, FOREVER);
 
@@ -121,7 +121,7 @@ public class RabbitMQWorkQueue implements WorkQueue {
     }
 
     private void consumeWorkqueue() {
-        receiver = new Receiver(new ReceiverOptions().connectionMono(channelPool.getConnectionMono()));
+        receiver = receiverProvider.createReceiver();
         receiverHandle = receiver.consumeManualAck(QUEUE_NAME, new ConsumeOptions())
             .subscribeOn(Schedulers.elastic())
             .concatMap(this::executeTask)
@@ -158,24 +158,23 @@ public class RabbitMQWorkQueue implements WorkQueue {
             .onErrorResume(error -> Mono.empty());
     }
 
-    void listenToCancelRequests() {
-        cancelRequestSender = channelPool.getSender();
+    private void listenToCancelRequests() {
         String queueName = CANCEL_REQUESTS_QUEUE_NAME_PREFIX + UUID.randomUUID().toString();
 
-        cancelRequestSender.declareExchange(ExchangeSpecification.exchange(CANCEL_REQUESTS_EXCHANGE_NAME)).block();
-        cancelRequestSender.declare(QueueSpecification.queue(queueName).durable(false).autoDelete(true)).block();
-        cancelRequestSender.bind(BindingSpecification.binding(CANCEL_REQUESTS_EXCHANGE_NAME, CANCEL_REQUESTS_ROUTING_KEY, queueName)).block();
+        sender.declareExchange(ExchangeSpecification.exchange(CANCEL_REQUESTS_EXCHANGE_NAME)).block();
+        sender.declare(QueueSpecification.queue(queueName).durable(false).autoDelete(true)).block();
+        sender.bind(BindingSpecification.binding(CANCEL_REQUESTS_EXCHANGE_NAME, CANCEL_REQUESTS_ROUTING_KEY, queueName)).block();
         registerCancelRequestsListener(queueName);
 
         sendCancelRequestsQueue = UnicastProcessor.create();
-        sendCancelRequestsQueueHandle = cancelRequestSender
+        sendCancelRequestsQueueHandle = sender
             .send(sendCancelRequestsQueue.map(this::makeCancelRequestMessage))
             .subscribeOn(Schedulers.elastic())
             .subscribe();
     }
 
     private void registerCancelRequestsListener(String queueName) {
-        cancelRequestListener = channelPool.createReceiver();
+        cancelRequestListener = receiverProvider.createReceiver();
         cancelRequestListenerHandle = cancelRequestListener
             .consumeAutoAck(queueName)
             .subscribeOn(Schedulers.elastic())
@@ -207,7 +206,7 @@ public class RabbitMQWorkQueue implements WorkQueue {
                 .build();
 
             OutboundMessage outboundMessage = new OutboundMessage(EXCHANGE_NAME, ROUTING_KEY, basicProperties, payload);
-            channelPool.getSender().send(Mono.just(outboundMessage)).block();
+            sender.send(Mono.just(outboundMessage)).block();
         } catch (JsonProcessingException e) {
             throw new RuntimeException(e);
         }
diff --git a/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala b/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala
index 7feed07..7ddfc5e 100644
--- a/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala
+++ b/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala
@@ -1,4 +1,4 @@
-/** **************************************************************
+/****************************************************************
  * Licensed to the Apache Software Foundation (ASF) under one   *
  * or more contributor license agreements.  See the NOTICE file *
  * distributed with this work for additional information        *
@@ -6,30 +6,34 @@
  * to you under the Apache License, Version 2.0 (the            *
  * "License"); you may not use this file except in compliance   *
  * with the License.  You may obtain a copy of the License at   *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0                 *
- * *
+ *                                                              *
+ * http://www.apache.org/licenses/LICENSE-2.0                   *
+ *                                                              *
  * Unless required by applicable law or agreed to in writing,   *
  * software distributed under the License is distributed on an  *
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
  * KIND, either express or implied.  See the License for the    *
  * specific language governing permissions and limitations      *
  * under the License.                                           *
- * ***************************************************************/
+ ****************************************************************/
 package org.apache.james.task.eventsourcing.distributed
 
 import java.time.Duration
 
-import com.google.common.annotations.VisibleForTesting
 import javax.inject.Inject
-import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool
+
+import org.apache.james.backends.rabbitmq.ReceiverProvider
 import org.apache.james.eventsourcing.EventSourcingSystem
 import org.apache.james.server.task.json.JsonTaskSerializer
 import org.apache.james.task.SerialTaskManagerWorker
 import org.apache.james.task.eventsourcing.{WorkQueueSupplier, WorkerStatusListener}
 
-class RabbitMQWorkQueueSupplier @Inject()(private val rabbitMQConnectionPool: ReactorRabbitMQChannelPool,
-                                private val jsonTaskSerializer: JsonTaskSerializer) extends WorkQueueSupplier {
+import com.google.common.annotations.VisibleForTesting
+import reactor.rabbitmq.Sender
+
+class RabbitMQWorkQueueSupplier @Inject()(private val sender: Sender,
+                                          private val receiverProvider: ReceiverProvider,
+                                          private val jsonTaskSerializer: JsonTaskSerializer) extends WorkQueueSupplier {
 
   val DEFAULT_ADDITIONAL_INFORMATION_POLLING_INTERVAL =  Duration.ofSeconds(30)
   override def apply(eventSourcingSystem: EventSourcingSystem): RabbitMQWorkQueue = {
@@ -40,7 +44,7 @@ class RabbitMQWorkQueueSupplier @Inject()(private val rabbitMQConnectionPool: Re
   def apply(eventSourcingSystem: EventSourcingSystem, additionalInformationPollingInterval: Duration): RabbitMQWorkQueue = {
     val listener = WorkerStatusListener(eventSourcingSystem)
     val worker = new SerialTaskManagerWorker(listener, additionalInformationPollingInterval)
-    val rabbitMQWorkQueue = new RabbitMQWorkQueue(worker, rabbitMQConnectionPool, jsonTaskSerializer)
+    val rabbitMQWorkQueue = new RabbitMQWorkQueue(worker, sender, receiverProvider, jsonTaskSerializer)
     rabbitMQWorkQueue
   }
 }
diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
index 295d27c..0b1ab59 100644
--- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
+++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
@@ -34,7 +34,7 @@ import org.apache.james.backends.cassandra.components.CassandraModule;
 import org.apache.james.backends.cassandra.init.CassandraZonedDateTimeModule;
 import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
 import org.apache.james.backends.rabbitmq.RabbitMQExtension;
-import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
+import org.apache.james.backends.rabbitmq.ReceiverProvider;
 import org.apache.james.eventsourcing.Event;
 import org.apache.james.eventsourcing.EventSourcingSystem;
 import org.apache.james.eventsourcing.eventstore.EventStore;
@@ -80,6 +80,7 @@ import org.junit.jupiter.api.extension.RegisterExtension;
 import com.google.common.collect.ImmutableBiMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
+import reactor.rabbitmq.Sender;
 
 class DistributedTaskManagerTest implements TaskManagerContract {
 
@@ -87,9 +88,9 @@ class DistributedTaskManagerTest implements TaskManagerContract {
         private final List<RabbitMQWorkQueue> workQueues;
         private final RabbitMQWorkQueueSupplier supplier;
 
-        TrackedRabbitMQWorkQueueSupplier(ReactorRabbitMQChannelPool rabbitConnectionPool, JsonTaskSerializer taskSerializer) {
+        TrackedRabbitMQWorkQueueSupplier(Sender sender, ReceiverProvider receiverProvider, JsonTaskSerializer taskSerializer) {
             workQueues = new ArrayList<>();
-            supplier = new RabbitMQWorkQueueSupplier(rabbitConnectionPool, taskSerializer);
+            supplier = new RabbitMQWorkQueueSupplier(sender, receiverProvider, taskSerializer);
         }
 
         @Override
@@ -114,7 +115,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
     static final Hostname HOSTNAME_2 = new Hostname("bar");
 
     @RegisterExtension
-    static final RabbitMQExtension RABBIT_MQ_EXTENSION = RabbitMQExtension.singletonRabbitMQ();
+    static final RabbitMQExtension rabbitMQExtension = RabbitMQExtension.singletonRabbitMQ();
 
 
     @RegisterExtension
@@ -165,7 +166,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
         CassandraCluster cassandra = CASSANDRA_CLUSTER.getCassandraCluster();
         CassandraTaskExecutionDetailsProjectionDAO projectionDAO = new CassandraTaskExecutionDetailsProjectionDAO(cassandra.getConf(), cassandra.getTypesProvider(), JSON_TASK_ADDITIONAL_INFORMATION_SERIALIZER);
         this.executionDetailsProjection = new CassandraTaskExecutionDetailsProjection(projectionDAO);
-        this.workQueueSupplier = new TrackedRabbitMQWorkQueueSupplier(RABBIT_MQ_EXTENSION.getRabbitChannelPool(), taskSerializer);
+        this.workQueueSupplier = new TrackedRabbitMQWorkQueueSupplier(rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), taskSerializer);
         this.eventStore = eventStore;
         this.terminationSubscribers = new ArrayList<>();
         this.eventSerializer = JsonEventSerializer.forModules(eventDtoModule).withoutNestedType();
@@ -182,7 +183,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
     }
 
     EventSourcingTaskManager taskManager(Hostname hostname) {
-        RabbitMQTerminationSubscriber terminationSubscriber = new RabbitMQTerminationSubscriber(RABBIT_MQ_EXTENSION.getRabbitChannelPool(), eventSerializer);
+        RabbitMQTerminationSubscriber terminationSubscriber = new RabbitMQTerminationSubscriber(rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), eventSerializer);
         terminationSubscribers.add(terminationSubscriber);
         terminationSubscriber.start();
         return new EventSourcingTaskManager(workQueueSupplier, eventStore, executionDetailsProjection, hostname, terminationSubscriber);
diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java
index 3b7fa9b..77cfeb7 100644
--- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java
+++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java
@@ -52,7 +52,7 @@ class RabbitMQTerminationSubscriberTest implements TerminationSubscriberContract
 
     @Override
     public TerminationSubscriber subscriber() {
-        RabbitMQTerminationSubscriber subscriber = new RabbitMQTerminationSubscriber(rabbitMQExtension.getRabbitChannelPool(), SERIALIZER);
+        RabbitMQTerminationSubscriber subscriber = new RabbitMQTerminationSubscriber(rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), SERIALIZER);
         subscriber.start();
         return subscriber;
     }
diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueuePersistenceTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueuePersistenceTest.java
index 298b45d..da91395 100644
--- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueuePersistenceTest.java
+++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueuePersistenceTest.java
@@ -52,7 +52,7 @@ class RabbitMQWorkQueuePersistenceTest {
     void setUp() {
         worker = spy(new ImmediateWorker());
         serializer = JsonTaskSerializer.of(TestTaskDTOModules.COMPLETED_TASK_MODULE, TestTaskDTOModules.MEMORY_REFERENCE_TASK_MODULE.apply(new MemoryReferenceTaskStore()));
-        testee = new RabbitMQWorkQueue(worker, rabbitMQExtension.getRabbitChannelPool(), serializer);
+        testee = new RabbitMQWorkQueue(worker, rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), serializer);
         //declare the queue but do not start consuming from it
         testee.declareQueue();
     }
@@ -91,7 +91,7 @@ class RabbitMQWorkQueuePersistenceTest {
 
     private void startNewConsumingWorkqueue() {
         worker = spy(new ImmediateWorker());
-        testee = new RabbitMQWorkQueue(worker, rabbitMQExtension.getRabbitChannelPool(), serializer);
+        testee = new RabbitMQWorkQueue(worker, rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), serializer);
         testee.start();
     }
 }
diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java
index bf151e6..4eb0323 100644
--- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java
+++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java
@@ -69,7 +69,7 @@ class RabbitMQWorkQueueTest {
     void setUp() {
         worker = spy(new ImmediateWorker());
         serializer = JsonTaskSerializer.of(TestTaskDTOModules.COMPLETED_TASK_MODULE, TestTaskDTOModules.MEMORY_REFERENCE_TASK_MODULE.apply(new MemoryReferenceTaskStore()));
-        testee = new RabbitMQWorkQueue(worker, rabbitMQExtension.getRabbitChannelPool(), serializer);
+        testee = new RabbitMQWorkQueue(worker, rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), serializer);
         testee.start();
     }
 
@@ -100,7 +100,7 @@ class RabbitMQWorkQueueTest {
         testee.submit(TASK_WITH_ID);
 
         ImmediateWorker otherTaskManagerWorker = new ImmediateWorker();
-        try (RabbitMQWorkQueue otherWorkQueue = new RabbitMQWorkQueue(otherTaskManagerWorker, rabbitMQExtension.getRabbitChannelPool(), serializer)) {
+        try (RabbitMQWorkQueue otherWorkQueue = new RabbitMQWorkQueue(otherTaskManagerWorker, rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), serializer)) {
             otherWorkQueue.start();
 
             IntStream.range(0, 9)
@@ -119,7 +119,7 @@ class RabbitMQWorkQueueTest {
 
         ImmediateWorker otherTaskManagerWorker = new ImmediateWorker();
         JsonTaskSerializer otherTaskSerializer = JsonTaskSerializer.of(TestTaskDTOModules.TEST_TYPE);
-        try (RabbitMQWorkQueue otherWorkQueue = new RabbitMQWorkQueue(otherTaskManagerWorker, rabbitMQExtension.getRabbitChannelPool(), otherTaskSerializer)) {
+        try (RabbitMQWorkQueue otherWorkQueue = new RabbitMQWorkQueue(otherTaskManagerWorker, rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), otherTaskSerializer)) {
             //wait to be sur that the first workqueue has subscribed as an exclusive consumer of the RabbitMQ queue.
             Thread.sleep(200);
             otherWorkQueue.start();


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org