You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2022/08/24 10:30:29 UTC

[james-project] 02/02: JAMES-3773 IDLE listener can avoid a nested blocking call

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 576f8860a3d8dfd4034e2e7788d1cb3ab2d3169a
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed Aug 24 09:15:13 2022 +0700

    JAMES-3773 IDLE listener can avoid a nested blocking call
---
 .../james/imap/api/process/SelectedMailbox.java      |  2 +-
 .../imap/processor/AbstractSelectionProcessor.java   | 10 +++++++---
 .../apache/james/imap/processor/IdleProcessor.java   |  7 ++++---
 .../imap/processor/base/SelectedMailboxImpl.java     | 20 ++++++++++----------
 .../processor/base/MailboxEventAnalyserTest.java     | 18 +++++++++---------
 .../imap/processor/base/SelectedMailboxImplTest.java |  8 ++++----
 6 files changed, 35 insertions(+), 30 deletions(-)

diff --git a/protocols/imap/src/main/java/org/apache/james/imap/api/process/SelectedMailbox.java b/protocols/imap/src/main/java/org/apache/james/imap/api/process/SelectedMailbox.java
index aad4eb0d2f..efd430034d 100644
--- a/protocols/imap/src/main/java/org/apache/james/imap/api/process/SelectedMailbox.java
+++ b/protocols/imap/src/main/java/org/apache/james/imap/api/process/SelectedMailbox.java
@@ -45,7 +45,7 @@ public interface SelectedMailbox {
      */
     Mono<Void> deselect();
 
-    void registerIdle(EventListener idle);
+    void registerIdle(EventListener.ReactiveEventListener idle);
 
     void unregisterIdle();
 
diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractSelectionProcessor.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractSelectionProcessor.java
index f00bf146f1..4e131d2db6 100644
--- a/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractSelectionProcessor.java
+++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractSelectionProcessor.java
@@ -83,7 +83,7 @@ abstract class AbstractSelectionProcessor<R extends AbstractMailboxSelectionRequ
     private final StatusResponseFactory statusResponseFactory;
     private final boolean openReadOnly;
     private final EventBus eventBus;
-    
+
     public AbstractSelectionProcessor(Class<R> acceptableClass, MailboxManager mailboxManager, StatusResponseFactory statusResponseFactory, boolean openReadOnly,
                                       MetricFactory metricFactory, EventBus eventBus) {
         super(acceptableClass, mailboxManager, statusResponseFactory, metricFactory);
@@ -195,8 +195,7 @@ abstract class AbstractSelectionProcessor<R extends AbstractMailboxSelectionRequ
         return Flux.<MessageUid>concat(
             Flux.just(firstUnseen),
             Flux.range(0, 5)
-                .concatMap(i -> selectMailbox(fullMailboxPath, session, responder)
-                    .map(MailboxMetaData::getFirstUnseen)))
+                .concatMap(i -> retrieveFirstUnseen(session, fullMailboxPath, responder)))
             .filter(unseenUid -> unseen(responder, firstUnseen, selected))
             .next()
             .switchIfEmpty(Mono.fromCallable(() -> {
@@ -206,6 +205,11 @@ abstract class AbstractSelectionProcessor<R extends AbstractMailboxSelectionRequ
             }));
     }
 
