You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2022/08/24 10:30:27 UTC

[james-project] branch master updated (5067cf6af2 -> 576f8860a3)

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


    from 5067cf6af2 JAMES-3809 Get rid of cassandra.properties chunk.size.message.read in favor of batchsize.properties (#1148)
     new 32b0db34bc JAMES-3773 Prevent deadlock under load in AbstractSelectionProcessor
     new 576f8860a3 JAMES-3773 IDLE listener can avoid a nested blocking call

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../james/imap/api/process/SelectedMailbox.java    |  2 +-
 .../imap/processor/AbstractSelectionProcessor.java | 65 +++++++++++++---------
 .../apache/james/imap/processor/IdleProcessor.java |  7 ++-
 .../imap/processor/base/SelectedMailboxImpl.java   | 20 +++----
 .../processor/base/MailboxEventAnalyserTest.java   | 18 +++---
 .../processor/base/SelectedMailboxImplTest.java    |  8 +--
 6 files changed, 67 insertions(+), 53 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 02/02: JAMES-3773 IDLE listener can avoid a nested blocking call

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 576f8860a3d8dfd4034e2e7788d1cb3ab2d3169a
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed Aug 24 09:15:13 2022 +0700

    JAMES-3773 IDLE listener can avoid a nested blocking call
---
 .../james/imap/api/process/SelectedMailbox.java      |  2 +-
 .../imap/processor/AbstractSelectionProcessor.java   | 10 +++++++---
 .../apache/james/imap/processor/IdleProcessor.java   |  7 ++++---
 .../imap/processor/base/SelectedMailboxImpl.java     | 20 ++++++++++----------
 .../processor/base/MailboxEventAnalyserTest.java     | 18 +++++++++---------
 .../imap/processor/base/SelectedMailboxImplTest.java |  8 ++++----
 6 files changed, 35 insertions(+), 30 deletions(-)

diff --git a/protocols/imap/src/main/java/org/apache/james/imap/api/process/SelectedMailbox.java b/protocols/imap/src/main/java/org/apache/james/imap/api/process/SelectedMailbox.java
index aad4eb0d2f..efd430034d 100644
--- a/protocols/imap/src/main/java/org/apache/james/imap/api/process/SelectedMailbox.java
+++ b/protocols/imap/src/main/java/org/apache/james/imap/api/process/SelectedMailbox.java
@@ -45,7 +45,7 @@ public interface SelectedMailbox {
      */
     Mono<Void> deselect();
 
-    void registerIdle(EventListener idle);
+    void registerIdle(EventListener.ReactiveEventListener idle);
 
     void unregisterIdle();
 
diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractSelectionProcessor.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractSelectionProcessor.java
index f00bf146f1..4e131d2db6 100644
--- a/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractSelectionProcessor.java
+++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractSelectionProcessor.java
@@ -83,7 +83,7 @@ abstract class AbstractSelectionProcessor<R extends AbstractMailboxSelectionRequ
     private final StatusResponseFactory statusResponseFactory;
     private final boolean openReadOnly;
     private final EventBus eventBus;
-    
+
     public AbstractSelectionProcessor(Class<R> acceptableClass, MailboxManager mailboxManager, StatusResponseFactory statusResponseFactory, boolean openReadOnly,
                                       MetricFactory metricFactory, EventBus eventBus) {
         super(acceptableClass, mailboxManager, statusResponseFactory, metricFactory);
@@ -195,8 +195,7 @@ abstract class AbstractSelectionProcessor<R extends AbstractMailboxSelectionRequ
         return Flux.<MessageUid>concat(
             Flux.just(firstUnseen),
             Flux.range(0, 5)
-                .concatMap(i -> selectMailbox(fullMailboxPath, session, responder)
-                    .map(MailboxMetaData::getFirstUnseen)))
+                .concatMap(i -> retrieveFirstUnseen(session, fullMailboxPath, responder)))
             .filter(unseenUid -> unseen(responder, firstUnseen, selected))
             .next()
             .switchIfEmpty(Mono.fromCallable(() -> {
@@ -206,6 +205,11 @@ abstract class AbstractSelectionProcessor<R extends AbstractMailboxSelectionRequ
             }));
     }
 
+    private Mono<MessageUid> retrieveFirstUnseen(ImapSession session, MailboxPath fullMailboxPath, Responder responder) {
+        return selectMailbox(fullMailboxPath, session, responder)
+            .map(MailboxMetaData::getFirstUnseen);
+    }
+
     private Optional<UidRange[]> uidSet(AbstractMailboxSelectionRequest request, MailboxMetaData metaData) {
         return Optional.ofNullable(request.getUidSet())
             .or(() -> {
diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/IdleProcessor.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/IdleProcessor.java
index 39005f0020..29e92276b2 100644
--- a/protocols/imap/src/main/java/org/apache/james/imap/processor/IdleProcessor.java
+++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/IdleProcessor.java
@@ -44,6 +44,7 @@ import org.apache.james.mailbox.events.MailboxEvents.Expunged;
 import org.apache.james.mailbox.events.MailboxEvents.FlagsUpdated;
 import org.apache.james.metrics.api.MetricFactory;
 import org.apache.james.util.MDCBuilder;
+import org.reactivestreams.Publisher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -146,7 +147,7 @@ public class IdleProcessor extends AbstractMailboxProcessor<IdleRequest> impleme
         return CAPS;
     }
 
-    private class IdleMailboxListener implements EventListener {
+    private class IdleMailboxListener implements EventListener.ReactiveEventListener {
 
         private final Responder responder;
         private final ImapSession session;
@@ -162,8 +163,8 @@ public class IdleProcessor extends AbstractMailboxProcessor<IdleRequest> impleme
         }
 
         @Override
-        public void event(Event event) {
-            unsolicitedResponses(session, responder, false).block();
+        public Publisher<Void> reactiveEvent(Event event) {
+            return unsolicitedResponses(session, responder, false);
         }
 
         @Override
diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/base/SelectedMailboxImpl.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/base/SelectedMailboxImpl.java
index 11dc17f264..49d4854d81 100644
--- a/protocols/imap/src/main/java/org/apache/james/imap/processor/base/SelectedMailboxImpl.java
+++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/base/SelectedMailboxImpl.java
@@ -62,6 +62,7 @@ import org.apache.james.mailbox.model.MailboxId;
 import org.apache.james.mailbox.model.MailboxPath;
 import org.apache.james.mailbox.model.SearchQuery;
 import org.apache.james.mailbox.model.UpdatedFlags;
+import org.reactivestreams.Publisher;
 
 import com.github.fge.lambdas.Throwing;
 import com.google.common.annotations.VisibleForTesting;
@@ -75,9 +76,7 @@ import reactor.core.scheduler.Schedulers;
 /**
  * Default implementation of {@link SelectedMailbox}
  */
-public class SelectedMailboxImpl implements SelectedMailbox, EventListener {
-
-
+public class SelectedMailboxImpl implements SelectedMailbox, EventListener.ReactiveEventListener {
     private static final Void VOID = null;
     private static final Flag UNINTERESTING_FLAGS = Flag.RECENT;
 
@@ -135,7 +134,7 @@ public class SelectedMailboxImpl implements SelectedMailbox, EventListener {
     private final Set<MessageUid> flagUpdateUids = new TreeSet<>();
     private final Set<MessageUid> expungedUids = new TreeSet<>();
     private final StampedLock applicableFlagsLock = new StampedLock();
-    private final AtomicReference<EventListener> idleEventListener = new AtomicReference<>();
+    private final AtomicReference<ReactiveEventListener> idleEventListener = new AtomicReference<>();
     private final AtomicBoolean recentUidRemoved = new AtomicBoolean(false);
     private final AtomicBoolean isDeletedByOtherSession = new AtomicBoolean(false);
     private final AtomicBoolean sizeChanged = new AtomicBoolean(false);
@@ -172,7 +171,7 @@ public class SelectedMailboxImpl implements SelectedMailbox, EventListener {
     }
 
     @Override
-    public void registerIdle(EventListener idle) {
+    public void registerIdle(ReactiveEventListener idle) {
         idleEventListener.set(idle);
     }
 
@@ -391,12 +390,13 @@ public class SelectedMailboxImpl implements SelectedMailbox, EventListener {
         applicableFlagsLock.unlockWrite(stamp);
     }
 
-    
     @Override
-    public void event(Event event) {
-        synchronizedEvent(event);
-        Optional.ofNullable(idleEventListener.get())
-            .ifPresent(Throwing.<EventListener>consumer(listener -> listener.event(event)).sneakyThrow());
+    public Publisher<Void> reactiveEvent(Event event) {
+        return Mono.fromRunnable(() -> synchronizedEvent(event))
+            .subscribeOn(Schedulers.boundedElastic())
+            .then(Optional.ofNullable(idleEventListener.get())
+                .map(listener -> Mono.from(listener.reactiveEvent(event)))
+                .orElse(Mono.empty()));
     }
 
     private synchronized void synchronizedEvent(Event event) {
diff --git a/protocols/imap/src/test/java/org/apache/james/imap/processor/base/MailboxEventAnalyserTest.java b/protocols/imap/src/test/java/org/apache/james/imap/processor/base/MailboxEventAnalyserTest.java
index 5ee666cc68..b5a750b31f 100644
--- a/protocols/imap/src/test/java/org/apache/james/imap/processor/base/MailboxEventAnalyserTest.java
+++ b/protocols/imap/src/test/java/org/apache/james/imap/processor/base/MailboxEventAnalyserTest.java
@@ -169,7 +169,7 @@ class MailboxEventAnalyserTest {
     }
 
     @Test
-    void testShouldBeNoSizeChangeOnOtherEvent() {
+    void testShouldBeNoSizeChangeOnOtherEvent() throws Exception {
             MailboxEvent event = new MailboxAdded(MAILBOX_SESSION.getSessionId(),
             MAILBOX_SESSION.getUser(), MAILBOX_PATH, MAILBOX_ID, Event.EventId.random());
       
@@ -179,14 +179,14 @@ class MailboxEventAnalyserTest {
     }
 
     @Test
-    void testShouldBeNoSizeChangeOnAdded() {
+    void testShouldBeNoSizeChangeOnAdded() throws Exception {
         testee.event(ADDED);
 
         assertThat(testee.isSizeChanged()).isTrue();
     }
 
     @Test
-    void testShouldNoSizeChangeAfterReset() {
+    void testShouldNoSizeChangeAfterReset() throws Exception {
         testee.event(ADDED);
         testee.resetEvents();
 
@@ -194,7 +194,7 @@ class MailboxEventAnalyserTest {
     }
 
     @Test
-    void testShouldNotSetUidWhenNoSystemFlagChange() {
+    void testShouldNotSetUidWhenNoSystemFlagChange() throws Exception {
         FlagsUpdated update = EventFactory.flagsUpdated()
             .randomEventId()
             .mailboxSession(MAILBOX_SESSION)
@@ -208,7 +208,7 @@ class MailboxEventAnalyserTest {
     }
 
     @Test
-    void testShouldSetUidWhenSystemFlagChange() {
+    void testShouldSetUidWhenSystemFlagChange() throws Exception {
         FlagsUpdated update = EventFactory.flagsUpdated()
             .randomEventId()
             .mailboxSession(OTHER_MAILBOX_SESSION)
@@ -223,7 +223,7 @@ class MailboxEventAnalyserTest {
     }
 
     @Test
-    void testShouldClearFlagUidsUponReset() {
+    void testShouldClearFlagUidsUponReset() throws Exception {
         SelectedMailboxImpl analyser = this.testee;
 
         FlagsUpdated update = EventFactory.flagsUpdated()
@@ -241,7 +241,7 @@ class MailboxEventAnalyserTest {
     }
 
     @Test
-    void testShouldSetUidWhenSystemFlagChangeDifferentSessionInSilentMode() {
+    void testShouldSetUidWhenSystemFlagChangeDifferentSessionInSilentMode() throws Exception {
         FlagsUpdated update = EventFactory.flagsUpdated()
             .randomEventId()
             .mailboxSession(OTHER_MAILBOX_SESSION)
@@ -258,7 +258,7 @@ class MailboxEventAnalyserTest {
     }
 
     @Test
-    void testShouldNotSetUidWhenSystemFlagChangeSameSessionInSilentMode() {
+    void testShouldNotSetUidWhenSystemFlagChangeSameSessionInSilentMode() throws Exception {
         FlagsUpdated update = EventFactory.flagsUpdated()
             .randomEventId()
             .mailboxSession(MAILBOX_SESSION)
@@ -275,7 +275,7 @@ class MailboxEventAnalyserTest {
     }
 
     @Test
-    void testShouldNotSetUidWhenOnlyRecentFlagUpdated() {
+    void testShouldNotSetUidWhenOnlyRecentFlagUpdated() throws Exception {
         FlagsUpdated update = EventFactory.flagsUpdated()
             .randomEventId()
             .mailboxSession(MAILBOX_SESSION)
diff --git a/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java b/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java
index e0346048a2..89ff7e077d 100644
--- a/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java
+++ b/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java
@@ -138,7 +138,7 @@ class SelectedMailboxImplTest {
         AtomicInteger successCount = new AtomicInteger(0);
         doAnswer(generateEmitEventAnswer(successCount))
             .when(eventBus)
-            .register(any(EventListener.class), eq(mailboxIdRegistrationKey));
+            .register(any(EventListener.ReactiveEventListener.class), eq(mailboxIdRegistrationKey));
         SelectedMailboxImpl selectedMailbox = new SelectedMailboxImpl(
             mailboxManager,
             eventBus,
@@ -154,7 +154,7 @@ class SelectedMailboxImplTest {
         AtomicInteger successCount = new AtomicInteger(0);
         doAnswer(generateEmitCustomFlagEventAnswer(successCount))
             .when(eventBus)
-            .register(any(EventListener.class), eq(mailboxIdRegistrationKey));
+            .register(any(EventListener.ReactiveEventListener.class), eq(mailboxIdRegistrationKey));
 
         new SelectedMailboxImpl(mailboxManager, eventBus, imapSession, messageManager).finishInit().block();
 
@@ -166,7 +166,7 @@ class SelectedMailboxImplTest {
         AtomicInteger successCount = new AtomicInteger(0);
         doAnswer(generateEmitCustomFlagEventAnswer(successCount))
             .when(eventBus)
-            .register(any(EventListener.class), eq(mailboxIdRegistrationKey));
+            .register(any(EventListener.ReactiveEventListener.class), eq(mailboxIdRegistrationKey));
 
         SelectedMailboxImpl selectedMailbox = new SelectedMailboxImpl(mailboxManager, eventBus, imapSession, messageManager);
         selectedMailbox.finishInit().block();
@@ -179,7 +179,7 @@ class SelectedMailboxImplTest {
         AtomicInteger successCount = new AtomicInteger(0);
         doAnswer(generateEmitEventAnswer(successCount))
             .when(eventBus)
-            .register(any(EventListener.class), eq(mailboxIdRegistrationKey));
+            .register(any(EventListener.ReactiveEventListener.class), eq(mailboxIdRegistrationKey));
 
         new SelectedMailboxImpl(
             mailboxManager,


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 01/02: JAMES-3773 Prevent deadlock under load in AbstractSelectionProcessor

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 32b0db34bc247b9be49f73eedf52108e16374fd2
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue Aug 23 15:09:11 2022 +0700

    JAMES-3773 Prevent deadlock under load in AbstractSelectionProcessor
    
    Avoid that nested call to .block
---
 .../imap/processor/AbstractSelectionProcessor.java | 59 +++++++++++++---------
 1 file changed, 34 insertions(+), 25 deletions(-)

diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractSelectionProcessor.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractSelectionProcessor.java
index 372cbda658..f00bf146f1 100644
--- a/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractSelectionProcessor.java
+++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractSelectionProcessor.java
@@ -73,6 +73,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 
 import io.vavr.Tuple;
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 abstract class AbstractSelectionProcessor<R extends AbstractMailboxSelectionRequest> extends AbstractMailboxProcessor<R> implements PermitEnableCapabilityProcessor {
@@ -129,31 +130,17 @@ abstract class AbstractSelectionProcessor<R extends AbstractMailboxSelectionRequ
 
         return selectMailbox(fullMailboxPath, session, responder)
             .doOnNext(metaData -> {
-
-                final SelectedMailbox selected = session.getSelected();
-                MessageUid firstUnseen = metaData.getFirstUnseen();
-
-                flags(responder, selected);
-                exists(responder, metaData);
-                recent(responder, selected);
-                uidValidity(responder, metaData);
-
-
-                // try to write the UNSEEN message to the client and retry if we fail because of concurrent sessions.
-                //
-                // See IMAP-345
-                int retryCount = 0;
-                while (unseen(responder, firstUnseen, selected) == false) {
-                    // if we not was able to get find the unseen within 5 retries we should just not send it
-                    if (retryCount == 5) {
-                        LOGGER.info("Unable to uid for unseen message {} in mailbox {}", firstUnseen, selected.getMailboxId().serialize());
-                        break;
-                    }
-                    firstUnseen = selectMailbox(fullMailboxPath, session, responder).block().getFirstUnseen();
-                    retryCount++;
-
-                }
-
+                    SelectedMailbox selected = session.getSelected();
+
+                    flags(responder, selected);
+                    exists(responder, metaData);
+                    recent(responder, selected);
+                    uidValidity(responder, metaData);
+                })
+            .flatMap(metadata -> firstUnseen(session, fullMailboxPath, responder, metadata.getFirstUnseen(), session.getSelected())
+            .thenReturn(metadata))
+            .doOnNext(metaData -> {
+                SelectedMailbox selected = session.getSelected();
                 permanentFlags(responder, metaData.getPermanentFlags(), selected);
                 highestModSeq(responder, metaData);
                 uidNext(responder, metaData);
@@ -197,6 +184,28 @@ abstract class AbstractSelectionProcessor<R extends AbstractMailboxSelectionRequ
             .then();
     }
 
+    private Mono<MessageUid> firstUnseen(ImapSession session, MailboxPath fullMailboxPath, Responder responder, MessageUid firstUnseen, SelectedMailbox selected) {
+        // try to write the UNSEEN message to the client and retry if we fail because of concurrent sessions.
+        // See IMAP-345
+
+        if (firstUnseen == null) {
+            return Mono.empty();
+        }
+
+        return Flux.<MessageUid>concat(
+            Flux.just(firstUnseen),
+            Flux.range(0, 5)
+                .concatMap(i -> selectMailbox(fullMailboxPath, session, responder)
+                    .map(MailboxMetaData::getFirstUnseen)))
+            .filter(unseenUid -> unseen(responder, firstUnseen, selected))
+            .next()
+            .switchIfEmpty(Mono.fromCallable(() -> {
+                // if we not was able to get find the unseen within 5 retries we should just not send it
+                LOGGER.info("Unable to uid for unseen message {} in mailbox {}", firstUnseen, selected.getMailboxId().serialize());
+                return firstUnseen;
+            }));
+    }
+
     private Optional<UidRange[]> uidSet(AbstractMailboxSelectionRequest request, MailboxMetaData metaData) {
         return Optional.ofNullable(request.getUidSet())
             .or(() -> {


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org