You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2018/12/14 10:34:24 UTC

[08/18] james-project git commit: MAILBOX-364 Reactor implementation for EventDelivery

MAILBOX-364 Reactor implementation for EventDelivery

The API changed to:

 - Handle delivery to **a group** of listeners which is the only real world scenario
 - Let the caller choose which stage of the computation she wants to await - this is no more an implementation hardcoded behaviour.

Tests are rewritten accordingly.

We can then only keep a single implementation of EventDelivery

Also, we no longer need the never used "asynchronous" version of event delivery.


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/67e1fb90
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/67e1fb90
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/67e1fb90

Branch: refs/heads/master
Commit: 67e1fb9077173d5632c48cdd88bc420f56e182e3
Parents: 61e8246
Author: Benoit Tellier <bt...@linagora.com>
Authored: Wed Dec 12 17:13:31 2018 +0700
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Fri Dec 14 17:09:20 2018 +0700

----------------------------------------------------------------------
 mailbox/event/event-memory/pom.xml              |   9 +-
 .../delivery/AsynchronousEventDelivery.java     |  51 ------
 .../mailbox/events/delivery/EventDelivery.java  |  24 ++-
 .../events/delivery/EventDeliveryImpl.java      |  90 +++++++++++
 .../events/delivery/MixedEventDelivery.java     |  52 ------
 .../delivery/SynchronousEventDelivery.java      |  55 -------
 .../mailbox/events/delivery/VoidMarker.java     |  25 +++
 .../delivery/AsynchronousEventDeliveryTest.java |  83 ----------
 .../events/delivery/EventDeliveryImplTest.java  | 158 +++++++++++++++++++
 .../events/delivery/MixedEventDeliveryTest.java |  86 ----------
 .../delivery/SynchronousEventDeliveryTest.java  |  60 -------
 .../resources/META-INF/spring/event-system.xml  |  12 +-
 .../event/DefaultDelegatingMailboxListener.java |  41 ++---
 .../modules/mailbox/DefaultEventModule.java     |   9 +-
 14 files changed, 324 insertions(+), 431 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/67e1fb90/mailbox/event/event-memory/pom.xml
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/pom.xml b/mailbox/event/event-memory/pom.xml
index 4047591..7460ace 100644
--- a/mailbox/event/event-memory/pom.xml
+++ b/mailbox/event/event-memory/pom.xml
@@ -43,6 +43,10 @@
             <type>test-jar</type>
         </dependency>
         <dependency>
+            <groupId>io.projectreactor</groupId>
+            <artifactId>reactor-core</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.apache.james</groupId>
             <artifactId>metrics-api</artifactId>
         </dependency>
@@ -66,11 +70,6 @@
             <artifactId>mockito-core</artifactId>
             <scope>test</scope>
         </dependency>
-        <dependency>
-            <groupId>org.junit.vintage</groupId>
-            <artifactId>junit-vintage-engine</artifactId>
-            <scope>test</scope>
-        </dependency>
     </dependencies>
 
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/67e1fb90/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/AsynchronousEventDelivery.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/AsynchronousEventDelivery.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/AsynchronousEventDelivery.java
deleted file mode 100644
index 028436c..0000000
--- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/AsynchronousEventDelivery.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-
-package org.apache.james.mailbox.events.delivery;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-
-import javax.annotation.PreDestroy;
-
-import org.apache.james.mailbox.Event;
-import org.apache.james.mailbox.MailboxListener;
-import org.apache.james.util.concurrent.NamedThreadFactory;
-
-public class AsynchronousEventDelivery implements EventDelivery {
-    private final ExecutorService threadPoolExecutor;
-    private final SynchronousEventDelivery synchronousEventDelivery;
-
-    public AsynchronousEventDelivery(int threadPoolSize, SynchronousEventDelivery synchronousEventDelivery) {
-        ThreadFactory threadFactory = NamedThreadFactory.withClassName(getClass());
-        this.threadPoolExecutor = Executors.newFixedThreadPool(threadPoolSize, threadFactory);
-        this.synchronousEventDelivery = synchronousEventDelivery;
-    }
-
-    @Override
-    public void deliver(MailboxListener mailboxListener, Event event) {
-        threadPoolExecutor.submit(() -> synchronousEventDelivery.deliver(mailboxListener, event));
-    }
-
-    @PreDestroy
-    public void stop() {
-        threadPoolExecutor.shutdownNow();
-    }
-}

