You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/06/02 08:41:51 UTC

[james-project] 06/07: JAMES-3191 Specify scheduler for EventBus key registration

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit b3ea48605e656ee2074fa1b47020f61528b235bf
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed May 20 11:21:50 2020 +0700

    JAMES-3191 Specify scheduler for EventBus key registration
---
 .../org/apache/james/mailbox/events/EventBus.java  |  6 +-
 .../mailbox/MailboxManagerStressContract.java      | 12 +--
 .../apache/james/mailbox/MailboxManagerTest.java   | 26 +++----
 .../mailbox/events/ErrorHandlingContract.java      |  8 +-
 .../events/EventBusConcurrentTestContract.java     | 38 ++++-----
 .../apache/james/mailbox/events/KeyContract.java   | 90 +++++++++++-----------
 .../apache/james/mailbox/events/InVMEventBus.java  |  4 +-
 .../mailbox/events/KeyRegistrationHandler.java     | 27 ++++---
 .../james/mailbox/events/RabbitMQEventBus.java     |  2 +-
 .../james/mailbox/events/RabbitMQEventBusTest.java | 14 ++--
 .../apache/james/imap/processor/IdleProcessor.java |  7 +-
 .../imap/processor/base/SelectedMailboxImpl.java   |  7 +-
 .../processor/base/SelectedMailboxImplTest.java    |  8 +-
 13 files changed, 137 insertions(+), 112 deletions(-)

diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/events/EventBus.java b/mailbox/api/src/main/java/org/apache/james/mailbox/events/EventBus.java
index 752740e..c07cfb9 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/events/EventBus.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/events/EventBus.java
@@ -21,6 +21,8 @@ package org.apache.james.mailbox.events;
 
 import java.util.Set;
 
+import org.reactivestreams.Publisher;
+
 import com.google.common.collect.ImmutableSet;
 
 import reactor.core.publisher.Mono;
@@ -44,11 +46,11 @@ public interface EventBus {
         }
     }
 
-    default Registration register(MailboxListener listener, RegistrationKey key) {
+    default Publisher<Registration> register(MailboxListener listener, RegistrationKey key) {
         return register(MailboxListener.wrapReactive(listener), key);
     }
 
-    Registration register(MailboxListener.ReactiveMailboxListener listener, RegistrationKey key);
+    Publisher<Registration> register(MailboxListener.ReactiveMailboxListener listener, RegistrationKey key);
 
     Registration register(MailboxListener.ReactiveMailboxListener listener, Group group) throws GroupAlreadyRegistered;
 
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerStressContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerStressContract.java
index c32b854..192140f 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerStressContract.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerStressContract.java
@@ -32,7 +32,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.james.core.Username;
-import org.apache.james.mailbox.events.Event;
 import org.apache.james.mailbox.events.EventBus;
 import org.apache.james.mailbox.events.MailboxIdRegistrationKey;
 import org.apache.james.mailbox.events.MailboxListener;
@@ -46,6 +45,8 @@ import org.junit.jupiter.api.Test;
 
 import com.google.common.collect.ImmutableSet;
 
+import reactor.core.publisher.Mono;
+
 public interface MailboxManagerStressContract<T extends MailboxManager> {
 
     int APPEND_OPERATIONS = 200;
@@ -66,13 +67,12 @@ public interface MailboxManagerStressContract<T extends MailboxManager> {
         getManager().startProcessingRequest(session);
         MailboxPath path = MailboxPath.forUser(username, "INBOX");
         MailboxId mailboxId = getManager().createMailbox(path, session).get();
-        retrieveEventBus().register(new MailboxListener() {
-            @Override
-            public void event(Event event) {
+        Mono.from(retrieveEventBus()
+            .register(event -> {
                 MessageUid u = ((MailboxListener.Added) event).getUids().iterator().next();
                 uList.add(u);
-            }
-        }, new MailboxIdRegistrationKey(mailboxId));
+            }, new MailboxIdRegistrationKey(mailboxId)))
+            .block();
         getManager().endProcessingRequest(session);
         getManager().logout(session);
 
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerTest.java b/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerTest.java
index 15ab35d..c262c91 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerTest.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerTest.java
@@ -719,7 +719,7 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
         @Test
         void deleteMailboxShouldFireMailboxDeletionEvent() throws Exception {
             assumeTrue(mailboxManager.hasCapability(MailboxCapabilities.Quota));
-            retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(inboxId));
+            Mono.from(retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(inboxId))).block();
 
             mailboxManager.deleteMailbox(inbox, session);
 
@@ -737,7 +737,7 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
         @Test
         void deleteMailboxByIdShouldFireMailboxDeletionEvent() throws Exception {
             assumeTrue(mailboxManager.hasCapability(MailboxCapabilities.Quota));
-            retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(inboxId));
+            Mono.from(retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(inboxId))).block();
 
             mailboxManager.deleteMailbox(inboxId, session);
 
@@ -792,7 +792,7 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
 
         @Test
         void addingMessageShouldFireAddedEvent() throws Exception {
-            retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(inboxId));
+            Mono.from(retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(inboxId))).block();
             inboxManager.appendMessage(MessageManager.AppendCommand.builder()
                     .build(message), session);
 
