You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2018/12/14 10:34:24 UTC
[08/18] james-project git commit: MAILBOX-364 Reactor implementation
for EventDelivery
MAILBOX-364 Reactor implementation for EventDelivery
The API changed to:
- Handle delivery to **a group** of listeners which is the only real world scenario
- Let the caller choose which stage of the computation she wants to await - this is no more an implementation hardcoded behaviour.
Tests are rewritten accordingly.
We can then only keep a single implementation of EventDelivery
Also, we no longer need the never used "asynchronous" version of event delivery.
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/67e1fb90
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/67e1fb90
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/67e1fb90
Branch: refs/heads/master
Commit: 67e1fb9077173d5632c48cdd88bc420f56e182e3
Parents: 61e8246
Author: Benoit Tellier <bt...@linagora.com>
Authored: Wed Dec 12 17:13:31 2018 +0700
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Fri Dec 14 17:09:20 2018 +0700
----------------------------------------------------------------------
mailbox/event/event-memory/pom.xml | 9 +-
.../delivery/AsynchronousEventDelivery.java | 51 ------
.../mailbox/events/delivery/EventDelivery.java | 24 ++-
.../events/delivery/EventDeliveryImpl.java | 90 +++++++++++
.../events/delivery/MixedEventDelivery.java | 52 ------
.../delivery/SynchronousEventDelivery.java | 55 -------
.../mailbox/events/delivery/VoidMarker.java | 25 +++
.../delivery/AsynchronousEventDeliveryTest.java | 83 ----------
.../events/delivery/EventDeliveryImplTest.java | 158 +++++++++++++++++++
.../events/delivery/MixedEventDeliveryTest.java | 86 ----------
.../delivery/SynchronousEventDeliveryTest.java | 60 -------
.../resources/META-INF/spring/event-system.xml | 12 +-
.../event/DefaultDelegatingMailboxListener.java | 41 ++---
.../modules/mailbox/DefaultEventModule.java | 9 +-
14 files changed, 324 insertions(+), 431 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/67e1fb90/mailbox/event/event-memory/pom.xml
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/pom.xml b/mailbox/event/event-memory/pom.xml
index 4047591..7460ace 100644
--- a/mailbox/event/event-memory/pom.xml
+++ b/mailbox/event/event-memory/pom.xml
@@ -43,6 +43,10 @@
<type>test-jar</type>
</dependency>
<dependency>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-core</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.apache.james</groupId>
<artifactId>metrics-api</artifactId>
</dependency>
@@ -66,11 +70,6 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.junit.vintage</groupId>
- <artifactId>junit-vintage-engine</artifactId>
- <scope>test</scope>
- </dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/james-project/blob/67e1fb90/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/AsynchronousEventDelivery.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/AsynchronousEventDelivery.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/AsynchronousEventDelivery.java
deleted file mode 100644
index 028436c..0000000
--- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/AsynchronousEventDelivery.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-
-package org.apache.james.mailbox.events.delivery;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-
-import javax.annotation.PreDestroy;
-
-import org.apache.james.mailbox.Event;
-import org.apache.james.mailbox.MailboxListener;
-import org.apache.james.util.concurrent.NamedThreadFactory;
-
-public class AsynchronousEventDelivery implements EventDelivery {
- private final ExecutorService threadPoolExecutor;
- private final SynchronousEventDelivery synchronousEventDelivery;
-
- public AsynchronousEventDelivery(int threadPoolSize, SynchronousEventDelivery synchronousEventDelivery) {
- ThreadFactory threadFactory = NamedThreadFactory.withClassName(getClass());
- this.threadPoolExecutor = Executors.newFixedThreadPool(threadPoolSize, threadFactory);
- this.synchronousEventDelivery = synchronousEventDelivery;
- }
-
- @Override
- public void deliver(MailboxListener mailboxListener, Event event) {
- threadPoolExecutor.submit(() -> synchronousEventDelivery.deliver(mailboxListener, event));
- }
-
- @PreDestroy
- public void stop() {
- threadPoolExecutor.shutdownNow();
- }
-}
http://git-wip-us.apache.org/repos/asf/james-project/blob/67e1fb90/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java
index 56481e7..ae38890 100644
--- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java
+++ b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java
@@ -19,11 +19,33 @@
package org.apache.james.mailbox.events.delivery;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
import org.apache.james.mailbox.Event;
import org.apache.james.mailbox.MailboxListener;
public interface EventDelivery {
+ class ExecutionStages {
+ private final CompletableFuture<Void> synchronousListenerFuture;
+ private final CompletableFuture<Void> asynchronousListenerFuture;
+
+ ExecutionStages(CompletableFuture<Void> synchronousListenerFuture, CompletableFuture<Void> asynchronousListenerFuture) {
+ this.synchronousListenerFuture = synchronousListenerFuture;
+ this.asynchronousListenerFuture = asynchronousListenerFuture;
+ }
+
+ public CompletableFuture<Void> synchronousListenerFuture() {
+ return synchronousListenerFuture;
+ }
+
+ public CompletableFuture<Void> allListenerFuture() {
+ return CompletableFuture.allOf(
+ synchronousListenerFuture,
+ asynchronousListenerFuture);
+ }
+ }
- void deliver(MailboxListener mailboxListener, Event event);
+ ExecutionStages deliver(Collection<MailboxListener> mailboxListeners, Event event);
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/67e1fb90/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDeliveryImpl.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDeliveryImpl.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDeliveryImpl.java
new file mode 100644
index 0000000..37f26ee
--- /dev/null
+++ b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDeliveryImpl.java
@@ -0,0 +1,90 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events.delivery;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.james.mailbox.Event;
+import org.apache.james.mailbox.MailboxListener;
+import org.apache.james.metrics.api.MetricFactory;
+import org.apache.james.metrics.api.TimeMetric;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.steveash.guavate.Guavate;
+import com.google.common.collect.ImmutableList;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
+public class EventDeliveryImpl implements EventDelivery {
+ private static final Logger LOGGER = LoggerFactory.getLogger(EventDeliveryImpl.class);
+
+ private final MetricFactory metricFactory;
+
+ public EventDeliveryImpl(MetricFactory metricFactory) {
+ this.metricFactory = metricFactory;
+ }
+
+ @Override
+ public ExecutionStages deliver(Collection<MailboxListener> mailboxListeners, Event event) {
+ CompletableFuture<Void> synchronousListeners = doDeliver(
+ filterByExecutionMode(mailboxListeners, MailboxListener.ExecutionMode.SYNCHRONOUS),
+ event);
+ CompletableFuture<Void> asyncListener = doDeliver(
+ filterByExecutionMode(mailboxListeners, MailboxListener.ExecutionMode.ASYNCHRONOUS),
+ event);
+
+ return new ExecutionStages(synchronousListeners, asyncListener);
+ }
+
+ private ImmutableList<MailboxListener> filterByExecutionMode(Collection<MailboxListener> mailboxListeners, MailboxListener.ExecutionMode executionMode) {
+ return mailboxListeners.stream()
+ .filter(listener -> listener.getExecutionMode() == executionMode)
+ .collect(Guavate.toImmutableList());
+ }
+
+ private CompletableFuture<Void> doDeliver(Collection<MailboxListener> mailboxListeners, Event event) {
+ return Flux.fromIterable(mailboxListeners)
+ .flatMap(mailboxListener -> Mono.fromCallable(() -> {
+ doDeliverToListener(mailboxListener, event);
+ return VoidMarker.IMPL;
+ }))
+ .then()
+ .subscribeOn(Schedulers.elastic())
+ .toFuture();
+ }
+
+ private void doDeliverToListener(MailboxListener mailboxListener, Event event) {
+ TimeMetric timer = metricFactory.timer("mailbox-listener-" + mailboxListener.getClass().getSimpleName());
+ try {
+ mailboxListener.event(event);
+ } catch (Throwable throwable) {
+ LOGGER.error("Error while processing listener {} for {}",
+ mailboxListener.getClass().getCanonicalName(), event.getClass().getCanonicalName(),
+ throwable);
+ } finally {
+ timer.stopAndPublish();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/james-project/blob/67e1fb90/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/MixedEventDelivery.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/MixedEventDelivery.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/MixedEventDelivery.java
deleted file mode 100644
index f8df0e5..0000000
--- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/MixedEventDelivery.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-
-package org.apache.james.mailbox.events.delivery;
-
-import javax.annotation.PreDestroy;
-
-import org.apache.james.mailbox.Event;
-import org.apache.james.mailbox.MailboxListener;
-
-public class MixedEventDelivery implements EventDelivery {
-
- private final AsynchronousEventDelivery asynchronousEventDelivery;
- private final SynchronousEventDelivery synchronousEventDelivery;
-
- public MixedEventDelivery(AsynchronousEventDelivery asynchronousEventDelivery,
- SynchronousEventDelivery synchronousEventDelivery) {
- this.asynchronousEventDelivery = asynchronousEventDelivery;
- this.synchronousEventDelivery = synchronousEventDelivery;
- }
-
- @Override
- public void deliver(MailboxListener mailboxListener, Event event) {
- if (mailboxListener.getExecutionMode().equals(MailboxListener.ExecutionMode.SYNCHRONOUS)) {
- synchronousEventDelivery.deliver(mailboxListener, event);
- } else {
- asynchronousEventDelivery.deliver(mailboxListener, event);
- }
- }
-
- @PreDestroy
- public void stop() {
- asynchronousEventDelivery.stop();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/james-project/blob/67e1fb90/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/SynchronousEventDelivery.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/SynchronousEventDelivery.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/SynchronousEventDelivery.java
deleted file mode 100644
index 77e735d..0000000
--- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/SynchronousEventDelivery.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-
-package org.apache.james.mailbox.events.delivery;
-
-import javax.inject.Inject;
-
-import org.apache.james.mailbox.Event;
-import org.apache.james.mailbox.MailboxListener;
-import org.apache.james.metrics.api.MetricFactory;
-import org.apache.james.metrics.api.TimeMetric;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class SynchronousEventDelivery implements EventDelivery {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(SynchronousEventDelivery.class);
-
- private final MetricFactory metricFactory;
-
- @Inject
- public SynchronousEventDelivery(MetricFactory metricFactory) {
- this.metricFactory = metricFactory;
- }
-
- @Override
- public void deliver(MailboxListener mailboxListener, Event event) {
- TimeMetric timer = metricFactory.timer("mailbox-listener-" + mailboxListener.getClass().getSimpleName());
- try {
- mailboxListener.event(event);
- } catch (Throwable throwable) {
- LOGGER.error("Error while processing listener {} for {}",
- mailboxListener.getClass().getCanonicalName(), event.getClass().getCanonicalName(),
- throwable);
- } finally {
- timer.stopAndPublish();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/james-project/blob/67e1fb90/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/VoidMarker.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/VoidMarker.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/VoidMarker.java
new file mode 100644
index 0000000..5a504a5
--- /dev/null
+++ b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/VoidMarker.java
@@ -0,0 +1,25 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events.delivery;
+
+public interface VoidMarker {
+ VoidMarker IMPL = () -> null;
+ Void toVoid();
+}
http://git-wip-us.apache.org/repos/asf/james-project/blob/67e1fb90/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/AsynchronousEventDeliveryTest.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/AsynchronousEventDeliveryTest.java b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/AsynchronousEventDeliveryTest.java
deleted file mode 100644
index 7a806e2..0000000
--- a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/AsynchronousEventDeliveryTest.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-
-package org.apache.james.mailbox.events.delivery;
-
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.verify;
-
-import java.util.concurrent.TimeUnit;
-
-import org.apache.james.mailbox.MailboxListener;
-import org.apache.james.mailbox.mock.MockMailboxSession;
-import org.apache.james.metrics.api.NoopMetricFactory;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class AsynchronousEventDeliveryTest {
-
- private static final int ONE_MINUTE = (int) TimeUnit.MINUTES.toMillis(1);
- private MailboxListener mailboxListener;
- private AsynchronousEventDelivery asynchronousEventDelivery;
-
- @Before
- public void setUp() {
- mailboxListener = mock(MailboxListener.class);
- asynchronousEventDelivery = new AsynchronousEventDelivery(2,
- new SynchronousEventDelivery(new NoopMetricFactory()));
- }
-
- @After
- public void tearDown() {
- asynchronousEventDelivery.stop();
- }
-
- @Test
- public void deliverShouldWork() {
- MailboxListener.MailboxEvent event = new MailboxListener.MailboxEvent(null, null, null, null) {};
- asynchronousEventDelivery.deliver(mailboxListener, event);
- verify(mailboxListener, timeout(ONE_MINUTE)).event(event);
- }
-
- @Test
- public void deliverShouldNotPropagateException() {
- MockMailboxSession session = new MockMailboxSession("test");
- MailboxListener.MailboxEvent event = new MailboxListener.MailboxEvent(session.getSessionId(),
- session.getUser().getCoreUser(), null, null) {};
- doThrow(new RuntimeException()).when(mailboxListener).event(event);
- asynchronousEventDelivery.deliver(mailboxListener, event);
- verify(mailboxListener, timeout(ONE_MINUTE)).event(event);
- }
-
- @Test
- public void deliverShouldWorkWhenThePoolIsFull() {
- MockMailboxSession session = new MockMailboxSession("test");
- MailboxListener.MailboxEvent event = new MailboxListener.MailboxEvent(session.getSessionId(),
- session.getUser().getCoreUser(), null, null) {};
- int operationCount = 10;
- for (int i = 0; i < operationCount; i++) {
- asynchronousEventDelivery.deliver(mailboxListener, event);
- }
- verify(mailboxListener, timeout(ONE_MINUTE).times(operationCount)).event(event);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/james-project/blob/67e1fb90/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/EventDeliveryImplTest.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/EventDeliveryImplTest.java b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/EventDeliveryImplTest.java
new file mode 100644
index 0000000..ccf8799
--- /dev/null
+++ b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/EventDeliveryImplTest.java
@@ -0,0 +1,158 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events.delivery;
+
+import static org.junit.jupiter.api.Assertions.assertTimeout;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.time.Duration;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.james.mailbox.MailboxListener;
+import org.apache.james.metrics.api.NoopMetricFactory;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import com.google.common.collect.ImmutableList;
+
+class EventDeliveryImplTest {
+ private static final int DELIVERY_DELAY = (int) TimeUnit.MILLISECONDS.toMillis(100);
+
+ private EventDeliveryImpl eventDeliveryImpl;
+ private MailboxListener listener;
+ private MailboxListener listener2;
+ private MailboxListener.MailboxEvent event;
+
+ @BeforeEach
+ void setUp() {
+ event = mock(MailboxListener.MailboxEvent.class);
+ listener = mock(MailboxListener.class);
+ listener2 = mock(MailboxListener.class);
+ eventDeliveryImpl = new EventDeliveryImpl(new NoopMetricFactory());
+ }
+
+ @Test
+ void deliverShouldHaveCalledSynchronousListenersWhenAllListenerExecutedJoined() {
+ when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
+
+ eventDeliveryImpl.deliver(ImmutableList.of(listener), event).allListenerFuture().join();
+
+ verify(listener).event(event);
+ }
+
+ @Test
+ void deliverShouldHaveCalledAsynchronousListenersWhenAllListenerExecutedJoined() {
+ when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
+
+ eventDeliveryImpl.deliver(ImmutableList.of(listener), event).allListenerFuture().join();
+
+ verify(listener).event(event);
+ }
+
+ @Test
+ void deliverShouldHaveCalledSynchronousListenersWhenSynchronousListenerExecutedJoined() {
+ when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
+
+ eventDeliveryImpl.deliver(ImmutableList.of(listener), event).synchronousListenerFuture().join();
+
+ verify(listener).event(event);
+ }
+
+ @Test
+ void deliverShouldNotBlockObAsynchronousListenersWhenSynchronousListenerExecutedJoined() {
+ when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
+ CountDownLatch latch = new CountDownLatch(1);
+ doAnswer(invocation -> {
+ latch.await();
+ return null;
+ }).when(listener).event(event);
+
+ assertTimeout(Duration.ofSeconds(2),
+ () -> {
+ eventDeliveryImpl.deliver(ImmutableList.of(listener), event).synchronousListenerFuture().join();
+ latch.countDown();
+ });
+ }
+
+ @Test
+ void deliverShouldNotBlockOnSynchronousListenersWhenNoJoin() {
+ when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
+ CountDownLatch latch = new CountDownLatch(1);
+ doAnswer(invocation -> {
+ latch.await();
+ return null;
+ }).when(listener).event(event);
+
+ assertTimeout(Duration.ofSeconds(2),
+ () -> {
+ eventDeliveryImpl.deliver(ImmutableList.of(listener), event);
+ latch.countDown();
+ });
+ }
+
+ @Test
+ void deliverShouldNotBlockOnAsynchronousListenersWhenNoJoin() {
+ when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
+ CountDownLatch latch = new CountDownLatch(1);
+ doAnswer(invocation -> {
+ latch.await();
+ return null;
+ }).when(listener).event(event);
+
+ assertTimeout(Duration.ofSeconds(2),
+ () -> {
+ eventDeliveryImpl.deliver(ImmutableList.of(listener), event);
+ latch.countDown();
+ });
+ }
+
+ @Test
+ void deliverShouldEventuallyDeliverAsynchronousListenersWhenSynchronousListenerExecutedJoined() {
+ when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
+
+ eventDeliveryImpl.deliver(ImmutableList.of(listener), event).synchronousListenerFuture().join();
+
+ verify(listener, timeout(DELIVERY_DELAY * 10)).event(event);
+ }
+
+ @Test
+ void deliverShouldEventuallyDeliverSynchronousListenersWhenNoJoin() {
+ when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
+
+ eventDeliveryImpl.deliver(ImmutableList.of(listener), event);
+
+ verify(listener, timeout(DELIVERY_DELAY * 10)).event(event);
+ }
+
+ @Test
+ void deliverShouldCallSynchronousListenersWhenAsynchronousListenersAreAlsoRegistered() {
+ when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
+ when(listener2.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
+
+ eventDeliveryImpl.deliver(ImmutableList.of(listener, listener2), event).synchronousListenerFuture().join();
+
+ verify(listener2).event(event);
+ }
+}
http://git-wip-us.apache.org/repos/asf/james-project/blob/67e1fb90/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/MixedEventDeliveryTest.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/MixedEventDeliveryTest.java b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/MixedEventDeliveryTest.java
deleted file mode 100644
index b697019..0000000
--- a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/MixedEventDeliveryTest.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-
-package org.apache.james.mailbox.events.delivery;
-
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.james.mailbox.MailboxListener;
-import org.apache.james.metrics.api.NoopMetricFactory;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class MixedEventDeliveryTest {
-
- private static final int DELIVERY_DELAY = (int) TimeUnit.MILLISECONDS.toMillis(100);
- private static final long ONE_MINUTE = 60000;
- private MixedEventDelivery mixedEventDelivery;
- private MailboxListener listener;
-
- @Before
- public void setUp() {
- listener = mock(MailboxListener.class);
- SynchronousEventDelivery synchronousEventDelivery = new SynchronousEventDelivery(new NoopMetricFactory());
- AsynchronousEventDelivery asynchronousEventDelivery = new AsynchronousEventDelivery(2, synchronousEventDelivery);
- mixedEventDelivery = new MixedEventDelivery(asynchronousEventDelivery, synchronousEventDelivery);
- }
-
- @After
- public void tearDown() {
- mixedEventDelivery.stop();
- }
-
- @Test
- public void deliverShouldWorkOnSynchronousListeners() {
- when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
- MailboxListener.MailboxEvent event = new MailboxListener.MailboxEvent(null, null, null, null) {};
- mixedEventDelivery.deliver(listener, event);
- verify(listener).event(event);
- }
-
- @Test
- public void deliverShouldEventuallyDeliverOnAsynchronousListeners() {
- MailboxListener.MailboxEvent event = new MailboxListener.MailboxEvent(null, null, null, null) {};
- when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
- mixedEventDelivery.deliver(listener, event);
- verify(listener, timeout(DELIVERY_DELAY * 10)).event(event);
- }
-
- @Test(timeout = ONE_MINUTE)
- public void deliverShouldNotBlockOnAsynchronousListeners() {
- MailboxListener.MailboxEvent event = new MailboxListener.MailboxEvent(null, null, null, null) {};
- when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
- final CountDownLatch latch = new CountDownLatch(1);
- doAnswer(invocation -> {
- latch.await();
- return null;
- }).when(listener).event(event);
- mixedEventDelivery.deliver(listener, event);
- latch.countDown();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/james-project/blob/67e1fb90/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/SynchronousEventDeliveryTest.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/SynchronousEventDeliveryTest.java b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/SynchronousEventDeliveryTest.java
deleted file mode 100644
index 95c4dfc..0000000
--- a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/SynchronousEventDeliveryTest.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-
-package org.apache.james.mailbox.events.delivery;
-
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-
-import org.apache.james.mailbox.MailboxListener;
-import org.apache.james.mailbox.mock.MockMailboxSession;
-import org.apache.james.metrics.api.NoopMetricFactory;
-import org.junit.Before;
-import org.junit.Test;
-
-public class SynchronousEventDeliveryTest {
-
- private MailboxListener mailboxListener;
- private SynchronousEventDelivery synchronousEventDelivery;
-
- @Before
- public void setUp() {
- mailboxListener = mock(MailboxListener.class);
- synchronousEventDelivery = new SynchronousEventDelivery(new NoopMetricFactory());
- }
-
- @Test
- public void deliverShouldWork() {
- MailboxListener.MailboxEvent event = new MailboxListener.MailboxEvent(null, null, null, null) {};
- synchronousEventDelivery.deliver(mailboxListener, event);
- verify(mailboxListener).event(event);
- }
-
- @Test
- public void deliverShouldNotPropagateException() {
- MockMailboxSession session = new MockMailboxSession("test");
- MailboxListener.MailboxEvent event = new MailboxListener.MailboxEvent(session.getSessionId(),
- session.getUser().getCoreUser(),null, null) {};
- doThrow(new RuntimeException()).when(mailboxListener).event(event);
- synchronousEventDelivery.deliver(mailboxListener, event);
- verify(mailboxListener).event(event);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/james-project/blob/67e1fb90/mailbox/spring/src/main/resources/META-INF/spring/event-system.xml
----------------------------------------------------------------------
diff --git a/mailbox/spring/src/main/resources/META-INF/spring/event-system.xml b/mailbox/spring/src/main/resources/META-INF/spring/event-system.xml
index 4ed8ce1..68ac412 100644
--- a/mailbox/spring/src/main/resources/META-INF/spring/event-system.xml
+++ b/mailbox/spring/src/main/resources/META-INF/spring/event-system.xml
@@ -28,20 +28,10 @@
<constructor-arg index="1" ref="event-registry"/>
</bean>
- <bean id="synchronous-event-delivery" class="org.apache.james.mailbox.events.delivery.SynchronousEventDelivery" lazy-init="true">
+ <bean id="event-delivery" class="org.apache.james.mailbox.events.delivery.EventDeliveryImpl" lazy-init="true">
<constructor-arg index="0" ref="metricFactory"/>
</bean>
<bean id="event-registry" class="org.apache.james.mailbox.store.event.MailboxListenerRegistry"/>
- <bean id="asynchronous-event-delivery" class="org.apache.james.mailbox.events.delivery.AsynchronousEventDelivery" lazy-init="true">
- <constructor-arg index="0" ref="${event.delivery.thread.count}"/>
- <constructor-arg index="1" ref="synchronous-event-delivery"/>
- </bean>
-
- <bean id="event-delivery" class="org.apache.james.mailbox.events.delivery.MixedEventDelivery" lazy-init="true">
- <constructor-arg index="0" ref="asynchronous-event-delivery"/>
- <constructor-arg index="1" ref="synchronous-event-delivery"/>
- </bean>
-
</beans>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/james-project/blob/67e1fb90/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/DefaultDelegatingMailboxListener.java
----------------------------------------------------------------------
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/DefaultDelegatingMailboxListener.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/DefaultDelegatingMailboxListener.java
index 59b91cc..3a7c150 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/DefaultDelegatingMailboxListener.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/DefaultDelegatingMailboxListener.java
@@ -26,11 +26,14 @@ import javax.inject.Inject;
import org.apache.james.mailbox.Event;
import org.apache.james.mailbox.MailboxListener;
import org.apache.james.mailbox.MailboxSession;
+import org.apache.james.mailbox.events.delivery.EventDelivery;
+import org.apache.james.mailbox.events.delivery.EventDeliveryImpl;
import org.apache.james.mailbox.exception.MailboxException;
import org.apache.james.mailbox.model.MailboxId;
import org.apache.james.metrics.api.NoopMetricFactory;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
/**
* Receive a {@link org.apache.james.mailbox.MailboxListener.MailboxEvent} and delegate it to an other
@@ -50,7 +53,7 @@ public class DefaultDelegatingMailboxListener implements DelegatingMailboxListen
@VisibleForTesting
public DefaultDelegatingMailboxListener() {
- this(new SynchronousEventDelivery(new NoopMetricFactory()),
+ this(new EventDeliveryImpl(new NoopMetricFactory()),
new MailboxListenerRegistry());
}
@@ -88,30 +91,28 @@ public class DefaultDelegatingMailboxListener implements DelegatingMailboxListen
@Override
public void event(Event event) {
- deliverEventToGlobalListeners(event);
- if (event instanceof MailboxEvent) {
- mailboxEvent((MailboxEvent) event);
- }
- }
-
- private void mailboxEvent(MailboxEvent mailboxEvent) {
- Collection<MailboxListener> listenerSnapshot = registry.getLocalMailboxListeners(mailboxEvent.getMailboxId());
- if (mailboxEvent instanceof MailboxDeletion && listenerSnapshot.size() > 0) {
- registry.deleteRegistryFor(mailboxEvent.getMailboxId());
+ ImmutableList<MailboxListener> listeners = ImmutableList.<MailboxListener>builder()
+ .addAll(registry.getGlobalListeners())
+ .addAll(registeredMailboxListeners(event))
+ .build();
+
+ eventDelivery.deliver(listeners, event)
+ .synchronousListenerFuture()
+ .join();
+
+ if (event instanceof MailboxDeletion) {
+ MailboxDeletion deletion = (MailboxDeletion) event;
+ registry.deleteRegistryFor(deletion.getMailboxId());
}
- deliverEventToMailboxListeners(mailboxEvent, listenerSnapshot);
}
- protected void deliverEventToMailboxListeners(MailboxEvent event, Collection<MailboxListener> listenerSnapshot) {
- for (MailboxListener listener : listenerSnapshot) {
- eventDelivery.deliver(listener, event);
- }
- }
+ private Collection<MailboxListener> registeredMailboxListeners(Event event) {
+ if (event instanceof MailboxEvent) {
+ MailboxEvent mailboxEvent = (MailboxEvent) event;
- protected void deliverEventToGlobalListeners(Event event) {
- for (MailboxListener mailboxListener : registry.getGlobalListeners()) {
- eventDelivery.deliver(mailboxListener, event);
+ return registry.getLocalMailboxListeners(mailboxEvent.getMailboxId());
}
+ return ImmutableList.of();
}
public MailboxListenerRegistry getRegistry() {
http://git-wip-us.apache.org/repos/asf/james-project/blob/67e1fb90/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java
----------------------------------------------------------------------
diff --git a/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java b/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java
index a139079..00be62d 100644
--- a/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java
+++ b/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java
@@ -27,10 +27,8 @@ import javax.inject.Inject;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.james.lifecycle.api.Configurable;
import org.apache.james.mailbox.MailboxListener;
-import org.apache.james.mailbox.events.delivery.AsynchronousEventDelivery;
import org.apache.james.mailbox.events.delivery.EventDelivery;
-import org.apache.james.mailbox.events.delivery.MixedEventDelivery;
-import org.apache.james.mailbox.events.delivery.SynchronousEventDelivery;
+import org.apache.james.mailbox.events.delivery.EventDeliveryImpl;
import org.apache.james.mailbox.store.event.DefaultDelegatingMailboxListener;
import org.apache.james.mailbox.store.event.DelegatingMailboxListener;
import org.apache.james.mailbox.store.event.MailboxAnnotationListener;
@@ -73,10 +71,7 @@ public class DefaultEventModule extends AbstractModule {
EventDelivery provideEventDelivery(ConfigurationProvider configurationProvider, MetricFactory metricFactory) {
int poolSize = retrievePoolSize(configurationProvider);
- SynchronousEventDelivery synchronousEventDelivery = new SynchronousEventDelivery(metricFactory);
- return new MixedEventDelivery(
- new AsynchronousEventDelivery(poolSize, synchronousEventDelivery),
- synchronousEventDelivery);
+ return new EventDeliveryImpl(metricFactory);
}
private int retrievePoolSize(ConfigurationProvider configurationProvider) {
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org