You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/11/04 11:23:37 UTC
[james-project] 01/30: JAMES-2937 RabbitMQMailQueue should rely on
ReactorRabbitMQChannelPool
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 4b9135ae8e81cc707de1e910bf1de6d7d895dfa1
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Oct 4 11:52:59 2019 +0700
JAMES-2937 RabbitMQMailQueue should rely on ReactorRabbitMQChannelPool
---
.../james/modules/rabbitmq/RabbitMQModule.java | 24 +++++
.../org/apache/james/queue/rabbitmq/Dequeuer.java | 12 ++-
.../org/apache/james/queue/rabbitmq/Enqueuer.java | 35 ++++---
.../apache/james/queue/rabbitmq/RabbitClient.java | 72 -------------
.../queue/rabbitmq/RabbitMQMailQueueFactory.java | 69 +++++++++++--
.../RabbitMQMailQueueConfigurationChangeTest.java | 16 ++-
.../queue/rabbitmq/RabbitMQMailQueueTest.java | 114 ++++++++++-----------
.../rabbitmq/RabbitMqMailQueueFactoryTest.java | 17 ++-
8 files changed, 191 insertions(+), 168 deletions(-)
diff --git a/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java b/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java
index bdb9f70..5eb49df 100644
--- a/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java
+++ b/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java
@@ -32,6 +32,7 @@ import org.apache.james.backends.rabbitmq.RabbitMQHealthCheck;
import org.apache.james.backends.rabbitmq.SimpleChannelPool;
import org.apache.james.core.healthcheck.HealthCheck;
import org.apache.james.eventsourcing.eventstore.cassandra.dto.EventDTOModule;
+import org.apache.james.lifecycle.api.Startable;
import org.apache.james.queue.api.MailQueueFactory;
import org.apache.james.queue.api.ManageableMailQueue;
import org.apache.james.queue.rabbitmq.RabbitMQMailQueue;
@@ -49,11 +50,13 @@ import org.apache.james.queue.rabbitmq.view.cassandra.EnqueuedMailsDAO;
import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMailQueueViewConfiguration;
import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMailQueueViewConfigurationModule;
import org.apache.james.queue.rabbitmq.view.cassandra.configuration.EventsourcingConfigurationManagement;
+import org.apache.james.utils.InitialisationOperation;
import org.apache.james.utils.PropertiesProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.inject.AbstractModule;
+import com.google.inject.Inject;
import com.google.inject.Provides;
import com.google.inject.Scopes;
import com.google.inject.TypeLiteral;
@@ -85,6 +88,7 @@ public class RabbitMQModule extends AbstractModule {
eventDTOModuleBinder.addBinding().toInstance(CassandraMailQueueViewConfigurationModule.MAIL_QUEUE_VIEW_CONFIGURATION);
Multibinder.newSetBinder(binder(), HealthCheck.class).addBinding().to(RabbitMQHealthCheck.class);
+ Multibinder.newSetBinder(binder(), InitialisationOperation.class).addBinding().to(RabbitMQMailQueueFactoryInitialisationOperation.class);
}
@Provides
@@ -140,4 +144,24 @@ public class RabbitMQModule extends AbstractModule {
private RabbitMQMailQueueConfiguration getMailQueueSizeConfiguration(@Named(RABBITMQ_CONFIGURATION_NAME) Configuration configuration) {
return RabbitMQMailQueueConfiguration.from(configuration);
}
+
+ @Singleton
+ public static class RabbitMQMailQueueFactoryInitialisationOperation implements InitialisationOperation {
+ private final RabbitMQMailQueueFactory rabbitMQMailQueueFactory;
+
+ @Inject
+ public RabbitMQMailQueueFactoryInitialisationOperation(RabbitMQMailQueueFactory rabbitMQMailQueueFactory) {
+ this.rabbitMQMailQueueFactory = rabbitMQMailQueueFactory;
+ }
+
+ @Override
+ public void initModule() {
+ rabbitMQMailQueueFactory.start();
+ }
+
+ @Override
+ public Class<? extends Startable> forClass() {
+ return RabbitMQMailQueueFactory.class;
+ }
+ }
}
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
index ae9829e..a6de706 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
@@ -33,15 +33,19 @@ import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
import org.apache.mailet.Mail;
import com.github.fge.lambdas.consumers.ThrowingConsumer;
+import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Delivery;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.rabbitmq.AcknowledgableDelivery;
+import reactor.rabbitmq.ConsumeOptions;
+import reactor.rabbitmq.RabbitFlux;
+import reactor.rabbitmq.ReceiverOptions;
class Dequeuer {
-
private static final boolean REQUEUE = true;
+ private static final int EXECUTION_RATE = 5;
private final Flux<AcknowledgableDelivery> flux;
private static class RabbitMQMailQueueItem implements MailQueue.MailQueueItem {
@@ -75,15 +79,15 @@ class Dequeuer {
private final MailReferenceSerializer mailReferenceSerializer;
private final MailQueueView mailQueueView;
- Dequeuer(MailQueueName name, RabbitClient rabbitClient, Function<MailReferenceDTO, MailWithEnqueueId> mailLoader,
+ Dequeuer(MailQueueName name, Mono<Connection> connectionMono, Function<MailReferenceDTO, MailWithEnqueueId> mailLoader,
MailReferenceSerializer serializer, MetricFactory metricFactory,
MailQueueView mailQueueView) {
this.mailLoader = mailLoader;
this.mailReferenceSerializer = serializer;
this.mailQueueView = mailQueueView;
this.dequeueMetric = metricFactory.generate(DEQUEUED_METRIC_NAME_PREFIX + name.asString());
- this.flux = rabbitClient
- .receive(name)
+ this.flux = RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(connectionMono))
+ .consumeManualAck(name.toWorkQueueName().asString(), new ConsumeOptions().qos(EXECUTION_RATE))
.filter(getResponse -> getResponse.getBody() != null);
}
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
index 776a739..ea7d04a 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
@@ -19,6 +19,7 @@
package org.apache.james.queue.rabbitmq;
+import static org.apache.james.backends.rabbitmq.Constants.EMPTY_ROUTING_KEY;
import static org.apache.james.queue.api.MailQueue.ENQUEUED_METRIC_NAME_PREFIX;
import java.time.Clock;
@@ -38,21 +39,23 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.github.fge.lambdas.Throwing;
import reactor.core.publisher.Mono;
+import reactor.rabbitmq.OutboundMessage;
+import reactor.rabbitmq.Sender;
class Enqueuer {
private final MailQueueName name;
- private final RabbitClient rabbitClient;
+ private final Sender sender;
private final Store<MimeMessage, MimeMessagePartsId> mimeMessageStore;
private final MailReferenceSerializer mailReferenceSerializer;
private final Metric enqueueMetric;
private final MailQueueView mailQueueView;
private final Clock clock;
- Enqueuer(MailQueueName name, RabbitClient rabbitClient, Store<MimeMessage, MimeMessagePartsId> mimeMessageStore,
+ Enqueuer(MailQueueName name, Sender sender, Store<MimeMessage, MimeMessagePartsId> mimeMessageStore,
MailReferenceSerializer serializer, MetricFactory metricFactory,
MailQueueView mailQueueView, Clock clock) {
this.name = name;
- this.rabbitClient = rabbitClient;
+ this.sender = sender;
this.mimeMessageStore = mimeMessageStore;
this.mailReferenceSerializer = serializer;
this.mailQueueView = mailQueueView;
@@ -64,7 +67,7 @@ class Enqueuer {
EnqueueId enqueueId = EnqueueId.generate();
saveMail(mail)
.map(partIds -> new MailReference(enqueueId, mail, partIds))
- .map(Throwing.function(this::publishReferenceToRabbit).sneakyThrow())
+ .flatMap(Throwing.function(this::publishReferenceToRabbit).sneakyThrow())
.flatMap(mailQueueView::storeMail)
.thenEmpty(Mono.fromRunnable(enqueueMetric::increment))
.block();
@@ -78,16 +81,20 @@ class Enqueuer {
}
}
- private EnqueuedItem publishReferenceToRabbit(MailReference mailReference) throws MailQueue.MailQueueException {
- rabbitClient.publish(name, getMailReferenceBytes(mailReference));
-
- return EnqueuedItem.builder()
- .enqueueId(mailReference.getEnqueueId())
- .mailQueueName(name)
- .mail(mailReference.getMail())
- .enqueuedTime(clock.instant())
- .mimeMessagePartsId(mailReference.getPartsId())
- .build();
+ private Mono<EnqueuedItem> publishReferenceToRabbit(MailReference mailReference) throws MailQueue.MailQueueException {
+ OutboundMessage data = new OutboundMessage(
+ name.toRabbitExchangeName().asString(),
+ EMPTY_ROUTING_KEY,
+ getMailReferenceBytes(mailReference));
+ return sender.send(Mono.just(data))
+ .then(Mono.just(
+ EnqueuedItem.builder()
+ .enqueueId(mailReference.getEnqueueId())
+ .mailQueueName(name)
+ .mail(mailReference.getMail())
+ .enqueuedTime(clock.instant())
+ .mimeMessagePartsId(mailReference.getPartsId())
+ .build()));
}
private byte[] getMailReferenceBytes(MailReference mailReference) throws MailQueue.MailQueueException {
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitClient.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitClient.java
deleted file mode 100644
index 779e71e..0000000
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitClient.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-
-package org.apache.james.queue.rabbitmq;
-
-import static org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE;
-import static org.apache.james.backends.rabbitmq.Constants.DURABLE;
-import static org.apache.james.backends.rabbitmq.Constants.EMPTY_ROUTING_KEY;
-import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE;
-import static org.apache.james.backends.rabbitmq.Constants.NO_ARGUMENTS;
-
-import java.io.IOException;
-
-import javax.inject.Inject;
-
-import org.apache.james.backends.rabbitmq.RabbitMQChannelPool;
-import org.apache.james.queue.api.MailQueue;
-
-import com.rabbitmq.client.AMQP;
-import reactor.core.publisher.Flux;
-import reactor.rabbitmq.AcknowledgableDelivery;
-
-class RabbitClient {
- private final RabbitMQChannelPool channelPool;
-
- @Inject
- RabbitClient(RabbitMQChannelPool channelPool) {
- this.channelPool = channelPool;
- }
-
- void attemptQueueCreation(MailQueueName name) {
- channelPool.execute(channel -> {
- try {
- channel.exchangeDeclare(name.toRabbitExchangeName().asString(), "direct", DURABLE);
- channel.queueDeclare(name.toWorkQueueName().asString(), DURABLE, !EXCLUSIVE, !AUTO_DELETE, NO_ARGUMENTS);
- channel.queueBind(name.toWorkQueueName().asString(), name.toRabbitExchangeName().asString(), EMPTY_ROUTING_KEY);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- });
- }
-
- void publish(MailQueueName name, byte[] message) throws MailQueue.MailQueueException {
- channelPool.execute(channel -> {
- try {
- channel.basicPublish(name.toRabbitExchangeName().asString(), EMPTY_ROUTING_KEY, new AMQP.BasicProperties(), message);
- } catch (IOException e) {
- throw new MailQueue.MailQueueException("Unable to publish to RabbitMQ", e);
- }
- });
- }
-
- Flux<AcknowledgableDelivery> receive(MailQueueName name) {
- return channelPool.receive(name.toWorkQueueName().asString());
- }
-}
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
index 0e1bf95..b0748c0 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
@@ -19,6 +19,11 @@
package org.apache.james.queue.rabbitmq;
+import static org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE;
+import static org.apache.james.backends.rabbitmq.Constants.DURABLE;
+import static org.apache.james.backends.rabbitmq.Constants.EMPTY_ROUTING_KEY;
+import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE;
+import static org.apache.james.backends.rabbitmq.Constants.NO_ARGUMENTS;
import static org.apache.james.queue.api.MailQueue.QUEUE_SIZE_METRIC_NAME_PREFIX;
import java.time.Clock;
@@ -27,13 +32,17 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
+import javax.annotation.PreDestroy;
import javax.inject.Inject;
import javax.mail.internet.MimeMessage;
+import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
+import org.apache.james.backends.rabbitmq.SimpleConnectionPool;
import org.apache.james.blob.api.BlobId;
import org.apache.james.blob.api.Store;
import org.apache.james.blob.mail.MimeMessagePartsId;
import org.apache.james.blob.mail.MimeMessageStore;
+import org.apache.james.lifecycle.api.Startable;
import org.apache.james.metrics.api.GaugeRegistry;
import org.apache.james.metrics.api.MetricFactory;
import org.apache.james.queue.api.MailQueueFactory;
@@ -44,13 +53,23 @@ import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
import com.github.fge.lambdas.Throwing;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
+import com.rabbitmq.client.Connection;
-public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQueue> {
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.rabbitmq.BindingSpecification;
+import reactor.rabbitmq.ExchangeSpecification;
+import reactor.rabbitmq.QueueSpecification;
+import reactor.rabbitmq.Sender;
+
+public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQueue>, Startable {
+ private static final int MAX_CHANNELS_NUMBER = 5;
@VisibleForTesting static class PrivateFactory {
private final MetricFactory metricFactory;
private final GaugeRegistry gaugeRegistry;
- private final RabbitClient rabbitClient;
+ private final Mono<Connection> connectionMono;
+ private final Sender sender;
private final Store<MimeMessage, MimeMessagePartsId> mimeMessageStore;
private final MailReferenceSerializer mailReferenceSerializer;
private final Function<MailReferenceDTO, MailWithEnqueueId> mailLoader;
@@ -62,8 +81,7 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
@Inject
@VisibleForTesting PrivateFactory(MetricFactory metricFactory,
GaugeRegistry gaugeRegistry,
- RabbitClient rabbitClient,
- MimeMessageStore.Factory mimeMessageStoreFactory,
+ Mono<Connection> connectionMono, Sender sender, MimeMessageStore.Factory mimeMessageStoreFactory,
BlobId.Factory blobIdFactory,
MailQueueView.Factory mailQueueViewFactory,
Clock clock,
@@ -71,7 +89,8 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
RabbitMQMailQueueConfiguration configuration) {
this.metricFactory = metricFactory;
this.gaugeRegistry = gaugeRegistry;
- this.rabbitClient = rabbitClient;
+ this.connectionMono = connectionMono;
+ this.sender = sender;
this.mimeMessageStore = mimeMessageStoreFactory.mimeMessageStore();
this.mailQueueViewFactory = mailQueueViewFactory;
this.clock = clock;
@@ -88,9 +107,9 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
RabbitMQMailQueue rabbitMQMailQueue = new RabbitMQMailQueue(
metricFactory,
mailQueueName,
- new Enqueuer(mailQueueName, rabbitClient, mimeMessageStore, mailReferenceSerializer,
+ new Enqueuer(mailQueueName, sender, mimeMessageStore, mailReferenceSerializer,
metricFactory, mailQueueView, clock),
- new Dequeuer(mailQueueName, rabbitClient, mailLoader, mailReferenceSerializer,
+ new Dequeuer(mailQueueName, connectionMono, mailLoader, mailReferenceSerializer,
metricFactory, mailQueueView),
mailQueueView,
decoratorFactory);
@@ -123,22 +142,29 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
}
}
- private final RabbitClient rabbitClient;
private final RabbitMQMailQueueManagement mqManagementApi;
private final PrivateFactory privateFactory;
private final RabbitMQMailQueueObjectPool mailQueueObjectPool;
+ private final Mono<Connection> connectionMono;
+ private ReactorRabbitMQChannelPool reactorRabbitMQChannelPool;
+ private Sender sender;
@VisibleForTesting
@Inject
- RabbitMQMailQueueFactory(RabbitClient rabbitClient,
+ RabbitMQMailQueueFactory(SimpleConnectionPool simpleConnectionPool,
RabbitMQMailQueueManagement mqManagementApi,
PrivateFactory privateFactory) {
- this.rabbitClient = rabbitClient;
+ this.connectionMono = simpleConnectionPool.getResilientConnection();
this.mqManagementApi = mqManagementApi;
this.privateFactory = privateFactory;
this.mailQueueObjectPool = new RabbitMQMailQueueObjectPool();
}
+ public void start() {
+ this.reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool(connectionMono, MAX_CHANNELS_NUMBER);
+ this.sender = reactorRabbitMQChannelPool.createSender();
+ }
+
@Override
public Optional<RabbitMQMailQueue> getQueue(String name) {
return getQueueFromRabbitServer(MailQueueName.fromString(name));
@@ -159,7 +185,22 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
}
private RabbitMQMailQueue createQueueIntoRabbitServer(MailQueueName mailQueueName) {
- rabbitClient.attemptQueueCreation(mailQueueName);
+ String exchangeName = mailQueueName.toRabbitExchangeName().asString();
+ Flux.concat(
+ sender.declareExchange(ExchangeSpecification.exchange(exchangeName)
+ .durable(true)
+ .type("direct")),
+ sender.declareQueue(QueueSpecification.queue(mailQueueName.toWorkQueueName().asString())
+ .durable(DURABLE)
+ .exclusive(!EXCLUSIVE)
+ .autoDelete(!AUTO_DELETE)
+ .arguments(NO_ARGUMENTS)),
+ sender.bind(BindingSpecification.binding()
+ .exchange(mailQueueName.toRabbitExchangeName().asString())
+ .queue(mailQueueName.toWorkQueueName().asString())
+ .routingKey(EMPTY_ROUTING_KEY)))
+ .then()
+ .block();
return mailQueueObjectPool.retrieveInstanceFor(mailQueueName);
}
@@ -170,4 +211,10 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
.findFirst();
}
+ @PreDestroy
+ public void stop() {
+ sender.close();
+ reactorRabbitMQChannelPool.close();
+ }
+
}
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java
index 464faf2..7c23f0f 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java
@@ -36,6 +36,7 @@ import org.apache.james.backends.cassandra.CassandraClusterExtension;
import org.apache.james.backends.cassandra.components.CassandraModule;
import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
import org.apache.james.backends.rabbitmq.RabbitMQExtension;
+import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
import org.apache.james.blob.api.HashBlobId;
import org.apache.james.blob.cassandra.CassandraBlobModule;
import org.apache.james.blob.cassandra.CassandraBlobStore;
@@ -60,6 +61,9 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import com.github.fge.lambdas.Throwing;
+import com.rabbitmq.client.Connection;
+
+import reactor.core.publisher.Mono;
class RabbitMQMailQueueConfigurationChangeTest {
private static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory();
@@ -75,6 +79,7 @@ class RabbitMQMailQueueConfigurationChangeTest {
private static final Instant IN_SLICE_1 = Instant.parse("2007-12-03T10:15:30.00Z");
private static final Instant IN_SLICE_2 = IN_SLICE_1.plus(1, HOURS);
private static final Instant IN_SLICE_3 = IN_SLICE_1.plus(2, HOURS);
+ public static final int POOL_SIZE = 5;
@RegisterExtension
static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraModule.aggregateModules(
@@ -88,7 +93,6 @@ class RabbitMQMailQueueConfigurationChangeTest {
private UpdatableTickingClock clock;
private RabbitMQMailQueueManagement mqManagementApi;
- private RabbitClient rabbitClient;
private MimeMessageStore.Factory mimeMessageStoreFactory;
@BeforeEach
@@ -96,7 +100,6 @@ class RabbitMQMailQueueConfigurationChangeTest {
CassandraBlobStore blobsDAO = new CassandraBlobStore(cassandra.getConf());
mimeMessageStoreFactory = MimeMessageStore.factory(blobsDAO);
clock = new UpdatableTickingClock(IN_SLICE_1);
- rabbitClient = new RabbitClient(rabbitMQExtension.getRabbitChannelPool());
mqManagementApi = new RabbitMQMailQueueManagement(rabbitMQExtension.managementAPI());
}
@@ -111,21 +114,26 @@ class RabbitMQMailQueueConfigurationChangeTest {
mailQueueViewConfiguration,
mimeMessageStoreFactory);
+
RabbitMQMailQueueConfiguration mailQueueSizeConfiguration = RabbitMQMailQueueConfiguration.builder()
.sizeMetricsEnabled(true)
.build();
+ Mono<Connection> connectionMono = rabbitMQExtension.getRabbitConnectionPool().getResilientConnection();
+ ReactorRabbitMQChannelPool reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool(connectionMono, POOL_SIZE);
RabbitMQMailQueueFactory.PrivateFactory privateFactory = new RabbitMQMailQueueFactory.PrivateFactory(
new NoopMetricFactory(),
new NoopGaugeRegistry(),
- rabbitClient,
+ connectionMono,
+ reactorRabbitMQChannelPool.createSender(),
mimeMessageStoreFactory,
BLOB_ID_FACTORY,
mailQueueViewFactory,
clock,
new RawMailQueueItemDecoratorFactory(),
mailQueueSizeConfiguration);
- RabbitMQMailQueueFactory mailQueueFactory = new RabbitMQMailQueueFactory(rabbitClient, mqManagementApi, privateFactory);
+ RabbitMQMailQueueFactory mailQueueFactory = new RabbitMQMailQueueFactory(rabbitMQExtension.getRabbitConnectionPool(), mqManagementApi, privateFactory);
+ mailQueueFactory.start();
return mailQueueFactory.createQueue(SPOOL);
}
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
index ea738d9..127ed23 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
@@ -38,6 +38,7 @@ import org.apache.james.backends.cassandra.CassandraClusterExtension;
import org.apache.james.backends.cassandra.components.CassandraModule;
import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
import org.apache.james.backends.rabbitmq.RabbitMQExtension;
+import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
import org.apache.james.blob.api.HashBlobId;
import org.apache.james.blob.cassandra.CassandraBlobModule;
import org.apache.james.blob.cassandra.CassandraBlobStore;
@@ -67,6 +68,7 @@ import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.ArgumentCaptor;
import com.github.fge.lambdas.Throwing;
+import com.rabbitmq.client.Connection;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -82,6 +84,7 @@ class RabbitMQMailQueueTest {
private static final Instant IN_SLICE_3 = IN_SLICE_1.plus(2, HOURS);
private static final Instant IN_SLICE_5 = IN_SLICE_1.plus(4, HOURS);
private static final Instant IN_SLICE_7 = IN_SLICE_1.plus(6, HOURS);
+ private static final int POOL_SIZE = 5;
@RegisterExtension
static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraModule.aggregateModules(
@@ -107,36 +110,11 @@ class RabbitMQMailQueueTest {
class MailQueueSizeMetricsEnabled implements ManageableMailQueueContract, MailQueueMetricContract {
@BeforeEach
void setup(CassandraCluster cassandra, MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem) throws Exception {
- CassandraBlobStore blobStore = new CassandraBlobStore(cassandra.getConf());
- MimeMessageStore.Factory mimeMessageStoreFactory = MimeMessageStore.factory(blobStore);
- clock = new UpdatableTickingClock(IN_SLICE_1);
-
- MailQueueView.Factory mailQueueViewFactory = CassandraMailQueueViewTestFactory.factory(clock, cassandra.getConf(),
- CassandraMailQueueViewConfiguration.builder()
- .bucketCount(THREE_BUCKET_COUNT)
- .updateBrowseStartPace(UPDATE_BROWSE_START_PACE)
- .sliceWindow(ONE_HOUR_SLICE_WINDOW)
- .build(),
- mimeMessageStoreFactory);
-
- RabbitMQMailQueueConfiguration configuration = RabbitMQMailQueueConfiguration.builder()
- .sizeMetricsEnabled(true)
- .build();
-
- RabbitClient rabbitClient = new RabbitClient(rabbitMQExtension.getRabbitChannelPool());
- RabbitMQMailQueueFactory.PrivateFactory factory = new RabbitMQMailQueueFactory.PrivateFactory(
- metricTestSystem.getMetricFactory(),
- metricTestSystem.getSpyGaugeRegistry(),
- rabbitClient,
- mimeMessageStoreFactory,
- BLOB_ID_FACTORY,
- mailQueueViewFactory,
- clock,
- new RawMailQueueItemDecoratorFactory(),
- configuration);
- mqManagementApi = new RabbitMQMailQueueManagement(rabbitMQExtension.managementAPI());
- mailQueueFactory = new RabbitMQMailQueueFactory(rabbitClient, mqManagementApi, factory);
- mailQueue = mailQueueFactory.createQueue(SPOOL);
+ setUp(cassandra,
+ metricTestSystem,
+ RabbitMQMailQueueConfiguration.builder()
+ .sizeMetricsEnabled(true)
+ .build());
}
@Override
@@ -260,6 +238,11 @@ class RabbitMQMailQueueTest {
}))
.blockLast();
}
+
+ @AfterEach
+ void tearDown() {
+ mqManagementApi.deleteAllQueues();
+ }
}
@Nested
@@ -269,36 +252,11 @@ class RabbitMQMailQueueTest {
@BeforeEach
void setup(CassandraCluster cassandra, MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem) throws Exception {
- CassandraBlobStore blobStore = new CassandraBlobStore(cassandra.getConf());
- MimeMessageStore.Factory mimeMessageStoreFactory = MimeMessageStore.factory(blobStore);
- clock = new UpdatableTickingClock(IN_SLICE_1);
-
- MailQueueView.Factory mailQueueViewFactory = CassandraMailQueueViewTestFactory.factory(clock, cassandra.getConf(),
- CassandraMailQueueViewConfiguration.builder()
- .bucketCount(THREE_BUCKET_COUNT)
- .updateBrowseStartPace(UPDATE_BROWSE_START_PACE)
- .sliceWindow(ONE_HOUR_SLICE_WINDOW)
- .build(),
- mimeMessageStoreFactory);
-
- RabbitMQMailQueueConfiguration configuration = RabbitMQMailQueueConfiguration.builder()
+ setUp(cassandra,
+ metricTestSystem,
+ RabbitMQMailQueueConfiguration.builder()
.sizeMetricsEnabled(false)
- .build();
-
- RabbitClient rabbitClient = new RabbitClient(rabbitMQExtension.getRabbitChannelPool());
- RabbitMQMailQueueFactory.PrivateFactory factory = new RabbitMQMailQueueFactory.PrivateFactory(
- metricTestSystem.getMetricFactory(),
- metricTestSystem.getSpyGaugeRegistry(),
- rabbitClient,
- mimeMessageStoreFactory,
- BLOB_ID_FACTORY,
- mailQueueViewFactory,
- clock,
- new RawMailQueueItemDecoratorFactory(),
- configuration);
- mqManagementApi = new RabbitMQMailQueueManagement(rabbitMQExtension.managementAPI());
- mailQueueFactory = new RabbitMQMailQueueFactory(rabbitClient, mqManagementApi, factory);
- mailQueue = mailQueueFactory.createQueue(SPOOL);
+ .build());
}
@Test
@@ -306,5 +264,43 @@ class RabbitMQMailQueueTest {
ArgumentCaptor<Gauge<?>> gaugeCaptor = ArgumentCaptor.forClass(Gauge.class);
verify(metricTestSystem.getSpyGaugeRegistry(), never()).register(any(), gaugeCaptor.capture());
}
+
+ @AfterEach
+ void tearDown() {
+ mqManagementApi.deleteAllQueues();
+ }
+ }
+
+ private void setUp(CassandraCluster cassandra, MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem, RabbitMQMailQueueConfiguration configuration) throws Exception {
+ CassandraBlobStore blobStore = new CassandraBlobStore(cassandra.getConf());
+ MimeMessageStore.Factory mimeMessageStoreFactory = MimeMessageStore.factory(blobStore);
+ clock = new UpdatableTickingClock(IN_SLICE_1);
+
+ MailQueueView.Factory mailQueueViewFactory = CassandraMailQueueViewTestFactory.factory(clock, cassandra.getConf(),
+ CassandraMailQueueViewConfiguration.builder()
+ .bucketCount(THREE_BUCKET_COUNT)
+ .updateBrowseStartPace(UPDATE_BROWSE_START_PACE)
+ .sliceWindow(ONE_HOUR_SLICE_WINDOW)
+ .build(),
+ mimeMessageStoreFactory);
+
+ Mono<Connection> connectionMono = rabbitMQExtension.getRabbitConnectionPool().getResilientConnection();
+ ReactorRabbitMQChannelPool reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool(connectionMono, POOL_SIZE);
+
+ RabbitMQMailQueueFactory.PrivateFactory factory = new RabbitMQMailQueueFactory.PrivateFactory(
+ metricTestSystem.getMetricFactory(),
+ metricTestSystem.getSpyGaugeRegistry(),
+ connectionMono,
+ reactorRabbitMQChannelPool.createSender(),
+ mimeMessageStoreFactory,
+ BLOB_ID_FACTORY,
+ mailQueueViewFactory,
+ clock,
+ new RawMailQueueItemDecoratorFactory(),
+ configuration);
+ mqManagementApi = new RabbitMQMailQueueManagement(rabbitMQExtension.managementAPI());
+ mailQueueFactory = new RabbitMQMailQueueFactory(rabbitMQExtension.getRabbitConnectionPool(), mqManagementApi, factory);
+ mailQueueFactory.start();
+ mailQueue = mailQueueFactory.createQueue(SPOOL);
}
}
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
index 625f988..fb5b8b7 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
@@ -30,6 +30,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.james.backends.rabbitmq.RabbitMQExtension;
+import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
import org.apache.james.blob.api.HashBlobId;
import org.apache.james.blob.mail.MimeMessageStore;
import org.apache.james.metrics.api.NoopGaugeRegistry;
@@ -45,8 +46,13 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
+import com.rabbitmq.client.Connection;
+
+import reactor.core.publisher.Mono;
+
class RabbitMqMailQueueFactoryTest implements MailQueueFactoryContract<RabbitMQMailQueue> {
private static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory();
+ public static final int POOL_SIZE = 5;
@RegisterExtension
static RabbitMQExtension rabbitMQExtension = RabbitMQExtension.singletonRabbitMQ();
@@ -66,11 +72,13 @@ class RabbitMqMailQueueFactoryTest implements MailQueueFactoryContract<RabbitMQM
.sizeMetricsEnabled(true)
.build();
- RabbitClient rabbitClient = new RabbitClient(rabbitMQExtension.getRabbitChannelPool());
- RabbitMQMailQueueFactory.PrivateFactory factory = new RabbitMQMailQueueFactory.PrivateFactory(
+ Mono<Connection> connectionMono = rabbitMQExtension.getRabbitConnectionPool().getResilientConnection();
+ ReactorRabbitMQChannelPool reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool(connectionMono, POOL_SIZE);
+ RabbitMQMailQueueFactory.PrivateFactory privateFactory = new RabbitMQMailQueueFactory.PrivateFactory(
new NoopMetricFactory(),
new NoopGaugeRegistry(),
- rabbitClient,
+ connectionMono,
+ reactorRabbitMQChannelPool.createSender(),
mimeMessageStoreFactory,
BLOB_ID_FACTORY,
mailQueueViewFactory,
@@ -78,7 +86,8 @@ class RabbitMqMailQueueFactoryTest implements MailQueueFactoryContract<RabbitMQM
new RawMailQueueItemDecoratorFactory(),
configuration);
mqManagementApi = new RabbitMQMailQueueManagement(rabbitMQExtension.managementAPI());
- mailQueueFactory = new RabbitMQMailQueueFactory(rabbitClient, mqManagementApi, factory);
+ mailQueueFactory = new RabbitMQMailQueueFactory(rabbitMQExtension.getRabbitConnectionPool(), mqManagementApi, privateFactory);
+ mailQueueFactory.start();
}
@AfterEach
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org