http://git-wip-us.apache.org/repos/asf/james-project/blob/67e1fb90/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java
index 56481e7..ae38890 100644
--- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java
+++ b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java
@@ -19,11 +19,33 @@
 
 package org.apache.james.mailbox.events.delivery;
 
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
 import org.apache.james.mailbox.Event;
 import org.apache.james.mailbox.MailboxListener;
 
 public interface EventDelivery {
+    class ExecutionStages {
+        private final CompletableFuture<Void> synchronousListenerFuture;
+        private final CompletableFuture<Void> asynchronousListenerFuture;
+
+        ExecutionStages(CompletableFuture<Void> synchronousListenerFuture, CompletableFuture<Void> asynchronousListenerFuture) {
+            this.synchronousListenerFuture = synchronousListenerFuture;
+            this.asynchronousListenerFuture = asynchronousListenerFuture;
+        }
+
+        public CompletableFuture<Void> synchronousListenerFuture() {
+            return synchronousListenerFuture;
+        }
+
+        public CompletableFuture<Void> allListenerFuture() {
+            return CompletableFuture.allOf(
+                synchronousListenerFuture,
+                asynchronousListenerFuture);
+        }
+    }
 
-    void deliver(MailboxListener mailboxListener, Event event);
 
+    ExecutionStages deliver(Collection<MailboxListener> mailboxListeners, Event event);
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/67e1fb90/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDeliveryImpl.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDeliveryImpl.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDeliveryImpl.java
new file mode 100644
index 0000000..37f26ee
--- /dev/null
+++ b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDeliveryImpl.java
@@ -0,0 +1,90 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events.delivery;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.james.mailbox.Event;
+import org.apache.james.mailbox.MailboxListener;
+import org.apache.james.metrics.api.MetricFactory;
+import org.apache.james.metrics.api.TimeMetric;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.steveash.guavate.Guavate;
+import com.google.common.collect.ImmutableList;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
+public class EventDeliveryImpl implements EventDelivery {
+    private static final Logger LOGGER = LoggerFactory.getLogger(EventDeliveryImpl.class);
+
+    private final MetricFactory metricFactory;
+
+    public EventDeliveryImpl(MetricFactory metricFactory) {
+        this.metricFactory = metricFactory;
+    }
+
+    @Override
+    public ExecutionStages deliver(Collection<MailboxListener> mailboxListeners, Event event) {
+        CompletableFuture<Void> synchronousListeners = doDeliver(
+            filterByExecutionMode(mailboxListeners, MailboxListener.ExecutionMode.SYNCHRONOUS),
+            event);
+        CompletableFuture<Void> asyncListener = doDeliver(
+            filterByExecutionMode(mailboxListeners, MailboxListener.ExecutionMode.ASYNCHRONOUS),
+            event);
+
+        return new ExecutionStages(synchronousListeners, asyncListener);
+    }
+
+    private ImmutableList<MailboxListener> filterByExecutionMode(Collection<MailboxListener> mailboxListeners, MailboxListener.ExecutionMode executionMode) {
+        return mailboxListeners.stream()
+            .filter(listener -> listener.getExecutionMode() == executionMode)
+            .collect(Guavate.toImmutableList());
+    }
+
+    private CompletableFuture<Void> doDeliver(Collection<MailboxListener> mailboxListeners, Event event) {
+        return Flux.fromIterable(mailboxListeners)
+            .flatMap(mailboxListener -> Mono.fromCallable(() ->  {
+                doDeliverToListener(mailboxListener, event);
+                return VoidMarker.IMPL;
+            }))
+            .then()
+            .subscribeOn(Schedulers.elastic())
+            .toFuture();
+    }
+
+    private void doDeliverToListener(MailboxListener mailboxListener, Event event) {
+        TimeMetric timer = metricFactory.timer("mailbox-listener-" + mailboxListener.getClass().getSimpleName());
+        try {
+            mailboxListener.event(event);
+        } catch (Throwable throwable) {
+            LOGGER.error("Error while processing listener {} for {}",
+                mailboxListener.getClass().getCanonicalName(), event.getClass().getCanonicalName(),
+                throwable);
+        } finally {
+            timer.stopAndPublish();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/67e1fb90/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/MixedEventDelivery.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/MixedEventDelivery.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/MixedEventDelivery.java
deleted file mode 100644
index f8df0e5..0000000
--- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/MixedEventDelivery.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-
-package org.apache.james.mailbox.events.delivery;
-
-import javax.annotation.PreDestroy;
-
-import org.apache.james.mailbox.Event;
-import org.apache.james.mailbox.MailboxListener;
-
-public class MixedEventDelivery implements EventDelivery {
-
-    private final AsynchronousEventDelivery asynchronousEventDelivery;
-    private final SynchronousEventDelivery synchronousEventDelivery;
-
-    public MixedEventDelivery(AsynchronousEventDelivery asynchronousEventDelivery,
-                              SynchronousEventDelivery synchronousEventDelivery) {
-        this.asynchronousEventDelivery = asynchronousEventDelivery;
-        this.synchronousEventDelivery = synchronousEventDelivery;
-    }
-
-    @Override
-    public void deliver(MailboxListener mailboxListener, Event event) {
-        if (mailboxListener.getExecutionMode().equals(MailboxListener.ExecutionMode.SYNCHRONOUS)) {
-            synchronousEventDelivery.deliver(mailboxListener, event);
-        } else {
-            asynchronousEventDelivery.deliver(mailboxListener, event);
-        }
-    }
-
-    @PreDestroy
-    public void stop() {
-        asynchronousEventDelivery.stop();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/james-project/blob/67e1fb90/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/SynchronousEventDelivery.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/SynchronousEventDelivery.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/SynchronousEventDelivery.java
deleted file mode 100644
index 77e735d..0000000
--- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/SynchronousEventDelivery.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-
-package org.apache.james.mailbox.events.delivery;
-
-import javax.inject.Inject;
-
-import org.apache.james.mailbox.Event;
-import org.apache.james.mailbox.MailboxListener;
-import org.apache.james.metrics.api.MetricFactory;
-import org.apache.james.metrics.api.TimeMetric;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class SynchronousEventDelivery implements EventDelivery {
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(SynchronousEventDelivery.class);
-
-    private final MetricFactory metricFactory;
-
-    @Inject
-    public SynchronousEventDelivery(MetricFactory metricFactory) {
-        this.metricFactory = metricFactory;
-    }
-
-    @Override
-    public void deliver(MailboxListener mailboxListener, Event event) {
-        TimeMetric timer = metricFactory.timer("mailbox-listener-" + mailboxListener.getClass().getSimpleName());
-        try {
-            mailboxListener.event(event);
-        } catch (Throwable throwable) {
-            LOGGER.error("Error while processing listener {} for {}",
-                    mailboxListener.getClass().getCanonicalName(), event.getClass().getCanonicalName(),
-                    throwable);
-        } finally {
-            timer.stopAndPublish();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/james-project/blob/67e1fb90/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/VoidMarker.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/VoidMarker.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/VoidMarker.java
new file mode 100644
index 0000000..5a504a5
--- /dev/null
+++ b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/VoidMarker.java
@@ -0,0 +1,25 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events.delivery;
+
+public interface VoidMarker {
+    VoidMarker IMPL = () -> null;
+    Void toVoid();
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/67e1fb90/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/AsynchronousEventDeliveryTest.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/AsynchronousEventDeliveryTest.java b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/AsynchronousEventDeliveryTest.java
deleted file mode 100644
index 7a806e2..0000000
--- a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/AsynchronousEventDeliveryTest.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-
-package org.apache.james.mailbox.events.delivery;
-
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.verify;
-
-import java.util.concurrent.TimeUnit;
-
-import org.apache.james.mailbox.MailboxListener;
-import org.apache.james.mailbox.mock.MockMailboxSession;
-import org.apache.james.metrics.api.NoopMetricFactory;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class AsynchronousEventDeliveryTest {
-
-    private static final int ONE_MINUTE = (int) TimeUnit.MINUTES.toMillis(1);
-    private MailboxListener mailboxListener;
-    private AsynchronousEventDelivery asynchronousEventDelivery;
-
-    @Before
-    public void setUp() {
-        mailboxListener = mock(MailboxListener.class);
-        asynchronousEventDelivery = new AsynchronousEventDelivery(2,
-            new SynchronousEventDelivery(new NoopMetricFactory()));
-    }
-
-    @After
-    public void tearDown() {
-        asynchronousEventDelivery.stop();
-    }
-
-    @Test
-    public void deliverShouldWork() {
-        MailboxListener.MailboxEvent event = new MailboxListener.MailboxEvent(null, null, null, null) {};
-        asynchronousEventDelivery.deliver(mailboxListener, event);
-        verify(mailboxListener, timeout(ONE_MINUTE)).event(event);
-    }
-
-    @Test
-    public void deliverShouldNotPropagateException() {
-        MockMailboxSession session = new MockMailboxSession("test");
-        MailboxListener.MailboxEvent event = new MailboxListener.MailboxEvent(session.getSessionId(),
-            session.getUser().getCoreUser(), null, null) {};
-        doThrow(new RuntimeException()).when(mailboxListener).event(event);
-        asynchronousEventDelivery.deliver(mailboxListener, event);
-        verify(mailboxListener, timeout(ONE_MINUTE)).event(event);
-    }
-
-    @Test
-    public void deliverShouldWorkWhenThePoolIsFull() {
-        MockMailboxSession session = new MockMailboxSession("test");
-        MailboxListener.MailboxEvent event = new MailboxListener.MailboxEvent(session.getSessionId(),
-            session.getUser().getCoreUser(), null, null) {};
-        int operationCount = 10;
-        for (int i = 0; i < operationCount; i++) {
-            asynchronousEventDelivery.deliver(mailboxListener, event);
-        }
-        verify(mailboxListener, timeout(ONE_MINUTE).times(operationCount)).event(event);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/james-project/blob/67e1fb90/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/EventDeliveryImplTest.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/EventDeliveryImplTest.java b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/EventDeliveryImplTest.java
new file mode 100644
index 0000000..ccf8799
--- /dev/null
+++ b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/EventDeliveryImplTest.java
@@ -0,0 +1,158 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events.delivery;
+
+import static org.junit.jupiter.api.Assertions.assertTimeout;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.time.Duration;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.james.mailbox.MailboxListener;
+import org.apache.james.metrics.api.NoopMetricFactory;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import com.google.common.collect.ImmutableList;
+
+class EventDeliveryImplTest {
+    private static final int DELIVERY_DELAY = (int) TimeUnit.MILLISECONDS.toMillis(100);
+
+    private EventDeliveryImpl eventDeliveryImpl;
+    private MailboxListener listener;
+    private MailboxListener listener2;
+    private MailboxListener.MailboxEvent event;
+
+    @BeforeEach
+    void setUp() {
+        event = mock(MailboxListener.MailboxEvent.class);
+        listener = mock(MailboxListener.class);
+        listener2 = mock(MailboxListener.class);
+        eventDeliveryImpl = new EventDeliveryImpl(new NoopMetricFactory());
+    }
+
+    @Test
+    void deliverShouldHaveCalledSynchronousListenersWhenAllListenerExecutedJoined() {
+        when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
+
+        eventDeliveryImpl.deliver(ImmutableList.of(listener), event).allListenerFuture().join();
+
+        verify(listener).event(event);
+    }
+
+    @Test
+    void deliverShouldHaveCalledAsynchronousListenersWhenAllListenerExecutedJoined() {
+        when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
+
+        eventDeliveryImpl.deliver(ImmutableList.of(listener), event).allListenerFuture().join();
+
+        verify(listener).event(event);
+    }
+
+    @Test
+    void deliverShouldHaveCalledSynchronousListenersWhenSynchronousListenerExecutedJoined() {
+        when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
+
+        eventDeliveryImpl.deliver(ImmutableList.of(listener), event).synchronousListenerFuture().join();
+
+        verify(listener).event(event);
+    }
+
+    @Test
+    void deliverShouldNotBlockObAsynchronousListenersWhenSynchronousListenerExecutedJoined() {
+        when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
+        CountDownLatch latch = new CountDownLatch(1);
+        doAnswer(invocation -> {
+            latch.await();
+            return null;
+        }).when(listener).event(event);
+
+        assertTimeout(Duration.ofSeconds(2),
+            () -> {
+                eventDeliveryImpl.deliver(ImmutableList.of(listener), event).synchronousListenerFuture().join();
+                latch.countDown();
+            });
+    }
+
+    @Test
+    void deliverShouldNotBlockOnSynchronousListenersWhenNoJoin() {
+        when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
+        CountDownLatch latch = new CountDownLatch(1);
+        doAnswer(invocation -> {
+            latch.await();
+            return null;
+        }).when(listener).event(event);
+
+        assertTimeout(Duration.ofSeconds(2),
+            () -> {
+                eventDeliveryImpl.deliver(ImmutableList.of(listener), event);
+                latch.countDown();
+            });
+    }
+
+    @Test
+    void deliverShouldNotBlockOnAsynchronousListenersWhenNoJoin() {
+        when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
+        CountDownLatch latch = new CountDownLatch(1);
+        doAnswer(invocation -> {
+            latch.await();
+            return null;
+        }).when(listener).event(event);
+
+        assertTimeout(Duration.ofSeconds(2),
+            () -> {
+                eventDeliveryImpl.deliver(ImmutableList.of(listener), event);
+                latch.countDown();
+            });
+    }
+
+    @Test
+    void deliverShouldEventuallyDeliverAsynchronousListenersWhenSynchronousListenerExecutedJoined() {
+        when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
+
+        eventDeliveryImpl.deliver(ImmutableList.of(listener), event).synchronousListenerFuture().join();
+
+        verify(listener, timeout(DELIVERY_DELAY * 10)).event(event);
+    }
+
+    @Test
+    void deliverShouldEventuallyDeliverSynchronousListenersWhenNoJoin() {
+        when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
+
+        eventDeliveryImpl.deliver(ImmutableList.of(listener), event);
+
+        verify(listener, timeout(DELIVERY_DELAY * 10)).event(event);
+    }
+
+    @Test
+    void deliverShouldCallSynchronousListenersWhenAsynchronousListenersAreAlsoRegistered() {
+        when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
+        when(listener2.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
+
+        eventDeliveryImpl.deliver(ImmutableList.of(listener, listener2), event).synchronousListenerFuture().join();
+
+        verify(listener2).event(event);
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/67e1fb90/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/MixedEventDeliveryTest.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/MixedEventDeliveryTest.java b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/MixedEventDeliveryTest.java
deleted file mode 100644
index b697019..0000000
--- a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/MixedEventDeliveryTest.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-
-package org.apache.james.mailbox.events.delivery;
-
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.james.mailbox.MailboxListener;
-import org.apache.james.metrics.api.NoopMetricFactory;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class MixedEventDeliveryTest {
-
-    private static final int DELIVERY_DELAY = (int) TimeUnit.MILLISECONDS.toMillis(100);
-    private static final long ONE_MINUTE = 60000;
-    private MixedEventDelivery mixedEventDelivery;
-    private MailboxListener listener;
-
-    @Before
-    public void setUp() {
-        listener = mock(MailboxListener.class);
-        SynchronousEventDelivery synchronousEventDelivery = new SynchronousEventDelivery(new NoopMetricFactory());
-        AsynchronousEventDelivery asynchronousEventDelivery = new AsynchronousEventDelivery(2, synchronousEventDelivery);
-        mixedEventDelivery = new MixedEventDelivery(asynchronousEventDelivery, synchronousEventDelivery);
-    }
-
-    @After
-    public void tearDown() {
-        mixedEventDelivery.stop();
-    }
-
-    @Test
-    public void deliverShouldWorkOnSynchronousListeners() {
-        when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
-        MailboxListener.MailboxEvent event = new MailboxListener.MailboxEvent(null, null, null, null) {};
-        mixedEventDelivery.deliver(listener, event);
-        verify(listener).event(event);
-    }
-
-    @Test
-    public void deliverShouldEventuallyDeliverOnAsynchronousListeners() {
-        MailboxListener.MailboxEvent event = new MailboxListener.MailboxEvent(null, null, null, null) {};
-        when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
-        mixedEventDelivery.deliver(listener, event);
-        verify(listener, timeout(DELIVERY_DELAY * 10)).event(event);
-    }
-
-    @Test(timeout = ONE_MINUTE)
-    public void deliverShouldNotBlockOnAsynchronousListeners() {
-        MailboxListener.MailboxEvent event = new MailboxListener.MailboxEvent(null, null, null, null) {};
-        when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
-        final CountDownLatch latch = new CountDownLatch(1);
-        doAnswer(invocation -> {
-            latch.await();
-            return null;
-        }).when(listener).event(event);
-        mixedEventDelivery.deliver(listener, event);
-        latch.countDown();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/james-project/blob/67e1fb90/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/SynchronousEventDeliveryTest.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/SynchronousEventDeliveryTest.java b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/SynchronousEventDeliveryTest.java
deleted file mode 100644
index 95c4dfc..0000000
--- a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/SynchronousEventDeliveryTest.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-
-package org.apache.james.mailbox.events.delivery;
-
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-
-import org.apache.james.mailbox.MailboxListener;
-import org.apache.james.mailbox.mock.MockMailboxSession;
-import org.apache.james.metrics.api.NoopMetricFactory;
-import org.junit.Before;
-import org.junit.Test;
-
-public class SynchronousEventDeliveryTest {
-
-    private MailboxListener mailboxListener;
-    private SynchronousEventDelivery synchronousEventDelivery;
-
-    @Before
-    public void setUp() {
-        mailboxListener = mock(MailboxListener.class);
-        synchronousEventDelivery = new SynchronousEventDelivery(new NoopMetricFactory());
-    }
-
-    @Test
-    public void deliverShouldWork() {
-        MailboxListener.MailboxEvent event = new MailboxListener.MailboxEvent(null, null, null, null) {};
-        synchronousEventDelivery.deliver(mailboxListener, event);
-        verify(mailboxListener).event(event);
-    }
-
-    @Test
-    public void deliverShouldNotPropagateException() {
-        MockMailboxSession session = new MockMailboxSession("test");
-        MailboxListener.MailboxEvent event = new MailboxListener.MailboxEvent(session.getSessionId(),
-            session.getUser().getCoreUser(),null, null) {};
-        doThrow(new RuntimeException()).when(mailboxListener).event(event);
-        synchronousEventDelivery.deliver(mailboxListener, event);
-        verify(mailboxListener).event(event);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/james-project/blob/67e1fb90/mailbox/spring/src/main/resources/META-INF/spring/event-system.xml
----------------------------------------------------------------------
diff --git a/mailbox/spring/src/main/resources/META-INF/spring/event-system.xml b/mailbox/spring/src/main/resources/META-INF/spring/event-system.xml
index 4ed8ce1..68ac412 100644
--- a/mailbox/spring/src/main/resources/META-INF/spring/event-system.xml
+++ b/mailbox/spring/src/main/resources/META-INF/spring/event-system.xml
@@ -28,20 +28,10 @@
         <constructor-arg index="1" ref="event-registry"/>
     </bean>
 
-    <bean id="synchronous-event-delivery" class="org.apache.james.mailbox.events.delivery.SynchronousEventDelivery" lazy-init="true">
+    <bean id="event-delivery" class="org.apache.james.mailbox.events.delivery.EventDeliveryImpl" lazy-init="true">
         <constructor-arg index="0" ref="metricFactory"/>
     </bean>
 
     <bean id="event-registry" class="org.apache.james.mailbox.store.event.MailboxListenerRegistry"/>
 
-    <bean id="asynchronous-event-delivery" class="org.apache.james.mailbox.events.delivery.AsynchronousEventDelivery" lazy-init="true">
-        <constructor-arg index="0" ref="${event.delivery.thread.count}"/>
-        <constructor-arg index="1" ref="synchronous-event-delivery"/>
-    </bean>
-
-    <bean id="event-delivery" class="org.apache.james.mailbox.events.delivery.MixedEventDelivery" lazy-init="true">
-        <constructor-arg index="0" ref="asynchronous-event-delivery"/>
-        <constructor-arg index="1" ref="synchronous-event-delivery"/>
-    </bean>
-
 </beans>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/james-project/blob/67e1fb90/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/DefaultDelegatingMailboxListener.java
----------------------------------------------------------------------
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/DefaultDelegatingMailboxListener.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/DefaultDelegatingMailboxListener.java
index 59b91cc..3a7c150 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/DefaultDelegatingMailboxListener.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/DefaultDelegatingMailboxListener.java
@@ -26,11 +26,14 @@ import javax.inject.Inject;
 import org.apache.james.mailbox.Event;
 import org.apache.james.mailbox.MailboxListener;
 import org.apache.james.mailbox.MailboxSession;
+import org.apache.james.mailbox.events.delivery.EventDelivery;
+import org.apache.james.mailbox.events.delivery.EventDeliveryImpl;
 import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.model.MailboxId;
 import org.apache.james.metrics.api.NoopMetricFactory;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
 
 /**
  * Receive a {@link org.apache.james.mailbox.MailboxListener.MailboxEvent} and delegate it to an other
@@ -50,7 +53,7 @@ public class DefaultDelegatingMailboxListener implements DelegatingMailboxListen
 
     @VisibleForTesting
     public DefaultDelegatingMailboxListener() {
-        this(new SynchronousEventDelivery(new NoopMetricFactory()),
+        this(new EventDeliveryImpl(new NoopMetricFactory()),
             new MailboxListenerRegistry());
     }
 
@@ -88,30 +91,28 @@ public class DefaultDelegatingMailboxListener implements DelegatingMailboxListen
 
     @Override
     public void event(Event event) {
-        deliverEventToGlobalListeners(event);
-        if (event instanceof MailboxEvent) {
-            mailboxEvent((MailboxEvent) event);
-        }
-    }
-
-    private void mailboxEvent(MailboxEvent mailboxEvent) {
-        Collection<MailboxListener> listenerSnapshot = registry.getLocalMailboxListeners(mailboxEvent.getMailboxId());
-        if (mailboxEvent instanceof MailboxDeletion && listenerSnapshot.size() > 0) {
-            registry.deleteRegistryFor(mailboxEvent.getMailboxId());
+        ImmutableList<MailboxListener> listeners = ImmutableList.<MailboxListener>builder()
+            .addAll(registry.getGlobalListeners())
+            .addAll(registeredMailboxListeners(event))
+            .build();
+
+        eventDelivery.deliver(listeners, event)
+            .synchronousListenerFuture()
+            .join();
+
+        if (event instanceof MailboxDeletion) {
+            MailboxDeletion deletion = (MailboxDeletion) event;
+            registry.deleteRegistryFor(deletion.getMailboxId());
         }
-        deliverEventToMailboxListeners(mailboxEvent, listenerSnapshot);
     }
 
-    protected void deliverEventToMailboxListeners(MailboxEvent event, Collection<MailboxListener> listenerSnapshot) {
-        for (MailboxListener listener : listenerSnapshot) {
-            eventDelivery.deliver(listener, event);
-        }
-    }
+    private Collection<MailboxListener> registeredMailboxListeners(Event event) {
+        if (event instanceof MailboxEvent) {
+            MailboxEvent mailboxEvent = (MailboxEvent) event;
 
-    protected void deliverEventToGlobalListeners(Event event) {
-        for (MailboxListener mailboxListener : registry.getGlobalListeners()) {
-            eventDelivery.deliver(mailboxListener, event);
+            return registry.getLocalMailboxListeners(mailboxEvent.getMailboxId());
         }
+        return ImmutableList.of();
     }
 
     public MailboxListenerRegistry getRegistry() {

http://git-wip-us.apache.org/repos/asf/james-project/blob/67e1fb90/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java
----------------------------------------------------------------------
diff --git a/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java b/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java
index a139079..00be62d 100644
--- a/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java
+++ b/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java
@@ -27,10 +27,8 @@ import javax.inject.Inject;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.james.lifecycle.api.Configurable;
 import org.apache.james.mailbox.MailboxListener;
-import org.apache.james.mailbox.events.delivery.AsynchronousEventDelivery;
 import org.apache.james.mailbox.events.delivery.EventDelivery;
-import org.apache.james.mailbox.events.delivery.MixedEventDelivery;
-import org.apache.james.mailbox.events.delivery.SynchronousEventDelivery;
+import org.apache.james.mailbox.events.delivery.EventDeliveryImpl;
 import org.apache.james.mailbox.store.event.DefaultDelegatingMailboxListener;
 import org.apache.james.mailbox.store.event.DelegatingMailboxListener;
 import org.apache.james.mailbox.store.event.MailboxAnnotationListener;
@@ -73,10 +71,7 @@ public class DefaultEventModule extends AbstractModule {
     EventDelivery provideEventDelivery(ConfigurationProvider configurationProvider, MetricFactory metricFactory) {
         int poolSize = retrievePoolSize(configurationProvider);
 
-        SynchronousEventDelivery synchronousEventDelivery = new SynchronousEventDelivery(metricFactory);
-        return new MixedEventDelivery(
-            new AsynchronousEventDelivery(poolSize, synchronousEventDelivery),
-            synchronousEventDelivery);
+        return new EventDeliveryImpl(metricFactory);
     }
 
     private int retrievePoolSize(ConfigurationProvider configurationProvider) {


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org