You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ro...@apache.org on 2019/12/16 17:10:08 UTC

[james-project] 02/03: JAMES-3003 Mailbox event delivery should run listeners concurrently

This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 069f81f8a37d10694fdd77b52a4b3bbea6cbc1bc
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Mon Dec 9 16:16:49 2019 +0100

    JAMES-3003 Mailbox event delivery should run listeners concurrently
---
 .../apache/james/mailbox/events/GroupContract.java | 57 ++++++++++++++++++++++
 .../apache/james/mailbox/events/KeyContract.java   | 44 +++++++++++++++++
 .../apache/james/mailbox/events/InVMEventBus.java  | 20 ++++----
 .../mailbox/events/delivery/EventDelivery.java     | 42 +---------------
 .../mailbox/events/delivery/InVmEventDelivery.java | 11 ++---
 .../events/delivery/InVmEventDeliveryTest.java     | 21 ++------
 .../james/mailbox/events/EventDispatcher.java      |  6 +--
 7 files changed, 122 insertions(+), 79 deletions(-)

diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
index 2f1aca9..ebfd4ea 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
@@ -26,6 +26,7 @@ import static org.apache.james.mailbox.events.EventBusTestFixture.EVENT_UNSUPPOR
 import static org.apache.james.mailbox.events.EventBusTestFixture.FIVE_HUNDRED_MS;
 import static org.apache.james.mailbox.events.EventBusTestFixture.GROUP_A;
 import static org.apache.james.mailbox.events.EventBusTestFixture.GROUP_B;
+import static org.apache.james.mailbox.events.EventBusTestFixture.GROUP_C;
 import static org.apache.james.mailbox.events.EventBusTestFixture.NO_KEYS;
 import static org.apache.james.mailbox.events.EventBusTestFixture.ONE_SECOND;
 import static org.apache.james.mailbox.events.EventBusTestFixture.WAIT_CONDITION;
