You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/06/02 08:41:51 UTC
[james-project] 06/07: JAMES-3191 Specify scheduler for EventBus
key registration
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit b3ea48605e656ee2074fa1b47020f61528b235bf
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed May 20 11:21:50 2020 +0700
JAMES-3191 Specify scheduler for EventBus key registration
---
.../org/apache/james/mailbox/events/EventBus.java | 6 +-
.../mailbox/MailboxManagerStressContract.java | 12 +--
.../apache/james/mailbox/MailboxManagerTest.java | 26 +++----
.../mailbox/events/ErrorHandlingContract.java | 8 +-
.../events/EventBusConcurrentTestContract.java | 38 ++++-----
.../apache/james/mailbox/events/KeyContract.java | 90 +++++++++++-----------
.../apache/james/mailbox/events/InVMEventBus.java | 4 +-
.../mailbox/events/KeyRegistrationHandler.java | 27 ++++---
.../james/mailbox/events/RabbitMQEventBus.java | 2 +-
.../james/mailbox/events/RabbitMQEventBusTest.java | 14 ++--
.../apache/james/imap/processor/IdleProcessor.java | 7 +-
.../imap/processor/base/SelectedMailboxImpl.java | 7 +-
.../processor/base/SelectedMailboxImplTest.java | 8 +-
13 files changed, 137 insertions(+), 112 deletions(-)
diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/events/EventBus.java b/mailbox/api/src/main/java/org/apache/james/mailbox/events/EventBus.java
index 752740e..c07cfb9 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/events/EventBus.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/events/EventBus.java
@@ -21,6 +21,8 @@ package org.apache.james.mailbox.events;
import java.util.Set;
+import org.reactivestreams.Publisher;
+
import com.google.common.collect.ImmutableSet;
import reactor.core.publisher.Mono;
@@ -44,11 +46,11 @@ public interface EventBus {
}
}
- default Registration register(MailboxListener listener, RegistrationKey key) {
+ default Publisher<Registration> register(MailboxListener listener, RegistrationKey key) {
return register(MailboxListener.wrapReactive(listener), key);
}
- Registration register(MailboxListener.ReactiveMailboxListener listener, RegistrationKey key);
+ Publisher<Registration> register(MailboxListener.ReactiveMailboxListener listener, RegistrationKey key);
Registration register(MailboxListener.ReactiveMailboxListener listener, Group group) throws GroupAlreadyRegistered;
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerStressContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerStressContract.java
index c32b854..192140f 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerStressContract.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerStressContract.java
@@ -32,7 +32,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.james.core.Username;
-import org.apache.james.mailbox.events.Event;
import org.apache.james.mailbox.events.EventBus;
import org.apache.james.mailbox.events.MailboxIdRegistrationKey;
import org.apache.james.mailbox.events.MailboxListener;
@@ -46,6 +45,8 @@ import org.junit.jupiter.api.Test;
import com.google.common.collect.ImmutableSet;
+import reactor.core.publisher.Mono;
+
public interface MailboxManagerStressContract<T extends MailboxManager> {
int APPEND_OPERATIONS = 200;
@@ -66,13 +67,12 @@ public interface MailboxManagerStressContract<T extends MailboxManager> {
getManager().startProcessingRequest(session);
MailboxPath path = MailboxPath.forUser(username, "INBOX");
MailboxId mailboxId = getManager().createMailbox(path, session).get();
- retrieveEventBus().register(new MailboxListener() {
- @Override
- public void event(Event event) {
+ Mono.from(retrieveEventBus()
+ .register(event -> {
MessageUid u = ((MailboxListener.Added) event).getUids().iterator().next();
uList.add(u);
- }
- }, new MailboxIdRegistrationKey(mailboxId));
+ }, new MailboxIdRegistrationKey(mailboxId)))
+ .block();
getManager().endProcessingRequest(session);
getManager().logout(session);
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerTest.java b/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerTest.java
index 15ab35d..c262c91 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerTest.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerTest.java
@@ -719,7 +719,7 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
@Test
void deleteMailboxShouldFireMailboxDeletionEvent() throws Exception {
assumeTrue(mailboxManager.hasCapability(MailboxCapabilities.Quota));
- retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(inboxId));
+ Mono.from(retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(inboxId))).block();
mailboxManager.deleteMailbox(inbox, session);
@@ -737,7 +737,7 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
@Test
void deleteMailboxByIdShouldFireMailboxDeletionEvent() throws Exception {
assumeTrue(mailboxManager.hasCapability(MailboxCapabilities.Quota));
- retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(inboxId));
+ Mono.from(retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(inboxId))).block();
mailboxManager.deleteMailbox(inboxId, session);
@@ -792,7 +792,7 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
@Test
void addingMessageShouldFireAddedEvent() throws Exception {
- retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(inboxId));
+ Mono.from(retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(inboxId))).block();
inboxManager.appendMessage(MessageManager.AppendCommand.builder()
.build(message), session);
@@ -810,7 +810,7 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
inboxManager.appendMessage(MessageManager.AppendCommand.builder().build(message), session);
inboxManager.setFlags(new Flags(Flags.Flag.DELETED), MessageManager.FlagsUpdateMode.ADD, MessageRange.all(), session);
- retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(inboxId));
+ Mono.from(retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(inboxId))).block();
inboxManager.expunge(MessageRange.all(), session);
assertThat(listener.getEvents())
@@ -827,7 +827,7 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
ComposedMessageId messageId = inboxManager.appendMessage(MessageManager.AppendCommand.builder().build(message), session).getId();
inboxManager.setFlags(new Flags(Flags.Flag.DELETED), MessageManager.FlagsUpdateMode.ADD, MessageRange.all(), session);
- retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(inboxId));
+ Mono.from(retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(inboxId))).block();
inboxManager.delete(ImmutableList.of(messageId.getUid()), session);
assertThat(listener.getEvents())
@@ -843,7 +843,7 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
void setFlagsShouldFireFlagsUpdatedEvent() throws Exception {
inboxManager.appendMessage(MessageManager.AppendCommand.builder().build(message), session);
- retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(inboxId));
+ Mono.from(retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(inboxId))).block();
inboxManager.setFlags(new Flags(Flags.Flag.FLAGGED), MessageManager.FlagsUpdateMode.ADD, MessageRange.all(), session);
assertThat(listener.getEvents())
@@ -861,7 +861,7 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
Optional<MailboxId> targetMailboxId = mailboxManager.createMailbox(newPath, session);
inboxManager.appendMessage(AppendCommand.builder().build(message), session);
- retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(targetMailboxId.get()));
+ Mono.from(retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(targetMailboxId.get()))).block();
mailboxManager.moveMessages(MessageRange.all(), inbox, newPath, session);
assertThat(listener.getEvents())
@@ -879,7 +879,7 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
mailboxManager.createMailbox(newPath, session);
inboxManager.appendMessage(AppendCommand.builder().build(message), session);
- retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(inboxId));
+ Mono.from(retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(inboxId))).block();
mailboxManager.moveMessages(MessageRange.all(), inbox, newPath, session);
assertThat(listener.getEvents())
@@ -896,7 +896,7 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
Optional<MailboxId> targetMailboxId = mailboxManager.createMailbox(newPath, session);
inboxManager.appendMessage(AppendCommand.builder().build(message), session);
- retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(targetMailboxId.get()));
+ Mono.from(retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(targetMailboxId.get()))).block();
mailboxManager.copyMessages(MessageRange.all(), inbox, newPath, session);
assertThat(listener.getEvents())
@@ -915,7 +915,7 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
Optional<MailboxId> targetMailboxId = mailboxManager.createMailbox(newPath, session);
ComposedMessageId messageId = inboxManager.appendMessage(AppendCommand.builder().build(message), session).getId();
- retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(targetMailboxId.get()));
+ Mono.from(retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(targetMailboxId.get()))).block();
mailboxManager.copyMessages(MessageRange.all(), inbox, newPath, session);
assertThat(listener.getEvents())
@@ -933,7 +933,7 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
mailboxManager.createMailbox(newPath, session);
inboxManager.appendMessage(AppendCommand.builder().build(message), session);
- retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(inboxId));
+ Mono.from(retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(inboxId))).block();
mailboxManager.copyMessages(MessageRange.all(), inbox, newPath, session);
assertThat(listener.getEvents())
@@ -948,7 +948,7 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
Optional<MailboxId> targetMailboxId = mailboxManager.createMailbox(newPath, session);
ComposedMessageId messageId = inboxManager.appendMessage(AppendCommand.builder().build(message), session).getId();
- retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(targetMailboxId.get()));
+ Mono.from(retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(targetMailboxId.get()))).block();
mailboxManager.moveMessages(MessageRange.all(), inbox, newPath, session);
assertThat(listener.getEvents())
@@ -966,7 +966,7 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
mailboxManager.createMailbox(newPath, session);
ComposedMessageId messageId = inboxManager.appendMessage(AppendCommand.builder().build(message), session).getId();
- retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(inboxId));
+ Mono.from(retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(inboxId))).block();
mailboxManager.moveMessages(MessageRange.all(), inbox, newPath, session);
assertThat(listener.getEvents())
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java
index 7979ed2..b598ac1 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java
@@ -44,6 +44,8 @@ import org.junit.jupiter.api.Test;
import com.google.common.collect.ImmutableSet;
+import reactor.core.publisher.Mono;
+
interface ErrorHandlingContract extends EventBusContract {
class ThrowingListener implements MailboxListener {
@@ -82,7 +84,7 @@ interface ErrorHandlingContract extends EventBusContract {
doThrow(new RuntimeException())
.when(eventCollector).event(EVENT);
- eventBus().register(eventCollector, KEY_1);
+ Mono.from(eventBus().register(eventCollector, KEY_1)).block();
eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
assertThat(eventCollector.getEvents())
@@ -232,7 +234,7 @@ interface ErrorHandlingContract extends EventBusContract {
.doCallRealMethod()
.when(eventCollector).event(EVENT);
- eventBus().register(eventCollector, KEY_1);
+ Mono.from(eventBus().register(eventCollector, KEY_1)).block();
eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
TimeUnit.SECONDS.sleep(1);
@@ -352,7 +354,7 @@ interface ErrorHandlingContract extends EventBusContract {
EventCollector eventCollector2 = eventCollector();
eventBus().register(eventCollector, GROUP_A);
- eventBus().register(eventCollector2, KEY_1);
+ Mono.from(eventBus().register(eventCollector2, KEY_1)).block();
eventBus().reDeliver(GROUP_A, EVENT).block();
getSpeedProfile().longWaitCondition()
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusConcurrentTestContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusConcurrentTestContract.java
index 99b64c1..1aa3c2e 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusConcurrentTestContract.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusConcurrentTestContract.java
@@ -39,6 +39,8 @@ import org.junit.jupiter.api.Test;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
+import reactor.core.publisher.Mono;
+
public interface EventBusConcurrentTestContract {
Duration FIVE_SECONDS = Duration.ofSeconds(5);
@@ -89,9 +91,9 @@ public interface EventBusConcurrentTestContract {
EventBusTestFixture.MailboxListenerCountingSuccessfulExecution countingListener1 = newCountingListener();
EventBusTestFixture.MailboxListenerCountingSuccessfulExecution countingListener2 = newCountingListener();
EventBusTestFixture.MailboxListenerCountingSuccessfulExecution countingListener3 = newCountingListener();
- eventBus().register(countingListener1, KEY_1);
- eventBus().register(countingListener2, KEY_2);
- eventBus().register(countingListener3, KEY_3);
+ Mono.from(eventBus().register(countingListener1, KEY_1)).block();
+ Mono.from(eventBus().register(countingListener2, KEY_2)).block();
+ Mono.from(eventBus().register(countingListener3, KEY_3)).block();
int totalKeyListenerRegistrations = 3; // KEY1 + KEY2 + KEY3
int totalEventBus = 1;
@@ -120,9 +122,9 @@ public interface EventBusConcurrentTestContract {
int totalGlobalRegistrations = 3; // GroupA + GroupB + GroupC
int totalEventDeliveredGlobally = totalGlobalRegistrations * TOTAL_DISPATCH_OPERATIONS;
- eventBus().register(countingListener1, KEY_1);
- eventBus().register(countingListener2, KEY_2);
- eventBus().register(countingListener3, KEY_3);
+ Mono.from(eventBus().register(countingListener1, KEY_1)).block();
+ Mono.from(eventBus().register(countingListener2, KEY_2)).block();
+ Mono.from(eventBus().register(countingListener3, KEY_3)).block();
int totalKeyListenerRegistrations = 3; // KEY1 + KEY2 + KEY3
int totalEventDeliveredByKeys = totalKeyListenerRegistrations * TOTAL_DISPATCH_OPERATIONS;
@@ -175,13 +177,13 @@ public interface EventBusConcurrentTestContract {
EventBusTestFixture.MailboxListenerCountingSuccessfulExecution countingListener2 = newCountingListener();
EventBusTestFixture.MailboxListenerCountingSuccessfulExecution countingListener3 = newCountingListener();
- eventBus().register(countingListener1, KEY_1);
- eventBus().register(countingListener2, KEY_2);
- eventBus().register(countingListener3, KEY_3);
+ Mono.from(eventBus().register(countingListener1, KEY_1)).block();
+ Mono.from(eventBus().register(countingListener2, KEY_2)).block();
+ Mono.from(eventBus().register(countingListener3, KEY_3)).block();
- eventBus2().register(countingListener1, KEY_1);
- eventBus2().register(countingListener2, KEY_2);
- eventBus2().register(countingListener3, KEY_3);
+ Mono.from(eventBus2().register(countingListener1, KEY_1)).block();
+ Mono.from(eventBus2().register(countingListener2, KEY_2)).block();
+ Mono.from(eventBus2().register(countingListener3, KEY_3)).block();
int totalKeyListenerRegistrations = 3; // KEY1 + KEY2 + KEY3
int totalEventBus = 2; // eventBus1 + eventBus2
@@ -209,14 +211,14 @@ public interface EventBusConcurrentTestContract {
int totalGlobalRegistrations = 3; // GroupA + GroupB + GroupC
int totalEventDeliveredGlobally = totalGlobalRegistrations * TOTAL_DISPATCH_OPERATIONS;
- eventBus().register(countingListener1, KEY_1);
- eventBus().register(countingListener2, KEY_2);
+ Mono.from(eventBus().register(countingListener1, KEY_1)).block();
+ Mono.from(eventBus().register(countingListener2, KEY_2)).block();
- eventBus2().register(countingListener1, KEY_1);
- eventBus2().register(countingListener2, KEY_2);
+ Mono.from(eventBus2().register(countingListener1, KEY_1)).block();
+ Mono.from(eventBus2().register(countingListener2, KEY_2)).block();
- eventBus3().register(countingListener3, KEY_1);
- eventBus3().register(countingListener3, KEY_2);
+ Mono.from(eventBus3().register(countingListener3, KEY_1)).block();
+ Mono.from(eventBus3().register(countingListener3, KEY_2)).block();
int totalKeyListenerRegistrations = 2; // KEY1 + KEY2
int totalEventBus = 3; // eventBus1 + eventBus2 + eventBus3
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java
index 34dd756..908f156 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java
@@ -56,6 +56,8 @@ import org.junit.jupiter.api.Test;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSortedMap;
+
+import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
public interface KeyContract extends EventBusContract {
@@ -68,7 +70,7 @@ public interface KeyContract extends EventBusContract {
AtomicInteger finishedExecutions = new AtomicInteger(0);
AtomicBoolean rateExceeded = new AtomicBoolean(false);
- eventBus().register(event -> {
+ Mono.from(eventBus().register(event -> {
if (nbCalls.get() - finishedExecutions.get() > EventBus.EXECUTION_RATE) {
rateExceeded.set(true);
}
@@ -76,7 +78,7 @@ public interface KeyContract extends EventBusContract {
Thread.sleep(Duration.ofMillis(200).toMillis());
finishedExecutions.incrementAndGet();
- }, KEY_1);
+ }, KEY_1)).block();
IntStream.range(0, eventCount)
.forEach(i -> eventBus().dispatch(EVENT, KEY_1).block());
@@ -91,18 +93,18 @@ public interface KeyContract extends EventBusContract {
CountDownLatch countDownLatch = new CountDownLatch(1);
try {
ConcurrentLinkedQueue<String> threads = new ConcurrentLinkedQueue<>();
- eventBus().register(event -> {
+ Mono.from(eventBus().register(event -> {
threads.add(Thread.currentThread().getName());
countDownLatch.await();
- }, KEY_1);
- eventBus().register(event -> {
+ }, KEY_1)).block();
+ Mono.from(eventBus().register(event -> {
threads.add(Thread.currentThread().getName());
countDownLatch.await();
- }, KEY_1);
- eventBus().register(event -> {
+ }, KEY_1)).block();
+ Mono.from(eventBus().register(event -> {
threads.add(Thread.currentThread().getName());
countDownLatch.await();
- }, KEY_1);
+ }, KEY_1)).block();
eventBus().dispatch(EVENT, KEY_1).subscribeOn(Schedulers.elastic()).subscribe();
@@ -120,7 +122,7 @@ public interface KeyContract extends EventBusContract {
default void registeredListenersShouldNotReceiveNoopEvents() throws Exception {
MailboxListener listener = newListener();
- eventBus().register(listener, KEY_1);
+ Mono.from(eventBus().register(listener, KEY_1)).block();
Username bob = Username.of("bob");
MailboxListener.Added noopEvent = new MailboxListener.Added(MailboxSession.SessionId.of(18), bob, MailboxPath.forUser(bob, "mailbox"), TestId.of(58), ImmutableSortedMap.of(), Event.EventId.random());
@@ -134,7 +136,7 @@ public interface KeyContract extends EventBusContract {
default void registeredListenersShouldReceiveOnlyHandledEvents() throws Exception {
MailboxListener listener = newListener();
- eventBus().register(listener, KEY_1);
+ Mono.from(eventBus().register(listener, KEY_1)).block();
eventBus().dispatch(EVENT_UNSUPPORTED_BY_LISTENER, KEY_1).block();
@@ -147,7 +149,7 @@ public interface KeyContract extends EventBusContract {
MailboxListener listener = newListener();
doThrow(new RuntimeException()).when(listener).event(any());
- eventBus().register(listener, KEY_1);
+ Mono.from(eventBus().register(listener, KEY_1)).block();
assertThatCode(() -> eventBus().dispatch(EVENT, NO_KEYS).block())
.doesNotThrowAnyException();
@@ -156,7 +158,7 @@ public interface KeyContract extends EventBusContract {
@Test
default void dispatchShouldNotNotifyRegisteredListenerWhenEmptyKeySet() throws Exception {
MailboxListener listener = newListener();
- eventBus().register(listener, KEY_1);
+ Mono.from(eventBus().register(listener, KEY_1)).block();
eventBus().dispatch(EVENT, NO_KEYS).block();
@@ -167,7 +169,7 @@ public interface KeyContract extends EventBusContract {
@Test
default void dispatchShouldNotNotifyListenerRegisteredOnOtherKeys() throws Exception {
MailboxListener listener = newListener();
- eventBus().register(listener, KEY_1);
+ Mono.from(eventBus().register(listener, KEY_1)).block();
eventBus().dispatch(EVENT, ImmutableSet.of(KEY_2)).block();
@@ -178,7 +180,7 @@ public interface KeyContract extends EventBusContract {
@Test
default void dispatchShouldNotifyRegisteredListeners() throws Exception {
MailboxListener listener = newListener();
- eventBus().register(listener, KEY_1);
+ Mono.from(eventBus().register(listener, KEY_1)).block();
eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
@@ -188,7 +190,7 @@ public interface KeyContract extends EventBusContract {
@Test
default void dispatchShouldNotifyLocalRegisteredListenerWithoutDelay() throws Exception {
MailboxListener listener = newListener();
- eventBus().register(listener, KEY_1);
+ Mono.from(eventBus().register(listener, KEY_1)).block();
eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
@@ -199,8 +201,8 @@ public interface KeyContract extends EventBusContract {
default void dispatchShouldNotifyOnlyRegisteredListener() throws Exception {
MailboxListener listener = newListener();
MailboxListener listener2 = newListener();
- eventBus().register(listener, KEY_1);
- eventBus().register(listener2, KEY_2);
+ Mono.from(eventBus().register(listener, KEY_1)).block();
+ Mono.from(eventBus().register(listener2, KEY_2)).block();
eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
@@ -213,8 +215,8 @@ public interface KeyContract extends EventBusContract {
default void dispatchShouldNotifyAllListenersRegisteredOnAKey() throws Exception {
MailboxListener listener = newListener();
MailboxListener listener2 = newListener();
- eventBus().register(listener, KEY_1);
- eventBus().register(listener2, KEY_1);
+ Mono.from(eventBus().register(listener, KEY_1)).block();
+ Mono.from(eventBus().register(listener2, KEY_1)).block();
eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
@@ -225,8 +227,8 @@ public interface KeyContract extends EventBusContract {
@Test
default void registerShouldAllowDuplicatedRegistration() throws Exception {
MailboxListener listener = newListener();
- eventBus().register(listener, KEY_1);
- eventBus().register(listener, KEY_1);
+ Mono.from(eventBus().register(listener, KEY_1)).block();
+ Mono.from(eventBus().register(listener, KEY_1)).block();
eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
@@ -236,8 +238,8 @@ public interface KeyContract extends EventBusContract {
@Test
default void unregisterShouldRemoveDoubleRegisteredListener() throws Exception {
MailboxListener listener = newListener();
- eventBus().register(listener, KEY_1);
- eventBus().register(listener, KEY_1).unregister();
+ Mono.from(eventBus().register(listener, KEY_1)).block();
+ Mono.from(eventBus().register(listener, KEY_1)).block().unregister();
eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
@@ -251,7 +253,7 @@ public interface KeyContract extends EventBusContract {
eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
- eventBus().register(listener, KEY_1);
+ Mono.from(eventBus().register(listener, KEY_1)).block();
verify(listener, after(FIVE_HUNDRED_MS.toMillis()).never())
.event(any());
@@ -260,8 +262,8 @@ public interface KeyContract extends EventBusContract {
@Test
default void callingAllUnregisterMethodShouldUnregisterTheListener() throws Exception {
MailboxListener listener = newListener();
- Registration registration = eventBus().register(listener, KEY_1);
- eventBus().register(listener, KEY_1).unregister();
+ Registration registration = Mono.from(eventBus().register(listener, KEY_1)).block();
+ Mono.from(eventBus().register(listener, KEY_1)).block().unregister();
registration.unregister();
eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
@@ -273,8 +275,8 @@ public interface KeyContract extends EventBusContract {
@Test
default void unregisterShouldHaveNotNotifyWhenCalledOnDifferentKeys() throws Exception {
MailboxListener listener = newListener();
- eventBus().register(listener, KEY_1);
- eventBus().register(listener, KEY_2).unregister();
+ Mono.from(eventBus().register(listener, KEY_1)).block();
+ Mono.from(eventBus().register(listener, KEY_2)).block().unregister();
eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
@@ -285,7 +287,7 @@ public interface KeyContract extends EventBusContract {
default void unregisterShouldBeIdempotentForKeyRegistrations() {
MailboxListener listener = newListener();
- Registration registration = eventBus().register(listener, KEY_1);
+ Registration registration = Mono.from(eventBus().register(listener, KEY_1)).block();
registration.unregister();
assertThatCode(registration::unregister)
@@ -295,7 +297,7 @@ public interface KeyContract extends EventBusContract {
@Test
default void dispatchShouldAcceptSeveralKeys() throws Exception {
MailboxListener listener = newListener();
- eventBus().register(listener, KEY_1);
+ Mono.from(eventBus().register(listener, KEY_1)).block();
eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1, KEY_2)).block();
@@ -305,8 +307,8 @@ public interface KeyContract extends EventBusContract {
@Test
default void dispatchShouldCallListenerOnceWhenSeveralKeysMatching() throws Exception {
MailboxListener listener = newListener();
- eventBus().register(listener, KEY_1);
- eventBus().register(listener, KEY_2);
+ Mono.from(eventBus().register(listener, KEY_1)).block();
+ Mono.from(eventBus().register(listener, KEY_2)).block();
eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1, KEY_2)).block();
@@ -316,7 +318,7 @@ public interface KeyContract extends EventBusContract {
@Test
default void dispatchShouldNotNotifyUnregisteredListener() throws Exception {
MailboxListener listener = newListener();
- eventBus().register(listener, KEY_1).unregister();
+ Mono.from(eventBus().register(listener, KEY_1)).block().unregister();
eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
@@ -329,7 +331,7 @@ public interface KeyContract extends EventBusContract {
default void dispatchShouldNotifyAsynchronousListener() throws Exception {
MailboxListener listener = newListener();
when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
- eventBus().register(listener, KEY_1);
+ Mono.from(eventBus().register(listener, KEY_1)).block();
eventBus().dispatch(EVENT, KEY_1).block();
@@ -356,7 +358,7 @@ public interface KeyContract extends EventBusContract {
@Test
default void failingRegisteredListenersShouldNotAbortRegisteredDelivery() {
EventBusTestFixture.MailboxListenerCountingSuccessfulExecution listener = new EventBusTestFixture.EventMatcherThrowingListener(ImmutableSet.of(EVENT));
- eventBus().register(listener, KEY_1);
+ Mono.from(eventBus().register(listener, KEY_1)).block();
eventBus().dispatch(EVENT, KEY_1).block();
eventBus().dispatch(EVENT_2, KEY_1).block();
@@ -373,8 +375,8 @@ public interface KeyContract extends EventBusContract {
when(failingListener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
doThrow(new RuntimeException()).when(failingListener).event(any());
- eventBus().register(failingListener, KEY_1);
- eventBus().register(listener, KEY_1);
+ Mono.from(eventBus().register(failingListener, KEY_1)).block();
+ Mono.from(eventBus().register(listener, KEY_1)).block();
eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
@@ -388,7 +390,7 @@ public interface KeyContract extends EventBusContract {
default void crossEventBusRegistrationShouldBeAllowed() throws Exception {
MailboxListener mailboxListener = newListener();
- eventBus().register(mailboxListener, KEY_1);
+ Mono.from(eventBus().register(mailboxListener, KEY_1)).block();
eventBus2().dispatch(EVENT, KEY_1).block();
@@ -399,7 +401,7 @@ public interface KeyContract extends EventBusContract {
default void unregisteredDistantListenersShouldNotBeNotified() throws Exception {
MailboxListener mailboxListener = newListener();
- eventBus().register(mailboxListener, KEY_1).unregister();
+ Mono.from(eventBus().register(mailboxListener, KEY_1)).block().unregister();
eventBus2().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
@@ -412,8 +414,8 @@ public interface KeyContract extends EventBusContract {
MailboxListener mailboxListener1 = newListener();
MailboxListener mailboxListener2 = newListener();
- eventBus().register(mailboxListener1, KEY_1);
- eventBus2().register(mailboxListener2, KEY_1);
+ Mono.from(eventBus().register(mailboxListener1, KEY_1)).block();
+ Mono.from(eventBus2().register(mailboxListener2, KEY_1)).block();
eventBus2().dispatch(EVENT, KEY_1).block();
@@ -427,7 +429,7 @@ public interface KeyContract extends EventBusContract {
eventBus2().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
- eventBus().register(listener, KEY_1);
+ Mono.from(eventBus().register(listener, KEY_1)).block();
verify(listener, after(FIVE_HUNDRED_MS.toMillis()).never())
.event(any());
@@ -438,8 +440,8 @@ public interface KeyContract extends EventBusContract {
MailboxListener mailboxListener1 = newListener();
MailboxListener mailboxListener2 = newListener();
- eventBus().register(mailboxListener1, KEY_1);
- eventBus2().register(mailboxListener2, KEY_1);
+ Mono.from(eventBus().register(mailboxListener1, KEY_1)).block();
+ Mono.from(eventBus2().register(mailboxListener2, KEY_1)).block();
eventBus2().dispatch(EVENT, KEY_1).block();
diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/InVMEventBus.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/InVMEventBus.java
index 36d9d49..ab7cb2f 100644
--- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/InVMEventBus.java
+++ b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/InVMEventBus.java
@@ -55,9 +55,9 @@ public class InVMEventBus implements EventBus {
}
@Override
- public Registration register(MailboxListener.ReactiveMailboxListener listener, RegistrationKey key) {
+ public Mono<Registration> register(MailboxListener.ReactiveMailboxListener listener, RegistrationKey key) {
registrations.put(key, listener);
- return () -> registrations.remove(key, listener);
+ return Mono.just(() -> registrations.remove(key, listener));
}
@Override
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
index c44be2d..1d5fdd9 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
@@ -125,20 +125,25 @@ class KeyRegistrationHandler {
.block();
}
- Registration register(MailboxListener.ReactiveMailboxListener listener, RegistrationKey key) {
+ Mono<Registration> register(MailboxListener.ReactiveMailboxListener listener, RegistrationKey key) {
LocalListenerRegistry.LocalRegistration registration = localListenerRegistry.addListener(key, listener);
+
+ return registerIfNeeded(key, registration)
+ .thenReturn(new KeyRegistration(() -> {
+ if (registration.unregister().lastListenerRemoved()) {
+ registrationBinder.unbind(key)
+ .retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor()).scheduler(Schedulers.elastic()))
+ .block();
+ }
+ }));
+ }
+
+ private Mono<Void> registerIfNeeded(RegistrationKey key, LocalListenerRegistry.LocalRegistration registration) {
if (registration.isFirstListener()) {
- registrationBinder.bind(key)
- .retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor()).scheduler(Schedulers.elastic()))
- .block();
+ return registrationBinder.bind(key)
+ .retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor()).scheduler(Schedulers.elastic()));
}
- return new KeyRegistration(() -> {
- if (registration.unregister().lastListenerRemoved()) {
- registrationBinder.unbind(key)
- .retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor()).scheduler(Schedulers.elastic()))
- .block();
- }
- });
+ return Mono.empty();
}
private Mono<Void> handleDelivery(Delivery delivery) {
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
index 196ac8c..a73f1ed 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
@@ -121,7 +121,7 @@ public class RabbitMQEventBus implements EventBus, Startable {
}
@Override
- public Registration register(MailboxListener.ReactiveMailboxListener listener, RegistrationKey key) {
+ public Mono<Registration> register(MailboxListener.ReactiveMailboxListener listener, RegistrationKey key) {
Preconditions.checkState(isRunning, NOT_RUNNING_ERROR_MESSAGE);
return keyRegistrationHandler.register(listener, key);
}
diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
index 3dbbb80..bfbae43 100644
--- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
+++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
@@ -372,7 +372,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
void dispatchShouldWorkAfterNetworkIssuesForOldRegistrationAndKey() {
rabbitMQEventBusWithNetWorkIssue.start();
MailboxListener listener = newListener();
- rabbitMQEventBusWithNetWorkIssue.register(listener, KEY_1);
+ Mono.from(rabbitMQEventBusWithNetWorkIssue.register(listener, KEY_1)).block();
rabbitMQNetWorkIssueExtension.getRabbitMQ().pause();
@@ -454,7 +454,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
void dispatchShouldWorkAfterRestartForOldKeyRegistration() throws Exception {
eventBus.start();
MailboxListener listener = newListener();
- eventBus.register(listener, KEY_1);
+ Mono.from(eventBus.register(listener, KEY_1)).block();
rabbitMQExtension.getRabbitMQ().restart();
@@ -466,7 +466,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
void dispatchedMessagesShouldSurviveARabbitMQRestart() throws Exception {
eventBusWithKeyHandlerNotStarted.startWithoutStartingKeyRegistrationHandler();
MailboxListener listener = newAsyncListener();
- eventBusWithKeyHandlerNotStarted.register(listener, KEY_1);
+ Mono.from(eventBusWithKeyHandlerNotStarted.register(listener, KEY_1)).block();
Mono<Void> dispatch = eventBusWithKeyHandlerNotStarted.dispatch(EVENT, KEY_1);
dispatch.block();
@@ -484,7 +484,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
rabbitMQExtension.getRabbitMQ().restart();
- eventBus.register(listener, KEY_1);
+ Mono.from(eventBus.register(listener, KEY_1)).block();
eventBus.dispatch(EVENT, KEY_1).block();
assertThatListenerReceiveOneEvent(listener);
@@ -530,7 +530,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
eventBus.start();
MailboxListener listener = newListener();
when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
- eventBus.register(listener, KEY_1);
+ Mono.from(eventBus.register(listener, KEY_1)).block();
rabbitMQExtension.getRabbitMQ().pause();
@@ -558,7 +558,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
rabbitMQExtension.getRabbitMQ().unpause();
- eventBus.register(listener, KEY_1);
+ Mono.from(eventBus.register(listener, KEY_1)).block();
eventBus.dispatch(EVENT, KEY_1).block();
assertThatListenerReceiveOneEvent(listener);
}
@@ -757,7 +757,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
void dispatchShouldPersistEventWhenDispatchingWithKeysGetError() {
EventCollector eventCollector = eventCollector();
eventBus().register(eventCollector, GROUP_A);
- eventBus().register(eventCollector, KEY_1);
+ Mono.from(eventBus().register(eventCollector, KEY_1)).block();
rabbitMQExtension.getRabbitMQ().pause();
diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/IdleProcessor.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/IdleProcessor.java
index 58b95b4..7681c33 100644
--- a/protocols/imap/src/main/java/org/apache/james/imap/processor/IdleProcessor.java
+++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/IdleProcessor.java
@@ -53,6 +53,9 @@ import org.apache.james.util.concurrent.NamedThreadFactory;
import com.google.common.collect.ImmutableList;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
public class IdleProcessor extends AbstractMailboxProcessor<IdleRequest> implements CapabilityImplementingProcessor {
private static final List<Capability> CAPS = ImmutableList.of(SUPPORTS_IDLE);
public static final int DEFAULT_SCHEDULED_POOL_CORE_SIZE = 5;
@@ -88,7 +91,9 @@ public class IdleProcessor extends AbstractMailboxProcessor<IdleRequest> impleme
SelectedMailbox sm = session.getSelected();
Registration registration;
if (sm != null) {
- registration = eventBus.register(new IdleMailboxListener(session, responder), new MailboxIdRegistrationKey(sm.getMailboxId()));
+ registration = Mono.from(eventBus.register(new IdleMailboxListener(session, responder), new MailboxIdRegistrationKey(sm.getMailboxId())))
+ .subscribeOn(Schedulers.elastic())
+ .block();
} else {
registration = null;
}
diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/base/SelectedMailboxImpl.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/base/SelectedMailboxImpl.java
index 1aef2bf..5457960 100644
--- a/protocols/imap/src/main/java/org/apache/james/imap/processor/base/SelectedMailboxImpl.java
+++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/base/SelectedMailboxImpl.java
@@ -52,6 +52,9 @@ import org.apache.james.mailbox.model.UpdatedFlags;
import com.github.steveash.guavate.Guavate;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
/**
* Default implementation of {@link SelectedMailbox}
*/
@@ -89,7 +92,9 @@ public class SelectedMailboxImpl implements SelectedMailbox, MailboxListener {
mailboxId = messageManager.getId();
- registration = eventBus.register(this, new MailboxIdRegistrationKey(mailboxId));
+ registration = Mono.from(eventBus.register(this, new MailboxIdRegistrationKey(mailboxId)))
+ .subscribeOn(Schedulers.elastic())
+ .block();
applicableFlags = messageManager.getApplicableFlags(mailboxSession);
try (Stream<MessageUid> stream = messageManager.search(new SearchQuery(SearchQuery.all()), mailboxSession)) {
diff --git a/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java b/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java
index f93ffc3..1e0cd54 100644
--- a/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java
+++ b/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java
@@ -27,7 +27,6 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.Date;
-import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
@@ -47,6 +46,7 @@ import org.apache.james.mailbox.ModSeq;
import org.apache.james.mailbox.events.EventBus;
import org.apache.james.mailbox.events.MailboxIdRegistrationKey;
import org.apache.james.mailbox.events.MailboxListener;
+import org.apache.james.mailbox.events.Registration;
import org.apache.james.mailbox.model.Mailbox;
import org.apache.james.mailbox.model.MailboxPath;
import org.apache.james.mailbox.model.MessageMetaData;
@@ -62,6 +62,8 @@ import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Mono;
+
public class SelectedMailboxImplTest {
@@ -152,7 +154,7 @@ public class SelectedMailboxImplTest {
};
}
- private Answer<Iterator<MessageUid>> generateEmitEventAnswer(AtomicInteger success) {
+ private Answer<Mono<Registration>> generateEmitEventAnswer(AtomicInteger success) {
return invocation -> {
Object[] args = invocation.getArguments();
MailboxListener mailboxListener = (MailboxListener) args[0];
@@ -164,7 +166,7 @@ public class SelectedMailboxImplTest {
LOGGER.error("Error while processing event on a concurrent thread", e);
}
});
- return null;
+ return Mono.just(() -> {});
};
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org