+    private Mono<MessageUid> retrieveFirstUnseen(ImapSession session, MailboxPath fullMailboxPath, Responder responder) {
+        return selectMailbox(fullMailboxPath, session, responder)
+            .map(MailboxMetaData::getFirstUnseen);
+    }
+
     private Optional<UidRange[]> uidSet(AbstractMailboxSelectionRequest request, MailboxMetaData metaData) {
         return Optional.ofNullable(request.getUidSet())
             .or(() -> {
diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/IdleProcessor.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/IdleProcessor.java
index 39005f0020..29e92276b2 100644
--- a/protocols/imap/src/main/java/org/apache/james/imap/processor/IdleProcessor.java
+++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/IdleProcessor.java
@@ -44,6 +44,7 @@ import org.apache.james.mailbox.events.MailboxEvents.Expunged;
 import org.apache.james.mailbox.events.MailboxEvents.FlagsUpdated;
 import org.apache.james.metrics.api.MetricFactory;
 import org.apache.james.util.MDCBuilder;
+import org.reactivestreams.Publisher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -146,7 +147,7 @@ public class IdleProcessor extends AbstractMailboxProcessor<IdleRequest> impleme
         return CAPS;
     }
 
-    private class IdleMailboxListener implements EventListener {
+    private class IdleMailboxListener implements EventListener.ReactiveEventListener {
 
         private final Responder responder;
         private final ImapSession session;
@@ -162,8 +163,8 @@ public class IdleProcessor extends AbstractMailboxProcessor<IdleRequest> impleme
         }
 
         @Override
-        public void event(Event event) {
-            unsolicitedResponses(session, responder, false).block();
+        public Publisher<Void> reactiveEvent(Event event) {
+            return unsolicitedResponses(session, responder, false);
         }
 
         @Override
diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/base/SelectedMailboxImpl.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/base/SelectedMailboxImpl.java
index 11dc17f264..49d4854d81 100644
--- a/protocols/imap/src/main/java/org/apache/james/imap/processor/base/SelectedMailboxImpl.java
+++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/base/SelectedMailboxImpl.java
@@ -62,6 +62,7 @@ import org.apache.james.mailbox.model.MailboxId;
 import org.apache.james.mailbox.model.MailboxPath;
 import org.apache.james.mailbox.model.SearchQuery;
 import org.apache.james.mailbox.model.UpdatedFlags;
+import org.reactivestreams.Publisher;
 
 import com.github.fge.lambdas.Throwing;
 import com.google.common.annotations.VisibleForTesting;
@@ -75,9 +76,7 @@ import reactor.core.scheduler.Schedulers;
 /**
  * Default implementation of {@link SelectedMailbox}
  */
-public class SelectedMailboxImpl implements SelectedMailbox, EventListener {
-
-
+public class SelectedMailboxImpl implements SelectedMailbox, EventListener.ReactiveEventListener {
     private static final Void VOID = null;
     private static final Flag UNINTERESTING_FLAGS = Flag.RECENT;
 
@@ -135,7 +134,7 @@ public class SelectedMailboxImpl implements SelectedMailbox, EventListener {
     private final Set<MessageUid> flagUpdateUids = new TreeSet<>();
     private final Set<MessageUid> expungedUids = new TreeSet<>();
     private final StampedLock applicableFlagsLock = new StampedLock();
-    private final AtomicReference<EventListener> idleEventListener = new AtomicReference<>();
+    private final AtomicReference<ReactiveEventListener> idleEventListener = new AtomicReference<>();
     private final AtomicBoolean recentUidRemoved = new AtomicBoolean(false);
     private final AtomicBoolean isDeletedByOtherSession = new AtomicBoolean(false);
     private final AtomicBoolean sizeChanged = new AtomicBoolean(false);
@@ -172,7 +171,7 @@ public class SelectedMailboxImpl implements SelectedMailbox, EventListener {
     }
 
     @Override
-    public void registerIdle(EventListener idle) {
+    public void registerIdle(ReactiveEventListener idle) {
         idleEventListener.set(idle);
     }
 
@@ -391,12 +390,13 @@ public class SelectedMailboxImpl implements SelectedMailbox, EventListener {
         applicableFlagsLock.unlockWrite(stamp);
     }
 
-    
     @Override
-    public void event(Event event) {
-        synchronizedEvent(event);
-        Optional.ofNullable(idleEventListener.get())
-            .ifPresent(Throwing.<EventListener>consumer(listener -> listener.event(event)).sneakyThrow());
+    public Publisher<Void> reactiveEvent(Event event) {
+        return Mono.fromRunnable(() -> synchronizedEvent(event))
+            .subscribeOn(Schedulers.boundedElastic())
+            .then(Optional.ofNullable(idleEventListener.get())
+                .map(listener -> Mono.from(listener.reactiveEvent(event)))
+                .orElse(Mono.empty()));
     }
 
     private synchronized void synchronizedEvent(Event event) {
diff --git a/protocols/imap/src/test/java/org/apache/james/imap/processor/base/MailboxEventAnalyserTest.java b/protocols/imap/src/test/java/org/apache/james/imap/processor/base/MailboxEventAnalyserTest.java
index 5ee666cc68..b5a750b31f 100644
--- a/protocols/imap/src/test/java/org/apache/james/imap/processor/base/MailboxEventAnalyserTest.java
+++ b/protocols/imap/src/test/java/org/apache/james/imap/processor/base/MailboxEventAnalyserTest.java
@@ -169,7 +169,7 @@ class MailboxEventAnalyserTest {
     }
 
     @Test
-    void testShouldBeNoSizeChangeOnOtherEvent() {
+    void testShouldBeNoSizeChangeOnOtherEvent() throws Exception {
             MailboxEvent event = new MailboxAdded(MAILBOX_SESSION.getSessionId(),
             MAILBOX_SESSION.getUser(), MAILBOX_PATH, MAILBOX_ID, Event.EventId.random());
       
@@ -179,14 +179,14 @@ class MailboxEventAnalyserTest {
     }
 
     @Test
-    void testShouldBeNoSizeChangeOnAdded() {
+    void testShouldBeNoSizeChangeOnAdded() throws Exception {
         testee.event(ADDED);
 
         assertThat(testee.isSizeChanged()).isTrue();
     }
 
     @Test
-    void testShouldNoSizeChangeAfterReset() {
+    void testShouldNoSizeChangeAfterReset() throws Exception {
         testee.event(ADDED);
         testee.resetEvents();
 
@@ -194,7 +194,7 @@ class MailboxEventAnalyserTest {
     }
 
     @Test
-    void testShouldNotSetUidWhenNoSystemFlagChange() {
+    void testShouldNotSetUidWhenNoSystemFlagChange() throws Exception {
         FlagsUpdated update = EventFactory.flagsUpdated()
             .randomEventId()
             .mailboxSession(MAILBOX_SESSION)
@@ -208,7 +208,7 @@ class MailboxEventAnalyserTest {
     }
 
     @Test
-    void testShouldSetUidWhenSystemFlagChange() {
+    void testShouldSetUidWhenSystemFlagChange() throws Exception {
         FlagsUpdated update = EventFactory.flagsUpdated()
             .randomEventId()
             .mailboxSession(OTHER_MAILBOX_SESSION)
@@ -223,7 +223,7 @@ class MailboxEventAnalyserTest {
     }
 
     @Test
-    void testShouldClearFlagUidsUponReset() {
+    void testShouldClearFlagUidsUponReset() throws Exception {
         SelectedMailboxImpl analyser = this.testee;
 
         FlagsUpdated update = EventFactory.flagsUpdated()
@@ -241,7 +241,7 @@ class MailboxEventAnalyserTest {
     }
 
     @Test
-    void testShouldSetUidWhenSystemFlagChangeDifferentSessionInSilentMode() {
+    void testShouldSetUidWhenSystemFlagChangeDifferentSessionInSilentMode() throws Exception {
         FlagsUpdated update = EventFactory.flagsUpdated()
             .randomEventId()
             .mailboxSession(OTHER_MAILBOX_SESSION)
@@ -258,7 +258,7 @@ class MailboxEventAnalyserTest {
     }
 
     @Test
-    void testShouldNotSetUidWhenSystemFlagChangeSameSessionInSilentMode() {
+    void testShouldNotSetUidWhenSystemFlagChangeSameSessionInSilentMode() throws Exception {
         FlagsUpdated update = EventFactory.flagsUpdated()
             .randomEventId()
             .mailboxSession(MAILBOX_SESSION)
@@ -275,7 +275,7 @@ class MailboxEventAnalyserTest {
     }
 
     @Test
-    void testShouldNotSetUidWhenOnlyRecentFlagUpdated() {
+    void testShouldNotSetUidWhenOnlyRecentFlagUpdated() throws Exception {
         FlagsUpdated update = EventFactory.flagsUpdated()
             .randomEventId()
             .mailboxSession(MAILBOX_SESSION)
diff --git a/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java b/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java
index e0346048a2..89ff7e077d 100644
--- a/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java
+++ b/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java
@@ -138,7 +138,7 @@ class SelectedMailboxImplTest {
         AtomicInteger successCount = new AtomicInteger(0);
         doAnswer(generateEmitEventAnswer(successCount))
             .when(eventBus)
-            .register(any(EventListener.class), eq(mailboxIdRegistrationKey));
+            .register(any(EventListener.ReactiveEventListener.class), eq(mailboxIdRegistrationKey));
         SelectedMailboxImpl selectedMailbox = new SelectedMailboxImpl(
             mailboxManager,
             eventBus,
@@ -154,7 +154,7 @@ class SelectedMailboxImplTest {
         AtomicInteger successCount = new AtomicInteger(0);
         doAnswer(generateEmitCustomFlagEventAnswer(successCount))
             .when(eventBus)
-            .register(any(EventListener.class), eq(mailboxIdRegistrationKey));
+            .register(any(EventListener.ReactiveEventListener.class), eq(mailboxIdRegistrationKey));
 
         new SelectedMailboxImpl(mailboxManager, eventBus, imapSession, messageManager).finishInit().block();
 
@@ -166,7 +166,7 @@ class SelectedMailboxImplTest {
         AtomicInteger successCount = new AtomicInteger(0);
         doAnswer(generateEmitCustomFlagEventAnswer(successCount))
             .when(eventBus)
-            .register(any(EventListener.class), eq(mailboxIdRegistrationKey));
+            .register(any(EventListener.ReactiveEventListener.class), eq(mailboxIdRegistrationKey));
 
         SelectedMailboxImpl selectedMailbox = new SelectedMailboxImpl(mailboxManager, eventBus, imapSession, messageManager);
         selectedMailbox.finishInit().block();
@@ -179,7 +179,7 @@ class SelectedMailboxImplTest {
         AtomicInteger successCount = new AtomicInteger(0);
         doAnswer(generateEmitEventAnswer(successCount))
             .when(eventBus)
-            .register(any(EventListener.class), eq(mailboxIdRegistrationKey));
+            .register(any(EventListener.ReactiveEventListener.class), eq(mailboxIdRegistrationKey));
 
         new SelectedMailboxImpl(
             mailboxManager,


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org