@@ -810,7 +810,7 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
             inboxManager.appendMessage(MessageManager.AppendCommand.builder().build(message), session);
             inboxManager.setFlags(new Flags(Flags.Flag.DELETED), MessageManager.FlagsUpdateMode.ADD, MessageRange.all(), session);
 
-            retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(inboxId));
+            Mono.from(retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(inboxId))).block();
             inboxManager.expunge(MessageRange.all(), session);
 
             assertThat(listener.getEvents())
@@ -827,7 +827,7 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
             ComposedMessageId messageId = inboxManager.appendMessage(MessageManager.AppendCommand.builder().build(message), session).getId();
             inboxManager.setFlags(new Flags(Flags.Flag.DELETED), MessageManager.FlagsUpdateMode.ADD, MessageRange.all(), session);
 
-            retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(inboxId));
+            Mono.from(retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(inboxId))).block();
             inboxManager.delete(ImmutableList.of(messageId.getUid()), session);
 
             assertThat(listener.getEvents())
@@ -843,7 +843,7 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
         void setFlagsShouldFireFlagsUpdatedEvent() throws Exception {
             inboxManager.appendMessage(MessageManager.AppendCommand.builder().build(message), session);
 
-            retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(inboxId));
+            Mono.from(retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(inboxId))).block();
             inboxManager.setFlags(new Flags(Flags.Flag.FLAGGED), MessageManager.FlagsUpdateMode.ADD, MessageRange.all(), session);
 
             assertThat(listener.getEvents())
@@ -861,7 +861,7 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
             Optional<MailboxId> targetMailboxId = mailboxManager.createMailbox(newPath, session);
             inboxManager.appendMessage(AppendCommand.builder().build(message), session);
 
-            retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(targetMailboxId.get()));
+            Mono.from(retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(targetMailboxId.get()))).block();
             mailboxManager.moveMessages(MessageRange.all(), inbox, newPath, session);
 
             assertThat(listener.getEvents())
@@ -879,7 +879,7 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
             mailboxManager.createMailbox(newPath, session);
             inboxManager.appendMessage(AppendCommand.builder().build(message), session);
 
-            retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(inboxId));
+            Mono.from(retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(inboxId))).block();
             mailboxManager.moveMessages(MessageRange.all(), inbox, newPath, session);
 
             assertThat(listener.getEvents())
@@ -896,7 +896,7 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
             Optional<MailboxId> targetMailboxId = mailboxManager.createMailbox(newPath, session);
             inboxManager.appendMessage(AppendCommand.builder().build(message), session);
 
-            retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(targetMailboxId.get()));
+            Mono.from(retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(targetMailboxId.get()))).block();
             mailboxManager.copyMessages(MessageRange.all(), inbox, newPath, session);
 
             assertThat(listener.getEvents())
@@ -915,7 +915,7 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
             Optional<MailboxId> targetMailboxId = mailboxManager.createMailbox(newPath, session);
             ComposedMessageId messageId = inboxManager.appendMessage(AppendCommand.builder().build(message), session).getId();
 
-            retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(targetMailboxId.get()));
+            Mono.from(retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(targetMailboxId.get()))).block();
             mailboxManager.copyMessages(MessageRange.all(), inbox, newPath, session);
 
             assertThat(listener.getEvents())
@@ -933,7 +933,7 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
             mailboxManager.createMailbox(newPath, session);
             inboxManager.appendMessage(AppendCommand.builder().build(message), session);
 
-            retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(inboxId));
+            Mono.from(retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(inboxId))).block();
             mailboxManager.copyMessages(MessageRange.all(), inbox, newPath, session);
 
             assertThat(listener.getEvents())
@@ -948,7 +948,7 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
             Optional<MailboxId> targetMailboxId = mailboxManager.createMailbox(newPath, session);
             ComposedMessageId messageId = inboxManager.appendMessage(AppendCommand.builder().build(message), session).getId();
 
-            retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(targetMailboxId.get()));
+            Mono.from(retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(targetMailboxId.get()))).block();
             mailboxManager.moveMessages(MessageRange.all(), inbox, newPath, session);
 
             assertThat(listener.getEvents())
@@ -966,7 +966,7 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
             mailboxManager.createMailbox(newPath, session);
             ComposedMessageId messageId = inboxManager.appendMessage(AppendCommand.builder().build(message), session).getId();
 
-            retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(inboxId));
+            Mono.from(retrieveEventBus(mailboxManager).register(listener, new MailboxIdRegistrationKey(inboxId))).block();
             mailboxManager.moveMessages(MessageRange.all(), inbox, newPath, session);
 
             assertThat(listener.getEvents())
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java
index 7979ed2..b598ac1 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java
@@ -44,6 +44,8 @@ import org.junit.jupiter.api.Test;
 
 import com.google.common.collect.ImmutableSet;
 
