You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2018/12/14 10:34:27 UTC
[11/18] james-project git commit: MAILBOX-364 Reading the pool size
is no more needed
MAILBOX-364 Reading the pool size is no more needed
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/e0dc1e90
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/e0dc1e90
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/e0dc1e90
Branch: refs/heads/master
Commit: e0dc1e90f89eaf8d143f4d91a30ca34bc53a2da3
Parents: 67e1fb9
Author: Benoit Tellier <bt...@linagora.com>
Authored: Wed Dec 12 17:04:09 2018 +0700
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Fri Dec 14 17:13:31 2018 +0700
----------------------------------------------------------------------
.../destination/conf/listeners.xml | 2 -
.../destination/conf/listeners.xml | 2 -
.../destination/conf/listeners.xml | 2 -
.../cassandra/destination/conf/listeners.xml | 2 -
.../guice/jpa/destination/conf/listeners.xml | 1 -
mailbox/event/event-memory/pom.xml | 14 +-
.../mailbox/events/delivery/EventDelivery.java | 19 +--
.../events/delivery/EventDeliveryImpl.java | 90 -----------
.../events/delivery/InVmEventDelivery.java | 93 +++++++++++
.../mailbox/events/delivery/VoidMarker.java | 25 ---
.../events/delivery/EventDeliveryImplTest.java | 158 -------------------
.../events/delivery/InVmEventDeliveryTest.java | 158 +++++++++++++++++++
mailbox/pom.xml | 3 +
.../resources/META-INF/spring/event-system.xml | 2 +-
.../event/DefaultDelegatingMailboxListener.java | 6 +-
.../modules/mailbox/DefaultEventModule.java | 27 +---
src/site/xdoc/server/config-listeners.xml | 3 +-
17 files changed, 279 insertions(+), 328 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/e0dc1e90/dockerfiles/run/guice/cassandra-ldap/destination/conf/listeners.xml
----------------------------------------------------------------------
diff --git a/dockerfiles/run/guice/cassandra-ldap/destination/conf/listeners.xml b/dockerfiles/run/guice/cassandra-ldap/destination/conf/listeners.xml
index 7f956ae..8d68135 100644
--- a/dockerfiles/run/guice/cassandra-ldap/destination/conf/listeners.xml
+++ b/dockerfiles/run/guice/cassandra-ldap/destination/conf/listeners.xml
@@ -19,8 +19,6 @@
-->
<listeners>
- <poolSize>8</poolSize>
-
<listener>
<class>org.apache.james.mailbox.spamassassin.SpamAssassinListener</class>
<async>true</async>
http://git-wip-us.apache.org/repos/asf/james-project/blob/e0dc1e90/dockerfiles/run/guice/cassandra-rabbitmq-ldap/destination/conf/listeners.xml
----------------------------------------------------------------------
diff --git a/dockerfiles/run/guice/cassandra-rabbitmq-ldap/destination/conf/listeners.xml b/dockerfiles/run/guice/cassandra-rabbitmq-ldap/destination/conf/listeners.xml
index 7f956ae..8d68135 100644
--- a/dockerfiles/run/guice/cassandra-rabbitmq-ldap/destination/conf/listeners.xml
+++ b/dockerfiles/run/guice/cassandra-rabbitmq-ldap/destination/conf/listeners.xml
@@ -19,8 +19,6 @@
-->
<listeners>
- <poolSize>8</poolSize>
-
<listener>
<class>org.apache.james.mailbox.spamassassin.SpamAssassinListener</class>
<async>true</async>
http://git-wip-us.apache.org/repos/asf/james-project/blob/e0dc1e90/dockerfiles/run/guice/cassandra-rabbitmq/destination/conf/listeners.xml
----------------------------------------------------------------------
diff --git a/dockerfiles/run/guice/cassandra-rabbitmq/destination/conf/listeners.xml b/dockerfiles/run/guice/cassandra-rabbitmq/destination/conf/listeners.xml
index 7f956ae..8d68135 100644
--- a/dockerfiles/run/guice/cassandra-rabbitmq/destination/conf/listeners.xml
+++ b/dockerfiles/run/guice/cassandra-rabbitmq/destination/conf/listeners.xml
@@ -19,8 +19,6 @@
-->
<listeners>
- <poolSize>8</poolSize>
-
<listener>
<class>org.apache.james.mailbox.spamassassin.SpamAssassinListener</class>
<async>true</async>
http://git-wip-us.apache.org/repos/asf/james-project/blob/e0dc1e90/dockerfiles/run/guice/cassandra/destination/conf/listeners.xml
----------------------------------------------------------------------
diff --git a/dockerfiles/run/guice/cassandra/destination/conf/listeners.xml b/dockerfiles/run/guice/cassandra/destination/conf/listeners.xml
index 7f956ae..8d68135 100644
--- a/dockerfiles/run/guice/cassandra/destination/conf/listeners.xml
+++ b/dockerfiles/run/guice/cassandra/destination/conf/listeners.xml
@@ -19,8 +19,6 @@
-->
<listeners>
- <poolSize>8</poolSize>
-
<listener>
<class>org.apache.james.mailbox.spamassassin.SpamAssassinListener</class>
<async>true</async>
http://git-wip-us.apache.org/repos/asf/james-project/blob/e0dc1e90/dockerfiles/run/guice/jpa/destination/conf/listeners.xml
----------------------------------------------------------------------
diff --git a/dockerfiles/run/guice/jpa/destination/conf/listeners.xml b/dockerfiles/run/guice/jpa/destination/conf/listeners.xml
index 496df58..ae5937f 100644
--- a/dockerfiles/run/guice/jpa/destination/conf/listeners.xml
+++ b/dockerfiles/run/guice/jpa/destination/conf/listeners.xml
@@ -19,5 +19,4 @@
-->
<listeners>
- <poolSize>8</poolSize>
</listeners>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/james-project/blob/e0dc1e90/mailbox/event/event-memory/pom.xml
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/pom.xml b/mailbox/event/event-memory/pom.xml
index 7460ace..be04bbc 100644
--- a/mailbox/event/event-memory/pom.xml
+++ b/mailbox/event/event-memory/pom.xml
@@ -29,26 +29,26 @@
<artifactId>apache-james-mailbox-event-memory</artifactId>
<name>Apache James :: Mailbox :: Event :: In VM implementation</name>
- <description>Memory implementation for the eventbus API</description>
+ <description>In VM implementation for the eventbus API</description>
<dependencies>
<dependency>
- <groupId>org.apache.james</groupId>
+ <groupId>${project.groupId}</groupId>
<artifactId>apache-james-mailbox-api</artifactId>
</dependency>
<dependency>
- <groupId>org.apache.james</groupId>
+ <groupId>${project.groupId}</groupId>
<artifactId>apache-james-mailbox-api</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
- <groupId>io.projectreactor</groupId>
- <artifactId>reactor-core</artifactId>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>metrics-api</artifactId>
</dependency>
<dependency>
- <groupId>org.apache.james</groupId>
- <artifactId>metrics-api</artifactId>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-core</artifactId>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
http://git-wip-us.apache.org/repos/asf/james-project/blob/e0dc1e90/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java
index ae38890..a0240b2 100644
--- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java
+++ b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java
@@ -20,29 +20,30 @@
package org.apache.james.mailbox.events.delivery;
import java.util.Collection;
-import java.util.concurrent.CompletableFuture;
import org.apache.james.mailbox.Event;
import org.apache.james.mailbox.MailboxListener;
+import reactor.core.publisher.Mono;
+
public interface EventDelivery {
class ExecutionStages {
- private final CompletableFuture<Void> synchronousListenerFuture;
- private final CompletableFuture<Void> asynchronousListenerFuture;
+ private final Mono<Void> synchronousListenerFuture;
+ private final Mono<Void> asynchronousListenerFuture;
- ExecutionStages(CompletableFuture<Void> synchronousListenerFuture, CompletableFuture<Void> asynchronousListenerFuture) {
+ ExecutionStages(Mono<Void> synchronousListenerFuture, Mono<Void> asynchronousListenerFuture) {
this.synchronousListenerFuture = synchronousListenerFuture;
this.asynchronousListenerFuture = asynchronousListenerFuture;
}
- public CompletableFuture<Void> synchronousListenerFuture() {
+ public Mono<Void> synchronousListenerFuture() {
return synchronousListenerFuture;
}
- public CompletableFuture<Void> allListenerFuture() {
- return CompletableFuture.allOf(
- synchronousListenerFuture,
- asynchronousListenerFuture);
+ public Mono<Void> allListenerFuture() {
+ return synchronousListenerFuture
+ .concatWith(asynchronousListenerFuture)
+ .then();
}
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/e0dc1e90/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDeliveryImpl.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDeliveryImpl.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDeliveryImpl.java
deleted file mode 100644
index 37f26ee..0000000
--- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDeliveryImpl.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-
-package org.apache.james.mailbox.events.delivery;
-
-import java.util.Collection;
-import java.util.concurrent.CompletableFuture;
-
-import org.apache.james.mailbox.Event;
-import org.apache.james.mailbox.MailboxListener;
-import org.apache.james.metrics.api.MetricFactory;
-import org.apache.james.metrics.api.TimeMetric;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.github.steveash.guavate.Guavate;
-import com.google.common.collect.ImmutableList;
-
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Schedulers;
-
-public class EventDeliveryImpl implements EventDelivery {
- private static final Logger LOGGER = LoggerFactory.getLogger(EventDeliveryImpl.class);
-
- private final MetricFactory metricFactory;
-
- public EventDeliveryImpl(MetricFactory metricFactory) {
- this.metricFactory = metricFactory;
- }
-
- @Override
- public ExecutionStages deliver(Collection<MailboxListener> mailboxListeners, Event event) {
- CompletableFuture<Void> synchronousListeners = doDeliver(
- filterByExecutionMode(mailboxListeners, MailboxListener.ExecutionMode.SYNCHRONOUS),
- event);
- CompletableFuture<Void> asyncListener = doDeliver(
- filterByExecutionMode(mailboxListeners, MailboxListener.ExecutionMode.ASYNCHRONOUS),
- event);
-
- return new ExecutionStages(synchronousListeners, asyncListener);
- }
-
- private ImmutableList<MailboxListener> filterByExecutionMode(Collection<MailboxListener> mailboxListeners, MailboxListener.ExecutionMode executionMode) {
- return mailboxListeners.stream()
- .filter(listener -> listener.getExecutionMode() == executionMode)
- .collect(Guavate.toImmutableList());
- }
-
- private CompletableFuture<Void> doDeliver(Collection<MailboxListener> mailboxListeners, Event event) {
- return Flux.fromIterable(mailboxListeners)
- .flatMap(mailboxListener -> Mono.fromCallable(() -> {
- doDeliverToListener(mailboxListener, event);
- return VoidMarker.IMPL;
- }))
- .then()
- .subscribeOn(Schedulers.elastic())
- .toFuture();
- }
-
- private void doDeliverToListener(MailboxListener mailboxListener, Event event) {
- TimeMetric timer = metricFactory.timer("mailbox-listener-" + mailboxListener.getClass().getSimpleName());
- try {
- mailboxListener.event(event);
- } catch (Throwable throwable) {
- LOGGER.error("Error while processing listener {} for {}",
- mailboxListener.getClass().getCanonicalName(), event.getClass().getCanonicalName(),
- throwable);
- } finally {
- timer.stopAndPublish();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/james-project/blob/e0dc1e90/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java
new file mode 100644
index 0000000..2d2ef65
--- /dev/null
+++ b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java
@@ -0,0 +1,93 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events.delivery;
+
+import java.util.Collection;
+
+import javax.inject.Inject;
+
+import org.apache.james.mailbox.Event;
+import org.apache.james.mailbox.MailboxListener;
+import org.apache.james.metrics.api.MetricFactory;
+import org.apache.james.metrics.api.TimeMetric;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.steveash.guavate.Guavate;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
+public class InVmEventDelivery implements EventDelivery {
+ private static final Logger LOGGER = LoggerFactory.getLogger(InVmEventDelivery.class);
+
+ private final MetricFactory metricFactory;
+
+ @Inject
+ @VisibleForTesting
+ public InVmEventDelivery(MetricFactory metricFactory) {
+ this.metricFactory = metricFactory;
+ }
+
+ @Override
+ public ExecutionStages deliver(Collection<MailboxListener> mailboxListeners, Event event) {
+ Mono<Void> synchronousListeners = doDeliver(
+ filterByExecutionMode(mailboxListeners, MailboxListener.ExecutionMode.SYNCHRONOUS), event)
+ .cache();
+ Mono<Void> asyncListener = doDeliver(
+ filterByExecutionMode(mailboxListeners, MailboxListener.ExecutionMode.ASYNCHRONOUS), event)
+ .cache();
+
+ synchronousListeners.subscribe();
+ asyncListener.subscribe();
+
+ return new ExecutionStages(synchronousListeners, asyncListener);
+ }
+
+ private ImmutableList<MailboxListener> filterByExecutionMode(Collection<MailboxListener> mailboxListeners, MailboxListener.ExecutionMode executionMode) {
+ return mailboxListeners.stream()
+ .filter(listener -> listener.getExecutionMode() == executionMode)
+ .collect(Guavate.toImmutableList());
+ }
+
+ private Mono<Void> doDeliver(Collection<MailboxListener> mailboxListeners, Event event) {
+ return Flux.fromIterable(mailboxListeners)
+ .flatMap(mailboxListener -> Mono.fromRunnable(() -> doDeliverToListener(mailboxListener, event)))
+ .then()
+ .subscribeOn(Schedulers.elastic());
+ }
+
+ private void doDeliverToListener(MailboxListener mailboxListener, Event event) {
+ TimeMetric timer = metricFactory.timer("mailbox-listener-" + mailboxListener.getClass().getSimpleName());
+ try {
+ mailboxListener.event(event);
+ } catch (Throwable throwable) {
+ LOGGER.error("Error while processing listener {} for {}",
+ mailboxListener.getClass().getCanonicalName(), event.getClass().getCanonicalName(),
+ throwable);
+ } finally {
+ timer.stopAndPublish();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/james-project/blob/e0dc1e90/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/VoidMarker.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/VoidMarker.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/VoidMarker.java
deleted file mode 100644
index 5a504a5..0000000
--- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/VoidMarker.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-
-package org.apache.james.mailbox.events.delivery;
-
-public interface VoidMarker {
- VoidMarker IMPL = () -> null;
- Void toVoid();
-}
http://git-wip-us.apache.org/repos/asf/james-project/blob/e0dc1e90/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/EventDeliveryImplTest.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/EventDeliveryImplTest.java b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/EventDeliveryImplTest.java
deleted file mode 100644
index ccf8799..0000000
--- a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/EventDeliveryImplTest.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-
-package org.apache.james.mailbox.events.delivery;
-
-import static org.junit.jupiter.api.Assertions.assertTimeout;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.time.Duration;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.james.mailbox.MailboxListener;
-import org.apache.james.metrics.api.NoopMetricFactory;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import com.google.common.collect.ImmutableList;
-
-class EventDeliveryImplTest {
- private static final int DELIVERY_DELAY = (int) TimeUnit.MILLISECONDS.toMillis(100);
-
- private EventDeliveryImpl eventDeliveryImpl;
- private MailboxListener listener;
- private MailboxListener listener2;
- private MailboxListener.MailboxEvent event;
-
- @BeforeEach
- void setUp() {
- event = mock(MailboxListener.MailboxEvent.class);
- listener = mock(MailboxListener.class);
- listener2 = mock(MailboxListener.class);
- eventDeliveryImpl = new EventDeliveryImpl(new NoopMetricFactory());
- }
-
- @Test
- void deliverShouldHaveCalledSynchronousListenersWhenAllListenerExecutedJoined() {
- when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
-
- eventDeliveryImpl.deliver(ImmutableList.of(listener), event).allListenerFuture().join();
-
- verify(listener).event(event);
- }
-
- @Test
- void deliverShouldHaveCalledAsynchronousListenersWhenAllListenerExecutedJoined() {
- when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
-
- eventDeliveryImpl.deliver(ImmutableList.of(listener), event).allListenerFuture().join();
-
- verify(listener).event(event);
- }
-
- @Test
- void deliverShouldHaveCalledSynchronousListenersWhenSynchronousListenerExecutedJoined() {
- when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
-
- eventDeliveryImpl.deliver(ImmutableList.of(listener), event).synchronousListenerFuture().join();
-
- verify(listener).event(event);
- }
-
- @Test
- void deliverShouldNotBlockObAsynchronousListenersWhenSynchronousListenerExecutedJoined() {
- when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
- CountDownLatch latch = new CountDownLatch(1);
- doAnswer(invocation -> {
- latch.await();
- return null;
- }).when(listener).event(event);
-
- assertTimeout(Duration.ofSeconds(2),
- () -> {
- eventDeliveryImpl.deliver(ImmutableList.of(listener), event).synchronousListenerFuture().join();
- latch.countDown();
- });
- }
-
- @Test
- void deliverShouldNotBlockOnSynchronousListenersWhenNoJoin() {
- when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
- CountDownLatch latch = new CountDownLatch(1);
- doAnswer(invocation -> {
- latch.await();
- return null;
- }).when(listener).event(event);
-
- assertTimeout(Duration.ofSeconds(2),
- () -> {
- eventDeliveryImpl.deliver(ImmutableList.of(listener), event);
- latch.countDown();
- });
- }
-
- @Test
- void deliverShouldNotBlockOnAsynchronousListenersWhenNoJoin() {
- when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
- CountDownLatch latch = new CountDownLatch(1);
- doAnswer(invocation -> {
- latch.await();
- return null;
- }).when(listener).event(event);
-
- assertTimeout(Duration.ofSeconds(2),
- () -> {
- eventDeliveryImpl.deliver(ImmutableList.of(listener), event);
- latch.countDown();
- });
- }
-
- @Test
- void deliverShouldEventuallyDeliverAsynchronousListenersWhenSynchronousListenerExecutedJoined() {
- when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
-
- eventDeliveryImpl.deliver(ImmutableList.of(listener), event).synchronousListenerFuture().join();
-
- verify(listener, timeout(DELIVERY_DELAY * 10)).event(event);
- }
-
- @Test
- void deliverShouldEventuallyDeliverSynchronousListenersWhenNoJoin() {
- when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
-
- eventDeliveryImpl.deliver(ImmutableList.of(listener), event);
-
- verify(listener, timeout(DELIVERY_DELAY * 10)).event(event);
- }
-
- @Test
- void deliverShouldCallSynchronousListenersWhenAsynchronousListenersAreAlsoRegistered() {
- when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
- when(listener2.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
-
- eventDeliveryImpl.deliver(ImmutableList.of(listener, listener2), event).synchronousListenerFuture().join();
-
- verify(listener2).event(event);
- }
-}
http://git-wip-us.apache.org/repos/asf/james-project/blob/e0dc1e90/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java
new file mode 100644
index 0000000..417401c
--- /dev/null
+++ b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java
@@ -0,0 +1,158 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events.delivery;
+
+import static org.junit.jupiter.api.Assertions.assertTimeout;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.time.Duration;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.james.mailbox.MailboxListener;
+import org.apache.james.metrics.api.NoopMetricFactory;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import com.google.common.collect.ImmutableList;
+
+class InVmEventDeliveryTest {
+ private static final int DELIVERY_DELAY = (int) TimeUnit.MILLISECONDS.toMillis(100);
+
+ private InVmEventDelivery inVmEventDelivery;
+ private MailboxListener listener;
+ private MailboxListener listener2;
+ private MailboxListener.MailboxEvent event;
+
+ @BeforeEach
+ void setUp() {
+ event = mock(MailboxListener.MailboxEvent.class);
+ listener = mock(MailboxListener.class);
+ listener2 = mock(MailboxListener.class);
+ inVmEventDelivery = new InVmEventDelivery(new NoopMetricFactory());
+ }
+
+ @Test
+ void deliverShouldHaveCalledSynchronousListenersWhenAllListenerExecutedJoined() {
+ when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
+
+ inVmEventDelivery.deliver(ImmutableList.of(listener), event).allListenerFuture().block();
+
+ verify(listener).event(event);
+ }
+
+ @Test
+ void deliverShouldHaveCalledAsynchronousListenersWhenAllListenerExecutedJoined() {
+ when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
+
+ inVmEventDelivery.deliver(ImmutableList.of(listener), event).allListenerFuture().block();
+
+ verify(listener).event(event);
+ }
+
+ @Test
+ void deliverShouldHaveCalledSynchronousListenersWhenSynchronousListenerExecutedJoined() {
+ when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
+
+ inVmEventDelivery.deliver(ImmutableList.of(listener), event).synchronousListenerFuture().block();
+
+ verify(listener).event(event);
+ }
+
+ @Test
+ void deliverShouldNotBlockOnAsynchronousListenersWhenSynchronousListenerExecutedJoined() {
+ when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
+ CountDownLatch latch = new CountDownLatch(1);
+ doAnswer(invocation -> {
+ latch.await();
+ return null;
+ }).when(listener).event(event);
+
+ assertTimeout(Duration.ofSeconds(2),
+ () -> {
+ inVmEventDelivery.deliver(ImmutableList.of(listener), event).synchronousListenerFuture().block();
+ latch.countDown();
+ });
+ }
+
+ @Test
+ void deliverShouldNotBlockOnSynchronousListenersWhenNoJoin() {
+ when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
+ CountDownLatch latch = new CountDownLatch(1);
+ doAnswer(invocation -> {
+ latch.await();
+ return null;
+ }).when(listener).event(event);
+
+ assertTimeout(Duration.ofSeconds(2),
+ () -> {
+ inVmEventDelivery.deliver(ImmutableList.of(listener), event);
+ latch.countDown();
+ });
+ }
+
+ @Test
+ void deliverShouldNotBlockOnAsynchronousListenersWhenNoJoin() {
+ when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
+ CountDownLatch latch = new CountDownLatch(1);
+ doAnswer(invocation -> {
+ latch.await();
+ return null;
+ }).when(listener).event(event);
+
+ assertTimeout(Duration.ofSeconds(2),
+ () -> {
+ inVmEventDelivery.deliver(ImmutableList.of(listener), event);
+ latch.countDown();
+ });
+ }
+
+ @Test
+ void deliverShouldEventuallyDeliverAsynchronousListenersWhenSynchronousListenerExecutedJoined() {
+ when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
+
+ inVmEventDelivery.deliver(ImmutableList.of(listener), event).synchronousListenerFuture().block();
+
+ verify(listener, timeout(DELIVERY_DELAY * 10)).event(event);
+ }
+
+ @Test
+ void deliverShouldEventuallyDeliverSynchronousListenersWhenNoJoin() {
+ when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
+
+ inVmEventDelivery.deliver(ImmutableList.of(listener), event);
+
+ verify(listener, timeout(DELIVERY_DELAY * 10)).event(event);
+ }
+
+ @Test
+ void deliverShouldCallSynchronousListenersWhenAsynchronousListenersAreAlsoRegistered() {
+ when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
+ when(listener2.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
+
+ inVmEventDelivery.deliver(ImmutableList.of(listener, listener2), event).synchronousListenerFuture().block();
+
+ verify(listener2).event(event);
+ }
+}
http://git-wip-us.apache.org/repos/asf/james-project/blob/e0dc1e90/mailbox/pom.xml
----------------------------------------------------------------------
diff --git a/mailbox/pom.xml b/mailbox/pom.xml
index 4361e93..114cab4 100644
--- a/mailbox/pom.xml
+++ b/mailbox/pom.xml
@@ -40,7 +40,10 @@
<module>caching</module>
<module>cassandra</module>
<module>elasticsearch</module>
+
+ <module>event/event-memory</module>
<module>event/json</module>
+
<module>jpa</module>
<module>lucene</module>
<module>maildir</module>
http://git-wip-us.apache.org/repos/asf/james-project/blob/e0dc1e90/mailbox/spring/src/main/resources/META-INF/spring/event-system.xml
----------------------------------------------------------------------
diff --git a/mailbox/spring/src/main/resources/META-INF/spring/event-system.xml b/mailbox/spring/src/main/resources/META-INF/spring/event-system.xml
index 68ac412..17661cb 100644
--- a/mailbox/spring/src/main/resources/META-INF/spring/event-system.xml
+++ b/mailbox/spring/src/main/resources/META-INF/spring/event-system.xml
@@ -28,7 +28,7 @@
<constructor-arg index="1" ref="event-registry"/>
</bean>
- <bean id="event-delivery" class="org.apache.james.mailbox.events.delivery.EventDeliveryImpl" lazy-init="true">
+ <bean id="event-delivery" class="org.apache.james.mailbox.events.delivery.InVmEventDelivery" lazy-init="true">
<constructor-arg index="0" ref="metricFactory"/>
</bean>
http://git-wip-us.apache.org/repos/asf/james-project/blob/e0dc1e90/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/DefaultDelegatingMailboxListener.java
----------------------------------------------------------------------
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/DefaultDelegatingMailboxListener.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/DefaultDelegatingMailboxListener.java
index 3a7c150..b050369 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/DefaultDelegatingMailboxListener.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/DefaultDelegatingMailboxListener.java
@@ -27,7 +27,7 @@ import org.apache.james.mailbox.Event;
import org.apache.james.mailbox.MailboxListener;
import org.apache.james.mailbox.MailboxSession;
import org.apache.james.mailbox.events.delivery.EventDelivery;
-import org.apache.james.mailbox.events.delivery.EventDeliveryImpl;
+import org.apache.james.mailbox.events.delivery.InVmEventDelivery;
import org.apache.james.mailbox.exception.MailboxException;
import org.apache.james.mailbox.model.MailboxId;
import org.apache.james.metrics.api.NoopMetricFactory;
@@ -53,7 +53,7 @@ public class DefaultDelegatingMailboxListener implements DelegatingMailboxListen
@VisibleForTesting
public DefaultDelegatingMailboxListener() {
- this(new EventDeliveryImpl(new NoopMetricFactory()),
+ this(new InVmEventDelivery(new NoopMetricFactory()),
new MailboxListenerRegistry());
}
@@ -98,7 +98,7 @@ public class DefaultDelegatingMailboxListener implements DelegatingMailboxListen
eventDelivery.deliver(listeners, event)
.synchronousListenerFuture()
- .join();
+ .block();
if (event instanceof MailboxDeletion) {
MailboxDeletion deletion = (MailboxDeletion) event;
http://git-wip-us.apache.org/repos/asf/james-project/blob/e0dc1e90/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java
----------------------------------------------------------------------
diff --git a/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java b/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java
index 00be62d..9921b23 100644
--- a/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java
+++ b/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java
@@ -20,7 +20,6 @@
package org.apache.james.modules.mailbox;
import java.util.List;
-import java.util.Optional;
import javax.inject.Inject;
@@ -28,27 +27,22 @@ import org.apache.commons.configuration.ConfigurationException;
import org.apache.james.lifecycle.api.Configurable;
import org.apache.james.mailbox.MailboxListener;
import org.apache.james.mailbox.events.delivery.EventDelivery;
-import org.apache.james.mailbox.events.delivery.EventDeliveryImpl;
+import org.apache.james.mailbox.events.delivery.InVmEventDelivery;
import org.apache.james.mailbox.store.event.DefaultDelegatingMailboxListener;
import org.apache.james.mailbox.store.event.DelegatingMailboxListener;
import org.apache.james.mailbox.store.event.MailboxAnnotationListener;
import org.apache.james.mailbox.store.event.MailboxListenerRegistry;
import org.apache.james.mailbox.store.quota.ListeningCurrentQuotaUpdater;
-import org.apache.james.metrics.api.MetricFactory;
import org.apache.james.server.core.configuration.ConfigurationProvider;
import org.apache.james.utils.ConfigurationPerformer;
import com.google.common.collect.ImmutableList;
import com.google.inject.AbstractModule;
-import com.google.inject.Provides;
import com.google.inject.Scopes;
import com.google.inject.Singleton;
import com.google.inject.multibindings.Multibinder;
public class DefaultEventModule extends AbstractModule {
-
- private static final int DEFAULT_POOL_SIZE = 8;
-
@Override
protected void configure() {
bind(DefaultDelegatingMailboxListener.class).in(Scopes.SINGLETON);
@@ -64,24 +58,9 @@ public class DefaultEventModule extends AbstractModule {
bind(MailboxListenerRegistry.class).in(Scopes.SINGLETON);
bind(MailboxListenersLoader.class).to(MailboxListenersLoaderImpl.class);
Multibinder.newSetBinder(binder(), MailboxListener.class);
- }
-
- @Provides
- @Singleton
- EventDelivery provideEventDelivery(ConfigurationProvider configurationProvider, MetricFactory metricFactory) {
- int poolSize = retrievePoolSize(configurationProvider);
-
- return new EventDeliveryImpl(metricFactory);
- }
- private int retrievePoolSize(ConfigurationProvider configurationProvider) {
- try {
- return Optional.ofNullable(configurationProvider.getConfiguration("listeners")
- .getInteger("poolSize", null))
- .orElse(DEFAULT_POOL_SIZE);
- } catch (ConfigurationException e) {
- return DEFAULT_POOL_SIZE;
- }
+ bind(InVmEventDelivery.class).in(Scopes.SINGLETON);
+ bind(EventDelivery.class).to(InVmEventDelivery.class);
}
@Singleton
http://git-wip-us.apache.org/repos/asf/james-project/blob/e0dc1e90/src/site/xdoc/server/config-listeners.xml
----------------------------------------------------------------------
diff --git a/src/site/xdoc/server/config-listeners.xml b/src/site/xdoc/server/config-listeners.xml
index e0efeb8..ca2f286 100644
--- a/src/site/xdoc/server/config-listeners.xml
+++ b/src/site/xdoc/server/config-listeners.xml
@@ -44,8 +44,7 @@
<p>
Some MailboxListener allows you to specify if you want to run them synchronously or asynchronously. To do so,
for MailboxListener that supports this, you can use the <b>async</b> attribute (optional, per mailet default) to govern the execution mode.
- If <b>true</b> the execution will be scheduled by a FixedThreadPool whose size is defined by the <b>poolSize</b>
- attribute (optional, default to 8). If <b>false</b>, the execution is synchronous, on the current thread.
+ If <b>true</b> the execution will be scheduled in a reactor elastic scheduler. If <b>false</b>, the execution is synchronous.
</p>
<ul>
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org