You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/11/04 11:23:37 UTC

[james-project] 01/30: JAMES-2937 RabbitMQMailQueue should rely on ReactorRabbitMQChannelPool

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 4b9135ae8e81cc707de1e910bf1de6d7d895dfa1
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Oct 4 11:52:59 2019 +0700

    JAMES-2937 RabbitMQMailQueue should rely on ReactorRabbitMQChannelPool
---
 .../james/modules/rabbitmq/RabbitMQModule.java     |  24 +++++
 .../org/apache/james/queue/rabbitmq/Dequeuer.java  |  12 ++-
 .../org/apache/james/queue/rabbitmq/Enqueuer.java  |  35 ++++---
 .../apache/james/queue/rabbitmq/RabbitClient.java  |  72 -------------
 .../queue/rabbitmq/RabbitMQMailQueueFactory.java   |  69 +++++++++++--
 .../RabbitMQMailQueueConfigurationChangeTest.java  |  16 ++-
 .../queue/rabbitmq/RabbitMQMailQueueTest.java      | 114 ++++++++++-----------
 .../rabbitmq/RabbitMqMailQueueFactoryTest.java     |  17 ++-
 8 files changed, 191 insertions(+), 168 deletions(-)

diff --git a/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java b/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java
index bdb9f70..5eb49df 100644
--- a/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java
+++ b/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java
@@ -32,6 +32,7 @@ import org.apache.james.backends.rabbitmq.RabbitMQHealthCheck;
 import org.apache.james.backends.rabbitmq.SimpleChannelPool;
 import org.apache.james.core.healthcheck.HealthCheck;
 import org.apache.james.eventsourcing.eventstore.cassandra.dto.EventDTOModule;
+import org.apache.james.lifecycle.api.Startable;
 import org.apache.james.queue.api.MailQueueFactory;
 import org.apache.james.queue.api.ManageableMailQueue;
 import org.apache.james.queue.rabbitmq.RabbitMQMailQueue;
@@ -49,11 +50,13 @@ import org.apache.james.queue.rabbitmq.view.cassandra.EnqueuedMailsDAO;
 import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMailQueueViewConfiguration;
 import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMailQueueViewConfigurationModule;
 import org.apache.james.queue.rabbitmq.view.cassandra.configuration.EventsourcingConfigurationManagement;
+import org.apache.james.utils.InitialisationOperation;
 import org.apache.james.utils.PropertiesProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.inject.AbstractModule;
+import com.google.inject.Inject;
 import com.google.inject.Provides;
 import com.google.inject.Scopes;
 import com.google.inject.TypeLiteral;
@@ -85,6 +88,7 @@ public class RabbitMQModule extends AbstractModule {
         eventDTOModuleBinder.addBinding().toInstance(CassandraMailQueueViewConfigurationModule.MAIL_QUEUE_VIEW_CONFIGURATION);
 
         Multibinder.newSetBinder(binder(), HealthCheck.class).addBinding().to(RabbitMQHealthCheck.class);
+        Multibinder.newSetBinder(binder(), InitialisationOperation.class).addBinding().to(RabbitMQMailQueueFactoryInitialisationOperation.class);
     }
 
     @Provides
@@ -140,4 +144,24 @@ public class RabbitMQModule extends AbstractModule {
     private RabbitMQMailQueueConfiguration getMailQueueSizeConfiguration(@Named(RABBITMQ_CONFIGURATION_NAME) Configuration configuration) {
         return RabbitMQMailQueueConfiguration.from(configuration);
     }
+
+    @Singleton
+    public static class RabbitMQMailQueueFactoryInitialisationOperation implements InitialisationOperation {
+        private final RabbitMQMailQueueFactory rabbitMQMailQueueFactory;
+
+        @Inject
+        public RabbitMQMailQueueFactoryInitialisationOperation(RabbitMQMailQueueFactory rabbitMQMailQueueFactory) {
+            this.rabbitMQMailQueueFactory = rabbitMQMailQueueFactory;
+        }
+
+        @Override
+        public void initModule() {
+            rabbitMQMailQueueFactory.start();
+        }
+
+        @Override
+        public Class<? extends Startable> forClass() {
+            return RabbitMQMailQueueFactory.class;
+        }
+    }
 }
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
index ae9829e..a6de706 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
@@ -33,15 +33,19 @@ import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
 import org.apache.mailet.Mail;
 
 import com.github.fge.lambdas.consumers.ThrowingConsumer;