@@ -42,6 +43,8 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.time.Duration;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.IntStream;
@@ -54,6 +57,7 @@ import org.junit.jupiter.api.Test;
 
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.ImmutableSortedMap;
+import reactor.core.scheduler.Schedulers;
 
 public interface GroupContract {
 
@@ -97,6 +101,59 @@ public interface GroupContract {
         }
 
         @Test
+        default void groupNotificationShouldDeliverASingleEventToAllListenersAtTheSameTime() {
+            CountDownLatch countDownLatch = new CountDownLatch(1);
+            try {
+                ConcurrentLinkedQueue<String> threads = new ConcurrentLinkedQueue<>();
+                eventBus().register(new MailboxListener.GroupMailboxListener() {
+                    @Override
+                    public Group getDefaultGroup() {
+                        return new GenericGroup("groupA");
+                    }
+
+                    @Override
+                    public void event(Event event) throws Exception {
+                        threads.add(Thread.currentThread().getName());
+                        countDownLatch.await();
+                    }
+                }, GROUP_A);
+                eventBus().register(new MailboxListener.GroupMailboxListener() {
+                    @Override
+                    public Group getDefaultGroup() {
+                        return new GenericGroup("groupB");
+                    }
+
+                    @Override
+                    public void event(Event event) throws Exception {
+                        threads.add(Thread.currentThread().getName());
+                        countDownLatch.await();
+                    }
+                }, GROUP_B);
+                eventBus().register(new MailboxListener.GroupMailboxListener() {
+                    @Override
+                    public Group getDefaultGroup() {
+                        return new GenericGroup("groupC");
+                    }
+
+                    @Override
+                    public void event(Event event) throws Exception {
+                        threads.add(Thread.currentThread().getName());
+                        countDownLatch.await();
+                    }
+                }, GROUP_C);
+
+                eventBus().dispatch(EVENT, NO_KEYS).subscribeOn(Schedulers.elastic()).subscribe();
+
+
+                WAIT_CONDITION.atMost(org.awaitility.Duration.TEN_SECONDS)
+                    .untilAsserted(() -> assertThat(threads).hasSize(3));
+                assertThat(threads).doesNotHaveDuplicates();
+            } finally {
+                countDownLatch.countDown();
+            }
+        }
+
+        @Test
         default void listenersShouldBeAbleToDispatch() {
             AtomicBoolean successfulRetry = new AtomicBoolean(false);
             MailboxListener listener = event -> {
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java
index f365ef3..2c8e581 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java
@@ -43,6 +43,7 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.time.Duration;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -56,6 +57,7 @@ import org.junit.jupiter.api.Test;
 
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.ImmutableSortedMap;
+import reactor.core.scheduler.Schedulers;
 
 public interface KeyContract extends EventBusContract {
 
@@ -86,6 +88,36 @@ public interface KeyContract extends EventBusContract {
         }
 
         @Test
+        default void notificationShouldDeliverASingleEventToAllListenersAtTheSameTime() {
+            CountDownLatch countDownLatch = new CountDownLatch(1);
+            try {
+                ConcurrentLinkedQueue<String> threads = new ConcurrentLinkedQueue<>();
+                eventBus().register(event -> {
+                    threads.add(Thread.currentThread().getName());
+                    countDownLatch.await();
+                }, KEY_1);
+                eventBus().register(event -> {
+                    threads.add(Thread.currentThread().getName());
+                    countDownLatch.await();
+                }, KEY_1);
+                eventBus().register(event -> {
+                    threads.add(Thread.currentThread().getName());
+                    countDownLatch.await();
+                }, KEY_1);
+
+                eventBus().dispatch(EVENT, KEY_1).subscribeOn(Schedulers.elastic()).subscribe();
+
+
+                WAIT_CONDITION.atMost(org.awaitility.Duration.TEN_SECONDS)
+                    .untilAsserted(() -> assertThat(threads).hasSize(3));
+                assertThat(threads).doesNotHaveDuplicates();
+            } finally {
+                countDownLatch.countDown();
+            }
+        }
+
+
+        @Test
         default void registeredListenersShouldNotReceiveNoopEvents() throws Exception {
             MailboxListener listener = newListener();
 
@@ -293,6 +325,18 @@ public interface KeyContract extends EventBusContract {
                 .event(any());
         }
 
+
+        @Test
+        default void dispatchShouldNotifyAsynchronousListener() throws Exception {
+            MailboxListener listener = newListener();
+            when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
+            eventBus().register(listener, KEY_1);
+
+            eventBus().dispatch(EVENT, KEY_1).block();
+
+            verify(listener, after(FIVE_HUNDRED_MS.toMillis())).event(EVENT);
+        }
+
         @Test
         default void dispatchShouldNotBlockAsynchronousListener() throws Exception {
             MailboxListener listener = newListener();
diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/InVMEventBus.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/InVMEventBus.java
index f8bc8c9..bcd2376 100644
--- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/InVMEventBus.java
+++ b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/InVMEventBus.java
@@ -34,9 +34,9 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Multimaps;
-
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
 public class InVMEventBus implements EventBus {
 
@@ -79,8 +79,6 @@ public class InVMEventBus implements EventBus {
     public Mono<Void> dispatch(Event event, Set<RegistrationKey> keys) {
         if (!event.isNoop()) {
             return Flux.merge(groupDeliveries(event), keyDeliveries(event, keys))
-                .reduceWith(EventDelivery.ExecutionStages::empty, EventDelivery.ExecutionStages::combine)
-                .flatMap(EventDelivery.ExecutionStages::synchronousListenerFuture)
                 .then()
                 .onErrorResume(throwable -> Mono.empty());
         }
@@ -90,9 +88,7 @@ public class InVMEventBus implements EventBus {
     @Override
     public Mono<Void> reDeliver(Group group, Event event) {
         if (!event.isNoop()) {
-            return Mono.fromCallable(() -> groupDelivery(event, retrieveListenerFromGroup(group), group))
-                .flatMap(EventDelivery.ExecutionStages::synchronousListenerFuture)
-                .then();
+            return groupDelivery(event, retrieveListenerFromGroup(group), group);
         }
         return Mono.empty();
     }
@@ -102,17 +98,19 @@ public class InVMEventBus implements EventBus {
             .orElseThrow(() -> new GroupRegistrationNotFound(group));
     }
 
-    private Flux<EventDelivery.ExecutionStages> keyDeliveries(Event event, Set<RegistrationKey> keys) {
+    private Mono<Void> keyDeliveries(Event event, Set<RegistrationKey> keys) {
         return Flux.fromIterable(registeredListenersByKeys(keys))
-            .map(listener -> eventDelivery.deliver(listener, event, EventDelivery.DeliveryOption.none()));
+            .flatMap(listener -> eventDelivery.deliver(listener, event, EventDelivery.DeliveryOption.none()).subscribeOn(Schedulers.elastic()))
+            .then();
     }
 
-    private Flux<EventDelivery.ExecutionStages> groupDeliveries(Event event) {
+    private Mono<Void> groupDeliveries(Event event) {
         return Flux.fromIterable(groups.entrySet())
-            .map(entry -> groupDelivery(event, entry.getValue(), entry.getKey()));
+            .flatMap(entry -> groupDelivery(event, entry.getValue(), entry.getKey()).subscribeOn(Schedulers.elastic()))
+            .then();
     }
 
-    private EventDelivery.ExecutionStages groupDelivery(Event event, MailboxListener mailboxListener, Group group) {
+    private Mono<Void> groupDelivery(Event event, MailboxListener mailboxListener, Group group) {
         return eventDelivery.deliver(
             mailboxListener,
             event,
diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java
index 0f56a60..26c972d 100644
--- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java
+++ b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java
@@ -32,7 +32,6 @@ import org.apache.james.mailbox.events.RetryBackoffConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 
@@ -129,44 +128,5 @@ public interface EventDelivery {
         Mono<Void> handle(Event event);
     }
 
-    class ExecutionStages {
-
-        public static ExecutionStages empty() {
-            return new ExecutionStages(Mono.empty(), Mono.empty());
-        }
-
-        static ExecutionStages synchronous(Mono<Void> synchronousListenerFuture) {
-            return new ExecutionStages(synchronousListenerFuture, Mono.empty());
-        }
-
-        static ExecutionStages asynchronous(Mono<Void> asynchronousListenerFuture) {
-            return new ExecutionStages(Mono.empty(),asynchronousListenerFuture);
-        }
-
-        private final Mono<Void> synchronousListenerFuture;
-        private final Mono<Void> asynchronousListenerFuture;
-
-        private ExecutionStages(Mono<Void> synchronousListenerFuture, Mono<Void> asynchronousListenerFuture) {
-            this.synchronousListenerFuture = synchronousListenerFuture;
-            this.asynchronousListenerFuture = asynchronousListenerFuture;
-        }
-
-        public Mono<Void> synchronousListenerFuture() {
-            return synchronousListenerFuture;
-        }
-
-        public Mono<Void> allListenerFuture() {
-            return synchronousListenerFuture
-                .concatWith(asynchronousListenerFuture)
-                .then();
-        }
-
-        public ExecutionStages combine(ExecutionStages another) {
-            return new ExecutionStages(
-                Flux.concat(this.synchronousListenerFuture, another.synchronousListenerFuture).then(),
-                Flux.concat(this.asynchronousListenerFuture, another.asynchronousListenerFuture).then());
-        }
-    }
-
-    ExecutionStages deliver(MailboxListener listener, Event event, DeliveryOption option);
+    Mono<Void> deliver(MailboxListener listener, Event event, DeliveryOption option);
 }
diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java
index 718d5f1..a36119f 100644
--- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java
+++ b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java
@@ -51,18 +51,17 @@ public class InVmEventDelivery implements EventDelivery {
     }
 
     @Override
-    public ExecutionStages deliver(MailboxListener listener, Event event, DeliveryOption option) {
+    public Mono<Void> deliver(MailboxListener listener, Event event, DeliveryOption option) {
         Mono<Void> executionResult = deliverByOption(listener, event, option);
 
-        return toExecutionStages(listener.getExecutionMode(), executionResult);
+        return waitForResultIfNeeded(listener.getExecutionMode(), executionResult);
     }
 
-    private ExecutionStages toExecutionStages(MailboxListener.ExecutionMode executionMode, Mono<Void> executionResult) {
+    private Mono<Void> waitForResultIfNeeded(MailboxListener.ExecutionMode executionMode, Mono<Void> executionResult) {
         if (executionMode.equals(MailboxListener.ExecutionMode.SYNCHRONOUS)) {
-            return ExecutionStages.synchronous(executionResult);
+            return executionResult;
         }
-
-        return ExecutionStages.asynchronous(executionResult);
+        return executionResult.or(Mono.empty()).onErrorResume(throwable -> Mono.empty());
     }
 
     private Mono<Void> deliverByOption(MailboxListener listener, Event event, DeliveryOption deliveryOption) {
diff --git a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java
index 547e192..dfea972 100644
--- a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java
+++ b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java
@@ -63,7 +63,6 @@ class InVmEventDeliveryTest {
         void deliverShouldDeliverEvent() {
             when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
             inVmEventDelivery.deliver(listener, EVENT, DeliveryOption.none())
-                .allListenerFuture()
                 .block();
 
             assertThat(listener.numberOfEventCalls())
@@ -74,7 +73,6 @@ class InVmEventDeliveryTest {
         void deliverShouldReturnSuccessSynchronousMono() {
             when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
             assertThatCode(() -> inVmEventDelivery.deliver(listener, EVENT, DeliveryOption.none())
-                    .synchronousListenerFuture()
                     .block())
                 .doesNotThrowAnyException();
         }
@@ -86,7 +84,6 @@ class InVmEventDeliveryTest {
                 .when(listener).event(EVENT);
 
             assertThatThrownBy(() -> inVmEventDelivery.deliver(listener, EVENT, DeliveryOption.none())
-                .allListenerFuture()
                 .block())
             .isInstanceOf(RuntimeException.class);
 
@@ -101,7 +98,6 @@ class InVmEventDeliveryTest {
                 .when(listener).event(EVENT);
 
             assertThatThrownBy(() -> inVmEventDelivery.deliver(listener, EVENT, DeliveryOption.none())
-                .synchronousListenerFuture()
                 .block())
             .isInstanceOf(RuntimeException.class);
         }
@@ -114,7 +110,6 @@ class InVmEventDeliveryTest {
         void deliverShouldDeliverEvent() {
             when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
             inVmEventDelivery.deliver(listener, EVENT, DeliveryOption.none())
-                .allListenerFuture()
                 .block();
 
             assertThat(listener.numberOfEventCalls())
@@ -125,24 +120,19 @@ class InVmEventDeliveryTest {
         void deliverShouldReturnSuccessSynchronousMono() {
             when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
             assertThatCode(() -> inVmEventDelivery.deliver(listener, EVENT, DeliveryOption.none())
-                    .synchronousListenerFuture()
                     .block())
                 .doesNotThrowAnyException();
         }
 
         @Test
-        void deliverShouldNotDeliverWhenListenerGetException() {
+        void deliverShouldNotFailWhenListenerGetException() {
             when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
             doThrow(new RuntimeException())
                 .when(listener).event(EVENT);
 
-            assertThatThrownBy(() -> inVmEventDelivery.deliver(listener, EVENT, DeliveryOption.none())
-                .allListenerFuture()
+            assertThatCode(() -> inVmEventDelivery.deliver(listener, EVENT, DeliveryOption.none())
                 .block())
-            .isInstanceOf(RuntimeException.class);
-
-            assertThat(listener.numberOfEventCalls())
-                .isEqualTo(0);
+            .doesNotThrowAnyException();
         }
 
         @Test
@@ -152,7 +142,6 @@ class InVmEventDeliveryTest {
                 .when(listener).event(EVENT);
 
             assertThatCode(() -> inVmEventDelivery.deliver(listener, EVENT, DeliveryOption.none())
-                .synchronousListenerFuture()
                 .block())
             .doesNotThrowAnyException();
         }
@@ -174,7 +163,6 @@ class InVmEventDeliveryTest {
                 DeliveryOption.of(
                     BackoffRetryer.of(RetryBackoffConfiguration.DEFAULT, listener),
                     PermanentFailureHandler.NO_HANDLER))
-                .allListenerFuture()
                 .block();
 
             assertThat(listener.numberOfEventCalls())
@@ -193,7 +181,6 @@ class InVmEventDeliveryTest {
                 DeliveryOption.of(
                     Retryer.NO_RETRYER,
                     PermanentFailureHandler.StoreToDeadLetters.of(GROUP_A, deadLetter)))
-                .allListenerFuture()
                 .block();
 
             assertThat(deadLetter.groupsWithFailedEvents().toStream())
@@ -214,7 +201,6 @@ class InVmEventDeliveryTest {
                 DeliveryOption.of(
                     BackoffRetryer.of(RetryBackoffConfiguration.DEFAULT, listener),
                     PermanentFailureHandler.StoreToDeadLetters.of(GROUP_A, deadLetter)))
-                .allListenerFuture()
                 .block();
 
             SoftAssertions.assertSoftly(softy -> {
@@ -242,7 +228,6 @@ class InVmEventDeliveryTest {
                 DeliveryOption.of(
                     BackoffRetryer.of(RetryBackoffConfiguration.DEFAULT, listener),
                     PermanentFailureHandler.StoreToDeadLetters.of(GROUP_A, deadLetter)))
-                .allListenerFuture()
                 .block();
 
             SoftAssertions.assertSoftly(softy -> {
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
index 47844d7..6c1b586 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
@@ -90,13 +90,13 @@ class EventDispatcher {
         return Flux.fromIterable(keys)
             .flatMap(key -> localListenerRegistry.getLocalMailboxListeners(key)
                 .map(listener -> Tuples.of(key, listener)))
-            .filter(pair -> pair.getT2().getExecutionMode().equals(MailboxListener.ExecutionMode.SYNCHRONOUS))
-            .flatMap(pair -> executeListener(event, pair.getT2(), pair.getT1()))
+            .filter(pair -> pair.getT2().getExecutionMode() == MailboxListener.ExecutionMode.SYNCHRONOUS)
+            .flatMap(pair -> executeListener(event, pair.getT2(), pair.getT1()).subscribeOn(Schedulers.elastic()))
             .then();
     }
 
     private Mono<Void> executeListener(Event event, MailboxListener mailboxListener, RegistrationKey registrationKey) {
-        return Mono.from((sink) -> {
+        return Mono.from(sink -> {
             try {
                 mailboxListenerExecutor.execute(mailboxListener,
                     MDCBuilder.create()


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org