You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ro...@apache.org on 2019/12/16 17:10:08 UTC
[james-project] 02/03: JAMES-3003 Mailbox event delivery should run
listeners concurrently
This is an automated email from the ASF dual-hosted git repository.
rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 069f81f8a37d10694fdd77b52a4b3bbea6cbc1bc
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Mon Dec 9 16:16:49 2019 +0100
JAMES-3003 Mailbox event delivery should run listeners concurrently
---
.../apache/james/mailbox/events/GroupContract.java | 57 ++++++++++++++++++++++
.../apache/james/mailbox/events/KeyContract.java | 44 +++++++++++++++++
.../apache/james/mailbox/events/InVMEventBus.java | 20 ++++----
.../mailbox/events/delivery/EventDelivery.java | 42 +---------------
.../mailbox/events/delivery/InVmEventDelivery.java | 11 ++---
.../events/delivery/InVmEventDeliveryTest.java | 21 ++------
.../james/mailbox/events/EventDispatcher.java | 6 +--
7 files changed, 122 insertions(+), 79 deletions(-)
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
index 2f1aca9..ebfd4ea 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
@@ -26,6 +26,7 @@ import static org.apache.james.mailbox.events.EventBusTestFixture.EVENT_UNSUPPOR
import static org.apache.james.mailbox.events.EventBusTestFixture.FIVE_HUNDRED_MS;
import static org.apache.james.mailbox.events.EventBusTestFixture.GROUP_A;
import static org.apache.james.mailbox.events.EventBusTestFixture.GROUP_B;
+import static org.apache.james.mailbox.events.EventBusTestFixture.GROUP_C;
import static org.apache.james.mailbox.events.EventBusTestFixture.NO_KEYS;
import static org.apache.james.mailbox.events.EventBusTestFixture.ONE_SECOND;
import static org.apache.james.mailbox.events.EventBusTestFixture.WAIT_CONDITION;
@@ -42,6 +43,8 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.time.Duration;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
@@ -54,6 +57,7 @@ import org.junit.jupiter.api.Test;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSortedMap;
+import reactor.core.scheduler.Schedulers;
public interface GroupContract {
@@ -97,6 +101,59 @@ public interface GroupContract {
}
@Test
+ default void groupNotificationShouldDeliverASingleEventToAllListenersAtTheSameTime() {
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ try {
+ ConcurrentLinkedQueue<String> threads = new ConcurrentLinkedQueue<>();
+ eventBus().register(new MailboxListener.GroupMailboxListener() {
+ @Override
+ public Group getDefaultGroup() {
+ return new GenericGroup("groupA");
+ }
+
+ @Override
+ public void event(Event event) throws Exception {
+ threads.add(Thread.currentThread().getName());
+ countDownLatch.await();
+ }
+ }, GROUP_A);
+ eventBus().register(new MailboxListener.GroupMailboxListener() {
+ @Override
+ public Group getDefaultGroup() {
+ return new GenericGroup("groupB");
+ }
+
+ @Override
+ public void event(Event event) throws Exception {
+ threads.add(Thread.currentThread().getName());
+ countDownLatch.await();
+ }
+ }, GROUP_B);
+ eventBus().register(new MailboxListener.GroupMailboxListener() {
+ @Override
+ public Group getDefaultGroup() {
+ return new GenericGroup("groupC");
+ }
+
+ @Override
+ public void event(Event event) throws Exception {
+ threads.add(Thread.currentThread().getName());
+ countDownLatch.await();
+ }
+ }, GROUP_C);
+
+ eventBus().dispatch(EVENT, NO_KEYS).subscribeOn(Schedulers.elastic()).subscribe();
+
+
+ WAIT_CONDITION.atMost(org.awaitility.Duration.TEN_SECONDS)
+ .untilAsserted(() -> assertThat(threads).hasSize(3));
+ assertThat(threads).doesNotHaveDuplicates();
+ } finally {
+ countDownLatch.countDown();
+ }
+ }
+
+ @Test
default void listenersShouldBeAbleToDispatch() {
AtomicBoolean successfulRetry = new AtomicBoolean(false);
MailboxListener listener = event -> {
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java
index f365ef3..2c8e581 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java
@@ -43,6 +43,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.time.Duration;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -56,6 +57,7 @@ import org.junit.jupiter.api.Test;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSortedMap;
+import reactor.core.scheduler.Schedulers;
public interface KeyContract extends EventBusContract {
@@ -86,6 +88,36 @@ public interface KeyContract extends EventBusContract {
}
@Test
+ default void notificationShouldDeliverASingleEventToAllListenersAtTheSameTime() {
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ try {
+ ConcurrentLinkedQueue<String> threads = new ConcurrentLinkedQueue<>();
+ eventBus().register(event -> {
+ threads.add(Thread.currentThread().getName());
+ countDownLatch.await();
+ }, KEY_1);
+ eventBus().register(event -> {
+ threads.add(Thread.currentThread().getName());
+ countDownLatch.await();
+ }, KEY_1);
+ eventBus().register(event -> {
+ threads.add(Thread.currentThread().getName());
+ countDownLatch.await();
+ }, KEY_1);
+
+ eventBus().dispatch(EVENT, KEY_1).subscribeOn(Schedulers.elastic()).subscribe();
+
+
+ WAIT_CONDITION.atMost(org.awaitility.Duration.TEN_SECONDS)
+ .untilAsserted(() -> assertThat(threads).hasSize(3));
+ assertThat(threads).doesNotHaveDuplicates();
+ } finally {
+ countDownLatch.countDown();
+ }
+ }
+
+
+ @Test
default void registeredListenersShouldNotReceiveNoopEvents() throws Exception {
MailboxListener listener = newListener();
@@ -293,6 +325,18 @@ public interface KeyContract extends EventBusContract {
.event(any());
}
+
+ @Test
+ default void dispatchShouldNotifyAsynchronousListener() throws Exception {
+ MailboxListener listener = newListener();
+ when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
+ eventBus().register(listener, KEY_1);
+
+ eventBus().dispatch(EVENT, KEY_1).block();
+
+ verify(listener, after(FIVE_HUNDRED_MS.toMillis())).event(EVENT);
+ }
+
@Test
default void dispatchShouldNotBlockAsynchronousListener() throws Exception {
MailboxListener listener = newListener();
diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/InVMEventBus.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/InVMEventBus.java
index f8bc8c9..bcd2376 100644
--- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/InVMEventBus.java
+++ b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/InVMEventBus.java
@@ -34,9 +34,9 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
-
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
public class InVMEventBus implements EventBus {
@@ -79,8 +79,6 @@ public class InVMEventBus implements EventBus {
public Mono<Void> dispatch(Event event, Set<RegistrationKey> keys) {
if (!event.isNoop()) {
return Flux.merge(groupDeliveries(event), keyDeliveries(event, keys))
- .reduceWith(EventDelivery.ExecutionStages::empty, EventDelivery.ExecutionStages::combine)
- .flatMap(EventDelivery.ExecutionStages::synchronousListenerFuture)
.then()
.onErrorResume(throwable -> Mono.empty());
}
@@ -90,9 +88,7 @@ public class InVMEventBus implements EventBus {
@Override
public Mono<Void> reDeliver(Group group, Event event) {
if (!event.isNoop()) {
- return Mono.fromCallable(() -> groupDelivery(event, retrieveListenerFromGroup(group), group))
- .flatMap(EventDelivery.ExecutionStages::synchronousListenerFuture)
- .then();
+ return groupDelivery(event, retrieveListenerFromGroup(group), group);
}
return Mono.empty();
}
@@ -102,17 +98,19 @@ public class InVMEventBus implements EventBus {
.orElseThrow(() -> new GroupRegistrationNotFound(group));
}
- private Flux<EventDelivery.ExecutionStages> keyDeliveries(Event event, Set<RegistrationKey> keys) {
+ private Mono<Void> keyDeliveries(Event event, Set<RegistrationKey> keys) {
return Flux.fromIterable(registeredListenersByKeys(keys))
- .map(listener -> eventDelivery.deliver(listener, event, EventDelivery.DeliveryOption.none()));
+ .flatMap(listener -> eventDelivery.deliver(listener, event, EventDelivery.DeliveryOption.none()).subscribeOn(Schedulers.elastic()))
+ .then();
}
- private Flux<EventDelivery.ExecutionStages> groupDeliveries(Event event) {
+ private Mono<Void> groupDeliveries(Event event) {
return Flux.fromIterable(groups.entrySet())
- .map(entry -> groupDelivery(event, entry.getValue(), entry.getKey()));
+ .flatMap(entry -> groupDelivery(event, entry.getValue(), entry.getKey()).subscribeOn(Schedulers.elastic()))
+ .then();
}
- private EventDelivery.ExecutionStages groupDelivery(Event event, MailboxListener mailboxListener, Group group) {
+ private Mono<Void> groupDelivery(Event event, MailboxListener mailboxListener, Group group) {
return eventDelivery.deliver(
mailboxListener,
event,
diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java
index 0f56a60..26c972d 100644
--- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java
+++ b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java
@@ -32,7 +32,6 @@ import org.apache.james.mailbox.events.RetryBackoffConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
@@ -129,44 +128,5 @@ public interface EventDelivery {
Mono<Void> handle(Event event);
}
- class ExecutionStages {
-
- public static ExecutionStages empty() {
- return new ExecutionStages(Mono.empty(), Mono.empty());
- }
-
- static ExecutionStages synchronous(Mono<Void> synchronousListenerFuture) {
- return new ExecutionStages(synchronousListenerFuture, Mono.empty());
- }
-
- static ExecutionStages asynchronous(Mono<Void> asynchronousListenerFuture) {
- return new ExecutionStages(Mono.empty(),asynchronousListenerFuture);
- }
-
- private final Mono<Void> synchronousListenerFuture;
- private final Mono<Void> asynchronousListenerFuture;
-
- private ExecutionStages(Mono<Void> synchronousListenerFuture, Mono<Void> asynchronousListenerFuture) {
- this.synchronousListenerFuture = synchronousListenerFuture;
- this.asynchronousListenerFuture = asynchronousListenerFuture;
- }
-
- public Mono<Void> synchronousListenerFuture() {
- return synchronousListenerFuture;
- }
-
- public Mono<Void> allListenerFuture() {
- return synchronousListenerFuture
- .concatWith(asynchronousListenerFuture)
- .then();
- }
-
- public ExecutionStages combine(ExecutionStages another) {
- return new ExecutionStages(
- Flux.concat(this.synchronousListenerFuture, another.synchronousListenerFuture).then(),
- Flux.concat(this.asynchronousListenerFuture, another.asynchronousListenerFuture).then());
- }
- }
-
- ExecutionStages deliver(MailboxListener listener, Event event, DeliveryOption option);
+ Mono<Void> deliver(MailboxListener listener, Event event, DeliveryOption option);
}
diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java
index 718d5f1..a36119f 100644
--- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java
+++ b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java
@@ -51,18 +51,17 @@ public class InVmEventDelivery implements EventDelivery {
}
@Override
- public ExecutionStages deliver(MailboxListener listener, Event event, DeliveryOption option) {
+ public Mono<Void> deliver(MailboxListener listener, Event event, DeliveryOption option) {
Mono<Void> executionResult = deliverByOption(listener, event, option);
- return toExecutionStages(listener.getExecutionMode(), executionResult);
+ return waitForResultIfNeeded(listener.getExecutionMode(), executionResult);
}
- private ExecutionStages toExecutionStages(MailboxListener.ExecutionMode executionMode, Mono<Void> executionResult) {
+ private Mono<Void> waitForResultIfNeeded(MailboxListener.ExecutionMode executionMode, Mono<Void> executionResult) {
if (executionMode.equals(MailboxListener.ExecutionMode.SYNCHRONOUS)) {
- return ExecutionStages.synchronous(executionResult);
+ return executionResult;
}
-
- return ExecutionStages.asynchronous(executionResult);
+ return executionResult.or(Mono.empty()).onErrorResume(throwable -> Mono.empty());
}
private Mono<Void> deliverByOption(MailboxListener listener, Event event, DeliveryOption deliveryOption) {
diff --git a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java
index 547e192..dfea972 100644
--- a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java
+++ b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java
@@ -63,7 +63,6 @@ class InVmEventDeliveryTest {
void deliverShouldDeliverEvent() {
when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
inVmEventDelivery.deliver(listener, EVENT, DeliveryOption.none())
- .allListenerFuture()
.block();
assertThat(listener.numberOfEventCalls())
@@ -74,7 +73,6 @@ class InVmEventDeliveryTest {
void deliverShouldReturnSuccessSynchronousMono() {
when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
assertThatCode(() -> inVmEventDelivery.deliver(listener, EVENT, DeliveryOption.none())
- .synchronousListenerFuture()
.block())
.doesNotThrowAnyException();
}
@@ -86,7 +84,6 @@ class InVmEventDeliveryTest {
.when(listener).event(EVENT);
assertThatThrownBy(() -> inVmEventDelivery.deliver(listener, EVENT, DeliveryOption.none())
- .allListenerFuture()
.block())
.isInstanceOf(RuntimeException.class);
@@ -101,7 +98,6 @@ class InVmEventDeliveryTest {
.when(listener).event(EVENT);
assertThatThrownBy(() -> inVmEventDelivery.deliver(listener, EVENT, DeliveryOption.none())
- .synchronousListenerFuture()
.block())
.isInstanceOf(RuntimeException.class);
}
@@ -114,7 +110,6 @@ class InVmEventDeliveryTest {
void deliverShouldDeliverEvent() {
when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
inVmEventDelivery.deliver(listener, EVENT, DeliveryOption.none())
- .allListenerFuture()
.block();
assertThat(listener.numberOfEventCalls())
@@ -125,24 +120,19 @@ class InVmEventDeliveryTest {
void deliverShouldReturnSuccessSynchronousMono() {
when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
assertThatCode(() -> inVmEventDelivery.deliver(listener, EVENT, DeliveryOption.none())
- .synchronousListenerFuture()
.block())
.doesNotThrowAnyException();
}
@Test
- void deliverShouldNotDeliverWhenListenerGetException() {
+ void deliverShouldNotFailWhenListenerGetException() {
when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
doThrow(new RuntimeException())
.when(listener).event(EVENT);
- assertThatThrownBy(() -> inVmEventDelivery.deliver(listener, EVENT, DeliveryOption.none())
- .allListenerFuture()
+ assertThatCode(() -> inVmEventDelivery.deliver(listener, EVENT, DeliveryOption.none())
.block())
- .isInstanceOf(RuntimeException.class);
-
- assertThat(listener.numberOfEventCalls())
- .isEqualTo(0);
+ .doesNotThrowAnyException();
}
@Test
@@ -152,7 +142,6 @@ class InVmEventDeliveryTest {
.when(listener).event(EVENT);
assertThatCode(() -> inVmEventDelivery.deliver(listener, EVENT, DeliveryOption.none())
- .synchronousListenerFuture()
.block())
.doesNotThrowAnyException();
}
@@ -174,7 +163,6 @@ class InVmEventDeliveryTest {
DeliveryOption.of(
BackoffRetryer.of(RetryBackoffConfiguration.DEFAULT, listener),
PermanentFailureHandler.NO_HANDLER))
- .allListenerFuture()
.block();
assertThat(listener.numberOfEventCalls())
@@ -193,7 +181,6 @@ class InVmEventDeliveryTest {
DeliveryOption.of(
Retryer.NO_RETRYER,
PermanentFailureHandler.StoreToDeadLetters.of(GROUP_A, deadLetter)))
- .allListenerFuture()
.block();
assertThat(deadLetter.groupsWithFailedEvents().toStream())
@@ -214,7 +201,6 @@ class InVmEventDeliveryTest {
DeliveryOption.of(
BackoffRetryer.of(RetryBackoffConfiguration.DEFAULT, listener),
PermanentFailureHandler.StoreToDeadLetters.of(GROUP_A, deadLetter)))
- .allListenerFuture()
.block();
SoftAssertions.assertSoftly(softy -> {
@@ -242,7 +228,6 @@ class InVmEventDeliveryTest {
DeliveryOption.of(
BackoffRetryer.of(RetryBackoffConfiguration.DEFAULT, listener),
PermanentFailureHandler.StoreToDeadLetters.of(GROUP_A, deadLetter)))
- .allListenerFuture()
.block();
SoftAssertions.assertSoftly(softy -> {
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
index 47844d7..6c1b586 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
@@ -90,13 +90,13 @@ class EventDispatcher {
return Flux.fromIterable(keys)
.flatMap(key -> localListenerRegistry.getLocalMailboxListeners(key)
.map(listener -> Tuples.of(key, listener)))
- .filter(pair -> pair.getT2().getExecutionMode().equals(MailboxListener.ExecutionMode.SYNCHRONOUS))
- .flatMap(pair -> executeListener(event, pair.getT2(), pair.getT1()))
+ .filter(pair -> pair.getT2().getExecutionMode() == MailboxListener.ExecutionMode.SYNCHRONOUS)
+ .flatMap(pair -> executeListener(event, pair.getT2(), pair.getT1()).subscribeOn(Schedulers.elastic()))
.then();
}
private Mono<Void> executeListener(Event event, MailboxListener mailboxListener, RegistrationKey registrationKey) {
- return Mono.from((sink) -> {
+ return Mono.from(sink -> {
try {
mailboxListenerExecutor.execute(mailboxListener,
MDCBuilder.create()
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org