You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2021/02/04 05:02:42 UTC
[james-project] 16/18: JAMES-3498 Rename variable/classes to remove
`mailbox` name component
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 206f1df7c0c9c00b4c72181ee4b6e26518876547
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Mon Feb 1 14:46:49 2021 +0700
JAMES-3498 Rename variable/classes to remove `mailbox` name component
Before:
```
$ grep -i mailbox * -R | wc -l
212
```
After:
```
~/Documents/james-project/event-bus$ grep -i mailbox * -R | wc -l
57
```
Remaining usages are mostly due to RabbitMQ queues/exchanges names
---
.../java/org/apache/james/events/EventBus.java | 12 ++++----
.../james/events/EventDeadLettersHealthCheck.java | 2 +-
.../org/apache/james/events/EventListener.java | 4 +--
.../EventDeadLettersHealthCheckContract.java | 2 +-
.../org/apache/james/events/GroupContract.java | 32 +++++++++++-----------
.../java/org/apache/james/events/GroupTest.java | 2 +-
.../java/org/apache/james/events/KeyContract.java | 30 ++++++++++----------
.../org/apache/james/events/EventDispatcher.java | 12 ++++----
.../org/apache/james/events/GroupRegistration.java | 16 +++++------
.../james/events/GroupRegistrationHandler.java | 8 +++---
.../james/events/KeyRegistrationHandler.java | 10 +++----
...ListenerExecutor.java => ListenerExecutor.java} | 6 ++--
.../apache/james/events/LocalListenerRegistry.java | 2 +-
.../org/apache/james/events/RabbitMQEventBus.java | 16 +++++------
.../james/events/LocalListenerRegistryTest.java | 30 ++++++++++----------
.../java/org/apache/james/events/InVMEventBus.java | 6 ++--
.../james/events/delivery/EventDelivery.java | 12 ++++----
.../james/events/delivery/InVmEventDelivery.java | 18 ++++++------
18 files changed, 110 insertions(+), 110 deletions(-)
diff --git a/event-bus/api/src/main/java/org/apache/james/events/EventBus.java b/event-bus/api/src/main/java/org/apache/james/events/EventBus.java
index 2429107..21246e0 100644
--- a/event-bus/api/src/main/java/org/apache/james/events/EventBus.java
+++ b/event-bus/api/src/main/java/org/apache/james/events/EventBus.java
@@ -41,8 +41,8 @@ public interface EventBus {
}
interface Metrics {
- static String timerName(EventListener mailboxListener) {
- return "mailbox-listener-" + mailboxListener.getClass().getSimpleName();
+ static String timerName(EventListener listener) {
+ return "mailbox-listener-" + listener.getClass().getSimpleName();
}
}
@@ -66,11 +66,11 @@ public interface EventBus {
return dispatch(event, ImmutableSet.of(key));
}
- default Registration register(EventListener.GroupEventListener groupMailboxListener) {
- return register(EventListener.wrapReactive(groupMailboxListener));
+ default Registration register(EventListener.GroupEventListener groupListener) {
+ return register(EventListener.wrapReactive(groupListener));
}
- default Registration register(EventListener.ReactiveGroupEventListener groupMailboxListener) {
- return register(groupMailboxListener, groupMailboxListener.getDefaultGroup());
+ default Registration register(EventListener.ReactiveGroupEventListener groupListener) {
+ return register(groupListener, groupListener.getDefaultGroup());
}
}
diff --git a/event-bus/api/src/main/java/org/apache/james/events/EventDeadLettersHealthCheck.java b/event-bus/api/src/main/java/org/apache/james/events/EventDeadLettersHealthCheck.java
index 98ccc0f..b398a23 100644
--- a/event-bus/api/src/main/java/org/apache/james/events/EventDeadLettersHealthCheck.java
+++ b/event-bus/api/src/main/java/org/apache/james/events/EventDeadLettersHealthCheck.java
@@ -47,7 +47,7 @@ public class EventDeadLettersHealthCheck implements HealthCheck {
return eventDeadLetters.containEvents()
.map(containEvents -> {
if (containEvents) {
- return Result.degraded(COMPONENT_NAME, "EventDeadLetters contain events. This might indicate transient failure on mailbox event processing.");
+ return Result.degraded(COMPONENT_NAME, "EventDeadLetters contain events. This might indicate transient failure on event processing.");
}
return Result.healthy(COMPONENT_NAME);
diff --git a/event-bus/api/src/main/java/org/apache/james/events/EventListener.java b/event-bus/api/src/main/java/org/apache/james/events/EventListener.java
index be50c12..e820283 100644
--- a/event-bus/api/src/main/java/org/apache/james/events/EventListener.java
+++ b/event-bus/api/src/main/java/org/apache/james/events/EventListener.java
@@ -120,8 +120,8 @@ public interface EventListener {
return new ReactiveWrapper<>(listener);
}
- static ReactiveGroupEventListener wrapReactive(GroupEventListener groupMailboxListener) {
- return new ReactiveGroupWrapper(groupMailboxListener);
+ static ReactiveGroupEventListener wrapReactive(GroupEventListener groupEventListener) {
+ return new ReactiveGroupWrapper(groupEventListener);
}
default ExecutionMode getExecutionMode() {
diff --git a/event-bus/api/src/test/java/org/apache/james/events/EventDeadLettersHealthCheckContract.java b/event-bus/api/src/test/java/org/apache/james/events/EventDeadLettersHealthCheckContract.java
index fd1bb22..e0cf4a8 100644
--- a/event-bus/api/src/test/java/org/apache/james/events/EventDeadLettersHealthCheckContract.java
+++ b/event-bus/api/src/test/java/org/apache/james/events/EventDeadLettersHealthCheckContract.java
@@ -31,7 +31,7 @@ import org.junit.jupiter.api.Test;
interface EventDeadLettersHealthCheckContract {
ComponentName COMPONENT_NAME = new ComponentName("EventDeadLettersHealthCheck");
- String EXPECTED_DEGRADED_MESSAGE = "EventDeadLetters contain events. This might indicate transient failure on mailbox event processing.";
+ String EXPECTED_DEGRADED_MESSAGE = "EventDeadLetters contain events. This might indicate transient failure on event processing.";
Username USERNAME = Username.of("user");
diff --git a/event-bus/api/src/test/java/org/apache/james/events/GroupContract.java b/event-bus/api/src/test/java/org/apache/james/events/GroupContract.java
index db42ff7..6a927ee 100644
--- a/event-bus/api/src/test/java/org/apache/james/events/GroupContract.java
+++ b/event-bus/api/src/test/java/org/apache/james/events/GroupContract.java
@@ -418,20 +418,20 @@ public interface GroupContract {
@Test
default void groupsDefinedOnlyOnSomeNodesShouldBeNotifiedWhenDispatch() throws Exception {
- EventListener mailboxListener = EventBusTestFixture.newListener();
+ EventListener listener = EventBusTestFixture.newListener();
- eventBus().register(mailboxListener, GROUP_A);
+ eventBus().register(listener, GROUP_A);
eventBus2().dispatch(EVENT, NO_KEYS).block();
- verify(mailboxListener, timeout(ONE_SECOND.toMillis()).times(1)).event(any());
+ verify(listener, timeout(ONE_SECOND.toMillis()).times(1)).event(any());
}
@Test
default void groupsDefinedOnlyOnSomeNodesShouldNotBeNotifiedWhenRedeliver() {
- EventListener mailboxListener = EventBusTestFixture.newListener();
+ EventListener listener = EventBusTestFixture.newListener();
- eventBus().register(mailboxListener, GROUP_A);
+ eventBus().register(listener, GROUP_A);
assertThatThrownBy(() -> eventBus2().reDeliver(GROUP_A, EVENT).block())
.isInstanceOf(GroupRegistrationNotFound.class);
@@ -439,38 +439,38 @@ public interface GroupContract {
@Test
default void groupListenersShouldBeExecutedOnceWhenRedeliverInADistributedEnvironment() throws Exception {
- EventListener mailboxListener = EventBusTestFixture.newListener();
+ EventListener listener = EventBusTestFixture.newListener();
- eventBus().register(mailboxListener, GROUP_A);
- eventBus2().register(mailboxListener, GROUP_A);
+ eventBus().register(listener, GROUP_A);
+ eventBus2().register(listener, GROUP_A);
eventBus2().reDeliver(GROUP_A, EVENT).block();
- verify(mailboxListener, timeout(ONE_SECOND.toMillis()).times(1)).event(any());
+ verify(listener, timeout(ONE_SECOND.toMillis()).times(1)).event(any());
}
@Test
default void groupListenersShouldBeExecutedOnceInAControlledEnvironment() throws Exception {
- EventListener mailboxListener = EventBusTestFixture.newListener();
+ EventListener listener = EventBusTestFixture.newListener();
- eventBus().register(mailboxListener, GROUP_A);
- eventBus2().register(mailboxListener, GROUP_A);
+ eventBus().register(listener, GROUP_A);
+ eventBus2().register(listener, GROUP_A);
eventBus2().dispatch(EVENT, NO_KEYS).block();
- verify(mailboxListener, timeout(ONE_SECOND.toMillis()).times(1)).event(any());
+ verify(listener, timeout(ONE_SECOND.toMillis()).times(1)).event(any());
}
@Test
default void unregisterShouldStopNotificationForDistantGroups() throws Exception {
- EventListener mailboxListener = EventBusTestFixture.newListener();
+ EventListener listener = EventBusTestFixture.newListener();
- eventBus().register(mailboxListener, GROUP_A).unregister();
+ eventBus().register(listener, GROUP_A).unregister();
eventBus2().dispatch(EVENT, NO_KEYS).block();
- verify(mailboxListener, after(FIVE_HUNDRED_MS.toMillis()).never())
+ verify(listener, after(FIVE_HUNDRED_MS.toMillis()).never())
.event(any());
}
diff --git a/event-bus/api/src/test/java/org/apache/james/events/GroupTest.java b/event-bus/api/src/test/java/org/apache/james/events/GroupTest.java
index 5326e3f..0d8a663 100644
--- a/event-bus/api/src/test/java/org/apache/james/events/GroupTest.java
+++ b/event-bus/api/src/test/java/org/apache/james/events/GroupTest.java
@@ -104,7 +104,7 @@ class GroupTest {
@Test
void deserializeShouldThrowWhenClassNotFound() {
- assertThatThrownBy(() -> Group.deserialize("org.apache.james.mailbox.events.Noone"))
+ assertThatThrownBy(() -> Group.deserialize("org.apache.james.events.Noone"))
.isInstanceOf(Group.GroupDeserializationException.class);
}
diff --git a/event-bus/api/src/test/java/org/apache/james/events/KeyContract.java b/event-bus/api/src/test/java/org/apache/james/events/KeyContract.java
index 52e5a53..1c1c6bd 100644
--- a/event-bus/api/src/test/java/org/apache/james/events/KeyContract.java
+++ b/event-bus/api/src/test/java/org/apache/james/events/KeyContract.java
@@ -383,13 +383,13 @@ public interface KeyContract extends EventBusContract {
@Test
default void crossEventBusRegistrationShouldBeAllowed() throws Exception {
- EventListener mailboxListener = EventBusTestFixture.newListener();
+ EventListener listener = EventBusTestFixture.newListener();
- Mono.from(eventBus().register(mailboxListener, KEY_1)).block();
+ Mono.from(eventBus().register(listener, KEY_1)).block();
eventBus2().dispatch(EVENT, KEY_1).block();
- verify(mailboxListener, timeout(ONE_SECOND.toMillis()).times(1)).event(any());
+ verify(listener, timeout(ONE_SECOND.toMillis()).times(1)).event(any());
}
@Test
@@ -406,16 +406,16 @@ public interface KeyContract extends EventBusContract {
@Test
default void allRegisteredListenersShouldBeDispatched() throws Exception {
- EventListener mailboxListener1 = EventBusTestFixture.newListener();
- EventListener mailboxListener2 = EventBusTestFixture.newListener();
+ EventListener listener1 = EventBusTestFixture.newListener();
+ EventListener listener2 = EventBusTestFixture.newListener();
- Mono.from(eventBus().register(mailboxListener1, KEY_1)).block();
- Mono.from(eventBus2().register(mailboxListener2, KEY_1)).block();
+ Mono.from(eventBus().register(listener1, KEY_1)).block();
+ Mono.from(eventBus2().register(listener2, KEY_1)).block();
eventBus2().dispatch(EVENT, KEY_1).block();
- verify(mailboxListener1, timeout(ONE_SECOND.toMillis()).times(1)).event(any());
- verify(mailboxListener2, timeout(ONE_SECOND.toMillis()).times(1)).event(any());
+ verify(listener1, timeout(ONE_SECOND.toMillis()).times(1)).event(any());
+ verify(listener2, timeout(ONE_SECOND.toMillis()).times(1)).event(any());
}
@Test
@@ -432,16 +432,16 @@ public interface KeyContract extends EventBusContract {
@Test
default void localDispatchedListenersShouldBeDispatchedWithoutDelay() throws Exception {
- EventListener mailboxListener1 = EventBusTestFixture.newListener();
- EventListener mailboxListener2 = EventBusTestFixture.newListener();
+ EventListener listener1 = EventBusTestFixture.newListener();
+ EventListener listener2 = EventBusTestFixture.newListener();
- Mono.from(eventBus().register(mailboxListener1, KEY_1)).block();
- Mono.from(eventBus2().register(mailboxListener2, KEY_1)).block();
+ Mono.from(eventBus().register(listener1, KEY_1)).block();
+ Mono.from(eventBus2().register(listener2, KEY_1)).block();
eventBus2().dispatch(EVENT, KEY_1).block();
- verify(mailboxListener2, times(1)).event(any());
- verify(mailboxListener1, timeout(ONE_SECOND.toMillis()).times(1)).event(any());
+ verify(listener2, times(1)).event(any());
+ verify(listener1, timeout(ONE_SECOND.toMillis()).times(1)).event(any());
}
}
diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/EventDispatcher.java b/event-bus/distributed/src/main/java/org/apache/james/events/EventDispatcher.java
index 9543179..9a98aeb 100644
--- a/event-bus/distributed/src/main/java/org/apache/james/events/EventDispatcher.java
+++ b/event-bus/distributed/src/main/java/org/apache/james/events/EventDispatcher.java
@@ -69,12 +69,12 @@ public class EventDispatcher {
private final Sender sender;
private final LocalListenerRegistry localListenerRegistry;
private final AMQP.BasicProperties basicProperties;
- private final MailboxListenerExecutor mailboxListenerExecutor;
+ private final ListenerExecutor listenerExecutor;
private final EventDeadLetters deadLetters;
EventDispatcher(EventBusId eventBusId, EventSerializer eventSerializer, Sender sender,
LocalListenerRegistry localListenerRegistry,
- MailboxListenerExecutor mailboxListenerExecutor,
+ ListenerExecutor listenerExecutor,
EventDeadLetters deadLetters) {
this.eventSerializer = eventSerializer;
this.sender = sender;
@@ -85,7 +85,7 @@ public class EventDispatcher {
.priority(PERSISTENT_TEXT_PLAIN.getPriority())
.contentType(PERSISTENT_TEXT_PLAIN.getContentType())
.build();
- this.mailboxListenerExecutor = mailboxListenerExecutor;
+ this.listenerExecutor = listenerExecutor;
this.deadLetters = deadLetters;
}
@@ -122,15 +122,15 @@ public class EventDispatcher {
private Mono<Void> dispatchToLocalListeners(Event event, Set<RegistrationKey> keys) {
return Flux.fromIterable(keys)
- .flatMap(key -> localListenerRegistry.getLocalMailboxListeners(key)
+ .flatMap(key -> localListenerRegistry.getLocalListeners(key)
.map(listener -> Tuples.of(key, listener)), EventBus.EXECUTION_RATE)
.filter(pair -> pair.getT2().getExecutionMode() == EventListener.ExecutionMode.SYNCHRONOUS)
.flatMap(pair -> executeListener(event, pair.getT2(), pair.getT1()), EventBus.EXECUTION_RATE)
.then();
}
- private Mono<Void> executeListener(Event event, EventListener.ReactiveEventListener mailboxListener, RegistrationKey registrationKey) {
- return mailboxListenerExecutor.execute(mailboxListener,
+ private Mono<Void> executeListener(Event event, EventListener.ReactiveEventListener listener, RegistrationKey registrationKey) {
+ return listenerExecutor.execute(listener,
MDCBuilder.create()
.addContext(EventBus.StructuredLoggingFields.REGISTRATION_KEY, registrationKey),
event)
diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java
index 07739ac..3b16688 100644
--- a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java
+++ b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java
@@ -76,7 +76,7 @@ class GroupRegistration implements Registration {
static final int DEFAULT_RETRY_COUNT = 0;
private final ReactorRabbitMQChannelPool channelPool;
- private final EventListener.ReactiveEventListener mailboxListener;
+ private final EventListener.ReactiveEventListener listener;
private final WorkQueueName queueName;
private final Receiver receiver;
private final Runnable unregisterGroup;
@@ -86,21 +86,21 @@ class GroupRegistration implements Registration {
private final WaitDelayGenerator delayGenerator;
private final Group group;
private final RetryBackoffConfiguration retryBackoff;
- private final MailboxListenerExecutor mailboxListenerExecutor;
+ private final ListenerExecutor listenerExecutor;
private Optional<Disposable> receiverSubscriber;
GroupRegistration(ReactorRabbitMQChannelPool channelPool, Sender sender, ReceiverProvider receiverProvider, EventSerializer eventSerializer,
- EventListener.ReactiveEventListener mailboxListener, Group group, RetryBackoffConfiguration retryBackoff,
+ EventListener.ReactiveEventListener listener, Group group, RetryBackoffConfiguration retryBackoff,
EventDeadLetters eventDeadLetters,
- Runnable unregisterGroup, MailboxListenerExecutor mailboxListenerExecutor) {
+ Runnable unregisterGroup, ListenerExecutor listenerExecutor) {
this.channelPool = channelPool;
this.eventSerializer = eventSerializer;
- this.mailboxListener = mailboxListener;
+ this.listener = listener;
this.queueName = WorkQueueName.of(group);
this.sender = sender;
this.receiver = receiverProvider.createReceiver();
this.retryBackoff = retryBackoff;
- this.mailboxListenerExecutor = mailboxListenerExecutor;
+ this.listenerExecutor = listenerExecutor;
this.receiverSubscriber = Optional.empty();
this.unregisterGroup = unregisterGroup;
this.retryHandler = new GroupConsumerRetry(sender, group, retryBackoff, eventDeadLetters, eventSerializer);
@@ -164,8 +164,8 @@ class GroupRegistration implements Registration {
}
private Mono<Void> runListener(Event event) {
- return mailboxListenerExecutor.execute(
- mailboxListener,
+ return listenerExecutor.execute(
+ listener,
MDCBuilder.create()
.addContext(EventBus.StructuredLoggingFields.GROUP, group),
event);
diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java
index 8e00eaa..f1ddb6c 100644
--- a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java
+++ b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java
@@ -36,18 +36,18 @@ class GroupRegistrationHandler {
private final ReceiverProvider receiverProvider;
private final RetryBackoffConfiguration retryBackoff;
private final EventDeadLetters eventDeadLetters;
- private final MailboxListenerExecutor mailboxListenerExecutor;
+ private final ListenerExecutor listenerExecutor;
GroupRegistrationHandler(EventSerializer eventSerializer, ReactorRabbitMQChannelPool channelPool, Sender sender, ReceiverProvider receiverProvider,
RetryBackoffConfiguration retryBackoff,
- EventDeadLetters eventDeadLetters, MailboxListenerExecutor mailboxListenerExecutor) {
+ EventDeadLetters eventDeadLetters, ListenerExecutor listenerExecutor) {
this.eventSerializer = eventSerializer;
this.channelPool = channelPool;
this.sender = sender;
this.receiverProvider = receiverProvider;
this.retryBackoff = retryBackoff;
this.eventDeadLetters = eventDeadLetters;
- this.mailboxListenerExecutor = mailboxListenerExecutor;
+ this.listenerExecutor = listenerExecutor;
this.groupRegistrations = new ConcurrentHashMap<>();
}
@@ -81,6 +81,6 @@ class GroupRegistrationHandler {
retryBackoff,
eventDeadLetters,
() -> groupRegistrations.remove(group),
- mailboxListenerExecutor);
+ listenerExecutor);
}
}
\ No newline at end of file
diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/KeyRegistrationHandler.java b/event-bus/distributed/src/main/java/org/apache/james/events/KeyRegistrationHandler.java
index db1e24b..1963d40 100644
--- a/event-bus/distributed/src/main/java/org/apache/james/events/KeyRegistrationHandler.java
+++ b/event-bus/distributed/src/main/java/org/apache/james/events/KeyRegistrationHandler.java
@@ -67,21 +67,21 @@ class KeyRegistrationHandler {
private final Receiver receiver;
private final RegistrationQueueName registrationQueue;
private final RegistrationBinder registrationBinder;
- private final MailboxListenerExecutor mailboxListenerExecutor;
+ private final ListenerExecutor listenerExecutor;
private final RetryBackoffConfiguration retryBackoff;
private Optional<Disposable> receiverSubscriber;
KeyRegistrationHandler(EventBusId eventBusId, EventSerializer eventSerializer,
Sender sender, ReceiverProvider receiverProvider,
RoutingKeyConverter routingKeyConverter, LocalListenerRegistry localListenerRegistry,
- MailboxListenerExecutor mailboxListenerExecutor, RetryBackoffConfiguration retryBackoff) {
+ ListenerExecutor listenerExecutor, RetryBackoffConfiguration retryBackoff) {
this.eventBusId = eventBusId;
this.eventSerializer = eventSerializer;
this.sender = sender;
this.routingKeyConverter = routingKeyConverter;
this.localListenerRegistry = localListenerRegistry;
this.receiver = receiverProvider.createReceiver();
- this.mailboxListenerExecutor = mailboxListenerExecutor;
+ this.listenerExecutor = listenerExecutor;
this.retryBackoff = retryBackoff;
this.registrationQueue = new RegistrationQueueName(EVENTBUS_QUEUE_NAME_PREFIX + eventBusId.asString());
this.registrationBinder = new RegistrationBinder(sender, registrationQueue);
@@ -162,7 +162,7 @@ class KeyRegistrationHandler {
RegistrationKey registrationKey = routingKeyConverter.toRegistrationKey(routingKey);
Event event = toEvent(delivery);
- return localListenerRegistry.getLocalMailboxListeners(registrationKey)
+ return localListenerRegistry.getLocalListeners(registrationKey)
.filter(listener -> !isLocalSynchronousListeners(eventBusId, listener))
.flatMap(listener -> executeListener(listener, event, registrationKey), EventBus.EXECUTION_RATE)
.then();
@@ -172,7 +172,7 @@ class KeyRegistrationHandler {
MDCBuilder mdcBuilder = MDCBuilder.create()
.addContext(EventBus.StructuredLoggingFields.REGISTRATION_KEY, key);
- return mailboxListenerExecutor.execute(listener, mdcBuilder, event)
+ return listenerExecutor.execute(listener, mdcBuilder, event)
.doOnError(e -> structuredLogger(event, key)
.log(logger -> logger.error("Exception happens when handling event", e)))
.onErrorResume(e -> Mono.empty())
diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/MailboxListenerExecutor.java b/event-bus/distributed/src/main/java/org/apache/james/events/ListenerExecutor.java
similarity index 91%
rename from event-bus/distributed/src/main/java/org/apache/james/events/MailboxListenerExecutor.java
rename to event-bus/distributed/src/main/java/org/apache/james/events/ListenerExecutor.java
index e74e6ec..2d2f258 100644
--- a/event-bus/distributed/src/main/java/org/apache/james/events/MailboxListenerExecutor.java
+++ b/event-bus/distributed/src/main/java/org/apache/james/events/ListenerExecutor.java
@@ -27,10 +27,10 @@ import org.apache.james.util.ReactorUtils;
import reactor.core.publisher.Mono;
-class MailboxListenerExecutor {
+class ListenerExecutor {
private final MetricFactory metricFactory;
- MailboxListenerExecutor(MetricFactory metricFactory) {
+ ListenerExecutor(MetricFactory metricFactory) {
this.metricFactory = metricFactory;
}
@@ -38,7 +38,7 @@ class MailboxListenerExecutor {
if (listener.isHandling(event)) {
return Mono.from(metricFactory.decoratePublisherWithTimerMetric(timerName(listener),
Mono.from(listener.reactiveEvent(event))
- .subscriberContext(ReactorUtils.context("MailboxListenerExecutor", mdc(listener, mdcBuilder, event)))));
+ .subscriberContext(ReactorUtils.context("ListenerExecutor", mdc(listener, mdcBuilder, event)))));
}
return Mono.empty();
}
diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/LocalListenerRegistry.java b/event-bus/distributed/src/main/java/org/apache/james/events/LocalListenerRegistry.java
index f4e5537..4f68b4f 100644
--- a/event-bus/distributed/src/main/java/org/apache/james/events/LocalListenerRegistry.java
+++ b/event-bus/distributed/src/main/java/org/apache/james/events/LocalListenerRegistry.java
@@ -103,7 +103,7 @@ class LocalListenerRegistry {
return remainingListeners;
}
- Flux<EventListener.ReactiveEventListener> getLocalMailboxListeners(RegistrationKey registrationKey) {
+ Flux<EventListener.ReactiveEventListener> getLocalListeners(RegistrationKey registrationKey) {
return Flux.fromIterable(listenersByKey.getOrDefault(registrationKey, ImmutableSet.of()));
}
}
\ No newline at end of file
diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java
index 772191b..78aeae7 100644
--- a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java
+++ b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java
@@ -50,7 +50,7 @@ public class RabbitMQEventBus implements EventBus, Startable {
private final RetryBackoffConfiguration retryBackoff;
private final EventBusId eventBusId;
private final EventDeadLetters eventDeadLetters;
- private final MailboxListenerExecutor mailboxListenerExecutor;
+ private final ListenerExecutor listenerExecutor;
private final Sender sender;
private final ReceiverProvider receiverProvider;
private final ReactorRabbitMQChannelPool channelPool;
@@ -69,7 +69,7 @@ public class RabbitMQEventBus implements EventBus, Startable {
EventBusId eventBusId) {
this.sender = sender;
this.receiverProvider = receiverProvider;
- this.mailboxListenerExecutor = new MailboxListenerExecutor(metricFactory);
+ this.listenerExecutor = new ListenerExecutor(metricFactory);
this.channelPool = channelPool;
this.eventBusId = eventBusId;
this.eventSerializer = eventSerializer;
@@ -84,9 +84,9 @@ public class RabbitMQEventBus implements EventBus, Startable {
if (!isRunning && !isStopping) {
LocalListenerRegistry localListenerRegistry = new LocalListenerRegistry();
- keyRegistrationHandler = new KeyRegistrationHandler(eventBusId, eventSerializer, sender, receiverProvider, routingKeyConverter, localListenerRegistry, mailboxListenerExecutor, retryBackoff);
- groupRegistrationHandler = new GroupRegistrationHandler(eventSerializer, channelPool, sender, receiverProvider, retryBackoff, eventDeadLetters, mailboxListenerExecutor);
- eventDispatcher = new EventDispatcher(eventBusId, eventSerializer, sender, localListenerRegistry, mailboxListenerExecutor, eventDeadLetters);
+ keyRegistrationHandler = new KeyRegistrationHandler(eventBusId, eventSerializer, sender, receiverProvider, routingKeyConverter, localListenerRegistry, listenerExecutor, retryBackoff);
+ groupRegistrationHandler = new GroupRegistrationHandler(eventSerializer, channelPool, sender, receiverProvider, retryBackoff, eventDeadLetters, listenerExecutor);
+ eventDispatcher = new EventDispatcher(eventBusId, eventSerializer, sender, localListenerRegistry, listenerExecutor, eventDeadLetters);
eventDispatcher.start();
keyRegistrationHandler.start();
@@ -99,9 +99,9 @@ public class RabbitMQEventBus implements EventBus, Startable {
if (!isRunning && !isStopping) {
LocalListenerRegistry localListenerRegistry = new LocalListenerRegistry();
- keyRegistrationHandler = new KeyRegistrationHandler(eventBusId, eventSerializer, sender, receiverProvider, routingKeyConverter, localListenerRegistry, mailboxListenerExecutor, retryBackoff);
- groupRegistrationHandler = new GroupRegistrationHandler(eventSerializer, channelPool, sender, receiverProvider, retryBackoff, eventDeadLetters, mailboxListenerExecutor);
- eventDispatcher = new EventDispatcher(eventBusId, eventSerializer, sender, localListenerRegistry, mailboxListenerExecutor, eventDeadLetters);
+ keyRegistrationHandler = new KeyRegistrationHandler(eventBusId, eventSerializer, sender, receiverProvider, routingKeyConverter, localListenerRegistry, listenerExecutor, retryBackoff);
+ groupRegistrationHandler = new GroupRegistrationHandler(eventSerializer, channelPool, sender, receiverProvider, retryBackoff, eventDeadLetters, listenerExecutor);
+ eventDispatcher = new EventDispatcher(eventBusId, eventSerializer, sender, localListenerRegistry, listenerExecutor, eventDeadLetters);
keyRegistrationHandler.declareQueue();
diff --git a/event-bus/distributed/src/test/java/org/apache/james/events/LocalListenerRegistryTest.java b/event-bus/distributed/src/test/java/org/apache/james/events/LocalListenerRegistryTest.java
index 36953c6..81269af 100644
--- a/event-bus/distributed/src/test/java/org/apache/james/events/LocalListenerRegistryTest.java
+++ b/event-bus/distributed/src/test/java/org/apache/james/events/LocalListenerRegistryTest.java
@@ -45,33 +45,33 @@ class LocalListenerRegistryTest {
}
@Test
- void getLocalMailboxListenersShouldReturnEmptyWhenNone() {
- assertThat(testee.getLocalMailboxListeners(KEY_1).collectList().block())
+ void getLocalListenersShouldReturnEmptyWhenNone() {
+ assertThat(testee.getLocalListeners(KEY_1).collectList().block())
.isEmpty();
}
@Test
- void getLocalMailboxListenersShouldReturnPreviouslyAddedListener() {
+ void getLocalListenersShouldReturnPreviouslyAddedListener() {
EventListener listener = event -> { };
testee.addListener(KEY_1, listener);
- assertThat(testee.getLocalMailboxListeners(KEY_1).collectList().block())
+ assertThat(testee.getLocalListeners(KEY_1).collectList().block())
.containsOnly(wrapReactive(listener));
}
@Test
- void getLocalMailboxListenersShouldReturnPreviouslyAddedListeners() {
+ void getLocalListenersShouldReturnPreviouslyAddedListeners() {
EventListener listener1 = event -> { };
EventListener listener2 = event -> { };
testee.addListener(KEY_1, listener1);
testee.addListener(KEY_1, listener2);
- assertThat(testee.getLocalMailboxListeners(KEY_1).collectList().block())
+ assertThat(testee.getLocalListeners(KEY_1).collectList().block())
.containsOnly(wrapReactive(listener1), wrapReactive(listener2));
}
@Test
- void getLocalMailboxListenersShouldNotReturnRemovedListeners() {
+ void getLocalListenersShouldNotReturnRemovedListeners() {
EventListener listener1 = event -> { };
EventListener listener2 = event -> { };
testee.addListener(KEY_1, listener1);
@@ -79,7 +79,7 @@ class LocalListenerRegistryTest {
registration.unregister();
- assertThat(testee.getLocalMailboxListeners(KEY_1).collectList().block())
+ assertThat(testee.getLocalListeners(KEY_1).collectList().block())
.containsOnly(wrapReactive(listener1));
}
@@ -126,7 +126,7 @@ class LocalListenerRegistryTest {
private final Duration oneSecond = Duration.ofSeconds(1);
@Test
- void getLocalMailboxListenersShouldReturnPreviousAddedListener() throws Exception {
+ void getLocalListenersShouldReturnPreviousAddedListener() throws Exception {
EventListener listener = event -> { };
ConcurrentTestRunner.builder()
@@ -135,12 +135,12 @@ class LocalListenerRegistryTest {
.operationCount(10)
.runSuccessfullyWithin(oneSecond);
- assertThat(testee.getLocalMailboxListeners(KEY_1).collectList().block())
+ assertThat(testee.getLocalListeners(KEY_1).collectList().block())
.containsOnly(wrapReactive(listener));
}
@Test
- void getLocalMailboxListenersShouldReturnAllPreviousAddedListeners() throws Exception {
+ void getLocalListenersShouldReturnAllPreviousAddedListeners() throws Exception {
EventListener listener1 = event -> { };
EventListener listener2 = event -> { };
EventListener listener3 = event -> { };
@@ -154,12 +154,12 @@ class LocalListenerRegistryTest {
.operationCount(10)
.runSuccessfullyWithin(oneSecond);
- assertThat(testee.getLocalMailboxListeners(KEY_1).collectList().block())
+ assertThat(testee.getLocalListeners(KEY_1).collectList().block())
.containsOnly(wrapReactive(listener1), wrapReactive(listener2), wrapReactive(listener3));
}
@Test
- void getLocalMailboxListenersShouldReturnEmptyWhenRemoveAddedListener() throws Exception {
+ void getLocalListenersShouldReturnEmptyWhenRemoveAddedListener() throws Exception {
EventListener listener1 = event -> { };
LocalListenerRegistry.LocalRegistration registration = testee.addListener(KEY_1, listener1);
@@ -170,7 +170,7 @@ class LocalListenerRegistryTest {
.operationCount(10)
.runSuccessfullyWithin(oneSecond);
- assertThat(testee.getLocalMailboxListeners(KEY_1).collectList().block())
+ assertThat(testee.getLocalListeners(KEY_1).collectList().block())
.isEmpty();
}
@@ -241,7 +241,7 @@ class LocalListenerRegistryTest {
testee.addListener(KEY_1, listener4);
LocalListenerRegistry.LocalRegistration registration5 = testee.addListener(KEY_1, listener5);
- Mono<List<EventListener.ReactiveEventListener>> listeners = testee.getLocalMailboxListeners(KEY_1)
+ Mono<List<EventListener.ReactiveEventListener>> listeners = testee.getLocalListeners(KEY_1)
.publishOn(Schedulers.elastic())
.delayElements(Duration.ofMillis(100))
.collectList();
diff --git a/event-bus/in-vm/src/main/java/org/apache/james/events/InVMEventBus.java b/event-bus/in-vm/src/main/java/org/apache/james/events/InVMEventBus.java
index a3360c5..c32f24d 100644
--- a/event-bus/in-vm/src/main/java/org/apache/james/events/InVMEventBus.java
+++ b/event-bus/in-vm/src/main/java/org/apache/james/events/InVMEventBus.java
@@ -102,12 +102,12 @@ public class InVMEventBus implements EventBus {
.then();
}
- private Mono<Void> groupDelivery(Event event, EventListener.ReactiveEventListener mailboxListener, Group group) {
+ private Mono<Void> groupDelivery(Event event, EventListener.ReactiveEventListener listener, Group group) {
return eventDelivery.deliver(
- mailboxListener,
+ listener,
event,
EventDelivery.DeliveryOption.of(
- EventDelivery.Retryer.BackoffRetryer.of(retryBackoff, mailboxListener),
+ EventDelivery.Retryer.BackoffRetryer.of(retryBackoff, listener),
EventDelivery.PermanentFailureHandler.StoreToDeadLetters.of(group, eventDeadLetters)));
}
diff --git a/event-bus/in-vm/src/main/java/org/apache/james/events/delivery/EventDelivery.java b/event-bus/in-vm/src/main/java/org/apache/james/events/delivery/EventDelivery.java
index 11f6593..717f71f 100644
--- a/event-bus/in-vm/src/main/java/org/apache/james/events/delivery/EventDelivery.java
+++ b/event-bus/in-vm/src/main/java/org/apache/james/events/delivery/EventDelivery.java
@@ -69,18 +69,18 @@ public interface EventDelivery {
class BackoffRetryer implements Retryer {
- public static BackoffRetryer of(RetryBackoffConfiguration retryBackoff, EventListener mailboxListener) {
- return new BackoffRetryer(retryBackoff, mailboxListener);
+ public static BackoffRetryer of(RetryBackoffConfiguration retryBackoff, EventListener listener) {
+ return new BackoffRetryer(retryBackoff, listener);
}
private static final Logger LOGGER = LoggerFactory.getLogger(BackoffRetryer.class);
private final RetryBackoffConfiguration retryBackoff;
- private final EventListener mailboxListener;
+ private final EventListener listener;
- public BackoffRetryer(RetryBackoffConfiguration retryBackoff, EventListener mailboxListener) {
+ public BackoffRetryer(RetryBackoffConfiguration retryBackoff, EventListener listener) {
this.retryBackoff = retryBackoff;
- this.mailboxListener = mailboxListener;
+ this.listener = listener;
}
@Override
@@ -88,7 +88,7 @@ public interface EventDelivery {
return executionResult
.retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor()).scheduler(Schedulers.elastic()))
.doOnError(throwable -> LOGGER.error("listener {} exceeded maximum retry({}) to handle event {}",
- mailboxListener.getClass().getCanonicalName(),
+ listener.getClass().getCanonicalName(),
retryBackoff.getMaxRetries(),
event.getClass().getCanonicalName(),
throwable))
diff --git a/event-bus/in-vm/src/main/java/org/apache/james/events/delivery/InVmEventDelivery.java b/event-bus/in-vm/src/main/java/org/apache/james/events/delivery/InVmEventDelivery.java
index f67c022..0cc70d5 100644
--- a/event-bus/in-vm/src/main/java/org/apache/james/events/delivery/InVmEventDelivery.java
+++ b/event-bus/in-vm/src/main/java/org/apache/james/events/delivery/InVmEventDelivery.java
@@ -77,28 +77,28 @@ public class InVmEventDelivery implements EventDelivery {
.then();
}
- private Mono<Void> doDeliverToListener(EventListener.ReactiveEventListener mailboxListener, Event event) {
- if (mailboxListener.isHandling(event)) {
- return Mono.defer(() -> Mono.from(metricFactory.decoratePublisherWithTimerMetric(timerName(mailboxListener),
- mailboxListener.reactiveEvent(event))))
- .subscriberContext(context("deliver", buildMDC(mailboxListener, event)));
+ private Mono<Void> doDeliverToListener(EventListener.ReactiveEventListener listener, Event event) {
+ if (listener.isHandling(event)) {
+ return Mono.defer(() -> Mono.from(metricFactory.decoratePublisherWithTimerMetric(timerName(listener),
+ listener.reactiveEvent(event))))
+ .subscriberContext(context("deliver", buildMDC(listener, event)));
}
return Mono.empty();
}
- private MDCBuilder buildMDC(EventListener mailboxListener, Event event) {
+ private MDCBuilder buildMDC(EventListener listener, Event event) {
return MDCBuilder.create()
.addContext(EventBus.StructuredLoggingFields.EVENT_ID, event.getEventId())
.addContext(EventBus.StructuredLoggingFields.EVENT_CLASS, event.getClass())
.addContext(EventBus.StructuredLoggingFields.USER, event.getUsername())
- .addContext(EventBus.StructuredLoggingFields.LISTENER_CLASS, mailboxListener.getClass());
+ .addContext(EventBus.StructuredLoggingFields.LISTENER_CLASS, listener.getClass());
}
- private StructuredLogger structuredLogger(Event event, EventListener mailboxListener) {
+ private StructuredLogger structuredLogger(Event event, EventListener listener) {
return MDCStructuredLogger.forLogger(LOGGER)
.addField(EventBus.StructuredLoggingFields.EVENT_ID, event.getEventId())
.addField(EventBus.StructuredLoggingFields.EVENT_CLASS, event.getClass())
.addField(EventBus.StructuredLoggingFields.USER, event.getUsername())
- .addField(EventBus.StructuredLoggingFields.LISTENER_CLASS, mailboxListener.getClass());
+ .addField(EventBus.StructuredLoggingFields.LISTENER_CLASS, listener.getClass());
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org