You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2022/08/24 10:30:29 UTC
[james-project] 02/02: JAMES-3773 IDLE listener can avoid a nested blocking call
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 576f8860a3d8dfd4034e2e7788d1cb3ab2d3169a
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed Aug 24 09:15:13 2022 +0700
JAMES-3773 IDLE listener can avoid a nested blocking call
---
.../james/imap/api/process/SelectedMailbox.java | 2 +-
.../imap/processor/AbstractSelectionProcessor.java | 10 +++++++---
.../apache/james/imap/processor/IdleProcessor.java | 7 ++++---
.../imap/processor/base/SelectedMailboxImpl.java | 20 ++++++++++----------
.../processor/base/MailboxEventAnalyserTest.java | 18 +++++++++---------
.../imap/processor/base/SelectedMailboxImplTest.java | 8 ++++----
6 files changed, 35 insertions(+), 30 deletions(-)
diff --git a/protocols/imap/src/main/java/org/apache/james/imap/api/process/SelectedMailbox.java b/protocols/imap/src/main/java/org/apache/james/imap/api/process/SelectedMailbox.java
index aad4eb0d2f..efd430034d 100644
--- a/protocols/imap/src/main/java/org/apache/james/imap/api/process/SelectedMailbox.java
+++ b/protocols/imap/src/main/java/org/apache/james/imap/api/process/SelectedMailbox.java
@@ -45,7 +45,7 @@ public interface SelectedMailbox {
*/
Mono<Void> deselect();
- void registerIdle(EventListener idle);
+ void registerIdle(EventListener.ReactiveEventListener idle);
void unregisterIdle();
diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractSelectionProcessor.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractSelectionProcessor.java
index f00bf146f1..4e131d2db6 100644
--- a/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractSelectionProcessor.java
+++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractSelectionProcessor.java
@@ -83,7 +83,7 @@ abstract class AbstractSelectionProcessor<R extends AbstractMailboxSelectionRequ
private final StatusResponseFactory statusResponseFactory;
private final boolean openReadOnly;
private final EventBus eventBus;
-
+
public AbstractSelectionProcessor(Class<R> acceptableClass, MailboxManager mailboxManager, StatusResponseFactory statusResponseFactory, boolean openReadOnly,
MetricFactory metricFactory, EventBus eventBus) {
super(acceptableClass, mailboxManager, statusResponseFactory, metricFactory);
@@ -195,8 +195,7 @@ abstract class AbstractSelectionProcessor<R extends AbstractMailboxSelectionRequ
return Flux.<MessageUid>concat(
Flux.just(firstUnseen),
Flux.range(0, 5)
- .concatMap(i -> selectMailbox(fullMailboxPath, session, responder)
- .map(MailboxMetaData::getFirstUnseen)))
+ .concatMap(i -> retrieveFirstUnseen(session, fullMailboxPath, responder)))
.filter(unseenUid -> unseen(responder, firstUnseen, selected))
.next()
.switchIfEmpty(Mono.fromCallable(() -> {
@@ -206,6 +205,11 @@ abstract class AbstractSelectionProcessor<R extends AbstractMailboxSelectionRequ
}));
}
+ private Mono<MessageUid> retrieveFirstUnseen(ImapSession session, MailboxPath fullMailboxPath, Responder responder) {
+ return selectMailbox(fullMailboxPath, session, responder)
+ .map(MailboxMetaData::getFirstUnseen);
+ }
+
private Optional<UidRange[]> uidSet(AbstractMailboxSelectionRequest request, MailboxMetaData metaData) {
return Optional.ofNullable(request.getUidSet())
.or(() -> {
diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/IdleProcessor.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/IdleProcessor.java
index 39005f0020..29e92276b2 100644
--- a/protocols/imap/src/main/java/org/apache/james/imap/processor/IdleProcessor.java
+++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/IdleProcessor.java
@@ -44,6 +44,7 @@ import org.apache.james.mailbox.events.MailboxEvents.Expunged;
import org.apache.james.mailbox.events.MailboxEvents.FlagsUpdated;
import org.apache.james.metrics.api.MetricFactory;
import org.apache.james.util.MDCBuilder;
+import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -146,7 +147,7 @@ public class IdleProcessor extends AbstractMailboxProcessor<IdleRequest> impleme
return CAPS;
}
- private class IdleMailboxListener implements EventListener {
+ private class IdleMailboxListener implements EventListener.ReactiveEventListener {
private final Responder responder;
private final ImapSession session;
@@ -162,8 +163,8 @@ public class IdleProcessor extends AbstractMailboxProcessor<IdleRequest> impleme
}
@Override
- public void event(Event event) {
- unsolicitedResponses(session, responder, false).block();
+ public Publisher<Void> reactiveEvent(Event event) {
+ return unsolicitedResponses(session, responder, false);
}
@Override
diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/base/SelectedMailboxImpl.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/base/SelectedMailboxImpl.java
index 11dc17f264..49d4854d81 100644
--- a/protocols/imap/src/main/java/org/apache/james/imap/processor/base/SelectedMailboxImpl.java
+++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/base/SelectedMailboxImpl.java
@@ -62,6 +62,7 @@ import org.apache.james.mailbox.model.MailboxId;
import org.apache.james.mailbox.model.MailboxPath;
import org.apache.james.mailbox.model.SearchQuery;
import org.apache.james.mailbox.model.UpdatedFlags;
+import org.reactivestreams.Publisher;
import com.github.fge.lambdas.Throwing;
import com.google.common.annotations.VisibleForTesting;
@@ -75,9 +76,7 @@ import reactor.core.scheduler.Schedulers;
/**
* Default implementation of {@link SelectedMailbox}
*/
-public class SelectedMailboxImpl implements SelectedMailbox, EventListener {
-
-
+public class SelectedMailboxImpl implements SelectedMailbox, EventListener.ReactiveEventListener {
private static final Void VOID = null;
private static final Flag UNINTERESTING_FLAGS = Flag.RECENT;
@@ -135,7 +134,7 @@ public class SelectedMailboxImpl implements SelectedMailbox, EventListener {
private final Set<MessageUid> flagUpdateUids = new TreeSet<>();
private final Set<MessageUid> expungedUids = new TreeSet<>();
private final StampedLock applicableFlagsLock = new StampedLock();
- private final AtomicReference<EventListener> idleEventListener = new AtomicReference<>();
+ private final AtomicReference<ReactiveEventListener> idleEventListener = new AtomicReference<>();
private final AtomicBoolean recentUidRemoved = new AtomicBoolean(false);
private final AtomicBoolean isDeletedByOtherSession = new AtomicBoolean(false);
private final AtomicBoolean sizeChanged = new AtomicBoolean(false);
@@ -172,7 +171,7 @@ public class SelectedMailboxImpl implements SelectedMailbox, EventListener {
}
@Override
- public void registerIdle(EventListener idle) {
+ public void registerIdle(ReactiveEventListener idle) {
idleEventListener.set(idle);
}
@@ -391,12 +390,13 @@ public class SelectedMailboxImpl implements SelectedMailbox, EventListener {
applicableFlagsLock.unlockWrite(stamp);
}
-
@Override
- public void event(Event event) {
- synchronizedEvent(event);
- Optional.ofNullable(idleEventListener.get())
- .ifPresent(Throwing.<EventListener>consumer(listener -> listener.event(event)).sneakyThrow());
+ public Publisher<Void> reactiveEvent(Event event) {
+ return Mono.fromRunnable(() -> synchronizedEvent(event))
+ .subscribeOn(Schedulers.boundedElastic())
+ .then(Optional.ofNullable(idleEventListener.get())
+ .map(listener -> Mono.from(listener.reactiveEvent(event)))
+ .orElse(Mono.empty()));
}
private synchronized void synchronizedEvent(Event event) {
diff --git a/protocols/imap/src/test/java/org/apache/james/imap/processor/base/MailboxEventAnalyserTest.java b/protocols/imap/src/test/java/org/apache/james/imap/processor/base/MailboxEventAnalyserTest.java
index 5ee666cc68..b5a750b31f 100644
--- a/protocols/imap/src/test/java/org/apache/james/imap/processor/base/MailboxEventAnalyserTest.java
+++ b/protocols/imap/src/test/java/org/apache/james/imap/processor/base/MailboxEventAnalyserTest.java
@@ -169,7 +169,7 @@ class MailboxEventAnalyserTest {
}
@Test
- void testShouldBeNoSizeChangeOnOtherEvent() {
+ void testShouldBeNoSizeChangeOnOtherEvent() throws Exception {
MailboxEvent event = new MailboxAdded(MAILBOX_SESSION.getSessionId(),
MAILBOX_SESSION.getUser(), MAILBOX_PATH, MAILBOX_ID, Event.EventId.random());
@@ -179,14 +179,14 @@ class MailboxEventAnalyserTest {
}
@Test
- void testShouldBeNoSizeChangeOnAdded() {
+ void testShouldBeNoSizeChangeOnAdded() throws Exception {
testee.event(ADDED);
assertThat(testee.isSizeChanged()).isTrue();
}
@Test
- void testShouldNoSizeChangeAfterReset() {
+ void testShouldNoSizeChangeAfterReset() throws Exception {
testee.event(ADDED);
testee.resetEvents();
@@ -194,7 +194,7 @@ class MailboxEventAnalyserTest {
}
@Test
- void testShouldNotSetUidWhenNoSystemFlagChange() {
+ void testShouldNotSetUidWhenNoSystemFlagChange() throws Exception {
FlagsUpdated update = EventFactory.flagsUpdated()
.randomEventId()
.mailboxSession(MAILBOX_SESSION)
@@ -208,7 +208,7 @@ class MailboxEventAnalyserTest {
}
@Test
- void testShouldSetUidWhenSystemFlagChange() {
+ void testShouldSetUidWhenSystemFlagChange() throws Exception {
FlagsUpdated update = EventFactory.flagsUpdated()
.randomEventId()
.mailboxSession(OTHER_MAILBOX_SESSION)
@@ -223,7 +223,7 @@ class MailboxEventAnalyserTest {
}
@Test
- void testShouldClearFlagUidsUponReset() {
+ void testShouldClearFlagUidsUponReset() throws Exception {
SelectedMailboxImpl analyser = this.testee;
FlagsUpdated update = EventFactory.flagsUpdated()
@@ -241,7 +241,7 @@ class MailboxEventAnalyserTest {
}
@Test
- void testShouldSetUidWhenSystemFlagChangeDifferentSessionInSilentMode() {
+ void testShouldSetUidWhenSystemFlagChangeDifferentSessionInSilentMode() throws Exception {
FlagsUpdated update = EventFactory.flagsUpdated()
.randomEventId()
.mailboxSession(OTHER_MAILBOX_SESSION)
@@ -258,7 +258,7 @@ class MailboxEventAnalyserTest {
}
@Test
- void testShouldNotSetUidWhenSystemFlagChangeSameSessionInSilentMode() {
+ void testShouldNotSetUidWhenSystemFlagChangeSameSessionInSilentMode() throws Exception {
FlagsUpdated update = EventFactory.flagsUpdated()
.randomEventId()
.mailboxSession(MAILBOX_SESSION)
@@ -275,7 +275,7 @@ class MailboxEventAnalyserTest {
}
@Test
- void testShouldNotSetUidWhenOnlyRecentFlagUpdated() {
+ void testShouldNotSetUidWhenOnlyRecentFlagUpdated() throws Exception {
FlagsUpdated update = EventFactory.flagsUpdated()
.randomEventId()
.mailboxSession(MAILBOX_SESSION)
diff --git a/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java b/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java
index e0346048a2..89ff7e077d 100644
--- a/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java
+++ b/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java
@@ -138,7 +138,7 @@ class SelectedMailboxImplTest {
AtomicInteger successCount = new AtomicInteger(0);
doAnswer(generateEmitEventAnswer(successCount))
.when(eventBus)
- .register(any(EventListener.class), eq(mailboxIdRegistrationKey));
+ .register(any(EventListener.ReactiveEventListener.class), eq(mailboxIdRegistrationKey));
SelectedMailboxImpl selectedMailbox = new SelectedMailboxImpl(
mailboxManager,
eventBus,
@@ -154,7 +154,7 @@ class SelectedMailboxImplTest {
AtomicInteger successCount = new AtomicInteger(0);
doAnswer(generateEmitCustomFlagEventAnswer(successCount))
.when(eventBus)
- .register(any(EventListener.class), eq(mailboxIdRegistrationKey));
+ .register(any(EventListener.ReactiveEventListener.class), eq(mailboxIdRegistrationKey));
new SelectedMailboxImpl(mailboxManager, eventBus, imapSession, messageManager).finishInit().block();
@@ -166,7 +166,7 @@ class SelectedMailboxImplTest {
AtomicInteger successCount = new AtomicInteger(0);
doAnswer(generateEmitCustomFlagEventAnswer(successCount))
.when(eventBus)
- .register(any(EventListener.class), eq(mailboxIdRegistrationKey));
+ .register(any(EventListener.ReactiveEventListener.class), eq(mailboxIdRegistrationKey));
SelectedMailboxImpl selectedMailbox = new SelectedMailboxImpl(mailboxManager, eventBus, imapSession, messageManager);
selectedMailbox.finishInit().block();
@@ -179,7 +179,7 @@ class SelectedMailboxImplTest {
AtomicInteger successCount = new AtomicInteger(0);
doAnswer(generateEmitEventAnswer(successCount))
.when(eventBus)
- .register(any(EventListener.class), eq(mailboxIdRegistrationKey));
+ .register(any(EventListener.ReactiveEventListener.class), eq(mailboxIdRegistrationKey));
new SelectedMailboxImpl(
mailboxManager,
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org