+import reactor.core.publisher.Mono;
+
 interface ErrorHandlingContract extends EventBusContract {
 
     class ThrowingListener implements MailboxListener {
@@ -82,7 +84,7 @@ interface ErrorHandlingContract extends EventBusContract {
         doThrow(new RuntimeException())
             .when(eventCollector).event(EVENT);
 
-        eventBus().register(eventCollector, KEY_1);
+        Mono.from(eventBus().register(eventCollector, KEY_1)).block();
         eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
 
         assertThat(eventCollector.getEvents())
@@ -232,7 +234,7 @@ interface ErrorHandlingContract extends EventBusContract {
             .doCallRealMethod()
             .when(eventCollector).event(EVENT);
 
-        eventBus().register(eventCollector, KEY_1);
+        Mono.from(eventBus().register(eventCollector, KEY_1)).block();
         eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
 
         TimeUnit.SECONDS.sleep(1);
@@ -352,7 +354,7 @@ interface ErrorHandlingContract extends EventBusContract {
         EventCollector eventCollector2 = eventCollector();
 
         eventBus().register(eventCollector, GROUP_A);
-        eventBus().register(eventCollector2, KEY_1);
+        Mono.from(eventBus().register(eventCollector2, KEY_1)).block();
         eventBus().reDeliver(GROUP_A, EVENT).block();
 
         getSpeedProfile().longWaitCondition()
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusConcurrentTestContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusConcurrentTestContract.java
index 99b64c1..1aa3c2e 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusConcurrentTestContract.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusConcurrentTestContract.java
@@ -39,6 +39,8 @@ import org.junit.jupiter.api.Test;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 
+import reactor.core.publisher.Mono;
+
 public interface EventBusConcurrentTestContract {
 
     Duration FIVE_SECONDS = Duration.ofSeconds(5);
@@ -89,9 +91,9 @@ public interface EventBusConcurrentTestContract {
             EventBusTestFixture.MailboxListenerCountingSuccessfulExecution countingListener1 = newCountingListener();
             EventBusTestFixture.MailboxListenerCountingSuccessfulExecution countingListener2 = newCountingListener();
             EventBusTestFixture.MailboxListenerCountingSuccessfulExecution countingListener3 = newCountingListener();
-            eventBus().register(countingListener1, KEY_1);
-            eventBus().register(countingListener2, KEY_2);
-            eventBus().register(countingListener3, KEY_3);
+            Mono.from(eventBus().register(countingListener1, KEY_1)).block();
+            Mono.from(eventBus().register(countingListener2, KEY_2)).block();
+            Mono.from(eventBus().register(countingListener3, KEY_3)).block();
 
             int totalKeyListenerRegistrations = 3; // KEY1 + KEY2 + KEY3
             int totalEventBus = 1;
@@ -120,9 +122,9 @@ public interface EventBusConcurrentTestContract {
             int totalGlobalRegistrations = 3; // GroupA + GroupB + GroupC
             int totalEventDeliveredGlobally = totalGlobalRegistrations * TOTAL_DISPATCH_OPERATIONS;
 
-            eventBus().register(countingListener1, KEY_1);
-            eventBus().register(countingListener2, KEY_2);
-            eventBus().register(countingListener3, KEY_3);
+            Mono.from(eventBus().register(countingListener1, KEY_1)).block();
+            Mono.from(eventBus().register(countingListener2, KEY_2)).block();
+            Mono.from(eventBus().register(countingListener3, KEY_3)).block();
             int totalKeyListenerRegistrations = 3; // KEY1 + KEY2 + KEY3
             int totalEventDeliveredByKeys = totalKeyListenerRegistrations * TOTAL_DISPATCH_OPERATIONS;
 
@@ -175,13 +177,13 @@ public interface EventBusConcurrentTestContract {
             EventBusTestFixture.MailboxListenerCountingSuccessfulExecution countingListener2 = newCountingListener();
             EventBusTestFixture.MailboxListenerCountingSuccessfulExecution countingListener3 = newCountingListener();
 
-            eventBus().register(countingListener1, KEY_1);
-            eventBus().register(countingListener2, KEY_2);
-            eventBus().register(countingListener3, KEY_3);
+            Mono.from(eventBus().register(countingListener1, KEY_1)).block();
+            Mono.from(eventBus().register(countingListener2, KEY_2)).block();
+            Mono.from(eventBus().register(countingListener3, KEY_3)).block();
 
-            eventBus2().register(countingListener1, KEY_1);
-            eventBus2().register(countingListener2, KEY_2);
-            eventBus2().register(countingListener3, KEY_3);
+            Mono.from(eventBus2().register(countingListener1, KEY_1)).block();
+            Mono.from(eventBus2().register(countingListener2, KEY_2)).block();
+            Mono.from(eventBus2().register(countingListener3, KEY_3)).block();
 
             int totalKeyListenerRegistrations = 3; // KEY1 + KEY2 + KEY3
             int totalEventBus = 2; // eventBus1 + eventBus2
@@ -209,14 +211,14 @@ public interface EventBusConcurrentTestContract {
             int totalGlobalRegistrations = 3; // GroupA + GroupB + GroupC
             int totalEventDeliveredGlobally = totalGlobalRegistrations * TOTAL_DISPATCH_OPERATIONS;
 
-            eventBus().register(countingListener1, KEY_1);
-            eventBus().register(countingListener2, KEY_2);
+            Mono.from(eventBus().register(countingListener1, KEY_1)).block();
+            Mono.from(eventBus().register(countingListener2, KEY_2)).block();
 
-            eventBus2().register(countingListener1, KEY_1);
-            eventBus2().register(countingListener2, KEY_2);
+            Mono.from(eventBus2().register(countingListener1, KEY_1)).block();
+            Mono.from(eventBus2().register(countingListener2, KEY_2)).block();
 
-            eventBus3().register(countingListener3, KEY_1);
-            eventBus3().register(countingListener3, KEY_2);
+            Mono.from(eventBus3().register(countingListener3, KEY_1)).block();
+            Mono.from(eventBus3().register(countingListener3, KEY_2)).block();
 
             int totalKeyListenerRegistrations = 2; // KEY1 + KEY2
             int totalEventBus = 3; // eventBus1 + eventBus2 + eventBus3
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java
index 34dd756..908f156 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java
@@ -56,6 +56,8 @@ import org.junit.jupiter.api.Test;
 
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.ImmutableSortedMap;
+
+import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 
 public interface KeyContract extends EventBusContract {
@@ -68,7 +70,7 @@ public interface KeyContract extends EventBusContract {
             AtomicInteger finishedExecutions = new AtomicInteger(0);
             AtomicBoolean rateExceeded = new AtomicBoolean(false);
 
-            eventBus().register(event -> {
+            Mono.from(eventBus().register(event -> {
                 if (nbCalls.get() - finishedExecutions.get() > EventBus.EXECUTION_RATE) {
                     rateExceeded.set(true);
                 }
@@ -76,7 +78,7 @@ public interface KeyContract extends EventBusContract {
                 Thread.sleep(Duration.ofMillis(200).toMillis());
                 finishedExecutions.incrementAndGet();
 
-            }, KEY_1);
+            }, KEY_1)).block();
 
             IntStream.range(0, eventCount)
                 .forEach(i -> eventBus().dispatch(EVENT, KEY_1).block());
@@ -91,18 +93,18 @@ public interface KeyContract extends EventBusContract {
             CountDownLatch countDownLatch = new CountDownLatch(1);
             try {
                 ConcurrentLinkedQueue<String> threads = new ConcurrentLinkedQueue<>();
-                eventBus().register(event -> {
+                Mono.from(eventBus().register(event -> {
                     threads.add(Thread.currentThread().getName());
                     countDownLatch.await();
-                }, KEY_1);
-                eventBus().register(event -> {
+                }, KEY_1)).block();
+                Mono.from(eventBus().register(event -> {
                     threads.add(Thread.currentThread().getName());
                     countDownLatch.await();
-                }, KEY_1);
-                eventBus().register(event -> {
+                }, KEY_1)).block();
+                Mono.from(eventBus().register(event -> {
                     threads.add(Thread.currentThread().getName());
                     countDownLatch.await();
-                }, KEY_1);
+                }, KEY_1)).block();
 
                 eventBus().dispatch(EVENT, KEY_1).subscribeOn(Schedulers.elastic()).subscribe();
 
@@ -120,7 +122,7 @@ public interface KeyContract extends EventBusContract {
         default void registeredListenersShouldNotReceiveNoopEvents() throws Exception {
             MailboxListener listener = newListener();
 
-            eventBus().register(listener, KEY_1);
+            Mono.from(eventBus().register(listener, KEY_1)).block();
 
             Username bob = Username.of("bob");
             MailboxListener.Added noopEvent = new MailboxListener.Added(MailboxSession.SessionId.of(18), bob, MailboxPath.forUser(bob, "mailbox"), TestId.of(58), ImmutableSortedMap.of(), Event.EventId.random());
@@ -134,7 +136,7 @@ public interface KeyContract extends EventBusContract {
         default void registeredListenersShouldReceiveOnlyHandledEvents() throws Exception {
             MailboxListener listener = newListener();
 
-            eventBus().register(listener, KEY_1);
+            Mono.from(eventBus().register(listener, KEY_1)).block();
 
             eventBus().dispatch(EVENT_UNSUPPORTED_BY_LISTENER, KEY_1).block();
 
@@ -147,7 +149,7 @@ public interface KeyContract extends EventBusContract {
             MailboxListener listener = newListener();
             doThrow(new RuntimeException()).when(listener).event(any());
 
-            eventBus().register(listener, KEY_1);
+            Mono.from(eventBus().register(listener, KEY_1)).block();
 
             assertThatCode(() -> eventBus().dispatch(EVENT, NO_KEYS).block())
                 .doesNotThrowAnyException();
@@ -156,7 +158,7 @@ public interface KeyContract extends EventBusContract {
         @Test
         default void dispatchShouldNotNotifyRegisteredListenerWhenEmptyKeySet() throws Exception {
             MailboxListener listener = newListener();
-            eventBus().register(listener, KEY_1);
+            Mono.from(eventBus().register(listener, KEY_1)).block();
 
             eventBus().dispatch(EVENT, NO_KEYS).block();
 
@@ -167,7 +169,7 @@ public interface KeyContract extends EventBusContract {
         @Test
         default void dispatchShouldNotNotifyListenerRegisteredOnOtherKeys() throws Exception {
             MailboxListener listener = newListener();
-            eventBus().register(listener, KEY_1);
+            Mono.from(eventBus().register(listener, KEY_1)).block();
 
             eventBus().dispatch(EVENT, ImmutableSet.of(KEY_2)).block();
 
@@ -178,7 +180,7 @@ public interface KeyContract extends EventBusContract {
         @Test
         default void dispatchShouldNotifyRegisteredListeners() throws Exception {
             MailboxListener listener = newListener();
-            eventBus().register(listener, KEY_1);
+            Mono.from(eventBus().register(listener, KEY_1)).block();
 
             eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
 
@@ -188,7 +190,7 @@ public interface KeyContract extends EventBusContract {
         @Test
         default void dispatchShouldNotifyLocalRegisteredListenerWithoutDelay() throws Exception {
             MailboxListener listener = newListener();
-            eventBus().register(listener, KEY_1);
+            Mono.from(eventBus().register(listener, KEY_1)).block();
 
             eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
 
@@ -199,8 +201,8 @@ public interface KeyContract extends EventBusContract {
         default void dispatchShouldNotifyOnlyRegisteredListener() throws Exception {
             MailboxListener listener = newListener();
             MailboxListener listener2 = newListener();
-            eventBus().register(listener, KEY_1);
-            eventBus().register(listener2, KEY_2);
+            Mono.from(eventBus().register(listener, KEY_1)).block();
+            Mono.from(eventBus().register(listener2, KEY_2)).block();
 
             eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
 
@@ -213,8 +215,8 @@ public interface KeyContract extends EventBusContract {
         default void dispatchShouldNotifyAllListenersRegisteredOnAKey() throws Exception {
             MailboxListener listener = newListener();
             MailboxListener listener2 = newListener();
-            eventBus().register(listener, KEY_1);
-            eventBus().register(listener2, KEY_1);
+            Mono.from(eventBus().register(listener, KEY_1)).block();
+            Mono.from(eventBus().register(listener2, KEY_1)).block();
 
             eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
 
@@ -225,8 +227,8 @@ public interface KeyContract extends EventBusContract {
         @Test
         default void registerShouldAllowDuplicatedRegistration() throws Exception {
             MailboxListener listener = newListener();
-            eventBus().register(listener, KEY_1);
-            eventBus().register(listener, KEY_1);
+            Mono.from(eventBus().register(listener, KEY_1)).block();
+            Mono.from(eventBus().register(listener, KEY_1)).block();
 
             eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
 
@@ -236,8 +238,8 @@ public interface KeyContract extends EventBusContract {
         @Test
         default void unregisterShouldRemoveDoubleRegisteredListener() throws Exception {
             MailboxListener listener = newListener();
-            eventBus().register(listener, KEY_1);
-            eventBus().register(listener, KEY_1).unregister();
+            Mono.from(eventBus().register(listener, KEY_1)).block();
+            Mono.from(eventBus().register(listener, KEY_1)).block().unregister();
 
             eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
 
@@ -251,7 +253,7 @@ public interface KeyContract extends EventBusContract {
 
             eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
 
-            eventBus().register(listener, KEY_1);
+            Mono.from(eventBus().register(listener, KEY_1)).block();
 
             verify(listener, after(FIVE_HUNDRED_MS.toMillis()).never())
                 .event(any());
@@ -260,8 +262,8 @@ public interface KeyContract extends EventBusContract {
         @Test
         default void callingAllUnregisterMethodShouldUnregisterTheListener() throws Exception {
             MailboxListener listener = newListener();
-            Registration registration = eventBus().register(listener, KEY_1);
-            eventBus().register(listener, KEY_1).unregister();
+            Registration registration = Mono.from(eventBus().register(listener, KEY_1)).block();
+            Mono.from(eventBus().register(listener, KEY_1)).block().unregister();
             registration.unregister();
 
             eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
@@ -273,8 +275,8 @@ public interface KeyContract extends EventBusContract {
         @Test
         default void unregisterShouldHaveNotNotifyWhenCalledOnDifferentKeys() throws Exception {
             MailboxListener listener = newListener();
-            eventBus().register(listener, KEY_1);
-            eventBus().register(listener, KEY_2).unregister();
+            Mono.from(eventBus().register(listener, KEY_1)).block();
+            Mono.from(eventBus().register(listener, KEY_2)).block().unregister();
 
             eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
 
@@ -285,7 +287,7 @@ public interface KeyContract extends EventBusContract {
         default void unregisterShouldBeIdempotentForKeyRegistrations() {
             MailboxListener listener = newListener();
 
-            Registration registration = eventBus().register(listener, KEY_1);
+            Registration registration = Mono.from(eventBus().register(listener, KEY_1)).block();
             registration.unregister();
 
             assertThatCode(registration::unregister)
@@ -295,7 +297,7 @@ public interface KeyContract extends EventBusContract {
         @Test
         default void dispatchShouldAcceptSeveralKeys() throws Exception {
             MailboxListener listener = newListener();
-            eventBus().register(listener, KEY_1);
+            Mono.from(eventBus().register(listener, KEY_1)).block();
 
             eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1, KEY_2)).block();
 
@@ -305,8 +307,8 @@ public interface KeyContract extends EventBusContract {
         @Test
         default void dispatchShouldCallListenerOnceWhenSeveralKeysMatching() throws Exception {
             MailboxListener listener = newListener();
-            eventBus().register(listener, KEY_1);
-            eventBus().register(listener, KEY_2);
+            Mono.from(eventBus().register(listener, KEY_1)).block();
+            Mono.from(eventBus().register(listener, KEY_2)).block();
 
             eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1, KEY_2)).block();
 
@@ -316,7 +318,7 @@ public interface KeyContract extends EventBusContract {
         @Test
         default void dispatchShouldNotNotifyUnregisteredListener() throws Exception {
             MailboxListener listener = newListener();
-            eventBus().register(listener, KEY_1).unregister();
+            Mono.from(eventBus().register(listener, KEY_1)).block().unregister();
 
             eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
 
@@ -329,7 +331,7 @@ public interface KeyContract extends EventBusContract {
         default void dispatchShouldNotifyAsynchronousListener() throws Exception {
             MailboxListener listener = newListener();
             when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
-            eventBus().register(listener, KEY_1);
+            Mono.from(eventBus().register(listener, KEY_1)).block();
 
             eventBus().dispatch(EVENT, KEY_1).block();
 
@@ -356,7 +358,7 @@ public interface KeyContract extends EventBusContract {
         @Test
         default void failingRegisteredListenersShouldNotAbortRegisteredDelivery() {
             EventBusTestFixture.MailboxListenerCountingSuccessfulExecution listener = new EventBusTestFixture.EventMatcherThrowingListener(ImmutableSet.of(EVENT));
-            eventBus().register(listener, KEY_1);
+            Mono.from(eventBus().register(listener, KEY_1)).block();
 
             eventBus().dispatch(EVENT, KEY_1).block();
             eventBus().dispatch(EVENT_2, KEY_1).block();
@@ -373,8 +375,8 @@ public interface KeyContract extends EventBusContract {
             when(failingListener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
             doThrow(new RuntimeException()).when(failingListener).event(any());
 
-            eventBus().register(failingListener, KEY_1);
-            eventBus().register(listener, KEY_1);
+            Mono.from(eventBus().register(failingListener, KEY_1)).block();
+            Mono.from(eventBus().register(listener, KEY_1)).block();
 
             eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
 
@@ -388,7 +390,7 @@ public interface KeyContract extends EventBusContract {
         default void crossEventBusRegistrationShouldBeAllowed() throws Exception {
             MailboxListener mailboxListener = newListener();
 
-            eventBus().register(mailboxListener, KEY_1);
+            Mono.from(eventBus().register(mailboxListener, KEY_1)).block();
 
             eventBus2().dispatch(EVENT, KEY_1).block();
 
@@ -399,7 +401,7 @@ public interface KeyContract extends EventBusContract {
         default void unregisteredDistantListenersShouldNotBeNotified() throws Exception {
             MailboxListener mailboxListener = newListener();
 
-            eventBus().register(mailboxListener, KEY_1).unregister();
+            Mono.from(eventBus().register(mailboxListener, KEY_1)).block().unregister();
 
             eventBus2().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
 
@@ -412,8 +414,8 @@ public interface KeyContract extends EventBusContract {
             MailboxListener mailboxListener1 = newListener();
             MailboxListener mailboxListener2 = newListener();
 
-            eventBus().register(mailboxListener1, KEY_1);
-            eventBus2().register(mailboxListener2, KEY_1);
+            Mono.from(eventBus().register(mailboxListener1, KEY_1)).block();
+            Mono.from(eventBus2().register(mailboxListener2, KEY_1)).block();
 
             eventBus2().dispatch(EVENT, KEY_1).block();
 
@@ -427,7 +429,7 @@ public interface KeyContract extends EventBusContract {
 
             eventBus2().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
 
-            eventBus().register(listener, KEY_1);
+            Mono.from(eventBus().register(listener, KEY_1)).block();
 
             verify(listener, after(FIVE_HUNDRED_MS.toMillis()).never())
                 .event(any());
@@ -438,8 +440,8 @@ public interface KeyContract extends EventBusContract {
             MailboxListener mailboxListener1 = newListener();
             MailboxListener mailboxListener2 = newListener();
 
-            eventBus().register(mailboxListener1, KEY_1);
-            eventBus2().register(mailboxListener2, KEY_1);
+            Mono.from(eventBus().register(mailboxListener1, KEY_1)).block();
+            Mono.from(eventBus2().register(mailboxListener2, KEY_1)).block();
 
             eventBus2().dispatch(EVENT, KEY_1).block();
 
diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/InVMEventBus.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/InVMEventBus.java
index 36d9d49..ab7cb2f 100644
--- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/InVMEventBus.java
+++ b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/InVMEventBus.java
@@ -55,9 +55,9 @@ public class InVMEventBus implements EventBus {
     }
 
     @Override
-    public Registration register(MailboxListener.ReactiveMailboxListener listener, RegistrationKey key) {
+    public Mono<Registration> register(MailboxListener.ReactiveMailboxListener listener, RegistrationKey key) {
         registrations.put(key, listener);
-        return () -> registrations.remove(key, listener);
+        return Mono.just(() -> registrations.remove(key, listener));
     }
 
     @Override
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
index c44be2d..1d5fdd9 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
@@ -125,20 +125,25 @@ class KeyRegistrationHandler {
             .block();
     }
 
-    Registration register(MailboxListener.ReactiveMailboxListener listener, RegistrationKey key) {
+    Mono<Registration> register(MailboxListener.ReactiveMailboxListener listener, RegistrationKey key) {
         LocalListenerRegistry.LocalRegistration registration = localListenerRegistry.addListener(key, listener);
+
+        return registerIfNeeded(key, registration)
+            .thenReturn(new KeyRegistration(() -> {
+                if (registration.unregister().lastListenerRemoved()) {
+                    registrationBinder.unbind(key)
+                        .retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor()).scheduler(Schedulers.elastic()))
+                        .block();
+                }
+            }));
+    }
+
+    private Mono<Void> registerIfNeeded(RegistrationKey key, LocalListenerRegistry.LocalRegistration registration) {
         if (registration.isFirstListener()) {
-            registrationBinder.bind(key)
-                .retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor()).scheduler(Schedulers.elastic()))
-                .block();
+            return registrationBinder.bind(key)
+                .retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor()).scheduler(Schedulers.elastic()));
         }
-        return new KeyRegistration(() -> {
-            if (registration.unregister().lastListenerRemoved()) {
-                registrationBinder.unbind(key)
-                    .retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor()).scheduler(Schedulers.elastic()))
-                    .block();
-            }
-        });
+        return Mono.empty();
     }
 
     private Mono<Void> handleDelivery(Delivery delivery) {
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
index 196ac8c..a73f1ed 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
@@ -121,7 +121,7 @@ public class RabbitMQEventBus implements EventBus, Startable {
     }
 
     @Override
-    public Registration register(MailboxListener.ReactiveMailboxListener listener, RegistrationKey key) {
+    public Mono<Registration> register(MailboxListener.ReactiveMailboxListener listener, RegistrationKey key) {
         Preconditions.checkState(isRunning, NOT_RUNNING_ERROR_MESSAGE);
         return keyRegistrationHandler.register(listener, key);
     }
diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
index 3dbbb80..bfbae43 100644
--- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
+++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
@@ -372,7 +372,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
                 void dispatchShouldWorkAfterNetworkIssuesForOldRegistrationAndKey() {
                     rabbitMQEventBusWithNetWorkIssue.start();
                     MailboxListener listener = newListener();
-                    rabbitMQEventBusWithNetWorkIssue.register(listener, KEY_1);
+                    Mono.from(rabbitMQEventBusWithNetWorkIssue.register(listener, KEY_1)).block();
 
                     rabbitMQNetWorkIssueExtension.getRabbitMQ().pause();
 
@@ -454,7 +454,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
             void dispatchShouldWorkAfterRestartForOldKeyRegistration() throws Exception {
                 eventBus.start();
                 MailboxListener listener = newListener();
-                eventBus.register(listener, KEY_1);
+                Mono.from(eventBus.register(listener, KEY_1)).block();
 
                 rabbitMQExtension.getRabbitMQ().restart();
 
@@ -466,7 +466,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
             void dispatchedMessagesShouldSurviveARabbitMQRestart() throws Exception {
                 eventBusWithKeyHandlerNotStarted.startWithoutStartingKeyRegistrationHandler();
                 MailboxListener listener = newAsyncListener();
-                eventBusWithKeyHandlerNotStarted.register(listener, KEY_1);
+                Mono.from(eventBusWithKeyHandlerNotStarted.register(listener, KEY_1)).block();
                 Mono<Void> dispatch = eventBusWithKeyHandlerNotStarted.dispatch(EVENT, KEY_1);
                 dispatch.block();
 
@@ -484,7 +484,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
 
                 rabbitMQExtension.getRabbitMQ().restart();
 
-                eventBus.register(listener, KEY_1);
+                Mono.from(eventBus.register(listener, KEY_1)).block();
 
                 eventBus.dispatch(EVENT, KEY_1).block();
                 assertThatListenerReceiveOneEvent(listener);
@@ -530,7 +530,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
                 eventBus.start();
                 MailboxListener listener = newListener();
                 when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
-                eventBus.register(listener, KEY_1);
+                Mono.from(eventBus.register(listener, KEY_1)).block();
 
                 rabbitMQExtension.getRabbitMQ().pause();
 
@@ -558,7 +558,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
 
                 rabbitMQExtension.getRabbitMQ().unpause();
 
-                eventBus.register(listener, KEY_1);
+                Mono.from(eventBus.register(listener, KEY_1)).block();
                 eventBus.dispatch(EVENT, KEY_1).block();
                 assertThatListenerReceiveOneEvent(listener);
             }
@@ -757,7 +757,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
         void dispatchShouldPersistEventWhenDispatchingWithKeysGetError() {
             EventCollector eventCollector = eventCollector();
             eventBus().register(eventCollector, GROUP_A);
-            eventBus().register(eventCollector, KEY_1);
+            Mono.from(eventBus().register(eventCollector, KEY_1)).block();
 
             rabbitMQExtension.getRabbitMQ().pause();
 
diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/IdleProcessor.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/IdleProcessor.java
index 58b95b4..7681c33 100644
--- a/protocols/imap/src/main/java/org/apache/james/imap/processor/IdleProcessor.java
+++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/IdleProcessor.java
@@ -53,6 +53,9 @@ import org.apache.james.util.concurrent.NamedThreadFactory;
 
 import com.google.common.collect.ImmutableList;
 
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
 public class IdleProcessor extends AbstractMailboxProcessor<IdleRequest> implements CapabilityImplementingProcessor {
     private static final List<Capability> CAPS = ImmutableList.of(SUPPORTS_IDLE);
     public static final int DEFAULT_SCHEDULED_POOL_CORE_SIZE = 5;
@@ -88,7 +91,9 @@ public class IdleProcessor extends AbstractMailboxProcessor<IdleRequest> impleme
         SelectedMailbox sm = session.getSelected();
         Registration registration;
         if (sm != null) {
-            registration = eventBus.register(new IdleMailboxListener(session, responder), new MailboxIdRegistrationKey(sm.getMailboxId()));
+            registration = Mono.from(eventBus.register(new IdleMailboxListener(session, responder), new MailboxIdRegistrationKey(sm.getMailboxId())))
+                .subscribeOn(Schedulers.elastic())
+                .block();
         } else {
             registration = null;
         }
diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/base/SelectedMailboxImpl.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/base/SelectedMailboxImpl.java
index 1aef2bf..5457960 100644
--- a/protocols/imap/src/main/java/org/apache/james/imap/processor/base/SelectedMailboxImpl.java
+++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/base/SelectedMailboxImpl.java
@@ -52,6 +52,9 @@ import org.apache.james.mailbox.model.UpdatedFlags;
 
 import com.github.steveash.guavate.Guavate;
 
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
 /**
  * Default implementation of {@link SelectedMailbox}
  */
@@ -89,7 +92,9 @@ public class SelectedMailboxImpl implements SelectedMailbox, MailboxListener {
 
         mailboxId = messageManager.getId();
 
-        registration = eventBus.register(this, new MailboxIdRegistrationKey(mailboxId));
+        registration = Mono.from(eventBus.register(this, new MailboxIdRegistrationKey(mailboxId)))
+            .subscribeOn(Schedulers.elastic())
+            .block();
 
         applicableFlags = messageManager.getApplicableFlags(mailboxSession);
         try (Stream<MessageUid> stream = messageManager.search(new SearchQuery(SearchQuery.all()), mailboxSession)) {
diff --git a/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java b/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java
index f93ffc3..1e0cd54 100644
--- a/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java
+++ b/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java
@@ -27,7 +27,6 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.util.Date;
-import java.util.Iterator;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
@@ -47,6 +46,7 @@ import org.apache.james.mailbox.ModSeq;
 import org.apache.james.mailbox.events.EventBus;
 import org.apache.james.mailbox.events.MailboxIdRegistrationKey;
 import org.apache.james.mailbox.events.MailboxListener;
+import org.apache.james.mailbox.events.Registration;
 import org.apache.james.mailbox.model.Mailbox;
 import org.apache.james.mailbox.model.MailboxPath;
 import org.apache.james.mailbox.model.MessageMetaData;
@@ -62,6 +62,8 @@ import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import reactor.core.publisher.Mono;
+
 
 public class SelectedMailboxImplTest {
 
@@ -152,7 +154,7 @@ public class SelectedMailboxImplTest {
         };
     }
 
-    private Answer<Iterator<MessageUid>> generateEmitEventAnswer(AtomicInteger success) {
+    private Answer<Mono<Registration>> generateEmitEventAnswer(AtomicInteger success) {
         return invocation -> {
             Object[] args = invocation.getArguments();
             MailboxListener mailboxListener = (MailboxListener) args[0];
@@ -164,7 +166,7 @@ public class SelectedMailboxImplTest {
                     LOGGER.error("Error while processing event on a concurrent thread", e);
                 }
             });
-            return null;
+            return Mono.just(() -> {});
         };
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org