+import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.Delivery;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.rabbitmq.AcknowledgableDelivery;
+import reactor.rabbitmq.ConsumeOptions;
+import reactor.rabbitmq.RabbitFlux;
+import reactor.rabbitmq.ReceiverOptions;
 
 class Dequeuer {
-
     private static final boolean REQUEUE = true;
+    private static final int EXECUTION_RATE = 5;
     private final Flux<AcknowledgableDelivery> flux;
 
     private static class RabbitMQMailQueueItem implements MailQueue.MailQueueItem {
@@ -75,15 +79,15 @@ class Dequeuer {
     private final MailReferenceSerializer mailReferenceSerializer;
     private final MailQueueView mailQueueView;
 
-    Dequeuer(MailQueueName name, RabbitClient rabbitClient, Function<MailReferenceDTO, MailWithEnqueueId> mailLoader,
+    Dequeuer(MailQueueName name, Mono<Connection> connectionMono, Function<MailReferenceDTO, MailWithEnqueueId> mailLoader,
              MailReferenceSerializer serializer, MetricFactory metricFactory,
              MailQueueView mailQueueView) {
         this.mailLoader = mailLoader;
         this.mailReferenceSerializer = serializer;
         this.mailQueueView = mailQueueView;
         this.dequeueMetric = metricFactory.generate(DEQUEUED_METRIC_NAME_PREFIX + name.asString());
-        this.flux = rabbitClient
-            .receive(name)
+        this.flux = RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(connectionMono))
+            .consumeManualAck(name.toWorkQueueName().asString(), new ConsumeOptions().qos(EXECUTION_RATE))
             .filter(getResponse -> getResponse.getBody() != null);
     }
 
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
index 776a739..ea7d04a 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
@@ -19,6 +19,7 @@
 
 package org.apache.james.queue.rabbitmq;
 
+import static org.apache.james.backends.rabbitmq.Constants.EMPTY_ROUTING_KEY;
 import static org.apache.james.queue.api.MailQueue.ENQUEUED_METRIC_NAME_PREFIX;
 
 import java.time.Clock;
@@ -38,21 +39,23 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.github.fge.lambdas.Throwing;
 
 import reactor.core.publisher.Mono;
+import reactor.rabbitmq.OutboundMessage;
+import reactor.rabbitmq.Sender;
 
 class Enqueuer {
     private final MailQueueName name;
-    private final RabbitClient rabbitClient;
+    private final Sender sender;
     private final Store<MimeMessage, MimeMessagePartsId> mimeMessageStore;
     private final MailReferenceSerializer mailReferenceSerializer;
     private final Metric enqueueMetric;
     private final MailQueueView mailQueueView;
     private final Clock clock;
 
-    Enqueuer(MailQueueName name, RabbitClient rabbitClient, Store<MimeMessage, MimeMessagePartsId> mimeMessageStore,
+    Enqueuer(MailQueueName name, Sender sender, Store<MimeMessage, MimeMessagePartsId> mimeMessageStore,
              MailReferenceSerializer serializer, MetricFactory metricFactory,
              MailQueueView mailQueueView, Clock clock) {
         this.name = name;
-        this.rabbitClient = rabbitClient;
+        this.sender = sender;
         this.mimeMessageStore = mimeMessageStore;
         this.mailReferenceSerializer = serializer;
         this.mailQueueView = mailQueueView;
@@ -64,7 +67,7 @@ class Enqueuer {
         EnqueueId enqueueId = EnqueueId.generate();
         saveMail(mail)
             .map(partIds -> new MailReference(enqueueId, mail, partIds))
-            .map(Throwing.function(this::publishReferenceToRabbit).sneakyThrow())
+            .flatMap(Throwing.function(this::publishReferenceToRabbit).sneakyThrow())
             .flatMap(mailQueueView::storeMail)
             .thenEmpty(Mono.fromRunnable(enqueueMetric::increment))
             .block();
@@ -78,16 +81,20 @@ class Enqueuer {
         }
     }
 
-    private EnqueuedItem publishReferenceToRabbit(MailReference mailReference) throws MailQueue.MailQueueException {
-        rabbitClient.publish(name, getMailReferenceBytes(mailReference));
-
-        return EnqueuedItem.builder()
-            .enqueueId(mailReference.getEnqueueId())
-            .mailQueueName(name)
-            .mail(mailReference.getMail())
-            .enqueuedTime(clock.instant())
-            .mimeMessagePartsId(mailReference.getPartsId())
-            .build();
+    private Mono<EnqueuedItem> publishReferenceToRabbit(MailReference mailReference) throws MailQueue.MailQueueException {
+        OutboundMessage data = new OutboundMessage(
+            name.toRabbitExchangeName().asString(),
+            EMPTY_ROUTING_KEY,
+            getMailReferenceBytes(mailReference));
+        return sender.send(Mono.just(data))
+            .then(Mono.just(
+                EnqueuedItem.builder()
+                    .enqueueId(mailReference.getEnqueueId())
+                    .mailQueueName(name)
+                    .mail(mailReference.getMail())
+                    .enqueuedTime(clock.instant())
+                    .mimeMessagePartsId(mailReference.getPartsId())
+                    .build()));
     }
 
     private byte[] getMailReferenceBytes(MailReference mailReference) throws MailQueue.MailQueueException {
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitClient.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitClient.java
deleted file mode 100644
index 779e71e..0000000
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitClient.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-
-package org.apache.james.queue.rabbitmq;
-
-import static org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE;
-import static org.apache.james.backends.rabbitmq.Constants.DURABLE;
-import static org.apache.james.backends.rabbitmq.Constants.EMPTY_ROUTING_KEY;
-import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE;
-import static org.apache.james.backends.rabbitmq.Constants.NO_ARGUMENTS;
-
-import java.io.IOException;
-
-import javax.inject.Inject;
-
-import org.apache.james.backends.rabbitmq.RabbitMQChannelPool;
-import org.apache.james.queue.api.MailQueue;
-
-import com.rabbitmq.client.AMQP;
-import reactor.core.publisher.Flux;
-import reactor.rabbitmq.AcknowledgableDelivery;
-
-class RabbitClient {
-    private final RabbitMQChannelPool channelPool;
-
-    @Inject
-    RabbitClient(RabbitMQChannelPool channelPool) {
-        this.channelPool = channelPool;
-    }
-
-    void attemptQueueCreation(MailQueueName name) {
-        channelPool.execute(channel -> {
-            try {
-                channel.exchangeDeclare(name.toRabbitExchangeName().asString(), "direct", DURABLE);
-                channel.queueDeclare(name.toWorkQueueName().asString(), DURABLE, !EXCLUSIVE, !AUTO_DELETE, NO_ARGUMENTS);
-                channel.queueBind(name.toWorkQueueName().asString(), name.toRabbitExchangeName().asString(), EMPTY_ROUTING_KEY);
-            } catch (IOException e) {
-                throw new RuntimeException(e);
-            }
-        });
-    }
-
-    void publish(MailQueueName name, byte[] message) throws MailQueue.MailQueueException {
-        channelPool.execute(channel -> {
-            try {
-                channel.basicPublish(name.toRabbitExchangeName().asString(), EMPTY_ROUTING_KEY, new AMQP.BasicProperties(), message);
-            } catch (IOException e) {
-                throw new MailQueue.MailQueueException("Unable to publish to RabbitMQ", e);
-            }
-        });
-    }
-
-    Flux<AcknowledgableDelivery> receive(MailQueueName name) {
-        return channelPool.receive(name.toWorkQueueName().asString());
-    }
-}
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
index 0e1bf95..b0748c0 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
@@ -19,6 +19,11 @@
 
 package org.apache.james.queue.rabbitmq;
 
+import static org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE;
+import static org.apache.james.backends.rabbitmq.Constants.DURABLE;
+import static org.apache.james.backends.rabbitmq.Constants.EMPTY_ROUTING_KEY;
+import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE;
+import static org.apache.james.backends.rabbitmq.Constants.NO_ARGUMENTS;
 import static org.apache.james.queue.api.MailQueue.QUEUE_SIZE_METRIC_NAME_PREFIX;
 
 import java.time.Clock;
@@ -27,13 +32,17 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Function;
 
+import javax.annotation.PreDestroy;
 import javax.inject.Inject;
 import javax.mail.internet.MimeMessage;
 
+import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
+import org.apache.james.backends.rabbitmq.SimpleConnectionPool;
 import org.apache.james.blob.api.BlobId;
 import org.apache.james.blob.api.Store;
 import org.apache.james.blob.mail.MimeMessagePartsId;
 import org.apache.james.blob.mail.MimeMessageStore;
+import org.apache.james.lifecycle.api.Startable;
 import org.apache.james.metrics.api.GaugeRegistry;
 import org.apache.james.metrics.api.MetricFactory;
 import org.apache.james.queue.api.MailQueueFactory;
@@ -44,13 +53,23 @@ import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
 import com.github.fge.lambdas.Throwing;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableSet;
+import com.rabbitmq.client.Connection;
 
-public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQueue> {
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.rabbitmq.BindingSpecification;
+import reactor.rabbitmq.ExchangeSpecification;
+import reactor.rabbitmq.QueueSpecification;
+import reactor.rabbitmq.Sender;
+
+public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQueue>, Startable {
+    private static final int MAX_CHANNELS_NUMBER = 5;
 
     @VisibleForTesting static class PrivateFactory {
         private final MetricFactory metricFactory;
         private final GaugeRegistry gaugeRegistry;
-        private final RabbitClient rabbitClient;
+        private final Mono<Connection> connectionMono;
+        private final Sender sender;
         private final Store<MimeMessage, MimeMessagePartsId> mimeMessageStore;
         private final MailReferenceSerializer mailReferenceSerializer;
         private final Function<MailReferenceDTO, MailWithEnqueueId> mailLoader;
@@ -62,8 +81,7 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
         @Inject
         @VisibleForTesting PrivateFactory(MetricFactory metricFactory,
                                           GaugeRegistry gaugeRegistry,
-                                          RabbitClient rabbitClient,
-                                          MimeMessageStore.Factory mimeMessageStoreFactory,
+                                          Mono<Connection> connectionMono, Sender sender, MimeMessageStore.Factory mimeMessageStoreFactory,
                                           BlobId.Factory blobIdFactory,
                                           MailQueueView.Factory mailQueueViewFactory,
                                           Clock clock,
@@ -71,7 +89,8 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
                                           RabbitMQMailQueueConfiguration configuration) {
             this.metricFactory = metricFactory;
             this.gaugeRegistry = gaugeRegistry;
-            this.rabbitClient = rabbitClient;
+            this.connectionMono = connectionMono;
+            this.sender = sender;
             this.mimeMessageStore = mimeMessageStoreFactory.mimeMessageStore();
             this.mailQueueViewFactory = mailQueueViewFactory;
             this.clock = clock;
@@ -88,9 +107,9 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
             RabbitMQMailQueue rabbitMQMailQueue = new RabbitMQMailQueue(
                 metricFactory,
                 mailQueueName,
-                new Enqueuer(mailQueueName, rabbitClient, mimeMessageStore, mailReferenceSerializer,
+                new Enqueuer(mailQueueName, sender, mimeMessageStore, mailReferenceSerializer,
                     metricFactory, mailQueueView, clock),
-                new Dequeuer(mailQueueName, rabbitClient, mailLoader, mailReferenceSerializer,
+                new Dequeuer(mailQueueName, connectionMono, mailLoader, mailReferenceSerializer,
                     metricFactory, mailQueueView),
                 mailQueueView,
                 decoratorFactory);
@@ -123,22 +142,29 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
         }
     }
 
-    private final RabbitClient rabbitClient;
     private final RabbitMQMailQueueManagement mqManagementApi;
     private final PrivateFactory privateFactory;
     private final RabbitMQMailQueueObjectPool mailQueueObjectPool;
+    private final Mono<Connection> connectionMono;
+    private ReactorRabbitMQChannelPool reactorRabbitMQChannelPool;
+    private Sender sender;
 
     @VisibleForTesting
     @Inject
-    RabbitMQMailQueueFactory(RabbitClient rabbitClient,
+    RabbitMQMailQueueFactory(SimpleConnectionPool simpleConnectionPool,
                              RabbitMQMailQueueManagement mqManagementApi,
                              PrivateFactory privateFactory) {
-        this.rabbitClient = rabbitClient;
+        this.connectionMono = simpleConnectionPool.getResilientConnection();
         this.mqManagementApi = mqManagementApi;
         this.privateFactory = privateFactory;
         this.mailQueueObjectPool = new RabbitMQMailQueueObjectPool();
     }
 
+    public void start() {
+        this.reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool(connectionMono, MAX_CHANNELS_NUMBER);
+        this.sender = reactorRabbitMQChannelPool.createSender();
+    }
+
     @Override
     public Optional<RabbitMQMailQueue> getQueue(String name) {
         return getQueueFromRabbitServer(MailQueueName.fromString(name));
@@ -159,7 +185,22 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
     }
 
     private RabbitMQMailQueue createQueueIntoRabbitServer(MailQueueName mailQueueName) {
-        rabbitClient.attemptQueueCreation(mailQueueName);
+        String exchangeName = mailQueueName.toRabbitExchangeName().asString();
+        Flux.concat(
+            sender.declareExchange(ExchangeSpecification.exchange(exchangeName)
+                .durable(true)
+                .type("direct")),
+            sender.declareQueue(QueueSpecification.queue(mailQueueName.toWorkQueueName().asString())
+                .durable(DURABLE)
+                .exclusive(!EXCLUSIVE)
+                .autoDelete(!AUTO_DELETE)
+                .arguments(NO_ARGUMENTS)),
+            sender.bind(BindingSpecification.binding()
+                .exchange(mailQueueName.toRabbitExchangeName().asString())
+                .queue(mailQueueName.toWorkQueueName().asString())
+                .routingKey(EMPTY_ROUTING_KEY)))
+            .then()
+            .block();
         return mailQueueObjectPool.retrieveInstanceFor(mailQueueName);
     }
 
@@ -170,4 +211,10 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
             .findFirst();
     }
 
+    @PreDestroy
+    public void stop() {
+        sender.close();
+        reactorRabbitMQChannelPool.close();
+    }
+
 }
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java
index 464faf2..7c23f0f 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java
@@ -36,6 +36,7 @@ import org.apache.james.backends.cassandra.CassandraClusterExtension;
 import org.apache.james.backends.cassandra.components.CassandraModule;
 import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
 import org.apache.james.backends.rabbitmq.RabbitMQExtension;
+import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
 import org.apache.james.blob.api.HashBlobId;
 import org.apache.james.blob.cassandra.CassandraBlobModule;
 import org.apache.james.blob.cassandra.CassandraBlobStore;
@@ -60,6 +61,9 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
 import com.github.fge.lambdas.Throwing;
+import com.rabbitmq.client.Connection;
+
+import reactor.core.publisher.Mono;
 
 class RabbitMQMailQueueConfigurationChangeTest {
     private static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory();
@@ -75,6 +79,7 @@ class RabbitMQMailQueueConfigurationChangeTest {
     private static final Instant IN_SLICE_1 = Instant.parse("2007-12-03T10:15:30.00Z");
     private static final Instant IN_SLICE_2 = IN_SLICE_1.plus(1, HOURS);
     private static final Instant IN_SLICE_3 = IN_SLICE_1.plus(2, HOURS);
+    public static final int POOL_SIZE = 5;
 
     @RegisterExtension
     static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraModule.aggregateModules(
@@ -88,7 +93,6 @@ class RabbitMQMailQueueConfigurationChangeTest {
 
     private UpdatableTickingClock clock;
     private RabbitMQMailQueueManagement mqManagementApi;
-    private RabbitClient rabbitClient;
     private MimeMessageStore.Factory mimeMessageStoreFactory;
 
     @BeforeEach
@@ -96,7 +100,6 @@ class RabbitMQMailQueueConfigurationChangeTest {
         CassandraBlobStore blobsDAO = new CassandraBlobStore(cassandra.getConf());
         mimeMessageStoreFactory = MimeMessageStore.factory(blobsDAO);
         clock = new UpdatableTickingClock(IN_SLICE_1);
-        rabbitClient = new RabbitClient(rabbitMQExtension.getRabbitChannelPool());
         mqManagementApi = new RabbitMQMailQueueManagement(rabbitMQExtension.managementAPI());
     }
 
@@ -111,21 +114,26 @@ class RabbitMQMailQueueConfigurationChangeTest {
             mailQueueViewConfiguration,
             mimeMessageStoreFactory);
 
+
         RabbitMQMailQueueConfiguration mailQueueSizeConfiguration = RabbitMQMailQueueConfiguration.builder()
             .sizeMetricsEnabled(true)
             .build();
 
+        Mono<Connection> connectionMono = rabbitMQExtension.getRabbitConnectionPool().getResilientConnection();
+        ReactorRabbitMQChannelPool reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool(connectionMono, POOL_SIZE);
         RabbitMQMailQueueFactory.PrivateFactory privateFactory = new RabbitMQMailQueueFactory.PrivateFactory(
             new NoopMetricFactory(),
             new NoopGaugeRegistry(),
-            rabbitClient,
+            connectionMono,
+            reactorRabbitMQChannelPool.createSender(),
             mimeMessageStoreFactory,
             BLOB_ID_FACTORY,
             mailQueueViewFactory,
             clock,
             new RawMailQueueItemDecoratorFactory(),
             mailQueueSizeConfiguration);
-        RabbitMQMailQueueFactory mailQueueFactory = new RabbitMQMailQueueFactory(rabbitClient, mqManagementApi, privateFactory);
+        RabbitMQMailQueueFactory mailQueueFactory = new RabbitMQMailQueueFactory(rabbitMQExtension.getRabbitConnectionPool(), mqManagementApi, privateFactory);
+        mailQueueFactory.start();
         return mailQueueFactory.createQueue(SPOOL);
     }
 
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
index ea738d9..127ed23 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
@@ -38,6 +38,7 @@ import org.apache.james.backends.cassandra.CassandraClusterExtension;
 import org.apache.james.backends.cassandra.components.CassandraModule;
 import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
 import org.apache.james.backends.rabbitmq.RabbitMQExtension;
+import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
 import org.apache.james.blob.api.HashBlobId;
 import org.apache.james.blob.cassandra.CassandraBlobModule;
 import org.apache.james.blob.cassandra.CassandraBlobStore;
@@ -67,6 +68,7 @@ import org.junit.jupiter.api.extension.RegisterExtension;
 import org.mockito.ArgumentCaptor;
 
 import com.github.fge.lambdas.Throwing;
+import com.rabbitmq.client.Connection;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -82,6 +84,7 @@ class RabbitMQMailQueueTest {
     private static final Instant IN_SLICE_3 = IN_SLICE_1.plus(2, HOURS);
     private static final Instant IN_SLICE_5 = IN_SLICE_1.plus(4, HOURS);
     private static final Instant IN_SLICE_7 = IN_SLICE_1.plus(6, HOURS);
+    private static final int POOL_SIZE = 5;
 
     @RegisterExtension
     static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraModule.aggregateModules(
@@ -107,36 +110,11 @@ class RabbitMQMailQueueTest {
     class MailQueueSizeMetricsEnabled implements ManageableMailQueueContract, MailQueueMetricContract {
         @BeforeEach
         void setup(CassandraCluster cassandra, MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem) throws Exception {
-            CassandraBlobStore blobStore = new CassandraBlobStore(cassandra.getConf());
-            MimeMessageStore.Factory mimeMessageStoreFactory = MimeMessageStore.factory(blobStore);
-            clock = new UpdatableTickingClock(IN_SLICE_1);
-
-            MailQueueView.Factory mailQueueViewFactory = CassandraMailQueueViewTestFactory.factory(clock, cassandra.getConf(),
-                CassandraMailQueueViewConfiguration.builder()
-                    .bucketCount(THREE_BUCKET_COUNT)
-                    .updateBrowseStartPace(UPDATE_BROWSE_START_PACE)
-                    .sliceWindow(ONE_HOUR_SLICE_WINDOW)
-                    .build(),
-                mimeMessageStoreFactory);
-
-            RabbitMQMailQueueConfiguration configuration = RabbitMQMailQueueConfiguration.builder()
-                .sizeMetricsEnabled(true)
-                .build();
-
-            RabbitClient rabbitClient = new RabbitClient(rabbitMQExtension.getRabbitChannelPool());
-            RabbitMQMailQueueFactory.PrivateFactory factory = new RabbitMQMailQueueFactory.PrivateFactory(
-                metricTestSystem.getMetricFactory(),
-                metricTestSystem.getSpyGaugeRegistry(),
-                rabbitClient,
-                mimeMessageStoreFactory,
-                BLOB_ID_FACTORY,
-                mailQueueViewFactory,
-                clock,
-                new RawMailQueueItemDecoratorFactory(),
-                configuration);
-            mqManagementApi = new RabbitMQMailQueueManagement(rabbitMQExtension.managementAPI());
-            mailQueueFactory = new RabbitMQMailQueueFactory(rabbitClient, mqManagementApi, factory);
-            mailQueue = mailQueueFactory.createQueue(SPOOL);
+            setUp(cassandra,
+                metricTestSystem,
+                RabbitMQMailQueueConfiguration.builder()
+                    .sizeMetricsEnabled(true)
+                    .build());
         }
 
         @Override
@@ -260,6 +238,11 @@ class RabbitMQMailQueueTest {
                 }))
                 .blockLast();
         }
+
+        @AfterEach
+        void tearDown() {
+            mqManagementApi.deleteAllQueues();
+        }
     }
 
     @Nested
@@ -269,36 +252,11 @@ class RabbitMQMailQueueTest {
 
         @BeforeEach
         void setup(CassandraCluster cassandra, MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem) throws Exception {
-            CassandraBlobStore blobStore = new CassandraBlobStore(cassandra.getConf());
-            MimeMessageStore.Factory mimeMessageStoreFactory = MimeMessageStore.factory(blobStore);
-            clock = new UpdatableTickingClock(IN_SLICE_1);
-
-            MailQueueView.Factory mailQueueViewFactory = CassandraMailQueueViewTestFactory.factory(clock, cassandra.getConf(),
-                CassandraMailQueueViewConfiguration.builder()
-                    .bucketCount(THREE_BUCKET_COUNT)
-                    .updateBrowseStartPace(UPDATE_BROWSE_START_PACE)
-                    .sliceWindow(ONE_HOUR_SLICE_WINDOW)
-                    .build(),
-                mimeMessageStoreFactory);
-
-            RabbitMQMailQueueConfiguration configuration = RabbitMQMailQueueConfiguration.builder()
+            setUp(cassandra,
+                metricTestSystem,
+                RabbitMQMailQueueConfiguration.builder()
                 .sizeMetricsEnabled(false)
-                .build();
-
-            RabbitClient rabbitClient = new RabbitClient(rabbitMQExtension.getRabbitChannelPool());
-            RabbitMQMailQueueFactory.PrivateFactory factory = new RabbitMQMailQueueFactory.PrivateFactory(
-                metricTestSystem.getMetricFactory(),
-                metricTestSystem.getSpyGaugeRegistry(),
-                rabbitClient,
-                mimeMessageStoreFactory,
-                BLOB_ID_FACTORY,
-                mailQueueViewFactory,
-                clock,
-                new RawMailQueueItemDecoratorFactory(),
-                configuration);
-            mqManagementApi = new RabbitMQMailQueueManagement(rabbitMQExtension.managementAPI());
-            mailQueueFactory = new RabbitMQMailQueueFactory(rabbitClient, mqManagementApi, factory);
-            mailQueue = mailQueueFactory.createQueue(SPOOL);
+                .build());
         }
 
         @Test
@@ -306,5 +264,43 @@ class RabbitMQMailQueueTest {
             ArgumentCaptor<Gauge<?>> gaugeCaptor = ArgumentCaptor.forClass(Gauge.class);
             verify(metricTestSystem.getSpyGaugeRegistry(), never()).register(any(), gaugeCaptor.capture());
         }
+
+        @AfterEach
+        void tearDown() {
+            mqManagementApi.deleteAllQueues();
+        }
+    }
+
+    private void setUp(CassandraCluster cassandra, MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem, RabbitMQMailQueueConfiguration configuration) throws Exception {
+        CassandraBlobStore blobStore = new CassandraBlobStore(cassandra.getConf());
+        MimeMessageStore.Factory mimeMessageStoreFactory = MimeMessageStore.factory(blobStore);
+        clock = new UpdatableTickingClock(IN_SLICE_1);
+
+        MailQueueView.Factory mailQueueViewFactory = CassandraMailQueueViewTestFactory.factory(clock, cassandra.getConf(),
+            CassandraMailQueueViewConfiguration.builder()
+                .bucketCount(THREE_BUCKET_COUNT)
+                .updateBrowseStartPace(UPDATE_BROWSE_START_PACE)
+                .sliceWindow(ONE_HOUR_SLICE_WINDOW)
+                .build(),
+            mimeMessageStoreFactory);
+
+        Mono<Connection> connectionMono = rabbitMQExtension.getRabbitConnectionPool().getResilientConnection();
+        ReactorRabbitMQChannelPool reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool(connectionMono, POOL_SIZE);
+
+        RabbitMQMailQueueFactory.PrivateFactory factory = new RabbitMQMailQueueFactory.PrivateFactory(
+            metricTestSystem.getMetricFactory(),
+            metricTestSystem.getSpyGaugeRegistry(),
+            connectionMono,
+            reactorRabbitMQChannelPool.createSender(),
+            mimeMessageStoreFactory,
+            BLOB_ID_FACTORY,
+            mailQueueViewFactory,
+            clock,
+            new RawMailQueueItemDecoratorFactory(),
+            configuration);
+        mqManagementApi = new RabbitMQMailQueueManagement(rabbitMQExtension.managementAPI());
+        mailQueueFactory = new RabbitMQMailQueueFactory(rabbitMQExtension.getRabbitConnectionPool(), mqManagementApi, factory);
+        mailQueueFactory.start();
+        mailQueue = mailQueueFactory.createQueue(SPOOL);
     }
 }
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
index 625f988..fb5b8b7 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
@@ -30,6 +30,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.james.backends.rabbitmq.RabbitMQExtension;
+import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
 import org.apache.james.blob.api.HashBlobId;
 import org.apache.james.blob.mail.MimeMessageStore;
 import org.apache.james.metrics.api.NoopGaugeRegistry;
@@ -45,8 +46,13 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
+import com.rabbitmq.client.Connection;
+
+import reactor.core.publisher.Mono;
+
 class RabbitMqMailQueueFactoryTest implements MailQueueFactoryContract<RabbitMQMailQueue> {
     private static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory();
+    public static final int POOL_SIZE = 5;
 
     @RegisterExtension
     static RabbitMQExtension rabbitMQExtension = RabbitMQExtension.singletonRabbitMQ();
@@ -66,11 +72,13 @@ class RabbitMqMailQueueFactoryTest implements MailQueueFactoryContract<RabbitMQM
             .sizeMetricsEnabled(true)
             .build();
 
-        RabbitClient rabbitClient = new RabbitClient(rabbitMQExtension.getRabbitChannelPool());
-        RabbitMQMailQueueFactory.PrivateFactory factory = new RabbitMQMailQueueFactory.PrivateFactory(
+        Mono<Connection> connectionMono = rabbitMQExtension.getRabbitConnectionPool().getResilientConnection();
+        ReactorRabbitMQChannelPool reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool(connectionMono, POOL_SIZE);
+        RabbitMQMailQueueFactory.PrivateFactory privateFactory = new RabbitMQMailQueueFactory.PrivateFactory(
             new NoopMetricFactory(),
             new NoopGaugeRegistry(),
-            rabbitClient,
+            connectionMono,
+            reactorRabbitMQChannelPool.createSender(),
             mimeMessageStoreFactory,
             BLOB_ID_FACTORY,
             mailQueueViewFactory,
@@ -78,7 +86,8 @@ class RabbitMqMailQueueFactoryTest implements MailQueueFactoryContract<RabbitMQM
             new RawMailQueueItemDecoratorFactory(),
             configuration);
         mqManagementApi = new RabbitMQMailQueueManagement(rabbitMQExtension.managementAPI());
-        mailQueueFactory = new RabbitMQMailQueueFactory(rabbitClient, mqManagementApi, factory);
+        mailQueueFactory = new RabbitMQMailQueueFactory(rabbitMQExtension.getRabbitConnectionPool(), mqManagementApi, privateFactory);
+        mailQueueFactory.start();
     }
 
     @AfterEach


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org