You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2022/08/24 10:30:28 UTC

[james-project] 01/02: JAMES-3773 Prevent deadlock under load in AbstractSelectionProcessor

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 32b0db34bc247b9be49f73eedf52108e16374fd2
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue Aug 23 15:09:11 2022 +0700

    JAMES-3773 Prevent deadlock under load in AbstractSelectionProcessor
    
    Avoid that nested call to .block
---
 .../imap/processor/AbstractSelectionProcessor.java | 59 +++++++++++++---------
 1 file changed, 34 insertions(+), 25 deletions(-)

diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractSelectionProcessor.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractSelectionProcessor.java
index 372cbda658..f00bf146f1 100644
--- a/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractSelectionProcessor.java
+++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractSelectionProcessor.java
@@ -73,6 +73,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 
 import io.vavr.Tuple;
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 abstract class AbstractSelectionProcessor<R extends AbstractMailboxSelectionRequest> extends AbstractMailboxProcessor<R> implements PermitEnableCapabilityProcessor {
@@ -129,31 +130,17 @@ abstract class AbstractSelectionProcessor<R extends AbstractMailboxSelectionRequ
 
         return selectMailbox(fullMailboxPath, session, responder)
             .doOnNext(metaData -> {
-
-                final SelectedMailbox selected = session.getSelected();
-                MessageUid firstUnseen = metaData.getFirstUnseen();
-
-                flags(responder, selected);
-                exists(responder, metaData);
-                recent(responder, selected);
-                uidValidity(responder, metaData);
-
-
-                // try to write the UNSEEN message to the client and retry if we fail because of concurrent sessions.
-                //
-                // See IMAP-345
-                int retryCount = 0;
-                while (unseen(responder, firstUnseen, selected) == false) {
-                    // if we not was able to get find the unseen within 5 retries we should just not send it
-                    if (retryCount == 5) {
-                        LOGGER.info("Unable to uid for unseen message {} in mailbox {}", firstUnseen, selected.getMailboxId().serialize());
-                        break;
-                    }
-                    firstUnseen = selectMailbox(fullMailboxPath, session, responder).block().getFirstUnseen();
-                    retryCount++;
-
-                }
-
+                    SelectedMailbox selected = session.getSelected();
+
+                    flags(responder, selected);
+                    exists(responder, metaData);
+                    recent(responder, selected);
+                    uidValidity(responder, metaData);
+                })
+            .flatMap(metadata -> firstUnseen(session, fullMailboxPath, responder, metadata.getFirstUnseen(), session.getSelected())
+            .thenReturn(metadata))
+            .doOnNext(metaData -> {
+                SelectedMailbox selected = session.getSelected();
                 permanentFlags(responder, metaData.getPermanentFlags(), selected);
                 highestModSeq(responder, metaData);
                 uidNext(responder, metaData);
@@ -197,6 +184,28 @@ abstract class AbstractSelectionProcessor<R extends AbstractMailboxSelectionRequ
             .then();
     }
 
+    private Mono<MessageUid> firstUnseen(ImapSession session, MailboxPath fullMailboxPath, Responder responder, MessageUid firstUnseen, SelectedMailbox selected) {
+        // try to write the UNSEEN message to the client and retry if we fail because of concurrent sessions.
+        // See IMAP-345
+
+        if (firstUnseen == null) {
+            return Mono.empty();
+        }
+
+        return Flux.<MessageUid>concat(
+            Flux.just(firstUnseen),
+            Flux.range(0, 5)
+                .concatMap(i -> selectMailbox(fullMailboxPath, session, responder)
+                    .map(MailboxMetaData::getFirstUnseen)))
+            .filter(unseenUid -> unseen(responder, firstUnseen, selected))
+            .next()
+            .switchIfEmpty(Mono.fromCallable(() -> {
+                // if we not was able to get find the unseen within 5 retries we should just not send it
+                LOGGER.info("Unable to uid for unseen message {} in mailbox {}", firstUnseen, selected.getMailboxId().serialize());
+                return firstUnseen;
+            }));
+    }
+
     private Optional<UidRange[]> uidSet(AbstractMailboxSelectionRequest request, MailboxMetaData metaData) {
         return Optional.ofNullable(request.getUidSet())
             .or(() -> {


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org