You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/03/06 04:13:16 UTC
[james-project] branch master updated: JAMES-3085 Drop direct
references to ReactorRabbitMQChannelPool
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new b2354d4 JAMES-3085 Drop direct references to ReactorRabbitMQChannelPool
b2354d4 is described below
commit b2354d440f8104fabd59c57e155ce2005c4e1d20
Author: Gautier DI FOLCO <gd...@linagora.com>
AuthorDate: Mon Mar 2 12:40:14 2020 +0100
JAMES-3085 Drop direct references to ReactorRabbitMQChannelPool
ReactorRabbitMQChannelPool was introduced in order to limit our
RabbitMQ's channels usage. It was set to 5.
We have 4 kinds of usage of it:
* Ever running Receivers (4)
* Temporary Senders (4)
* Permanent Senders (2)
* Management operations (3)
*Ever running Receivers* hold a channel forever and are allocated at
startup.
*Management operations* need a channel but do not need them exclusively.
The real benefit of ReactorRabbitMQChannelPool is for the *Temporary
Senders* which request a channel each time.
Before this commit, only one channel was available for *Management
operations* and *Senders*, leading to starvations and timeouts
under heavy loads.
This commit make the *Ever running Receivers* relying directly on a
channel. The other usages relying (indirectly) on
ReactorRabbitMQChannelPool (with a smaller channel pool).
Now 7 channels are open (4 for the *Ever running Receivers* and
3 for the ReactorRabbitMQChannelPool) instead of 5.
---
.../rabbitmq/ReactorRabbitMQChannelPool.java | 4 +--
.../james/backends/rabbitmq/ReceiverProvider.java | 27 +++++++++++++++++
.../james/backends/rabbitmq/RabbitMQExtension.java | 12 +++++++-
.../james/mailbox/events/GroupRegistration.java | 8 ++---
.../mailbox/events/GroupRegistrationHandler.java | 15 ++++++----
.../mailbox/events/KeyRegistrationHandler.java | 8 ++---
.../james/mailbox/events/RabbitMQEventBus.java | 23 +++++++-------
.../james/mailbox/events/RabbitMQEventBusTest.java | 21 +++++++------
.../rabbitmq/host/RabbitMQEventBusHostSystem.java | 5 ++--
.../james/modules/rabbitmq/RabbitMQModule.java | 33 ++++++++++++++------
.../mailets/remote/delivery/DeliveryRunnable.java | 2 --
.../org/apache/james/queue/rabbitmq/Dequeuer.java | 6 ++--
.../org/apache/james/queue/rabbitmq/Enqueuer.java | 5 ++--
.../queue/rabbitmq/RabbitMQMailQueueFactory.java | 29 +++++++++---------
.../RabbitMQMailQueueConfigurationChangeTest.java | 5 ++--
.../queue/rabbitmq/RabbitMQMailQueueTest.java | 4 +--
.../rabbitmq/RabbitMqMailQueueFactoryTest.java | 5 ++--
.../distributed/RabbitMQTerminationSubscriber.java | 17 +++++------
.../distributed/RabbitMQWorkQueue.java | 35 +++++++++++-----------
.../distributed/RabbitMQWorkQueueSupplier.scala | 24 ++++++++-------
.../distributed/DistributedTaskManagerTest.java | 13 ++++----
.../RabbitMQTerminationSubscriberTest.java | 2 +-
.../RabbitMQWorkQueuePersistenceTest.java | 4 +--
.../distributed/RabbitMQWorkQueueTest.java | 6 ++--
24 files changed, 186 insertions(+), 127 deletions(-)
diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
index 2f789bb..db3e900 100644
--- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
+++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
@@ -26,7 +26,6 @@ import java.util.concurrent.ConcurrentSkipListSet;
import java.util.function.BiConsumer;
import javax.annotation.PreDestroy;
-import javax.inject.Inject;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
@@ -103,7 +102,7 @@ public class ReactorRabbitMQChannelPool implements ChannelPool, Startable {
private static final Logger LOGGER = LoggerFactory.getLogger(ReactorRabbitMQChannelPool.class);
private static final long MAXIMUM_BORROW_TIMEOUT_IN_MS = Duration.ofSeconds(5).toMillis();
- private static final int MAX_CHANNELS_NUMBER = 5;
+ private static final int MAX_CHANNELS_NUMBER = 3;
private static final int MAX_BORROW_RETRIES = 3;
private static final Duration MIN_BORROW_DELAY = Duration.ofMillis(50);
private static final Duration FOREVER = Duration.ofMillis(Long.MAX_VALUE);
@@ -113,7 +112,6 @@ public class ReactorRabbitMQChannelPool implements ChannelPool, Startable {
private final ConcurrentSkipListSet<Channel> borrowedChannels;
private Sender sender;
- @Inject
public ReactorRabbitMQChannelPool(SimpleConnectionPool simpleConnectionPool) {
this(simpleConnectionPool.getResilientConnection(), MAX_CHANNELS_NUMBER);
}
diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReceiverProvider.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReceiverProvider.java
new file mode 100644
index 0000000..8d90134
--- /dev/null
+++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReceiverProvider.java
@@ -0,0 +1,27 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ***************************************************************/
+
+package org.apache.james.backends.rabbitmq;
+
+import reactor.rabbitmq.Receiver;
+
+@FunctionalInterface
+public interface ReceiverProvider {
+ Receiver createReceiver();
+}
diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQExtension.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQExtension.java
index a909bb0..2dc8d31 100644
--- a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQExtension.java
+++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQExtension.java
@@ -32,6 +32,8 @@ import org.junit.jupiter.api.extension.ParameterContext;
import org.junit.jupiter.api.extension.ParameterResolutionException;
import org.junit.jupiter.api.extension.ParameterResolver;
+import reactor.rabbitmq.Sender;
+
public class RabbitMQExtension implements BeforeAllCallback, BeforeEachCallback, AfterAllCallback, AfterEachCallback, ParameterResolver {
private static final Consumer<DockerRabbitMQ> DO_NOTHING = dockerRabbitMQ -> {};
@@ -113,7 +115,7 @@ public class RabbitMQExtension implements BeforeAllCallback, BeforeEachCallback,
RabbitMQConnectionFactory connectionFactory = createRabbitConnectionFactory();
connectionPool = new SimpleConnectionPool(connectionFactory);
- channelPool = new ReactorRabbitMQChannelPool(connectionPool);
+ channelPool = new ReactorRabbitMQChannelPool(connectionPool.getResilientConnection(), 5);
channelPool.start();
}
@@ -144,6 +146,14 @@ public class RabbitMQExtension implements BeforeAllCallback, BeforeEachCallback,
return channelPool;
}
+ public Sender getSender() {
+ return channelPool.getSender();
+ }
+
+ public ReceiverProvider getReceiverProvider() {
+ return channelPool::createReceiver;
+ }
+
public SimpleConnectionPool getConnectionPool() {
return connectionPool;
}
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
index 5622412..da0fe77 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
@@ -31,7 +31,7 @@ import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.Optional;
-import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
+import org.apache.james.backends.rabbitmq.ReceiverProvider;
import org.apache.james.event.json.EventSerializer;
import org.apache.james.util.MDCBuilder;
@@ -84,15 +84,15 @@ class GroupRegistration implements Registration {
private final MailboxListenerExecutor mailboxListenerExecutor;
private Optional<Disposable> receiverSubscriber;
- GroupRegistration(ReactorRabbitMQChannelPool reactorRabbitMQChannelPool, EventSerializer eventSerializer,
+ GroupRegistration(Sender sender, ReceiverProvider receiverProvider, EventSerializer eventSerializer,
MailboxListener mailboxListener, Group group, RetryBackoffConfiguration retryBackoff,
EventDeadLetters eventDeadLetters,
Runnable unregisterGroup, MailboxListenerExecutor mailboxListenerExecutor) {
this.eventSerializer = eventSerializer;
this.mailboxListener = mailboxListener;
this.queueName = WorkQueueName.of(group);
- this.sender = reactorRabbitMQChannelPool.getSender();
- this.receiver = reactorRabbitMQChannelPool.createReceiver();
+ this.sender = sender;
+ this.receiver = receiverProvider.createReceiver();
this.mailboxListenerExecutor = mailboxListenerExecutor;
this.receiverSubscriber = Optional.empty();
this.unregisterGroup = unregisterGroup;
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistrationHandler.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistrationHandler.java
index 5337df6..201d3c3 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistrationHandler.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistrationHandler.java
@@ -23,22 +23,26 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
-import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
+import org.apache.james.backends.rabbitmq.ReceiverProvider;
import org.apache.james.event.json.EventSerializer;
+import reactor.rabbitmq.Sender;
+
class GroupRegistrationHandler {
private final Map<Group, GroupRegistration> groupRegistrations;
private final EventSerializer eventSerializer;
+ private final Sender sender;
+ private final ReceiverProvider receiverProvider;
private final RetryBackoffConfiguration retryBackoff;
private final EventDeadLetters eventDeadLetters;
private final MailboxListenerExecutor mailboxListenerExecutor;
- private final ReactorRabbitMQChannelPool reactorRabbitMQChannelPool;
- GroupRegistrationHandler(EventSerializer eventSerializer, ReactorRabbitMQChannelPool reactorRabbitMQChannelPool,
+ GroupRegistrationHandler(EventSerializer eventSerializer, Sender sender, ReceiverProvider receiverProvider,
RetryBackoffConfiguration retryBackoff,
EventDeadLetters eventDeadLetters, MailboxListenerExecutor mailboxListenerExecutor) {
this.eventSerializer = eventSerializer;
- this.reactorRabbitMQChannelPool = reactorRabbitMQChannelPool;
+ this.sender = sender;
+ this.receiverProvider = receiverProvider;
this.retryBackoff = retryBackoff;
this.eventDeadLetters = eventDeadLetters;
this.mailboxListenerExecutor = mailboxListenerExecutor;
@@ -67,7 +71,8 @@ class GroupRegistrationHandler {
private GroupRegistration newGroupRegistration(MailboxListener listener, Group group) {
return new GroupRegistration(
- reactorRabbitMQChannelPool,
+ sender,
+ receiverProvider,
eventSerializer,
listener,
group,
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
index 41db3f8..ae49471 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
@@ -28,7 +28,7 @@ import static org.apache.james.mailbox.events.RabbitMQEventBus.EVENT_BUS_ID;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
-import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
+import org.apache.james.backends.rabbitmq.ReceiverProvider;
import org.apache.james.event.json.EventSerializer;
import org.apache.james.util.MDCBuilder;
import org.apache.james.util.MDCStructuredLogger;
@@ -62,13 +62,13 @@ class KeyRegistrationHandler {
private final MailboxListenerExecutor mailboxListenerExecutor;
private Optional<Disposable> receiverSubscriber;
- KeyRegistrationHandler(EventBusId eventBusId, EventSerializer eventSerializer, ReactorRabbitMQChannelPool reactorRabbitMQChannelPool, RoutingKeyConverter routingKeyConverter, LocalListenerRegistry localListenerRegistry, MailboxListenerExecutor mailboxListenerExecutor) {
+ KeyRegistrationHandler(EventBusId eventBusId, EventSerializer eventSerializer, Sender sender, ReceiverProvider receiverProvider, RoutingKeyConverter routingKeyConverter, LocalListenerRegistry localListenerRegistry, MailboxListenerExecutor mailboxListenerExecutor) {
this.eventBusId = eventBusId;
this.eventSerializer = eventSerializer;
- this.sender = reactorRabbitMQChannelPool.getSender();
+ this.sender = sender;
this.routingKeyConverter = routingKeyConverter;
this.localListenerRegistry = localListenerRegistry;
- this.receiver = reactorRabbitMQChannelPool.createReceiver();
+ this.receiver = receiverProvider.createReceiver();
this.mailboxListenerExecutor = mailboxListenerExecutor;
this.registrationQueue = new RegistrationQueueName();
this.registrationBinder = new RegistrationBinder(sender, registrationQueue);
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
index 06d786a..ecc1b4f 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
@@ -24,7 +24,7 @@ import java.util.Set;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
-import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
+import org.apache.james.backends.rabbitmq.ReceiverProvider;
import org.apache.james.event.json.EventSerializer;
import org.apache.james.lifecycle.api.Startable;
import org.apache.james.metrics.api.MetricFactory;
@@ -32,6 +32,7 @@ import org.apache.james.metrics.api.MetricFactory;
import com.google.common.base.Preconditions;
import reactor.core.publisher.Mono;
+import reactor.rabbitmq.Sender;
public class RabbitMQEventBus implements EventBus, Startable {
private static final String NOT_RUNNING_ERROR_MESSAGE = "Event Bus is not running";
@@ -44,8 +45,9 @@ public class RabbitMQEventBus implements EventBus, Startable {
private final RetryBackoffConfiguration retryBackoff;
private final EventBusId eventBusId;
private final EventDeadLetters eventDeadLetters;
- private final ReactorRabbitMQChannelPool channelPool;
private final MailboxListenerExecutor mailboxListenerExecutor;
+ private final Sender sender;
+ private final ReceiverProvider receiverProvider;
private volatile boolean isRunning;
private volatile boolean isStopping;
@@ -54,11 +56,12 @@ public class RabbitMQEventBus implements EventBus, Startable {
private EventDispatcher eventDispatcher;
@Inject
- public RabbitMQEventBus(ReactorRabbitMQChannelPool reactorRabbitMQChannelPool, EventSerializer eventSerializer,
- RetryBackoffConfiguration retryBackoff,
- RoutingKeyConverter routingKeyConverter,
- EventDeadLetters eventDeadLetters, MetricFactory metricFactory) {
- this.channelPool = reactorRabbitMQChannelPool;
+ public RabbitMQEventBus(Sender sender, ReceiverProvider receiverProvider, EventSerializer eventSerializer,
+ RetryBackoffConfiguration retryBackoff,
+ RoutingKeyConverter routingKeyConverter,
+ EventDeadLetters eventDeadLetters, MetricFactory metricFactory) {
+ this.sender = sender;
+ this.receiverProvider = receiverProvider;
this.mailboxListenerExecutor = new MailboxListenerExecutor(metricFactory);
this.eventBusId = EventBusId.random();
this.eventSerializer = eventSerializer;
@@ -73,9 +76,9 @@ public class RabbitMQEventBus implements EventBus, Startable {
if (!isRunning && !isStopping) {
LocalListenerRegistry localListenerRegistry = new LocalListenerRegistry();
- keyRegistrationHandler = new KeyRegistrationHandler(eventBusId, eventSerializer, channelPool, routingKeyConverter, localListenerRegistry, mailboxListenerExecutor);
- groupRegistrationHandler = new GroupRegistrationHandler(eventSerializer, channelPool, retryBackoff, eventDeadLetters, mailboxListenerExecutor);
- eventDispatcher = new EventDispatcher(eventBusId, eventSerializer, channelPool.getSender(), localListenerRegistry, mailboxListenerExecutor);
+ keyRegistrationHandler = new KeyRegistrationHandler(eventBusId, eventSerializer, sender, receiverProvider, routingKeyConverter, localListenerRegistry, mailboxListenerExecutor);
+ groupRegistrationHandler = new GroupRegistrationHandler(eventSerializer, sender, receiverProvider, retryBackoff, eventDeadLetters, mailboxListenerExecutor);
+ eventDispatcher = new EventDispatcher(eventBusId, eventSerializer, sender, localListenerRegistry, mailboxListenerExecutor);
eventDispatcher.start();
keyRegistrationHandler.start();
diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
index 5436749..35d3581 100644
--- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
+++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
@@ -56,7 +56,7 @@ import org.apache.james.backends.rabbitmq.RabbitMQExtension;
import org.apache.james.backends.rabbitmq.RabbitMQExtension.DockerRestartPolicy;
import org.apache.james.backends.rabbitmq.RabbitMQFixture;
import org.apache.james.backends.rabbitmq.RabbitMQManagementAPI;
-import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
+import org.apache.james.backends.rabbitmq.ReceiverProvider;
import org.apache.james.event.json.EventSerializer;
import org.apache.james.mailbox.events.EventBusTestFixture.GroupA;
import org.apache.james.mailbox.events.EventBusTestFixture.MailboxListenerCountingSuccessfulExecution;
@@ -119,19 +119,18 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
eventBus3.stop();
ALL_GROUPS.stream()
.map(GroupRegistration.WorkQueueName::of)
- .forEach(queueName -> rabbitMQExtension.getRabbitChannelPool().getSender().delete(QueueSpecification.queue(queueName.asString())).block());
- rabbitMQExtension.getRabbitChannelPool()
- .getSender()
+ .forEach(queueName -> rabbitMQExtension.getSender().delete(QueueSpecification.queue(queueName.asString())).block());
+ rabbitMQExtension.getSender()
.delete(ExchangeSpecification.exchange(MAILBOX_EVENT_EXCHANGE_NAME))
.block();
}
private RabbitMQEventBus newEventBus() {
- return newEventBus(rabbitMQExtension.getRabbitChannelPool());
+ return newEventBus(rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider());
}
- private RabbitMQEventBus newEventBus(ReactorRabbitMQChannelPool rabbitMQChannelPool) {
- return new RabbitMQEventBus(rabbitMQChannelPool, eventSerializer, RetryBackoffConfiguration.DEFAULT, routingKeyConverter, memoryEventDeadLetters, new RecordingMetricFactory());
+ private RabbitMQEventBus newEventBus(Sender sender, ReceiverProvider receiverProvider) {
+ return new RabbitMQEventBus(sender, receiverProvider, eventSerializer, RetryBackoffConfiguration.DEFAULT, routingKeyConverter, memoryEventDeadLetters, new RecordingMetricFactory());
}
@Override
@@ -256,7 +255,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
@BeforeEach
void setUp() {
- Sender sender = rabbitMQExtension.getRabbitChannelPool().getSender();
+ Sender sender = rabbitMQExtension.getSender();
sender.declareQueue(QueueSpecification.queue(MAILBOX_WORK_QUEUE_NAME)
.durable(DURABLE)
@@ -286,7 +285,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
}
private Event dequeueEvent() {
- try (Receiver receiver = rabbitMQExtension.getRabbitChannelPool().createReceiver()) {
+ try (Receiver receiver = rabbitMQExtension.getReceiverProvider().createReceiver()) {
byte[] eventInBytes = receiver.consumeAutoAck(MAILBOX_WORK_QUEUE_NAME)
.blockFirst()
.getBody();
@@ -328,7 +327,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
@BeforeEach
void beforeEach() {
- rabbitMQEventBusWithNetWorkIssue = newEventBus(rabbitMQNetWorkIssueExtension.getRabbitChannelPool());
+ rabbitMQEventBusWithNetWorkIssue = newEventBus(rabbitMQNetWorkIssueExtension.getSender(), rabbitMQNetWorkIssueExtension.getReceiverProvider());
}
@Test
@@ -676,4 +675,4 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
RabbitMQFixture.awaitAtMostThirtySeconds
.untilAsserted(() -> verify(listener).event(EVENT));
}
-}
\ No newline at end of file
+}
diff --git a/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java b/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java
index c34510c..08b41e9 100644
--- a/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java
+++ b/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java
@@ -109,8 +109,9 @@ public class RabbitMQEventBusHostSystem extends JamesImapHostSystem {
InMemoryId.Factory mailboxIdFactory = new InMemoryId.Factory();
EventSerializer eventSerializer = new EventSerializer(mailboxIdFactory, messageIdFactory, new DefaultUserQuotaRootResolver.DefaultQuotaRootDeserializer());
RoutingKeyConverter routingKeyConverter = new RoutingKeyConverter(ImmutableSet.of(new MailboxIdRegistrationKey.Factory(mailboxIdFactory)));
- return new RabbitMQEventBus(reactorRabbitMQChannelPool, eventSerializer, RetryBackoffConfiguration.DEFAULT,
- routingKeyConverter, new MemoryEventDeadLetters(), new RecordingMetricFactory());
+ return new RabbitMQEventBus(reactorRabbitMQChannelPool.getSender(), reactorRabbitMQChannelPool::createReceiver,
+ eventSerializer, RetryBackoffConfiguration.DEFAULT, routingKeyConverter, new MemoryEventDeadLetters(),
+ new RecordingMetricFactory());
}
@Override
diff --git a/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java b/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java
index ea8c317..1d223b3 100644
--- a/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java
+++ b/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java
@@ -29,6 +29,8 @@ import org.apache.james.backends.cassandra.components.CassandraModule;
import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
import org.apache.james.backends.rabbitmq.RabbitMQHealthCheck;
import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
+import org.apache.james.backends.rabbitmq.ReceiverProvider;
+import org.apache.james.backends.rabbitmq.SimpleConnectionPool;
import org.apache.james.core.healthcheck.HealthCheck;
import org.apache.james.eventsourcing.Event;
import org.apache.james.eventsourcing.eventstore.cassandra.dto.EventDTO;
@@ -50,8 +52,6 @@ import org.apache.james.queue.rabbitmq.view.cassandra.EnqueuedMailsDAO;
import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMailQueueViewConfiguration;
import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMailQueueViewConfigurationModule;
import org.apache.james.queue.rabbitmq.view.cassandra.configuration.EventsourcingConfigurationManagement;
-import org.apache.james.utils.InitializationOperation;
-import org.apache.james.utils.InitilizationOperationBuilder;
import org.apache.james.utils.PropertiesProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,7 +61,10 @@ import com.google.inject.Provides;
import com.google.inject.Scopes;
import com.google.inject.TypeLiteral;
import com.google.inject.multibindings.Multibinder;
-import com.google.inject.multibindings.ProvidesIntoSet;
+
+import reactor.rabbitmq.RabbitFlux;
+import reactor.rabbitmq.ReceiverOptions;
+import reactor.rabbitmq.Sender;
public class RabbitMQModule extends AbstractModule {
@@ -71,7 +74,6 @@ public class RabbitMQModule extends AbstractModule {
@Override
protected void configure() {
- bind(ReactorRabbitMQChannelPool.class).in(Scopes.SINGLETON);
bind(EnqueuedMailsDAO.class).in(Scopes.SINGLETON);
bind(DeletedMailsDAO.class).in(Scopes.SINGLETON);
bind(BrowseStartDAO.class).in(Scopes.SINGLETON);
@@ -143,10 +145,23 @@ public class RabbitMQModule extends AbstractModule {
return RabbitMQMailQueueConfiguration.from(configuration);
}
- @ProvidesIntoSet
- InitializationOperation workQueue(ReactorRabbitMQChannelPool instance) {
- return InitilizationOperationBuilder
- .forClass(ReactorRabbitMQChannelPool.class)
- .init(instance::start);
+ @Provides
+ @Singleton
+ ReactorRabbitMQChannelPool provideReactorRabbitMQChannelPool(SimpleConnectionPool simpleConnectionPool) {
+ ReactorRabbitMQChannelPool channelPool = new ReactorRabbitMQChannelPool(simpleConnectionPool);
+ channelPool.start();
+ return channelPool;
+ }
+
+ @Provides
+ @Singleton
+ public Sender provideRabbitMQSender(ReactorRabbitMQChannelPool channelPool) {
+ return channelPool.getSender();
+ }
+
+ @Provides
+ @Singleton
+ public ReceiverProvider provideRabbitMQReceiver(SimpleConnectionPool simpleConnectionPool) {
+ return () -> RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(simpleConnectionPool.getResilientConnection()));
}
}
diff --git a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/DeliveryRunnable.java b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/DeliveryRunnable.java
index a6d92d0..0fa2b96 100644
--- a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/DeliveryRunnable.java
+++ b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/DeliveryRunnable.java
@@ -21,7 +21,6 @@ package org.apache.james.transport.mailets.remote.delivery;
import java.time.Duration;
import java.util.Date;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import org.apache.james.dnsservice.api.DNSService;
@@ -48,7 +47,6 @@ public class DeliveryRunnable implements Disposable {
private static final Logger LOGGER = LoggerFactory.getLogger(DeliveryRunnable.class);
public static final Supplier<Date> CURRENT_DATE_SUPPLIER = Date::new;
- public static final AtomicBoolean DEFAULT_NOT_STARTED = new AtomicBoolean(false);
public static final String OUTGOING_MAILS = "outgoingMails";
public static final String REMOTE_DELIVERY_TRIAL = "RemoteDeliveryTrial";
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
index 21546b9..ece7018 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
@@ -25,7 +25,7 @@ import java.io.IOException;
import java.util.function.Consumer;
import java.util.function.Function;
-import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
+import org.apache.james.backends.rabbitmq.ReceiverProvider;
import org.apache.james.metrics.api.Metric;
import org.apache.james.metrics.api.MetricFactory;
import org.apache.james.queue.api.MailQueue;
@@ -77,14 +77,14 @@ class Dequeuer {
private final MailReferenceSerializer mailReferenceSerializer;
private final MailQueueView mailQueueView;
- Dequeuer(MailQueueName name, ReactorRabbitMQChannelPool reactorRabbitMQChannelPool, Function<MailReferenceDTO, MailWithEnqueueId> mailLoader,
+ Dequeuer(MailQueueName name, ReceiverProvider receiverProvider, Function<MailReferenceDTO, MailWithEnqueueId> mailLoader,
MailReferenceSerializer serializer, MetricFactory metricFactory,
MailQueueView mailQueueView) {
this.mailLoader = mailLoader;
this.mailReferenceSerializer = serializer;
this.mailQueueView = mailQueueView;
this.dequeueMetric = metricFactory.generate(DEQUEUED_METRIC_NAME_PREFIX + name.asString());
- this.flux = reactorRabbitMQChannelPool.createReceiver()
+ this.flux = receiverProvider.createReceiver()
.consumeManualAck(name.toWorkQueueName().asString(), new ConsumeOptions().qos(EXECUTION_RATE))
.filter(getResponse -> getResponse.getBody() != null);
}
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
index 06bb328..481e380 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
@@ -28,7 +28,6 @@ import java.time.Clock;
import javax.mail.MessagingException;
import javax.mail.internet.MimeMessage;
-import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
import org.apache.james.blob.api.Store;
import org.apache.james.blob.mail.MimeMessagePartsId;
import org.apache.james.metrics.api.Metric;
@@ -54,11 +53,11 @@ class Enqueuer {
private final MailQueueView mailQueueView;
private final Clock clock;
- Enqueuer(MailQueueName name, ReactorRabbitMQChannelPool reactorRabbitMQChannelPool, Store<MimeMessage, MimeMessagePartsId> mimeMessageStore,
+ Enqueuer(MailQueueName name, Sender sender, Store<MimeMessage, MimeMessagePartsId> mimeMessageStore,
MailReferenceSerializer serializer, MetricFactory metricFactory,
MailQueueView mailQueueView, Clock clock) {
this.name = name;
- this.sender = reactorRabbitMQChannelPool.getSender();
+ this.sender = sender;
this.mimeMessageStore = mimeMessageStore;
this.mailReferenceSerializer = serializer;
this.mailQueueView = mailQueueView;
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
index 7c0325a..f771118 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
@@ -35,7 +35,7 @@ import java.util.function.Function;
import javax.inject.Inject;
import javax.mail.internet.MimeMessage;
-import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
+import org.apache.james.backends.rabbitmq.ReceiverProvider;
import org.apache.james.blob.api.BlobId;
import org.apache.james.blob.api.Store;
import org.apache.james.blob.mail.MimeMessagePartsId;
@@ -55,13 +55,15 @@ import reactor.core.publisher.Flux;
import reactor.rabbitmq.BindingSpecification;
import reactor.rabbitmq.ExchangeSpecification;
import reactor.rabbitmq.QueueSpecification;
+import reactor.rabbitmq.Sender;
public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQueue> {
@VisibleForTesting static class PrivateFactory {
private final MetricFactory metricFactory;
private final GaugeRegistry gaugeRegistry;
- private final ReactorRabbitMQChannelPool reactorRabbitMQChannelPool;
+ private final ReceiverProvider receiverProvider;
+ private final Sender sender;
private final Store<MimeMessage, MimeMessagePartsId> mimeMessageStore;
private final MailReferenceSerializer mailReferenceSerializer;
private final Function<MailReferenceDTO, MailWithEnqueueId> mailLoader;
@@ -73,8 +75,7 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
@Inject
@VisibleForTesting PrivateFactory(MetricFactory metricFactory,
GaugeRegistry gaugeRegistry,
- ReactorRabbitMQChannelPool reactorRabbitMQChannelPool,
- MimeMessageStore.Factory mimeMessageStoreFactory,
+ Sender sender, ReceiverProvider receiverProvider, MimeMessageStore.Factory mimeMessageStoreFactory,
BlobId.Factory blobIdFactory,
MailQueueView.Factory mailQueueViewFactory,
Clock clock,
@@ -82,7 +83,8 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
RabbitMQMailQueueConfiguration configuration) {
this.metricFactory = metricFactory;
this.gaugeRegistry = gaugeRegistry;
- this.reactorRabbitMQChannelPool = reactorRabbitMQChannelPool;
+ this.sender = sender;
+ this.receiverProvider = receiverProvider;
this.mimeMessageStore = mimeMessageStoreFactory.mimeMessageStore();
this.mailQueueViewFactory = mailQueueViewFactory;
this.clock = clock;
@@ -99,9 +101,9 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
RabbitMQMailQueue rabbitMQMailQueue = new RabbitMQMailQueue(
metricFactory,
mailQueueName,
- new Enqueuer(mailQueueName, reactorRabbitMQChannelPool, mimeMessageStore, mailReferenceSerializer,
+ new Enqueuer(mailQueueName, sender, mimeMessageStore, mailReferenceSerializer,
metricFactory, mailQueueView, clock),
- new Dequeuer(mailQueueName, reactorRabbitMQChannelPool, mailLoader, mailReferenceSerializer,
+ new Dequeuer(mailQueueName, receiverProvider, mailLoader, mailReferenceSerializer,
metricFactory, mailQueueView),
mailQueueView,
decoratorFactory);
@@ -137,14 +139,14 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
private final RabbitMQMailQueueManagement mqManagementApi;
private final PrivateFactory privateFactory;
private final RabbitMQMailQueueObjectPool mailQueueObjectPool;
- private final ReactorRabbitMQChannelPool reactorRabbitMQChannelPool;
+ private final Sender sender;
@VisibleForTesting
@Inject
- RabbitMQMailQueueFactory(ReactorRabbitMQChannelPool reactorRabbitMQChannelPool,
+ RabbitMQMailQueueFactory(Sender sender,
RabbitMQMailQueueManagement mqManagementApi,
PrivateFactory privateFactory) {
- this.reactorRabbitMQChannelPool = reactorRabbitMQChannelPool;
+ this.sender = sender;
this.mqManagementApi = mqManagementApi;
this.privateFactory = privateFactory;
this.mailQueueObjectPool = new RabbitMQMailQueueObjectPool();
@@ -172,16 +174,15 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
private RabbitMQMailQueue createQueueIntoRabbitServer(MailQueueName mailQueueName) {
String exchangeName = mailQueueName.toRabbitExchangeName().asString();
Flux.concat(
- reactorRabbitMQChannelPool.getSender().declareExchange(ExchangeSpecification.exchange(exchangeName)
+ sender.declareExchange(ExchangeSpecification.exchange(exchangeName)
.durable(true)
.type("direct")),
- reactorRabbitMQChannelPool.getSender().declareQueue(QueueSpecification.queue(mailQueueName.toWorkQueueName().asString())
+ sender.declareQueue(QueueSpecification.queue(mailQueueName.toWorkQueueName().asString())
.durable(DURABLE)
.exclusive(!EXCLUSIVE)
.autoDelete(!AUTO_DELETE)
.arguments(NO_ARGUMENTS)),
- reactorRabbitMQChannelPool.getSender()
- .bind(BindingSpecification.binding()
+ sender.bind(BindingSpecification.binding()
.exchange(mailQueueName.toRabbitExchangeName().asString())
.queue(mailQueueName.toWorkQueueName().asString())
.routingKey(EMPTY_ROUTING_KEY)))
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java
index e0b9910..35792e7 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java
@@ -118,14 +118,15 @@ class RabbitMQMailQueueConfigurationChangeTest {
RabbitMQMailQueueFactory.PrivateFactory privateFactory = new RabbitMQMailQueueFactory.PrivateFactory(
new RecordingMetricFactory(),
new NoopGaugeRegistry(),
- rabbitMQExtension.getRabbitChannelPool(),
+ rabbitMQExtension.getSender(),
+ rabbitMQExtension.getReceiverProvider(),
mimeMessageStoreFactory,
BLOB_ID_FACTORY,
mailQueueViewFactory,
clock,
new RawMailQueueItemDecoratorFactory(),
mailQueueSizeConfiguration);
- RabbitMQMailQueueFactory mailQueueFactory = new RabbitMQMailQueueFactory(rabbitMQExtension.getRabbitChannelPool(), mqManagementApi, privateFactory);
+ RabbitMQMailQueueFactory mailQueueFactory = new RabbitMQMailQueueFactory(rabbitMQExtension.getSender(), mqManagementApi, privateFactory);
return mailQueueFactory.createQueue(SPOOL);
}
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
index cfad2b0..57783d2 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
@@ -336,7 +336,7 @@ class RabbitMQMailQueueTest {
RabbitMQMailQueueFactory.PrivateFactory factory = new RabbitMQMailQueueFactory.PrivateFactory(
metricTestSystem.getMetricFactory(),
metricTestSystem.getSpyGaugeRegistry(),
- rabbitMQExtension.getRabbitChannelPool(),
+ rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(),
mimeMessageStoreFactory,
BLOB_ID_FACTORY,
mailQueueViewFactory,
@@ -344,7 +344,7 @@ class RabbitMQMailQueueTest {
new RawMailQueueItemDecoratorFactory(),
configuration);
mqManagementApi = new RabbitMQMailQueueManagement(rabbitMQExtension.managementAPI());
- mailQueueFactory = new RabbitMQMailQueueFactory(rabbitMQExtension.getRabbitChannelPool(), mqManagementApi, factory);
+ mailQueueFactory = new RabbitMQMailQueueFactory(rabbitMQExtension.getSender(), mqManagementApi, factory);
mailQueue = mailQueueFactory.createQueue(SPOOL);
}
}
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
index 16ab46d..a717476 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
@@ -69,7 +69,8 @@ class RabbitMqMailQueueFactoryTest implements MailQueueFactoryContract<RabbitMQM
RabbitMQMailQueueFactory.PrivateFactory privateFactory = new RabbitMQMailQueueFactory.PrivateFactory(
new RecordingMetricFactory(),
new NoopGaugeRegistry(),
- rabbitMQExtension.getRabbitChannelPool(),
+ rabbitMQExtension.getSender(),
+ rabbitMQExtension.getReceiverProvider(),
mimeMessageStoreFactory,
BLOB_ID_FACTORY,
mailQueueViewFactory,
@@ -77,7 +78,7 @@ class RabbitMqMailQueueFactoryTest implements MailQueueFactoryContract<RabbitMQM
new RawMailQueueItemDecoratorFactory(),
configuration);
mqManagementApi = new RabbitMQMailQueueManagement(rabbitMQExtension.managementAPI());
- mailQueueFactory = new RabbitMQMailQueueFactory(rabbitMQExtension.getRabbitChannelPool(), mqManagementApi, privateFactory);
+ mailQueueFactory = new RabbitMQMailQueueFactory(rabbitMQExtension.getSender(), mqManagementApi, privateFactory);
}
@AfterEach
diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
index a949af5..174101a 100644
--- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
+++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
@@ -28,7 +28,7 @@ import java.util.UUID;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
-import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
+import org.apache.james.backends.rabbitmq.ReceiverProvider;
import org.apache.james.eventsourcing.Event;
import org.apache.james.eventsourcing.eventstore.cassandra.JsonEventSerializer;
import org.apache.james.lifecycle.api.Startable;
@@ -54,31 +54,29 @@ import reactor.rabbitmq.Sender;
public class RabbitMQTerminationSubscriber implements TerminationSubscriber, Startable, Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQTerminationSubscriber.class);
- private static final Integer MAX_CHANNELS_NUMBER = 1;
private static final String EXCHANGE_NAME = "terminationSubscriberExchange";
private static final String QUEUE_NAME_PREFIX = "terminationSubscriber";
private static final String ROUTING_KEY = "terminationSubscriberRoutingKey";
private final JsonEventSerializer serializer;
- private final ReactorRabbitMQChannelPool channelPool;
+ private final Sender sender;
+ private final ReceiverProvider receiverProvider;
private final String queueName;
private UnicastProcessor<OutboundMessage> sendQueue;
private DirectProcessor<Event> listener;
private Disposable sendQueueHandle;
private Disposable listenQueueHandle;
private Receiver listenerReceiver;
- private Sender sender;
@Inject
- RabbitMQTerminationSubscriber(ReactorRabbitMQChannelPool channelPool, JsonEventSerializer serializer) {
+ RabbitMQTerminationSubscriber(Sender sender, ReceiverProvider receiverProvider, JsonEventSerializer serializer) {
+ this.sender = sender;
+ this.receiverProvider = receiverProvider;
this.serializer = serializer;
- this.channelPool = channelPool;
this.queueName = QUEUE_NAME_PREFIX + UUID.randomUUID().toString();
}
public void start() {
- sender = channelPool.getSender();
-
sender.declareExchange(ExchangeSpecification.exchange(EXCHANGE_NAME)).block();
sender.declare(QueueSpecification.queue(queueName).durable(false).autoDelete(true)).block();
sender.bind(BindingSpecification.binding(EXCHANGE_NAME, ROUTING_KEY, queueName)).block();
@@ -88,7 +86,7 @@ public class RabbitMQTerminationSubscriber implements TerminationSubscriber, Sta
.subscribeOn(Schedulers.elastic())
.subscribe();
- listenerReceiver = channelPool.createReceiver();
+ listenerReceiver = receiverProvider.createReceiver();
listener = DirectProcessor.create();
listenQueueHandle = listenerReceiver
.consumeAutoAck(queueName)
@@ -132,6 +130,5 @@ public class RabbitMQTerminationSubscriber implements TerminationSubscriber, Sta
Optional.ofNullable(sendQueueHandle).ifPresent(Disposable::dispose);
Optional.ofNullable(listenQueueHandle).ifPresent(Disposable::dispose);
Optional.ofNullable(listenerReceiver).ifPresent(Receiver::close);
- Optional.ofNullable(sender).ifPresent(Sender::close);
}
}
diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
index 93208fe..de68b24 100644
--- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
+++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
@@ -28,7 +28,7 @@ import java.util.Optional;
import java.util.UUID;
import org.apache.james.backends.rabbitmq.Constants;
-import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
+import org.apache.james.backends.rabbitmq.ReceiverProvider;
import org.apache.james.server.task.json.JsonTaskSerializer;
import org.apache.james.task.Task;
import org.apache.james.task.TaskId;
@@ -55,7 +55,6 @@ import reactor.rabbitmq.ExchangeSpecification;
import reactor.rabbitmq.OutboundMessage;
import reactor.rabbitmq.QueueSpecification;
import reactor.rabbitmq.Receiver;
-import reactor.rabbitmq.ReceiverOptions;
import reactor.rabbitmq.Sender;
public class RabbitMQWorkQueue implements WorkQueue {
@@ -75,19 +74,20 @@ public class RabbitMQWorkQueue implements WorkQueue {
public static final Duration FOREVER = Duration.ofMillis(Long.MAX_VALUE);
private final TaskManagerWorker worker;
- private final ReactorRabbitMQChannelPool channelPool;
private final JsonTaskSerializer taskSerializer;
+ private final Sender sender;
+ private final ReceiverProvider receiverProvider;
private Receiver receiver;
private UnicastProcessor<TaskId> sendCancelRequestsQueue;
private Disposable sendCancelRequestsQueueHandle;
private Disposable receiverHandle;
private Disposable cancelRequestListenerHandle;
- private Sender cancelRequestSender;
private Receiver cancelRequestListener;
- public RabbitMQWorkQueue(TaskManagerWorker worker, ReactorRabbitMQChannelPool reactorRabbitMQChannelPool, JsonTaskSerializer taskSerializer) {
+ public RabbitMQWorkQueue(TaskManagerWorker worker, Sender sender, ReceiverProvider receiverProvider, JsonTaskSerializer taskSerializer) {
this.worker = worker;
- this.channelPool = reactorRabbitMQChannelPool;
+ this.receiverProvider = receiverProvider;
+ this.sender = sender;
this.taskSerializer = taskSerializer;
}
@@ -104,13 +104,13 @@ public class RabbitMQWorkQueue implements WorkQueue {
@VisibleForTesting
void declareQueue() {
- Mono<AMQP.Exchange.DeclareOk> declareExchange = channelPool.getSender()
+ Mono<AMQP.Exchange.DeclareOk> declareExchange = sender
.declareExchange(ExchangeSpecification.exchange(EXCHANGE_NAME))
.retryBackoff(NUM_RETRIES, FIRST_BACKOFF, FOREVER);
- Mono<AMQP.Queue.DeclareOk> declareQueue = channelPool.getSender()
+ Mono<AMQP.Queue.DeclareOk> declareQueue = sender
.declare(QueueSpecification.queue(QUEUE_NAME).durable(true).arguments(Constants.WITH_SINGLE_ACTIVE_CONSUMER))
.retryBackoff(NUM_RETRIES, FIRST_BACKOFF, FOREVER);
- Mono<AMQP.Queue.BindOk> bindQueueToExchange = channelPool.getSender()
+ Mono<AMQP.Queue.BindOk> bindQueueToExchange = sender
.bind(BindingSpecification.binding(EXCHANGE_NAME, ROUTING_KEY, QUEUE_NAME))
.retryBackoff(NUM_RETRIES, FIRST_BACKOFF, FOREVER);
@@ -121,7 +121,7 @@ public class RabbitMQWorkQueue implements WorkQueue {
}
private void consumeWorkqueue() {
- receiver = new Receiver(new ReceiverOptions().connectionMono(channelPool.getConnectionMono()));
+ receiver = receiverProvider.createReceiver();
receiverHandle = receiver.consumeManualAck(QUEUE_NAME, new ConsumeOptions())
.subscribeOn(Schedulers.elastic())
.concatMap(this::executeTask)
@@ -158,24 +158,23 @@ public class RabbitMQWorkQueue implements WorkQueue {
.onErrorResume(error -> Mono.empty());
}
- void listenToCancelRequests() {
- cancelRequestSender = channelPool.getSender();
+ private void listenToCancelRequests() {
String queueName = CANCEL_REQUESTS_QUEUE_NAME_PREFIX + UUID.randomUUID().toString();
- cancelRequestSender.declareExchange(ExchangeSpecification.exchange(CANCEL_REQUESTS_EXCHANGE_NAME)).block();
- cancelRequestSender.declare(QueueSpecification.queue(queueName).durable(false).autoDelete(true)).block();
- cancelRequestSender.bind(BindingSpecification.binding(CANCEL_REQUESTS_EXCHANGE_NAME, CANCEL_REQUESTS_ROUTING_KEY, queueName)).block();
+ sender.declareExchange(ExchangeSpecification.exchange(CANCEL_REQUESTS_EXCHANGE_NAME)).block();
+ sender.declare(QueueSpecification.queue(queueName).durable(false).autoDelete(true)).block();
+ sender.bind(BindingSpecification.binding(CANCEL_REQUESTS_EXCHANGE_NAME, CANCEL_REQUESTS_ROUTING_KEY, queueName)).block();
registerCancelRequestsListener(queueName);
sendCancelRequestsQueue = UnicastProcessor.create();
- sendCancelRequestsQueueHandle = cancelRequestSender
+ sendCancelRequestsQueueHandle = sender
.send(sendCancelRequestsQueue.map(this::makeCancelRequestMessage))
.subscribeOn(Schedulers.elastic())
.subscribe();
}
private void registerCancelRequestsListener(String queueName) {
- cancelRequestListener = channelPool.createReceiver();
+ cancelRequestListener = receiverProvider.createReceiver();
cancelRequestListenerHandle = cancelRequestListener
.consumeAutoAck(queueName)
.subscribeOn(Schedulers.elastic())
@@ -207,7 +206,7 @@ public class RabbitMQWorkQueue implements WorkQueue {
.build();
OutboundMessage outboundMessage = new OutboundMessage(EXCHANGE_NAME, ROUTING_KEY, basicProperties, payload);
- channelPool.getSender().send(Mono.just(outboundMessage)).block();
+ sender.send(Mono.just(outboundMessage)).block();
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
diff --git a/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala b/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala
index 7feed07..7ddfc5e 100644
--- a/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala
+++ b/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala
@@ -1,4 +1,4 @@
-/** **************************************************************
+/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
@@ -6,30 +6,34 @@
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
- * ***************************************************************/
+ ****************************************************************/
package org.apache.james.task.eventsourcing.distributed
import java.time.Duration
-import com.google.common.annotations.VisibleForTesting
import javax.inject.Inject
-import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool
+
+import org.apache.james.backends.rabbitmq.ReceiverProvider
import org.apache.james.eventsourcing.EventSourcingSystem
import org.apache.james.server.task.json.JsonTaskSerializer
import org.apache.james.task.SerialTaskManagerWorker
import org.apache.james.task.eventsourcing.{WorkQueueSupplier, WorkerStatusListener}
-class RabbitMQWorkQueueSupplier @Inject()(private val rabbitMQConnectionPool: ReactorRabbitMQChannelPool,
- private val jsonTaskSerializer: JsonTaskSerializer) extends WorkQueueSupplier {
+import com.google.common.annotations.VisibleForTesting
+import reactor.rabbitmq.Sender
+
+class RabbitMQWorkQueueSupplier @Inject()(private val sender: Sender,
+ private val receiverProvider: ReceiverProvider,
+ private val jsonTaskSerializer: JsonTaskSerializer) extends WorkQueueSupplier {
val DEFAULT_ADDITIONAL_INFORMATION_POLLING_INTERVAL = Duration.ofSeconds(30)
override def apply(eventSourcingSystem: EventSourcingSystem): RabbitMQWorkQueue = {
@@ -40,7 +44,7 @@ class RabbitMQWorkQueueSupplier @Inject()(private val rabbitMQConnectionPool: Re
def apply(eventSourcingSystem: EventSourcingSystem, additionalInformationPollingInterval: Duration): RabbitMQWorkQueue = {
val listener = WorkerStatusListener(eventSourcingSystem)
val worker = new SerialTaskManagerWorker(listener, additionalInformationPollingInterval)
- val rabbitMQWorkQueue = new RabbitMQWorkQueue(worker, rabbitMQConnectionPool, jsonTaskSerializer)
+ val rabbitMQWorkQueue = new RabbitMQWorkQueue(worker, sender, receiverProvider, jsonTaskSerializer)
rabbitMQWorkQueue
}
}
diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
index 295d27c..0b1ab59 100644
--- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
+++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
@@ -34,7 +34,7 @@ import org.apache.james.backends.cassandra.components.CassandraModule;
import org.apache.james.backends.cassandra.init.CassandraZonedDateTimeModule;
import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
import org.apache.james.backends.rabbitmq.RabbitMQExtension;
-import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
+import org.apache.james.backends.rabbitmq.ReceiverProvider;
import org.apache.james.eventsourcing.Event;
import org.apache.james.eventsourcing.EventSourcingSystem;
import org.apache.james.eventsourcing.eventstore.EventStore;
@@ -80,6 +80,7 @@ import org.junit.jupiter.api.extension.RegisterExtension;
import com.google.common.collect.ImmutableBiMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
+import reactor.rabbitmq.Sender;
class DistributedTaskManagerTest implements TaskManagerContract {
@@ -87,9 +88,9 @@ class DistributedTaskManagerTest implements TaskManagerContract {
private final List<RabbitMQWorkQueue> workQueues;
private final RabbitMQWorkQueueSupplier supplier;
- TrackedRabbitMQWorkQueueSupplier(ReactorRabbitMQChannelPool rabbitConnectionPool, JsonTaskSerializer taskSerializer) {
+ TrackedRabbitMQWorkQueueSupplier(Sender sender, ReceiverProvider receiverProvider, JsonTaskSerializer taskSerializer) {
workQueues = new ArrayList<>();
- supplier = new RabbitMQWorkQueueSupplier(rabbitConnectionPool, taskSerializer);
+ supplier = new RabbitMQWorkQueueSupplier(sender, receiverProvider, taskSerializer);
}
@Override
@@ -114,7 +115,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
static final Hostname HOSTNAME_2 = new Hostname("bar");
@RegisterExtension
- static final RabbitMQExtension RABBIT_MQ_EXTENSION = RabbitMQExtension.singletonRabbitMQ();
+ static final RabbitMQExtension rabbitMQExtension = RabbitMQExtension.singletonRabbitMQ();
@RegisterExtension
@@ -165,7 +166,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
CassandraCluster cassandra = CASSANDRA_CLUSTER.getCassandraCluster();
CassandraTaskExecutionDetailsProjectionDAO projectionDAO = new CassandraTaskExecutionDetailsProjectionDAO(cassandra.getConf(), cassandra.getTypesProvider(), JSON_TASK_ADDITIONAL_INFORMATION_SERIALIZER);
this.executionDetailsProjection = new CassandraTaskExecutionDetailsProjection(projectionDAO);
- this.workQueueSupplier = new TrackedRabbitMQWorkQueueSupplier(RABBIT_MQ_EXTENSION.getRabbitChannelPool(), taskSerializer);
+ this.workQueueSupplier = new TrackedRabbitMQWorkQueueSupplier(rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), taskSerializer);
this.eventStore = eventStore;
this.terminationSubscribers = new ArrayList<>();
this.eventSerializer = JsonEventSerializer.forModules(eventDtoModule).withoutNestedType();
@@ -182,7 +183,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
}
EventSourcingTaskManager taskManager(Hostname hostname) {
- RabbitMQTerminationSubscriber terminationSubscriber = new RabbitMQTerminationSubscriber(RABBIT_MQ_EXTENSION.getRabbitChannelPool(), eventSerializer);
+ RabbitMQTerminationSubscriber terminationSubscriber = new RabbitMQTerminationSubscriber(rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), eventSerializer);
terminationSubscribers.add(terminationSubscriber);
terminationSubscriber.start();
return new EventSourcingTaskManager(workQueueSupplier, eventStore, executionDetailsProjection, hostname, terminationSubscriber);
diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java
index 3b7fa9b..77cfeb7 100644
--- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java
+++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java
@@ -52,7 +52,7 @@ class RabbitMQTerminationSubscriberTest implements TerminationSubscriberContract
@Override
public TerminationSubscriber subscriber() {
- RabbitMQTerminationSubscriber subscriber = new RabbitMQTerminationSubscriber(rabbitMQExtension.getRabbitChannelPool(), SERIALIZER);
+ RabbitMQTerminationSubscriber subscriber = new RabbitMQTerminationSubscriber(rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), SERIALIZER);
subscriber.start();
return subscriber;
}
diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueuePersistenceTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueuePersistenceTest.java
index 298b45d..da91395 100644
--- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueuePersistenceTest.java
+++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueuePersistenceTest.java
@@ -52,7 +52,7 @@ class RabbitMQWorkQueuePersistenceTest {
void setUp() {
worker = spy(new ImmediateWorker());
serializer = JsonTaskSerializer.of(TestTaskDTOModules.COMPLETED_TASK_MODULE, TestTaskDTOModules.MEMORY_REFERENCE_TASK_MODULE.apply(new MemoryReferenceTaskStore()));
- testee = new RabbitMQWorkQueue(worker, rabbitMQExtension.getRabbitChannelPool(), serializer);
+ testee = new RabbitMQWorkQueue(worker, rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), serializer);
//declare the queue but do not start consuming from it
testee.declareQueue();
}
@@ -91,7 +91,7 @@ class RabbitMQWorkQueuePersistenceTest {
private void startNewConsumingWorkqueue() {
worker = spy(new ImmediateWorker());
- testee = new RabbitMQWorkQueue(worker, rabbitMQExtension.getRabbitChannelPool(), serializer);
+ testee = new RabbitMQWorkQueue(worker, rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), serializer);
testee.start();
}
}
diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java
index bf151e6..4eb0323 100644
--- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java
+++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java
@@ -69,7 +69,7 @@ class RabbitMQWorkQueueTest {
void setUp() {
worker = spy(new ImmediateWorker());
serializer = JsonTaskSerializer.of(TestTaskDTOModules.COMPLETED_TASK_MODULE, TestTaskDTOModules.MEMORY_REFERENCE_TASK_MODULE.apply(new MemoryReferenceTaskStore()));
- testee = new RabbitMQWorkQueue(worker, rabbitMQExtension.getRabbitChannelPool(), serializer);
+ testee = new RabbitMQWorkQueue(worker, rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), serializer);
testee.start();
}
@@ -100,7 +100,7 @@ class RabbitMQWorkQueueTest {
testee.submit(TASK_WITH_ID);
ImmediateWorker otherTaskManagerWorker = new ImmediateWorker();
- try (RabbitMQWorkQueue otherWorkQueue = new RabbitMQWorkQueue(otherTaskManagerWorker, rabbitMQExtension.getRabbitChannelPool(), serializer)) {
+ try (RabbitMQWorkQueue otherWorkQueue = new RabbitMQWorkQueue(otherTaskManagerWorker, rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), serializer)) {
otherWorkQueue.start();
IntStream.range(0, 9)
@@ -119,7 +119,7 @@ class RabbitMQWorkQueueTest {
ImmediateWorker otherTaskManagerWorker = new ImmediateWorker();
JsonTaskSerializer otherTaskSerializer = JsonTaskSerializer.of(TestTaskDTOModules.TEST_TYPE);
- try (RabbitMQWorkQueue otherWorkQueue = new RabbitMQWorkQueue(otherTaskManagerWorker, rabbitMQExtension.getRabbitChannelPool(), otherTaskSerializer)) {
+ try (RabbitMQWorkQueue otherWorkQueue = new RabbitMQWorkQueue(otherTaskManagerWorker, rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), otherTaskSerializer)) {
//wait to be sur that the first workqueue has subscribed as an exclusive consumer of the RabbitMQ queue.
Thread.sleep(200);
otherWorkQueue.start();
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org