You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2021/02/04 05:02:43 UTC
[james-project] 17/18: JAMES-3498 Allow alternative namings for
RabbitMQ queues/exchanges names
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 65ca31380b2f84489192b4ef089a90d8ea28f6b8
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Mon Feb 1 15:25:43 2021 +0700
JAMES-3498 Allow alternative namings for RabbitMQ queues/exchanges names
```
~/Documents/james-project/event-bus$ grep -i mailbox * -R | grep -v target | grep -v *.iml | grep -i mailbox
api/src/main/java/org/apache/james/events/EventBus.java: return "mailbox-listener-" + listener.getClass().getSimpleName();
api/src/main/java/org/apache/james/events/Group.java:import org.apache.james.mailbox.events.GenericGroup;
api/src/main/java/org/apache/james/mailbox/events/GenericGroup.java:package org.apache.james.mailbox.events;
api/src/test/java/org/apache/james/events/GroupTest.java:import org.apache.james.mailbox.events.GenericGroup;
api/src/test/java/org/apache/james/events/GroupTest.java: assertThat(new GenericGroup("abc").asString()).isEqualTo("org.apache.james.mailbox.events.GenericGroup-abc");
api/src/test/java/org/apache/james/events/GroupTest.java: assertThat(Group.deserialize("org.apache.james.mailbox.events.GenericGroup-abc"))
api/src/test/java/org/apache/james/events/GroupTest.java: assertThat(Group.deserialize("org.apache.james.mailbox.events.GenericGroup-"))
api/src/test/java/org/apache/james/events/GroupTest.java: assertThatThrownBy(() -> Group.deserialize("org.apache.james.mailbox.events.GenericGroup"))
api/src/test/java/org/apache/james/events/GroupContract.java:import org.apache.james.mailbox.events.GenericGroup;
```
- Keep `api/src/main/java/org/apache/james/events/EventBus.java: return "mailbox-listener-" + listener.getClass().getSimpleName();` to preserve backward compatibility
- `GenericGroup` should stay in mailbox package to not alter its FQDN
---
.../org/apache/james/events/EventDispatcher.java | 20 ++++-----
.../apache/james/events/GroupConsumerRetry.java | 25 ++++-------
.../org/apache/james/events/GroupRegistration.java | 25 +++++------
.../james/events/GroupRegistrationHandler.java | 6 ++-
.../james/events/KeyReconnectionHandler.java | 7 ++--
.../james/events/KeyRegistrationHandler.java | 11 ++---
...RegistrationBinder.java => NamingStrategy.java} | 46 ++++++++++----------
.../org/apache/james/events/RabbitMQEventBus.java | 20 ++++-----
.../apache/james/events/RegistrationBinder.java | 8 ++--
.../org/apache/james/events/NetworkErrorTest.java | 2 +-
...RabbitMQEventBusDeadLetterQueueUpgradeTest.java | 5 ++-
.../apache/james/events/RabbitMQEventBusTest.java | 49 ++++++++++------------
.../rabbitmq/host/RabbitMQEventBusHostSystem.java | 3 +-
.../modules/event/RabbitMQEventBusModule.java | 2 +
14 files changed, 109 insertions(+), 120 deletions(-)
diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/EventDispatcher.java b/event-bus/distributed/src/main/java/org/apache/james/events/EventDispatcher.java
index 9a98aeb..ddfa875 100644
--- a/event-bus/distributed/src/main/java/org/apache/james/events/EventDispatcher.java
+++ b/event-bus/distributed/src/main/java/org/apache/james/events/EventDispatcher.java
@@ -27,9 +27,6 @@ import static org.apache.james.backends.rabbitmq.Constants.EMPTY_ROUTING_KEY;
import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE;
import static org.apache.james.backends.rabbitmq.Constants.NO_ARGUMENTS;
import static org.apache.james.events.RabbitMQEventBus.EVENT_BUS_ID;
-import static org.apache.james.events.RabbitMQEventBus.MAILBOX_EVENT_DEAD_LETTER_EXCHANGE_NAME;
-import static org.apache.james.events.RabbitMQEventBus.MAILBOX_EVENT_DEAD_LETTER_QUEUE;
-import static org.apache.james.events.RabbitMQEventBus.MAILBOX_EVENT_EXCHANGE_NAME;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
@@ -54,7 +51,6 @@ import reactor.core.publisher.MonoProcessor;
import reactor.rabbitmq.BindingSpecification;
import reactor.rabbitmq.ExchangeSpecification;
import reactor.rabbitmq.OutboundMessage;
-import reactor.rabbitmq.QueueSpecification;
import reactor.rabbitmq.Sender;
import reactor.util.function.Tuples;
@@ -65,6 +61,7 @@ public class EventDispatcher {
private static final Logger LOGGER = LoggerFactory.getLogger(EventDispatcher.class);
+ private final NamingStrategy namingStrategy;
private final EventSerializer eventSerializer;
private final Sender sender;
private final LocalListenerRegistry localListenerRegistry;
@@ -72,10 +69,11 @@ public class EventDispatcher {
private final ListenerExecutor listenerExecutor;
private final EventDeadLetters deadLetters;
- EventDispatcher(EventBusId eventBusId, EventSerializer eventSerializer, Sender sender,
+ EventDispatcher(NamingStrategy namingStrategy, EventBusId eventBusId, EventSerializer eventSerializer, Sender sender,
LocalListenerRegistry localListenerRegistry,
ListenerExecutor listenerExecutor,
EventDeadLetters deadLetters) {
+ this.namingStrategy = namingStrategy;
this.eventSerializer = eventSerializer;
this.sender = sender;
this.localListenerRegistry = localListenerRegistry;
@@ -91,20 +89,20 @@ public class EventDispatcher {
void start() {
Flux.concat(
- sender.declareExchange(ExchangeSpecification.exchange(MAILBOX_EVENT_EXCHANGE_NAME)
+ sender.declareExchange(ExchangeSpecification.exchange(namingStrategy.exchange())
.durable(DURABLE)
.type(DIRECT_EXCHANGE)),
- sender.declareExchange(ExchangeSpecification.exchange(MAILBOX_EVENT_DEAD_LETTER_EXCHANGE_NAME)
+ sender.declareExchange(ExchangeSpecification.exchange(namingStrategy.deadLetterExchange())
.durable(DURABLE)
.type(DIRECT_EXCHANGE)),
- sender.declareQueue(QueueSpecification.queue(MAILBOX_EVENT_DEAD_LETTER_QUEUE)
+ sender.declareQueue(namingStrategy.deadLetterQueue()
.durable(DURABLE)
.exclusive(!EXCLUSIVE)
.autoDelete(!AUTO_DELETE)
.arguments(NO_ARGUMENTS)),
sender.bind(BindingSpecification.binding()
- .exchange(MAILBOX_EVENT_DEAD_LETTER_EXCHANGE_NAME)
- .queue(MAILBOX_EVENT_DEAD_LETTER_QUEUE)
+ .exchange(namingStrategy.deadLetterExchange())
+ .queue(namingStrategy.deadLetterQueue().getName())
.routingKey(EMPTY_ROUTING_KEY)))
.then()
.block();
@@ -185,7 +183,7 @@ public class EventDispatcher {
private Flux<OutboundMessage> toMessages(byte[] serializedEvent, Collection<RoutingKey> routingKeys) {
return Flux.fromIterable(routingKeys)
- .map(routingKey -> new OutboundMessage(MAILBOX_EVENT_EXCHANGE_NAME, routingKey.asString(), basicProperties, serializedEvent));
+ .map(routingKey -> new OutboundMessage(namingStrategy.exchange(), routingKey.asString(), basicProperties, serializedEvent));
}
private byte[] serializeEvent(Event event) {
diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/GroupConsumerRetry.java b/event-bus/distributed/src/main/java/org/apache/james/events/GroupConsumerRetry.java
index 5c6fd2e..5982933 100644
--- a/event-bus/distributed/src/main/java/org/apache/james/events/GroupConsumerRetry.java
+++ b/event-bus/distributed/src/main/java/org/apache/james/events/GroupConsumerRetry.java
@@ -24,7 +24,6 @@ import static org.apache.james.backends.rabbitmq.Constants.DIRECT_EXCHANGE;
import static org.apache.james.backends.rabbitmq.Constants.DURABLE;
import static org.apache.james.backends.rabbitmq.Constants.EMPTY_ROUTING_KEY;
import static org.apache.james.events.GroupRegistration.RETRY_COUNT;
-import static org.apache.james.events.RabbitMQEventBus.MAILBOX_EVENT;
import java.nio.charset.StandardCharsets;
@@ -33,8 +32,6 @@ import org.apache.james.util.StructuredLogger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.rabbitmq.client.AMQP;
@@ -46,23 +43,17 @@ import reactor.rabbitmq.OutboundMessage;
import reactor.rabbitmq.Sender;
class GroupConsumerRetry {
-
static class RetryExchangeName {
- static RetryExchangeName of(Group group) {
- return new RetryExchangeName(group.asString());
- }
-
- static final String MAILBOX_EVENT_RETRY_EXCHANGE_PREFIX = MAILBOX_EVENT + "-retryExchange-";
-
- private final String name;
+ private final String prefix;
+ private final Group group;
- private RetryExchangeName(String name) {
- Preconditions.checkArgument(!Strings.isNullOrEmpty(name), "Exchange name must be specified");
- this.name = name;
+ RetryExchangeName(String prefix, Group group) {
+ this.prefix = prefix;
+ this.group = group;
}
String asString() {
- return MAILBOX_EVENT_RETRY_EXCHANGE_PREFIX + name;
+ return prefix + "-retryExchange-" + group.asString();
}
}
@@ -75,10 +66,10 @@ class GroupConsumerRetry {
private final Group group;
private final EventSerializer eventSerializer;
- GroupConsumerRetry(Sender sender, Group group, RetryBackoffConfiguration retryBackoff,
+ GroupConsumerRetry(NamingStrategy namingStrategy, Sender sender, Group group, RetryBackoffConfiguration retryBackoff,
EventDeadLetters eventDeadLetters, EventSerializer eventSerializer) {
this.sender = sender;
- this.retryExchangeName = RetryExchangeName.of(group);
+ this.retryExchangeName = namingStrategy.retryExchange(group);
this.retryBackoff = retryBackoff;
this.eventDeadLetters = eventDeadLetters;
this.group = group;
diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java
index 3b16688..d0013ff 100644
--- a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java
+++ b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java
@@ -25,7 +25,6 @@ import static org.apache.james.backends.rabbitmq.Constants.EMPTY_ROUTING_KEY;
import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE;
import static org.apache.james.backends.rabbitmq.Constants.REQUEUE;
import static org.apache.james.backends.rabbitmq.Constants.deadLetterQueue;
-import static org.apache.james.events.RabbitMQEventBus.MAILBOX_EVENT;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
@@ -53,21 +52,17 @@ import reactor.util.retry.Retry;
class GroupRegistration implements Registration {
static class WorkQueueName {
- static WorkQueueName of(Group group) {
- return new WorkQueueName(group);
- }
-
- static final String MAILBOX_EVENT_WORK_QUEUE_PREFIX = MAILBOX_EVENT + "-workQueue-";
-
+ private final String prefix;
private final Group group;
- private WorkQueueName(Group group) {
+ WorkQueueName(String prefix, Group group) {
+ this.prefix = prefix;
Preconditions.checkNotNull(group, "Group must be specified");
this.group = group;
}
String asString() {
- return MAILBOX_EVENT_WORK_QUEUE_PREFIX + group.asString();
+ return prefix + "-workQueue-" + group.asString();
}
}
@@ -75,6 +70,7 @@ class GroupRegistration implements Registration {
static final String RETRY_COUNT = "retry-count";
static final int DEFAULT_RETRY_COUNT = 0;
+ private final NamingStrategy namingStrategy;
private final ReactorRabbitMQChannelPool channelPool;
private final EventListener.ReactiveEventListener listener;
private final WorkQueueName queueName;
@@ -89,21 +85,22 @@ class GroupRegistration implements Registration {
private final ListenerExecutor listenerExecutor;
private Optional<Disposable> receiverSubscriber;
- GroupRegistration(ReactorRabbitMQChannelPool channelPool, Sender sender, ReceiverProvider receiverProvider, EventSerializer eventSerializer,
+ GroupRegistration(NamingStrategy namingStrategy, ReactorRabbitMQChannelPool channelPool, Sender sender, ReceiverProvider receiverProvider, EventSerializer eventSerializer,
EventListener.ReactiveEventListener listener, Group group, RetryBackoffConfiguration retryBackoff,
EventDeadLetters eventDeadLetters,
Runnable unregisterGroup, ListenerExecutor listenerExecutor) {
+ this.namingStrategy = namingStrategy;
this.channelPool = channelPool;
this.eventSerializer = eventSerializer;
this.listener = listener;
- this.queueName = WorkQueueName.of(group);
+ this.queueName = namingStrategy.workQueue(group);
this.sender = sender;
this.receiver = receiverProvider.createReceiver();
this.retryBackoff = retryBackoff;
this.listenerExecutor = listenerExecutor;
this.receiverSubscriber = Optional.empty();
this.unregisterGroup = unregisterGroup;
- this.retryHandler = new GroupConsumerRetry(sender, group, retryBackoff, eventDeadLetters, eventSerializer);
+ this.retryHandler = new GroupConsumerRetry(namingStrategy, sender, group, retryBackoff, eventDeadLetters, eventSerializer);
this.delayGenerator = WaitDelayGenerator.of(retryBackoff);
this.group = group;
}
@@ -124,9 +121,9 @@ class GroupRegistration implements Registration {
.durable(DURABLE)
.exclusive(!EXCLUSIVE)
.autoDelete(!AUTO_DELETE)
- .arguments(deadLetterQueue(RabbitMQEventBus.MAILBOX_EVENT_DEAD_LETTER_EXCHANGE_NAME)),
+ .arguments(deadLetterQueue(namingStrategy.deadLetterExchange())),
BindingSpecification.binding()
- .exchange(RabbitMQEventBus.MAILBOX_EVENT_EXCHANGE_NAME)
+ .exchange(namingStrategy.exchange())
.queue(queueName.asString())
.routingKey(EMPTY_ROUTING_KEY));
}
diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java
index f1ddb6c..cb224d4 100644
--- a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java
+++ b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java
@@ -29,6 +29,7 @@ import org.apache.james.backends.rabbitmq.ReceiverProvider;
import reactor.rabbitmq.Sender;
class GroupRegistrationHandler {
+ private final NamingStrategy namingStrategy;
private final Map<Group, GroupRegistration> groupRegistrations;
private final EventSerializer eventSerializer;
private final ReactorRabbitMQChannelPool channelPool;
@@ -38,9 +39,10 @@ class GroupRegistrationHandler {
private final EventDeadLetters eventDeadLetters;
private final ListenerExecutor listenerExecutor;
- GroupRegistrationHandler(EventSerializer eventSerializer, ReactorRabbitMQChannelPool channelPool, Sender sender, ReceiverProvider receiverProvider,
+ GroupRegistrationHandler(NamingStrategy namingStrategy, EventSerializer eventSerializer, ReactorRabbitMQChannelPool channelPool, Sender sender, ReceiverProvider receiverProvider,
RetryBackoffConfiguration retryBackoff,
EventDeadLetters eventDeadLetters, ListenerExecutor listenerExecutor) {
+ this.namingStrategy = namingStrategy;
this.eventSerializer = eventSerializer;
this.channelPool = channelPool;
this.sender = sender;
@@ -73,7 +75,7 @@ class GroupRegistrationHandler {
private GroupRegistration newGroupRegistration(EventListener.ReactiveEventListener listener, Group group) {
return new GroupRegistration(
- channelPool, sender,
+ namingStrategy, channelPool, sender,
receiverProvider,
eventSerializer,
listener,
diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/KeyReconnectionHandler.java b/event-bus/distributed/src/main/java/org/apache/james/events/KeyReconnectionHandler.java
index 8b15173..41b3bba 100644
--- a/event-bus/distributed/src/main/java/org/apache/james/events/KeyReconnectionHandler.java
+++ b/event-bus/distributed/src/main/java/org/apache/james/events/KeyReconnectionHandler.java
@@ -22,7 +22,6 @@ package org.apache.james.events;
import static org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE;
import static org.apache.james.backends.rabbitmq.Constants.DURABLE;
import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE;
-import static org.apache.james.events.KeyRegistrationHandler.EVENTBUS_QUEUE_NAME_PREFIX;
import static org.apache.james.events.KeyRegistrationHandler.QUEUE_ARGUMENTS;
import javax.inject.Inject;
@@ -40,10 +39,12 @@ import reactor.core.publisher.Mono;
public class KeyReconnectionHandler implements SimpleConnectionPool.ReconnectionHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(KeyReconnectionHandler.class);
+ private final NamingStrategy namingStrategy;
private final EventBusId eventBusId;
@Inject
- public KeyReconnectionHandler(EventBusId eventBusId) {
+ public KeyReconnectionHandler(NamingStrategy namingStrategy, EventBusId eventBusId) {
+ this.namingStrategy = namingStrategy;
this.eventBusId = eventBusId;
}
@@ -51,7 +52,7 @@ public class KeyReconnectionHandler implements SimpleConnectionPool.Reconnection
public Publisher<Void> handleReconnection(Connection connection) {
return Mono.fromRunnable(() -> {
try (Channel channel = connection.createChannel()) {
- channel.queueDeclare(EVENTBUS_QUEUE_NAME_PREFIX + eventBusId.asString(), DURABLE, !EXCLUSIVE, AUTO_DELETE, QUEUE_ARGUMENTS);
+ channel.queueDeclare(namingStrategy.queueName(eventBusId).asString(), DURABLE, !EXCLUSIVE, AUTO_DELETE, QUEUE_ARGUMENTS);
} catch (Exception e) {
LOGGER.error("Error recovering connection", e);
}
diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/KeyRegistrationHandler.java b/event-bus/distributed/src/main/java/org/apache/james/events/KeyRegistrationHandler.java
index 1963d40..a6088be 100644
--- a/event-bus/distributed/src/main/java/org/apache/james/events/KeyRegistrationHandler.java
+++ b/event-bus/distributed/src/main/java/org/apache/james/events/KeyRegistrationHandler.java
@@ -53,12 +53,12 @@ import reactor.util.retry.Retry;
class KeyRegistrationHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(KeyRegistrationHandler.class);
- static final String EVENTBUS_QUEUE_NAME_PREFIX = "eventbus-";
private static final Duration EXPIRATION_TIMEOUT = Duration.ofMinutes(30);
static final Map<String, Object> QUEUE_ARGUMENTS = ImmutableMap.of("x-expires", EXPIRATION_TIMEOUT.toMillis());
private static final Duration TOPOLOGY_CHANGES_TIMEOUT = Duration.ofMinutes(1);
+ private final NamingStrategy namingStrategy;
private final EventBusId eventBusId;
private final LocalListenerRegistry localListenerRegistry;
private final EventSerializer eventSerializer;
@@ -71,10 +71,11 @@ class KeyRegistrationHandler {
private final RetryBackoffConfiguration retryBackoff;
private Optional<Disposable> receiverSubscriber;
- KeyRegistrationHandler(EventBusId eventBusId, EventSerializer eventSerializer,
+ KeyRegistrationHandler(NamingStrategy namingStrategy, EventBusId eventBusId, EventSerializer eventSerializer,
Sender sender, ReceiverProvider receiverProvider,
RoutingKeyConverter routingKeyConverter, LocalListenerRegistry localListenerRegistry,
ListenerExecutor listenerExecutor, RetryBackoffConfiguration retryBackoff) {
+ this.namingStrategy = namingStrategy;
this.eventBusId = eventBusId;
this.eventSerializer = eventSerializer;
this.sender = sender;
@@ -83,8 +84,8 @@ class KeyRegistrationHandler {
this.receiver = receiverProvider.createReceiver();
this.listenerExecutor = listenerExecutor;
this.retryBackoff = retryBackoff;
- this.registrationQueue = new RegistrationQueueName(EVENTBUS_QUEUE_NAME_PREFIX + eventBusId.asString());
- this.registrationBinder = new RegistrationBinder(sender, registrationQueue);
+ this.registrationQueue = namingStrategy.queueName(eventBusId);
+ this.registrationBinder = new RegistrationBinder(namingStrategy, sender, registrationQueue);
this.receiverSubscriber = Optional.empty();
}
@@ -105,7 +106,7 @@ class KeyRegistrationHandler {
private void declareQueue(Sender sender) {
sender.declareQueue(
- QueueSpecification.queue(EVENTBUS_QUEUE_NAME_PREFIX + eventBusId.asString())
+ QueueSpecification.queue(registrationQueue.asString())
.durable(DURABLE)
.exclusive(!EXCLUSIVE)
.autoDelete(AUTO_DELETE)
diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/RegistrationBinder.java b/event-bus/distributed/src/main/java/org/apache/james/events/NamingStrategy.java
similarity index 53%
copy from event-bus/distributed/src/main/java/org/apache/james/events/RegistrationBinder.java
copy to event-bus/distributed/src/main/java/org/apache/james/events/NamingStrategy.java
index 45fbd49..80dc2d2 100644
--- a/event-bus/distributed/src/main/java/org/apache/james/events/RegistrationBinder.java
+++ b/event-bus/distributed/src/main/java/org/apache/james/events/NamingStrategy.java
@@ -19,36 +19,36 @@
package org.apache.james.events;
-import static org.apache.james.events.RabbitMQEventBus.MAILBOX_EVENT_EXCHANGE_NAME;
+import reactor.rabbitmq.QueueSpecification;
-import reactor.core.publisher.Mono;
-import reactor.rabbitmq.BindingSpecification;
-import reactor.rabbitmq.Sender;
+public class NamingStrategy {
+ private final String prefix;
-class RegistrationBinder {
- private final Sender sender;
- private final RegistrationQueueName registrationQueue;
+ public NamingStrategy(String prefix) {
+ this.prefix = prefix;
+ }
+
+ public RegistrationQueueName queueName(EventBusId eventBusId) {
+ return new RegistrationQueueName(prefix + "-eventbus-" + eventBusId.asString());
+ }
+
+ public QueueSpecification deadLetterQueue() {
+ return QueueSpecification.queue(prefix + "-dead-letter-queue");
+ }
- RegistrationBinder(Sender sender, RegistrationQueueName registrationQueue) {
- this.sender = sender;
- this.registrationQueue = registrationQueue;
+ public String exchange() {
+ return prefix + "-exchange";
}
- Mono<Void> bind(RegistrationKey key) {
- return sender.bind(bindingSpecification(key))
- .then();
+ public String deadLetterExchange() {
+ return prefix + "-dead-letter-exchange";
}
- Mono<Void> unbind(RegistrationKey key) {
- return sender.unbind(bindingSpecification(key))
- .then();
+ public GroupConsumerRetry.RetryExchangeName retryExchange(Group group) {
+ return new GroupConsumerRetry.RetryExchangeName(prefix, group);
}
- private BindingSpecification bindingSpecification(RegistrationKey key) {
- RoutingKeyConverter.RoutingKey routingKey = RoutingKeyConverter.RoutingKey.of(key);
- return BindingSpecification.binding()
- .exchange(MAILBOX_EVENT_EXCHANGE_NAME)
- .queue(registrationQueue.asString())
- .routingKey(routingKey.asString());
+ public GroupRegistration.WorkQueueName workQueue(Group group) {
+ return new GroupRegistration.WorkQueueName(prefix, group);
}
-}
\ No newline at end of file
+}
diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java
index 78aeae7..ad50412 100644
--- a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java
+++ b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java
@@ -39,12 +39,9 @@ import reactor.rabbitmq.Sender;
public class RabbitMQEventBus implements EventBus, Startable {
private static final Set<RegistrationKey> NO_KEY = ImmutableSet.of();
private static final String NOT_RUNNING_ERROR_MESSAGE = "Event Bus is not running";
- static final String MAILBOX_EVENT = "mailboxEvent";
- static final String MAILBOX_EVENT_DEAD_LETTER_QUEUE = MAILBOX_EVENT + "-dead-letter-queue";
- static final String MAILBOX_EVENT_EXCHANGE_NAME = MAILBOX_EVENT + "-exchange";
- static final String MAILBOX_EVENT_DEAD_LETTER_EXCHANGE_NAME = MAILBOX_EVENT + "-dead-letter-exchange";
static final String EVENT_BUS_ID = "eventBusId";
+ private final NamingStrategy namingStrategy;
private final EventSerializer eventSerializer;
private final RoutingKeyConverter routingKeyConverter;
private final RetryBackoffConfiguration retryBackoff;
@@ -62,11 +59,12 @@ public class RabbitMQEventBus implements EventBus, Startable {
private EventDispatcher eventDispatcher;
@Inject
- public RabbitMQEventBus(Sender sender, ReceiverProvider receiverProvider, EventSerializer eventSerializer,
+ public RabbitMQEventBus(NamingStrategy namingStrategy, Sender sender, ReceiverProvider receiverProvider, EventSerializer eventSerializer,
RetryBackoffConfiguration retryBackoff,
RoutingKeyConverter routingKeyConverter,
EventDeadLetters eventDeadLetters, MetricFactory metricFactory, ReactorRabbitMQChannelPool channelPool,
EventBusId eventBusId) {
+ this.namingStrategy = namingStrategy;
this.sender = sender;
this.receiverProvider = receiverProvider;
this.listenerExecutor = new ListenerExecutor(metricFactory);
@@ -84,9 +82,9 @@ public class RabbitMQEventBus implements EventBus, Startable {
if (!isRunning && !isStopping) {
LocalListenerRegistry localListenerRegistry = new LocalListenerRegistry();
- keyRegistrationHandler = new KeyRegistrationHandler(eventBusId, eventSerializer, sender, receiverProvider, routingKeyConverter, localListenerRegistry, listenerExecutor, retryBackoff);
- groupRegistrationHandler = new GroupRegistrationHandler(eventSerializer, channelPool, sender, receiverProvider, retryBackoff, eventDeadLetters, listenerExecutor);
- eventDispatcher = new EventDispatcher(eventBusId, eventSerializer, sender, localListenerRegistry, listenerExecutor, eventDeadLetters);
+ keyRegistrationHandler = new KeyRegistrationHandler(namingStrategy, eventBusId, eventSerializer, sender, receiverProvider, routingKeyConverter, localListenerRegistry, listenerExecutor, retryBackoff);
+ groupRegistrationHandler = new GroupRegistrationHandler(namingStrategy, eventSerializer, channelPool, sender, receiverProvider, retryBackoff, eventDeadLetters, listenerExecutor);
+ eventDispatcher = new EventDispatcher(namingStrategy, eventBusId, eventSerializer, sender, localListenerRegistry, listenerExecutor, eventDeadLetters);
eventDispatcher.start();
keyRegistrationHandler.start();
@@ -99,9 +97,9 @@ public class RabbitMQEventBus implements EventBus, Startable {
if (!isRunning && !isStopping) {
LocalListenerRegistry localListenerRegistry = new LocalListenerRegistry();
- keyRegistrationHandler = new KeyRegistrationHandler(eventBusId, eventSerializer, sender, receiverProvider, routingKeyConverter, localListenerRegistry, listenerExecutor, retryBackoff);
- groupRegistrationHandler = new GroupRegistrationHandler(eventSerializer, channelPool, sender, receiverProvider, retryBackoff, eventDeadLetters, listenerExecutor);
- eventDispatcher = new EventDispatcher(eventBusId, eventSerializer, sender, localListenerRegistry, listenerExecutor, eventDeadLetters);
+ keyRegistrationHandler = new KeyRegistrationHandler(namingStrategy, eventBusId, eventSerializer, sender, receiverProvider, routingKeyConverter, localListenerRegistry, listenerExecutor, retryBackoff);
+ groupRegistrationHandler = new GroupRegistrationHandler(namingStrategy, eventSerializer, channelPool, sender, receiverProvider, retryBackoff, eventDeadLetters, listenerExecutor);
+ eventDispatcher = new EventDispatcher(namingStrategy, eventBusId, eventSerializer, sender, localListenerRegistry, listenerExecutor, eventDeadLetters);
keyRegistrationHandler.declareQueue();
diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/RegistrationBinder.java b/event-bus/distributed/src/main/java/org/apache/james/events/RegistrationBinder.java
index 45fbd49..acd3475 100644
--- a/event-bus/distributed/src/main/java/org/apache/james/events/RegistrationBinder.java
+++ b/event-bus/distributed/src/main/java/org/apache/james/events/RegistrationBinder.java
@@ -19,17 +19,17 @@
package org.apache.james.events;
-import static org.apache.james.events.RabbitMQEventBus.MAILBOX_EVENT_EXCHANGE_NAME;
-
import reactor.core.publisher.Mono;
import reactor.rabbitmq.BindingSpecification;
import reactor.rabbitmq.Sender;
class RegistrationBinder {
+ private final NamingStrategy namingStrategy;
private final Sender sender;
private final RegistrationQueueName registrationQueue;
- RegistrationBinder(Sender sender, RegistrationQueueName registrationQueue) {
+ RegistrationBinder(NamingStrategy namingStrategy, Sender sender, RegistrationQueueName registrationQueue) {
+ this.namingStrategy = namingStrategy;
this.sender = sender;
this.registrationQueue = registrationQueue;
}
@@ -47,7 +47,7 @@ class RegistrationBinder {
private BindingSpecification bindingSpecification(RegistrationKey key) {
RoutingKeyConverter.RoutingKey routingKey = RoutingKeyConverter.RoutingKey.of(key);
return BindingSpecification.binding()
- .exchange(MAILBOX_EVENT_EXCHANGE_NAME)
+ .exchange(namingStrategy.exchange())
.queue(registrationQueue.asString())
.routingKey(routingKey.asString());
}
diff --git a/event-bus/distributed/src/test/java/org/apache/james/events/NetworkErrorTest.java b/event-bus/distributed/src/test/java/org/apache/james/events/NetworkErrorTest.java
index 56d903c..fbcc13e 100644
--- a/event-bus/distributed/src/test/java/org/apache/james/events/NetworkErrorTest.java
+++ b/event-bus/distributed/src/test/java/org/apache/james/events/NetworkErrorTest.java
@@ -51,7 +51,7 @@ class NetworkErrorTest {
EventSerializer eventSerializer = new TestEventSerializer();
RoutingKeyConverter routingKeyConverter = RoutingKeyConverter.forFactories(new EventBusTestFixture.TestRegistrationKeyFactory());
- eventBus = new RabbitMQEventBus(rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(),
+ eventBus = new RabbitMQEventBus(new NamingStrategy("test"), rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(),
eventSerializer, RETRY_BACKOFF_CONFIGURATION, routingKeyConverter,
memoryEventDeadLetters, new RecordingMetricFactory(), rabbitMQExtension.getRabbitChannelPool(),
EventBusId.random());
diff --git a/event-bus/distributed/src/test/java/org/apache/james/events/RabbitMQEventBusDeadLetterQueueUpgradeTest.java b/event-bus/distributed/src/test/java/org/apache/james/events/RabbitMQEventBusDeadLetterQueueUpgradeTest.java
index 137e59c..ea73049 100644
--- a/event-bus/distributed/src/test/java/org/apache/james/events/RabbitMQEventBusDeadLetterQueueUpgradeTest.java
+++ b/event-bus/distributed/src/test/java/org/apache/james/events/RabbitMQEventBusDeadLetterQueueUpgradeTest.java
@@ -40,7 +40,8 @@ import reactor.rabbitmq.QueueSpecification;
class RabbitMQEventBusDeadLetterQueueUpgradeTest {
private static final GroupA REGISTERED_GROUP = new GroupA();
- private static final WorkQueueName WORK_QUEUE_NAME = WorkQueueName.of(REGISTERED_GROUP);
+ public static final NamingStrategy NAMING_STRATEGY = new NamingStrategy("test");
+ private static final WorkQueueName WORK_QUEUE_NAME = NAMING_STRATEGY.workQueue(REGISTERED_GROUP);
@RegisterExtension
static RabbitMQExtension rabbitMQExtension = RabbitMQExtension.singletonRabbitMQ()
@@ -55,7 +56,7 @@ class RabbitMQEventBusDeadLetterQueueUpgradeTest {
EventSerializer eventSerializer = new TestEventSerializer();
RoutingKeyConverter routingKeyConverter = RoutingKeyConverter.forFactories(new TestRegistrationKeyFactory());
- eventBus = new RabbitMQEventBus(rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(),
+ eventBus = new RabbitMQEventBus(NAMING_STRATEGY, rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(),
eventSerializer, RETRY_BACKOFF_CONFIGURATION, routingKeyConverter,
memoryEventDeadLetters, new RecordingMetricFactory(), rabbitMQExtension.getRabbitChannelPool(),
EventBusId.random());
diff --git a/event-bus/distributed/src/test/java/org/apache/james/events/RabbitMQEventBusTest.java b/event-bus/distributed/src/test/java/org/apache/james/events/RabbitMQEventBusTest.java
index b191547..efc3db4 100644
--- a/event-bus/distributed/src/test/java/org/apache/james/events/RabbitMQEventBusTest.java
+++ b/event-bus/distributed/src/test/java/org/apache/james/events/RabbitMQEventBusTest.java
@@ -35,10 +35,6 @@ import static org.apache.james.events.EventBusTestFixture.KEY_1;
import static org.apache.james.events.EventBusTestFixture.NO_KEYS;
import static org.apache.james.events.EventBusTestFixture.newAsyncListener;
import static org.apache.james.events.EventBusTestFixture.newListener;
-import static org.apache.james.events.GroupRegistration.WorkQueueName.MAILBOX_EVENT_WORK_QUEUE_PREFIX;
-import static org.apache.james.events.RabbitMQEventBus.MAILBOX_EVENT;
-import static org.apache.james.events.RabbitMQEventBus.MAILBOX_EVENT_DEAD_LETTER_QUEUE;
-import static org.apache.james.events.RabbitMQEventBus.MAILBOX_EVENT_EXCHANGE_NAME;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -95,6 +91,7 @@ import reactor.rabbitmq.Sender;
class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract, GroupContract.MultipleEventBusGroupContract,
KeyContract.SingleEventBusKeyContract, KeyContract.MultipleEventBusKeyContract,
ErrorHandlingContract {
+ static NamingStrategy TEST_NAMING_STRATEGY = new NamingStrategy("test");
@RegisterExtension
static RabbitMQExtension rabbitMQExtension = RabbitMQExtension.singletonRabbitMQ()
@@ -138,13 +135,13 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
eventBus3.stop();
eventBusWithKeyHandlerNotStarted.stop();
ALL_GROUPS.stream()
- .map(GroupRegistration.WorkQueueName::of)
+ .map(TEST_NAMING_STRATEGY::workQueue)
.forEach(queueName -> rabbitMQExtension.getSender().delete(QueueSpecification.queue(queueName.asString())).block());
rabbitMQExtension.getSender()
- .delete(ExchangeSpecification.exchange(MAILBOX_EVENT_EXCHANGE_NAME))
+ .delete(ExchangeSpecification.exchange(TEST_NAMING_STRATEGY.exchange()))
.block();
rabbitMQExtension.getSender()
- .delete(QueueSpecification.queue().name(MAILBOX_EVENT_DEAD_LETTER_QUEUE))
+ .delete(TEST_NAMING_STRATEGY.deadLetterQueue())
.block();
}
@@ -153,7 +150,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
}
private RabbitMQEventBus newEventBus(Sender sender, ReceiverProvider receiverProvider) {
- return new RabbitMQEventBus(sender, receiverProvider, eventSerializer,
+ return new RabbitMQEventBus(TEST_NAMING_STRATEGY, sender, receiverProvider, eventSerializer,
EventBusTestFixture.RETRY_BACKOFF_CONFIGURATION, routingKeyConverter,
memoryEventDeadLetters, new RecordingMetricFactory(),
rabbitMQExtension.getRabbitChannelPool(), EventBusId.random());
@@ -189,7 +186,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
String emptyRoutingKey = "";
rabbitMQExtension.getSender()
- .send(Mono.just(new OutboundMessage(MAILBOX_EVENT_EXCHANGE_NAME,
+ .send(Mono.just(new OutboundMessage(TEST_NAMING_STRATEGY.exchange(),
emptyRoutingKey,
"BAD_PAYLOAD!".getBytes(StandardCharsets.UTF_8))))
.block();
@@ -208,7 +205,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
String emptyRoutingKey = "";
IntStream.range(0, 10).forEach(i -> rabbitMQExtension.getSender()
- .send(Mono.just(new OutboundMessage(MAILBOX_EVENT_EXCHANGE_NAME,
+ .send(Mono.just(new OutboundMessage(TEST_NAMING_STRATEGY.exchange(),
emptyRoutingKey,
"BAD_PAYLOAD!".getBytes(StandardCharsets.UTF_8))))
.block());
@@ -227,7 +224,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
String emptyRoutingKey = "";
rabbitMQExtension.getSender()
- .send(Mono.just(new OutboundMessage(MAILBOX_EVENT_EXCHANGE_NAME,
+ .send(Mono.just(new OutboundMessage(TEST_NAMING_STRATEGY.exchange(),
emptyRoutingKey,
"BAD_PAYLOAD!".getBytes(StandardCharsets.UTF_8))))
.block();
@@ -235,7 +232,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
AtomicInteger deadLetteredCount = new AtomicInteger();
rabbitMQExtension.getRabbitChannelPool()
.createReceiver()
- .consumeAutoAck(MAILBOX_EVENT_DEAD_LETTER_QUEUE)
+ .consumeAutoAck(TEST_NAMING_STRATEGY.deadLetterQueue().getName())
.doOnNext(next -> deadLetteredCount.incrementAndGet())
.subscribeOn(Schedulers.elastic())
.subscribe();
@@ -250,7 +247,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
Mono.from(eventBus.register(listener, KEY_1)).block();
rabbitMQExtension.getSender()
- .send(Mono.just(new OutboundMessage(MAILBOX_EVENT_EXCHANGE_NAME,
+ .send(Mono.just(new OutboundMessage(TEST_NAMING_STRATEGY.exchange(),
RoutingKey.of(KEY_1).asString(),
"BAD_PAYLOAD!".getBytes(StandardCharsets.UTF_8))))
.block();
@@ -267,7 +264,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
IntStream.range(0, 100)
.forEach(i -> rabbitMQExtension.getSender()
- .send(Mono.just(new OutboundMessage(MAILBOX_EVENT_EXCHANGE_NAME,
+ .send(Mono.just(new OutboundMessage(TEST_NAMING_STRATEGY.exchange(),
RoutingKey.of(KEY_1).asString(),
"BAD_PAYLOAD!".getBytes(StandardCharsets.UTF_8))))
.block());
@@ -289,7 +286,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
GroupA registeredGroup = new GroupA();
eventBus.register(listener, registeredGroup);
- GroupConsumerRetry.RetryExchangeName retryExchangeName = GroupConsumerRetry.RetryExchangeName.of(registeredGroup);
+ GroupConsumerRetry.RetryExchangeName retryExchangeName = TEST_NAMING_STRATEGY.retryExchange(registeredGroup);
assertThat(rabbitMQExtension.managementAPI().listExchanges())
.anyMatch(exchange -> exchange.getName().equals(retryExchangeName.asString()));
}
@@ -378,21 +375,21 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
@Nested
class PublishingTest {
- private static final String MAILBOX_WORK_QUEUE_NAME = MAILBOX_EVENT + "-workQueue";
+ private static final String WORK_QUEUE_NAME = "test-workQueue";
@BeforeEach
void setUp() {
Sender sender = rabbitMQExtension.getSender();
- sender.declareQueue(QueueSpecification.queue(MAILBOX_WORK_QUEUE_NAME)
+ sender.declareQueue(QueueSpecification.queue(WORK_QUEUE_NAME)
.durable(DURABLE)
.exclusive(!EXCLUSIVE)
.autoDelete(!AUTO_DELETE)
.arguments(NO_ARGUMENTS))
.block();
sender.bind(BindingSpecification.binding()
- .exchange(MAILBOX_EVENT_EXCHANGE_NAME)
- .queue(MAILBOX_WORK_QUEUE_NAME)
+ .exchange(TEST_NAMING_STRATEGY.exchange())
+ .queue(WORK_QUEUE_NAME)
.routingKey(EMPTY_ROUTING_KEY))
.block();
}
@@ -413,7 +410,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
private Event dequeueEvent() {
try (Receiver receiver = rabbitMQExtension.getReceiverProvider().createReceiver()) {
- byte[] eventInBytes = receiver.consumeAutoAck(MAILBOX_WORK_QUEUE_NAME)
+ byte[] eventInBytes = receiver.consumeAutoAck(WORK_QUEUE_NAME)
.blockFirst()
.getBody();
@@ -480,7 +477,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
void startShouldCreateEventExchange() {
eventBus.start();
assertThat(rabbitManagementAPI.listExchanges())
- .filteredOn(exchange -> exchange.getName().equals(MAILBOX_EVENT_EXCHANGE_NAME))
+ .filteredOn(exchange -> exchange.getName().equals(TEST_NAMING_STRATEGY.exchange()))
.hasOnlyOneElementSatisfying(exchange -> {
assertThat(exchange.isDurable()).isTrue();
assertThat(exchange.getType()).isEqualTo(DIRECT_EXCHANGE);
@@ -659,7 +656,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
eventBus.stop();
assertThat(rabbitManagementAPI.listExchanges())
- .anySatisfy(exchange -> assertThat(exchange.getName()).isEqualTo(MAILBOX_EVENT_EXCHANGE_NAME));
+ .anySatisfy(exchange -> assertThat(exchange.getName()).isEqualTo(TEST_NAMING_STRATEGY.exchange()));
}
@Test
@@ -719,7 +716,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
@Test
void multipleEventBusStartShouldCreateOnlyOneEventExchange() {
assertThat(rabbitManagementAPI.listExchanges())
- .filteredOn(exchange -> exchange.getName().equals(MAILBOX_EVENT_EXCHANGE_NAME))
+ .filteredOn(exchange -> exchange.getName().equals(TEST_NAMING_STRATEGY.exchange()))
.hasSize(1);
}
@@ -752,7 +749,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
eventBusWithKeyHandlerNotStarted.stop();
assertThat(rabbitManagementAPI.listExchanges())
- .anySatisfy(exchange -> assertThat(exchange.getName()).isEqualTo(MAILBOX_EVENT_EXCHANGE_NAME));
+ .anySatisfy(exchange -> assertThat(exchange.getName()).isEqualTo(TEST_NAMING_STRATEGY.exchange()));
}
@Test
@@ -776,8 +773,8 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
eventBusWithKeyHandlerNotStarted.stop();
assertThat(rabbitManagementAPI.listQueues())
- .filteredOn(queue -> !queue.getName().startsWith(MAILBOX_EVENT_WORK_QUEUE_PREFIX)
- && !queue.getName().startsWith(MAILBOX_EVENT_DEAD_LETTER_QUEUE))
+ .filteredOn(queue -> !queue.getName().startsWith("test-")
+ && !queue.getName().startsWith("test-dead-letter-queue"))
.isEmpty();
}
diff --git a/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java b/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java
index d6a451d..831183e 100644
--- a/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java
+++ b/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java
@@ -33,6 +33,7 @@ import org.apache.james.core.quota.QuotaSizeLimit;
import org.apache.james.event.json.MailboxEventSerializer;
import org.apache.james.events.EventBusId;
import org.apache.james.events.MemoryEventDeadLetters;
+import org.apache.james.events.NamingStrategy;
import org.apache.james.events.RabbitMQEventBus;
import org.apache.james.events.RetryBackoffConfiguration;
import org.apache.james.events.RoutingKeyConverter;
@@ -117,7 +118,7 @@ public class RabbitMQEventBusHostSystem extends JamesImapHostSystem {
InMemoryId.Factory mailboxIdFactory = new InMemoryId.Factory();
MailboxEventSerializer eventSerializer = new MailboxEventSerializer(mailboxIdFactory, messageIdFactory, new DefaultUserQuotaRootResolver.DefaultQuotaRootDeserializer());
RoutingKeyConverter routingKeyConverter = new RoutingKeyConverter(ImmutableSet.of(new MailboxIdRegistrationKey.Factory(mailboxIdFactory)));
- return new RabbitMQEventBus(reactorRabbitMQChannelPool.getSender(), reactorRabbitMQChannelPool::createReceiver,
+ return new RabbitMQEventBus(new NamingStrategy("mailboxEvent-"), reactorRabbitMQChannelPool.getSender(), reactorRabbitMQChannelPool::createReceiver,
eventSerializer, RetryBackoffConfiguration.DEFAULT, routingKeyConverter, new MemoryEventDeadLetters(),
new RecordingMetricFactory(),
reactorRabbitMQChannelPool, EventBusId.random());
diff --git a/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/modules/event/RabbitMQEventBusModule.java b/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/modules/event/RabbitMQEventBusModule.java
index 8dc2e5e..fea6b3f 100644
--- a/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/modules/event/RabbitMQEventBusModule.java
+++ b/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/modules/event/RabbitMQEventBusModule.java
@@ -25,6 +25,7 @@ import org.apache.james.events.EventBus;
import org.apache.james.events.EventBusId;
import org.apache.james.events.EventSerializer;
import org.apache.james.events.KeyReconnectionHandler;
+import org.apache.james.events.NamingStrategy;
import org.apache.james.events.RabbitMQEventBus;
import org.apache.james.events.RegistrationKey;
import org.apache.james.events.RetryBackoffConfiguration;
@@ -44,6 +45,7 @@ public class RabbitMQEventBusModule extends AbstractModule {
bind(MailboxEventSerializer.class).in(Scopes.SINGLETON);
bind(EventSerializer.class).to(MailboxEventSerializer.class);
+ bind(NamingStrategy.class).toInstance(new NamingStrategy("mailboxEvent"));
bind(RabbitMQEventBus.class).in(Scopes.SINGLETON);
bind(EventBus.class).to(RabbitMQEventBus.class);
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org