You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/01/23 03:04:55 UTC

[1/2] james-project git commit: MAILBOX-376 Plug EventDeadLetter into InVmEventBus

Repository: james-project
Updated Branches:
  refs/heads/master d51c70854 -> 606ff9c93


MAILBOX-376 Plug EventDeadLetter into InVmEventBus


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/bb067a1b
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/bb067a1b
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/bb067a1b

Branch: refs/heads/master
Commit: bb067a1bcb18752c86c0f6bc52096b980e96e7ee
Parents: d51c708
Author: tran tien duc <dt...@linagora.com>
Authored: Thu Jan 17 18:27:50 2019 +0700
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Wed Jan 23 10:00:08 2019 +0700

----------------------------------------------------------------------
 .../mailbox/events/EventBusTestFixture.java     |   2 +-
 .../james/mailbox/events/InVMEventBus.java      |  40 +-
 .../mailbox/events/MemoryEventDeadLetters.java  |   2 +-
 .../mailbox/events/delivery/EventDelivery.java  | 129 ++++-
 .../events/delivery/InVmEventDelivery.java      |  86 +---
 .../james/mailbox/events/InVMEventBusTest.java  |  27 +-
 .../events/delivery/InVmEventDeliveryTest.java  | 472 +++++++------------
 .../resources/META-INF/spring/event-system.xml  |   9 +-
 .../store/MessageIdManagerTestSystem.java       |   3 +-
 .../modules/mailbox/DefaultEventModule.java     |   2 +-
 10 files changed, 372 insertions(+), 400 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/bb067a1b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java
index 263a439..d11571c 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java
@@ -50,7 +50,7 @@ public interface EventBusTestFixture {
             calls.incrementAndGet();
         }
 
-        int numberOfEventCalls() {
+        public int numberOfEventCalls() {
             return calls.get();
         }
     }

http://git-wip-us.apache.org/repos/asf/james-project/blob/bb067a1b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/InVMEventBus.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/InVMEventBus.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/InVMEventBus.java
index 3bb11b0..5da4adb 100644
--- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/InVMEventBus.java
+++ b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/InVMEventBus.java
@@ -27,8 +27,11 @@ import javax.inject.Inject;
 import org.apache.james.mailbox.Event;
 import org.apache.james.mailbox.MailboxListener;
 import org.apache.james.mailbox.events.delivery.EventDelivery;
+import org.apache.james.mailbox.events.delivery.EventDelivery.PermanentFailureHandler.StoreToDeadLetters;
+import org.apache.james.mailbox.events.delivery.EventDelivery.Retryer.BackoffRetryer;
 
 import com.github.steveash.guavate.Guavate;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Multimaps;
@@ -37,17 +40,27 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public class InVMEventBus implements EventBus {
+
     private final Multimap<RegistrationKey, MailboxListener> registrations;
     private final ConcurrentHashMap<Group, MailboxListener> groups;
     private final EventDelivery eventDelivery;
+    private final RetryBackoffConfiguration retryBackoff;
+    private final EventDeadLetters eventDeadLetters;
 
     @Inject
-    public InVMEventBus(EventDelivery eventDelivery) {
+    public InVMEventBus(EventDelivery eventDelivery, RetryBackoffConfiguration retryBackoff, EventDeadLetters eventDeadLetters) {
         this.eventDelivery = eventDelivery;
+        this.retryBackoff = retryBackoff;
+        this.eventDeadLetters = eventDeadLetters;
         this.registrations = Multimaps.synchronizedSetMultimap(HashMultimap.create());
         this.groups = new ConcurrentHashMap<>();
     }
 
+    @VisibleForTesting
+    public InVMEventBus(EventDelivery eventDelivery) {
+        this(eventDelivery, RetryBackoffConfiguration.DEFAULT, new MemoryEventDeadLetters());
+    }
+
     @Override
     public Registration register(MailboxListener listener, RegistrationKey key) {
         registrations.put(key, listener);
@@ -66,15 +79,34 @@ public class InVMEventBus implements EventBus {
     @Override
     public Mono<Void> dispatch(Event event, Set<RegistrationKey> keys) {
         if (!event.isNoop()) {
-            return Flux.merge(
-                eventDelivery.deliverWithRetries(groups.values(), event).synchronousListenerFuture(),
-                eventDelivery.deliver(registeredListenersByKeys(keys), event).synchronousListenerFuture())
+            return Flux.merge(groupDeliveries(event), keyDeliveries(event, keys))
+                .reduceWith(EventDelivery.ExecutionStages::empty, EventDelivery.ExecutionStages::combine)
+                .flatMap(EventDelivery.ExecutionStages::synchronousListenerFuture)
                 .then()
                 .onErrorResume(throwable -> Mono.empty());
         }
         return Mono.empty();
     }
 
+    private Flux<EventDelivery.ExecutionStages> keyDeliveries(Event event, Set<RegistrationKey> keys) {
+        return Flux.fromIterable(registeredListenersByKeys(keys))
+            .map(listener -> eventDelivery.deliver(listener, event, EventDelivery.DeliveryOption.none()));
+    }
+
+    private Flux<EventDelivery.ExecutionStages> groupDeliveries(Event event) {
+        return Flux.fromIterable(groups.entrySet())
+            .map(entry -> groupDelivery(event, entry.getValue(), entry.getKey()));
+    }
+
+    private EventDelivery.ExecutionStages groupDelivery(Event event, MailboxListener mailboxListener, Group group) {
+        return eventDelivery.deliver(
+            mailboxListener,
+            event,
+            EventDelivery.DeliveryOption.of(
+                BackoffRetryer.of(retryBackoff, mailboxListener),
+                StoreToDeadLetters.of(group, eventDeadLetters)));
+    }
+
     public Set<Group> registeredGroups() {
         return groups.keySet();
     }

http://git-wip-us.apache.org/repos/asf/james-project/blob/bb067a1b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/MemoryEventDeadLetters.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/MemoryEventDeadLetters.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/MemoryEventDeadLetters.java
index 8464ed9..122763a 100644
--- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/MemoryEventDeadLetters.java
+++ b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/MemoryEventDeadLetters.java
@@ -38,7 +38,7 @@ public class MemoryEventDeadLetters implements EventDeadLetters {
 
     private final Multimap<Group, Event> deadLetters;
 
-    MemoryEventDeadLetters() {
+    public MemoryEventDeadLetters() {
         this.deadLetters = Multimaps.synchronizedSetMultimap(HashMultimap.create());
     }
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/bb067a1b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java
index 359e4d7..e34cb31 100644
--- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java
+++ b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java
@@ -19,19 +19,133 @@
 
 package org.apache.james.mailbox.events.delivery;
 
-import java.util.Collection;
+import static org.apache.james.mailbox.events.delivery.EventDelivery.PermanentFailureHandler.NO_HANDLER;
+import static org.apache.james.mailbox.events.delivery.EventDelivery.Retryer.NO_RETRYER;
+
+import java.time.Duration;
 
 import org.apache.james.mailbox.Event;
 import org.apache.james.mailbox.MailboxListener;
+import org.apache.james.mailbox.events.EventDeadLetters;
+import org.apache.james.mailbox.events.Group;
+import org.apache.james.mailbox.events.RetryBackoffConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public interface EventDelivery {
+
+    class DeliveryOption {
+        public static DeliveryOption of(Retryer retrier, PermanentFailureHandler permanentFailureHandler) {
+            return new DeliveryOption(retrier, permanentFailureHandler);
+        }
+
+        public static DeliveryOption none() {
+            return new DeliveryOption(NO_RETRYER, NO_HANDLER);
+        }
+
+        private final Retryer retrier;
+        private final PermanentFailureHandler permanentFailureHandler;
+
+        private DeliveryOption(Retryer retrier, PermanentFailureHandler permanentFailureHandler) {
+            this.retrier = retrier;
+            this.permanentFailureHandler = permanentFailureHandler;
+        }
+
+        Retryer getRetrier() {
+            return retrier;
+        }
+
+        PermanentFailureHandler getPermanentFailureHandler() {
+            return permanentFailureHandler;
+        }
+    }
+
+
+    interface Retryer {
+
+        Retryer NO_RETRYER = (executionResult, event) -> executionResult;
+
+        class BackoffRetryer implements EventDelivery.Retryer {
+
+            public static BackoffRetryer of(RetryBackoffConfiguration retryBackoff, MailboxListener mailboxListener) {
+                return new BackoffRetryer(retryBackoff, mailboxListener);
+            }
+
+            private static final Logger LOGGER = LoggerFactory.getLogger(BackoffRetryer.class);
+            private static final Duration MAX_BACKOFF = Duration.ofMillis(Long.MAX_VALUE);
+
+            private final RetryBackoffConfiguration retryBackoff;
+            private final MailboxListener mailboxListener;
+
+            public BackoffRetryer(RetryBackoffConfiguration retryBackoff, MailboxListener mailboxListener) {
+                this.retryBackoff = retryBackoff;
+                this.mailboxListener = mailboxListener;
+            }
+
+            @Override
+            public Mono<Void> doRetry(Mono<Void> executionResult, Event event) {
+                return executionResult
+                    .retryBackoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff(), MAX_BACKOFF, retryBackoff.getJitterFactor())
+                    .doOnError(throwable -> LOGGER.error("listener {} exceeded maximum retry({}) to handle event {}",
+                        mailboxListener.getClass().getCanonicalName(),
+                        retryBackoff.getMaxRetries(),
+                        event.getClass().getCanonicalName(),
+                        throwable))
+                    .then();
+            }
+        }
+
+        Mono<Void> doRetry(Mono<Void> executionResult, Event event);
+    }
+
+    interface PermanentFailureHandler {
+
+        PermanentFailureHandler NO_HANDLER = event -> Mono.error(new UnsupportedOperationException("doesn't handle error"));
+
+        class StoreToDeadLetters implements EventDelivery.PermanentFailureHandler {
+
+            public static StoreToDeadLetters of(Group group, EventDeadLetters eventDeadLetters) {
+                return new StoreToDeadLetters(group, eventDeadLetters);
+            }
+
+            private final Group group;
+            private final EventDeadLetters eventDeadLetters;
+
+            private StoreToDeadLetters(Group group, EventDeadLetters eventDeadLetters) {
+                this.group = group;
+                this.eventDeadLetters = eventDeadLetters;
+            }
+
+            @Override
+            public Mono<Void> handle(Event event) {
+                return eventDeadLetters.store(group, event);
+            }
+        }
+
+        Mono<Void> handle(Event event);
+    }
+
     class ExecutionStages {
+
+        public static ExecutionStages empty() {
+            return new ExecutionStages(Mono.empty(), Mono.empty());
+        }
+
+        static ExecutionStages synchronous(Mono<Void> synchronousListenerFuture) {
+            return new ExecutionStages(synchronousListenerFuture, Mono.empty());
+        }
+
+        static ExecutionStages asynchronous(Mono<Void> asynchronousListenerFuture) {
+            return new ExecutionStages(Mono.empty(),asynchronousListenerFuture);
+        }
+
         private final Mono<Void> synchronousListenerFuture;
         private final Mono<Void> asynchronousListenerFuture;
 
-        ExecutionStages(Mono<Void> synchronousListenerFuture, Mono<Void> asynchronousListenerFuture) {
+        private ExecutionStages(Mono<Void> synchronousListenerFuture, Mono<Void> asynchronousListenerFuture) {
             this.synchronousListenerFuture = synchronousListenerFuture;
             this.asynchronousListenerFuture = asynchronousListenerFuture;
         }
@@ -45,10 +159,13 @@ public interface EventDelivery {
                 .concatWith(asynchronousListenerFuture)
                 .then();
         }
-    }
-
 
-    ExecutionStages deliver(Collection<MailboxListener> mailboxListeners, Event event);
+        public ExecutionStages combine(ExecutionStages another) {
+            return new ExecutionStages(
+                Flux.concat(this.synchronousListenerFuture, another.synchronousListenerFuture).then(),
+                Flux.concat(this.asynchronousListenerFuture, another.asynchronousListenerFuture).then());
+        }
+    }
 
-    ExecutionStages deliverWithRetries(Collection<MailboxListener> mailboxListeners, Event event);
+    ExecutionStages deliver(MailboxListener listener, Event event, DeliveryOption option);
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/bb067a1b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java
index 84a6a90..ac4ce4e 100644
--- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java
+++ b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java
@@ -19,15 +19,10 @@
 
 package org.apache.james.mailbox.events.delivery;
 
-import java.time.Duration;
-import java.util.Collection;
-import java.util.stream.Stream;
-
 import javax.inject.Inject;
 
 import org.apache.james.mailbox.Event;
 import org.apache.james.mailbox.MailboxListener;
-import org.apache.james.mailbox.events.RetryBackoffConfiguration;
 import org.apache.james.metrics.api.MetricFactory;
 import org.apache.james.metrics.api.TimeMetric;
 import org.slf4j.Logger;
@@ -35,87 +30,58 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 
-import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.publisher.MonoProcessor;
 import reactor.core.scheduler.Schedulers;
 
 public class InVmEventDelivery implements EventDelivery {
 
-    private enum DeliveryOption {
-        NO_RETRY, WITH_RETRY
+    private static String listenerName(MailboxListener mailboxListener) {
+        return mailboxListener.getClass().getCanonicalName();
+    }
+
+    private static String eventName(Event event) {
+        return event.getClass().getCanonicalName();
     }
 
     private static final Logger LOGGER = LoggerFactory.getLogger(InVmEventDelivery.class);
-    private static final Duration MAX_BACKOFF = Duration.ofMillis(Long.MAX_VALUE);
 
     private final MetricFactory metricFactory;
-    private final RetryBackoffConfiguration retryBackoff;
 
     @Inject
     @VisibleForTesting
-    public InVmEventDelivery(MetricFactory metricFactory, RetryBackoffConfiguration retryBackoff) {
-        this.metricFactory = metricFactory;
-        this.retryBackoff = retryBackoff;
-    }
-
     public InVmEventDelivery(MetricFactory metricFactory) {
-        this(metricFactory, RetryBackoffConfiguration.DEFAULT);
-    }
-
-    @Override
-    public ExecutionStages deliver(Collection<MailboxListener> mailboxListeners, Event event) {
-        return deliverByOption(mailboxListeners, event, DeliveryOption.NO_RETRY);
+        this.metricFactory = metricFactory;
     }
 
     @Override
-    public ExecutionStages deliverWithRetries(Collection<MailboxListener> mailboxListeners, Event event) {
-        return deliverByOption(mailboxListeners, event, DeliveryOption.WITH_RETRY);
-    }
-
-    private ExecutionStages deliverByOption(Collection<MailboxListener> mailboxListeners, Event event, DeliveryOption deliveryOption) {
-        Mono<Void> synchronousListeners = doDeliver(
-            filterByExecutionMode(mailboxListeners, MailboxListener.ExecutionMode.SYNCHRONOUS), event, deliveryOption)
-            .subscribeWith(MonoProcessor.create());
-        Mono<Void> asyncListener = doDeliver(
-            filterByExecutionMode(mailboxListeners, MailboxListener.ExecutionMode.ASYNCHRONOUS), event, deliveryOption)
-            .subscribeWith(MonoProcessor.create());
+    public ExecutionStages deliver(MailboxListener listener, Event event, DeliveryOption option) {
+        Mono<Void> executionResult = deliverByOption(listener, event, option);
 
-        return new ExecutionStages(synchronousListeners, asyncListener);
+        return toExecutionStages(listener.getExecutionMode(), executionResult);
     }
 
-    private Stream<MailboxListener> filterByExecutionMode(Collection<MailboxListener> mailboxListeners, MailboxListener.ExecutionMode executionMode) {
-        return mailboxListeners.stream()
-            .filter(listener -> listener.getExecutionMode() == executionMode);
-    }
+    private ExecutionStages toExecutionStages(MailboxListener.ExecutionMode executionMode, Mono<Void> executionResult) {
+        if (executionMode.equals(MailboxListener.ExecutionMode.SYNCHRONOUS)) {
+            return ExecutionStages.synchronous(executionResult);
+        }
 
-    private Mono<Void> doDeliver(Stream<MailboxListener> mailboxListeners, Event event, DeliveryOption deliveryOption) {
-        return Flux.fromStream(mailboxListeners)
-            .flatMap(mailboxListener -> deliveryWithRetries(event, mailboxListener, deliveryOption))
-            .then()
-            .subscribeOn(Schedulers.elastic());
+        return ExecutionStages.asynchronous(executionResult);
     }
 
-    private Mono<Void> deliveryWithRetries(Event event, MailboxListener mailboxListener, DeliveryOption deliveryOption) {
-        Mono<Void> firstDelivery = Mono.fromRunnable(() -> doDeliverToListener(mailboxListener, event))
+    private Mono<Void> deliverByOption(MailboxListener listener, Event event, DeliveryOption deliveryOption) {
+        Mono<Void> deliveryToListener = Mono.fromRunnable(() -> doDeliverToListener(listener, event))
             .doOnError(throwable -> LOGGER.error("Error while processing listener {} for {}",
-                listenerName(mailboxListener),
+                listenerName(listener),
                 eventName(event),
                 throwable))
             .subscribeOn(Schedulers.elastic())
             .then();
 
-        if (deliveryOption == DeliveryOption.NO_RETRY) {
-            return firstDelivery;
-        }
-
-        return firstDelivery
-            .retryBackoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff(), MAX_BACKOFF, retryBackoff.getJitterFactor())
-            .doOnError(throwable -> LOGGER.error("listener {} exceeded maximum retry({}) to handle event {}",
-                listenerName(mailboxListener),
-                retryBackoff.getMaxRetries(),
-                eventName(event),
-                throwable))
+        return deliveryOption.getRetrier().doRetry(deliveryToListener, event)
+            .onErrorResume(throwable -> deliveryOption.getPermanentFailureHandler().handle(event))
+            .subscribeWith(MonoProcessor.create())
+            .subscribeOn(Schedulers.elastic())
             .then();
     }
 
@@ -129,12 +95,4 @@ public class InVmEventDelivery implements EventDelivery {
             timer.stopAndPublish();
         }
     }
-
-    private String listenerName(MailboxListener mailboxListener) {
-        return mailboxListener.getClass().getCanonicalName();
-    }
-
-    private String eventName(Event event) {
-        return event.getClass().getCanonicalName();
-    }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/bb067a1b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/InVMEventBusTest.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/InVMEventBusTest.java b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/InVMEventBusTest.java
index c7e7edd..56a9cde 100644
--- a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/InVMEventBusTest.java
+++ b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/InVMEventBusTest.java
@@ -22,19 +22,18 @@ package org.apache.james.mailbox.events;
 import org.apache.james.mailbox.events.delivery.InVmEventDelivery;
 import org.apache.james.metrics.api.NoopMetricFactory;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.Test;
 
 public class InVMEventBusTest implements KeyContract.SingleEventBusKeyContract, GroupContract.SingleEventBusGroupContract,
     ErrorHandlingContract{
 
     private InVMEventBus eventBus;
+    private MemoryEventDeadLetters deadLetters;
 
     @BeforeEach
     void setUp() {
+        deadLetters = new MemoryEventDeadLetters();
         eventBus = new InVMEventBus(
-            new InVmEventDelivery(
-                new NoopMetricFactory(), RetryBackoffConfiguration.DEFAULT));
+            new InVmEventDelivery(new NoopMetricFactory()), RetryBackoffConfiguration.DEFAULT, deadLetters);
     }
 
     @Override
@@ -44,24 +43,6 @@ public class InVMEventBusTest implements KeyContract.SingleEventBusKeyContract,
 
     @Override
     public EventDeadLetters deadLetter() {
-        throw new RuntimeException("this method is not a part of this task contents, will be handled in another pull request");
-    }
-
-    @Test
-    @Disabled("this test is not a part of this task contents, will be handled in another pull request")
-    @Override
-    public void deadLettersIsNotAppliedForKeyRegistrations() {
-    }
-
-    @Test
-    @Disabled("this test is not a part of this task contents, will be handled in another pull request")
-    @Override
-    public void deadLetterShouldNotStoreWhenFailsLessThanMaxRetries() {
-    }
-
-    @Test
-    @Disabled("this test is not a part of this task contents, will be handled in another pull request")
-    @Override
-    public void deadLetterShouldStoreWhenFailsGreaterThanMaxRetries() {
+        return deadLetters;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/james-project/blob/bb067a1b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java
index c0ae5a1..dc122f6 100644
--- a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java
+++ b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java
@@ -19,359 +19,241 @@
 
 package org.apache.james.mailbox.events.delivery;
 
+import static org.apache.james.mailbox.events.EventBusTestFixture.EVENT;
+import static org.apache.james.mailbox.events.EventBusTestFixture.GROUP_A;
+import static org.apache.james.mailbox.events.EventBusTestFixture.MailboxListenerCountingSuccessfulExecution;
+import static org.apache.james.mailbox.events.delivery.EventDelivery.Retryer;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.junit.jupiter.api.Assertions.assertTimeout;
-import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import java.time.Duration;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.james.mailbox.MailboxListener;
+import org.apache.james.mailbox.events.MemoryEventDeadLetters;
 import org.apache.james.mailbox.events.RetryBackoffConfiguration;
-import org.apache.james.mailbox.util.EventCollector;
+import org.apache.james.mailbox.events.delivery.EventDelivery.DeliveryOption;
+import org.apache.james.mailbox.events.delivery.EventDelivery.PermanentFailureHandler;
+import org.apache.james.mailbox.events.delivery.EventDelivery.Retryer.BackoffRetryer;
 import org.apache.james.metrics.api.NoopMetricFactory;
 import org.assertj.core.api.SoftAssertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.Test;
 
-import com.google.common.collect.ImmutableList;
-
-import reactor.core.publisher.Mono;
-
 class InVmEventDeliveryTest {
-    private static final int DELIVERY_DELAY = (int) TimeUnit.MILLISECONDS.toMillis(100);
-
     private InVmEventDelivery inVmEventDelivery;
-    private MailboxListener listener;
-    private MailboxListener listener2;
+    private MailboxListenerCountingSuccessfulExecution listener;
     private MailboxListener.MailboxEvent event;
 
     @BeforeEach
     void setUp() {
         event = mock(MailboxListener.MailboxEvent.class);
-        listener = mock(MailboxListener.class);
-        listener2 = mock(MailboxListener.class);
-        inVmEventDelivery = new InVmEventDelivery(new NoopMetricFactory(), RetryBackoffConfiguration.DEFAULT);
+        listener = newListener();
+        inVmEventDelivery = new InVmEventDelivery(new NoopMetricFactory());
     }
 
-    @Nested
-    class ErrorHandling {
+    MailboxListenerCountingSuccessfulExecution newListener() {
+        return spy(new MailboxListenerCountingSuccessfulExecution());
+    }
 
-        class AsyncEventCollector extends EventCollector {
-            @Override
-            public ExecutionMode getExecutionMode() {
-                return ExecutionMode.ASYNCHRONOUS;
-            }
-        }
+    @Nested
+    class SynchronousListener {
 
-        private EventCollector syncEventCollector;
-        private EventCollector asyncEventCollector;
+        @Test
+        void deliverShouldDeliverEvent() {
+            when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
+            inVmEventDelivery.deliver(listener, EVENT, DeliveryOption.none())
+                .allListenerFuture()
+                .block();
 
-        @BeforeEach
-        void setUp() {
-            syncEventCollector = spy(new EventCollector());
-            asyncEventCollector = spy(new AsyncEventCollector());
+            assertThat(listener.numberOfEventCalls())
+                .isEqualTo(1);
         }
 
-        @Nested
-        class SynchronousOnly {
-            @Test
-            void deliverShouldNotDeliverEventToListenerWhenException() {
-                doThrow(RuntimeException.class).when(syncEventCollector).event(event);
-
-                assertThatThrownBy(() -> inVmEventDelivery
-                    .deliver(ImmutableList.of(syncEventCollector), event).allListenerFuture()
+        @Test
+        void deliverShouldReturnSuccessSynchronousMono() {
+            when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
+            assertThatCode(() -> inVmEventDelivery.deliver(listener, EVENT, DeliveryOption.none())
+                    .synchronousListenerFuture()
                     .block())
-                    .isInstanceOf(RuntimeException.class);
-
-                assertThat(syncEventCollector.getEvents())
-                    .isEmpty();
-            }
-
-            @Test
-            void deliverWithRetriesShouldNotDeliverEventToListenerWhenException() {
-                doThrow(RuntimeException.class).when(syncEventCollector).event(event);
-
-                assertThatThrownBy(() -> inVmEventDelivery
-                    .deliverWithRetries(ImmutableList.of(syncEventCollector), event).allListenerFuture()
-                    .block())
-                    .isInstanceOf(IllegalStateException.class);
-
-                assertThat(syncEventCollector.getEvents())
-                    .isEmpty();
-            }
-
-            @Test
-            void deliverShouldBeErrorWhenException() {
-                doThrow(new RuntimeException("mock exception")).when(syncEventCollector).event(event);
-
-                assertThatThrownBy(() -> inVmEventDelivery
-                    .deliver(ImmutableList.of(syncEventCollector), event).allListenerFuture()
-                    .block())
-                    .isInstanceOf(RuntimeException.class)
-                    .hasMessageContaining("mock exception");
-            }
-
-            @Test
-            void deliverWithRetriesShouldBeErrorWhenException() {
-                doThrow(RuntimeException.class).when(syncEventCollector).event(event);
-
-                assertThatThrownBy(() -> inVmEventDelivery
-                    .deliverWithRetries(ImmutableList.of(syncEventCollector), event).allListenerFuture()
-                    .block())
-                .isInstanceOf(IllegalStateException.class)
-                .hasMessageContaining("Retries exhausted");
-            }
+                .doesNotThrowAnyException();
         }
 
-        @Nested
-        class AsynchronousOnly {
-            @Test
-            void deliverShouldNotDeliverEventToListenerWhenException() {
-                doThrow(RuntimeException.class).when(asyncEventCollector).event(event);
+        @Test
+        void deliverShouldNotDeliverWhenListenerGetException() {
+            when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
+            doThrow(new RuntimeException())
+                .when(listener).event(EVENT);
 
-                assertThatThrownBy(() -> inVmEventDelivery
-                    .deliver(ImmutableList.of(asyncEventCollector), event).allListenerFuture()
-                    .block())
-                    .isInstanceOf(RuntimeException.class);
+            assertThatThrownBy(() -> inVmEventDelivery.deliver(listener, EVENT, DeliveryOption.none())
+                .allListenerFuture()
+                .block())
+            .isInstanceOf(RuntimeException.class);
 
-                assertThat(asyncEventCollector.getEvents())
-                    .isEmpty();
-            }
-
-            @Test
-            void deliverWithRetriesShouldNotDeliverEventToListenerWhenException() {
-                doThrow(RuntimeException.class).when(asyncEventCollector).event(event);
-
-                assertThatThrownBy(() -> inVmEventDelivery
-                    .deliverWithRetries(ImmutableList.of(asyncEventCollector), event).allListenerFuture()
-                    .block())
-                    .isInstanceOf(IllegalStateException.class);
+            assertThat(listener.numberOfEventCalls())
+                .isEqualTo(0);
+        }
 
-                assertThat(asyncEventCollector.getEvents())
-                    .isEmpty();
-            }
+        @Test
+        void deliverShouldReturnAnErrorMonoWhenListenerGetException() {
+            when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
+            doThrow(new RuntimeException())
+                .when(listener).event(EVENT);
 
-            @Test
-            void deliverShouldBeErrorWhenException() {
-                doThrow(new RuntimeException("mock exception")).when(asyncEventCollector).event(event);
+            assertThatThrownBy(() -> inVmEventDelivery.deliver(listener, EVENT, DeliveryOption.none())
+                .synchronousListenerFuture()
+                .block())
+            .isInstanceOf(RuntimeException.class);
+        }
+    }
 
-                assertThatThrownBy(() -> inVmEventDelivery
-                    .deliver(ImmutableList.of(asyncEventCollector), event).allListenerFuture()
-                    .block())
-                    .isInstanceOf(RuntimeException.class)
-                    .hasMessageContaining("mock exception");
-            }
+    @Nested
+    class AsynchronousListener {
 
-            @Test
-            void deliverWithRetriesShouldBeErrorWhenException() {
-                doThrow(RuntimeException.class).when(asyncEventCollector).event(event);
+        @Test
+        void deliverShouldDeliverEvent() {
+            when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
+            inVmEventDelivery.deliver(listener, EVENT, DeliveryOption.none())
+                .allListenerFuture()
+                .block();
 
-                assertThatThrownBy(() -> inVmEventDelivery
-                    .deliverWithRetries(ImmutableList.of(asyncEventCollector), event).allListenerFuture()
-                    .block())
-                    .isInstanceOf(IllegalStateException.class)
-                    .hasMessageContaining("Retries exhausted");
-            }
+            assertThat(listener.numberOfEventCalls())
+                .isEqualTo(1);
         }
 
-        @Nested
-        class BothAsynchronousAndSynchronous {
-            @Test
-            void deliverShouldDeliverEventToSyncListenerWhenAsyncGetException() {
-                doThrow(RuntimeException.class).when(asyncEventCollector).event(event);
-
-                assertThatThrownBy(() -> inVmEventDelivery
-                    .deliver(ImmutableList.of(asyncEventCollector, syncEventCollector), event).allListenerFuture()
+        @Test
+        void deliverShouldReturnSuccessSynchronousMono() {
+            when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
+            assertThatCode(() -> inVmEventDelivery.deliver(listener, EVENT, DeliveryOption.none())
+                    .synchronousListenerFuture()
                     .block())
-                    .isInstanceOf(RuntimeException.class);
-
-                SoftAssertions.assertSoftly(softly -> {
-                    softly.assertThat(asyncEventCollector.getEvents()).isEmpty();
-                    softly.assertThat(syncEventCollector.getEvents()).hasSize(1);
-                });
-
-            }
+                .doesNotThrowAnyException();
+        }
 
-            @Test
-            void deliverWithRetriesShouldDeliverEventToSyncListenerWhenAsyncGetException() {
-                doThrow(RuntimeException.class).when(asyncEventCollector).event(event);
+        @Test
+        void deliverShouldNotDeliverWhenListenerGetException() {
+            when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
+            doThrow(new RuntimeException())
+                .when(listener).event(EVENT);
 
-                assertThatThrownBy(() -> inVmEventDelivery
-                    .deliverWithRetries(ImmutableList.of(asyncEventCollector, syncEventCollector), event).allListenerFuture()
-                    .block())
-                    .isInstanceOf(IllegalStateException.class)
-                    .hasMessageContaining("Retries exhausted");
-
-                SoftAssertions.assertSoftly(softly -> {
-                    softly.assertThat(asyncEventCollector.getEvents()).isEmpty();
-                    softly.assertThat(syncEventCollector.getEvents()).hasSize(1);
-                });
-            }
-
-            @Test
-            void deliverShouldDeliverEventToAsyncListenerWhenSyncGetException() {
-                doThrow(RuntimeException.class).when(syncEventCollector).event(event);
-
-                inVmEventDelivery.deliver(ImmutableList.of(asyncEventCollector, syncEventCollector), event).allListenerFuture()
-                    .onErrorResume(e -> Mono.empty())
-                    .block();
-
-                SoftAssertions.assertSoftly(softly -> {
-                    softly.assertThat(syncEventCollector.getEvents()).isEmpty();
-                    softly.assertThat(asyncEventCollector.getEvents()).hasSize(1);
-                });
-            }
-
-            @Test
-            void deliverWithRetriesShouldDeliverEventToAsyncListenerWhenSyncGetException() {
-                doThrow(RuntimeException.class).when(syncEventCollector).event(event);
-
-                inVmEventDelivery.deliverWithRetries(ImmutableList.of(asyncEventCollector, syncEventCollector), event).allListenerFuture()
-                    .onErrorResume(e -> Mono.empty())
-                    .block();
-
-                SoftAssertions.assertSoftly(softly -> {
-                    softly.assertThat(syncEventCollector.getEvents()).isEmpty();
-                    softly.assertThat(asyncEventCollector.getEvents()).hasSize(1);
-                });
-            }
-
-            @Test
-            void deliverShouldBeErrorWhenException() {
-                doThrow(new RuntimeException("mock exception")).when(syncEventCollector).event(event);
-                doThrow(new RuntimeException("mock exception")).when(asyncEventCollector).event(event);
-
-                assertThatThrownBy(() -> inVmEventDelivery
-                    .deliver(ImmutableList.of(asyncEventCollector), event).allListenerFuture()
-                    .block())
-                    .isInstanceOf(RuntimeException.class)
-                    .hasMessageContaining("mock exception");
-            }
+            assertThatThrownBy(() -> inVmEventDelivery.deliver(listener, EVENT, DeliveryOption.none())
+                .allListenerFuture()
+                .block())
+            .isInstanceOf(RuntimeException.class);
 
-            @Test
-            void deliverWithRetriesShouldBeErrorWhenException() {
-                doThrow(RuntimeException.class).when(syncEventCollector).event(event);
-                doThrow(RuntimeException.class).when(asyncEventCollector).event(event);
-
-                assertThatThrownBy(() -> inVmEventDelivery
-                    .deliverWithRetries(ImmutableList.of(asyncEventCollector), event).allListenerFuture()
-                    .block())
-                .isInstanceOf(IllegalStateException.class)
-                .hasMessageContaining("Retries exhausted");
-            }
+            assertThat(listener.numberOfEventCalls())
+                .isEqualTo(0);
         }
-    }
-
-    @Test
-    void deliverShouldHaveCalledSynchronousListenersWhenAllListenerExecutedJoined() throws Exception {
-        when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
 
-        inVmEventDelivery.deliver(ImmutableList.of(listener), event).allListenerFuture().block();
+        @Test
+        void deliverShouldReturnAnSuccessSyncMonoWhenListenerGetException() {
+            when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
+            doThrow(new RuntimeException())
+                .when(listener).event(EVENT);
 
-        verify(listener).event(event);
+            assertThatCode(() -> inVmEventDelivery.deliver(listener, EVENT, DeliveryOption.none())
+                .synchronousListenerFuture()
+                .block())
+            .doesNotThrowAnyException();
+        }
     }
 
-    @Test
-    void deliverShouldHaveCalledAsynchronousListenersWhenAllListenerExecutedJoined() throws Exception {
-        when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
-
-        inVmEventDelivery.deliver(ImmutableList.of(listener), event).allListenerFuture().block();
+    @Nested
+    class WithOptions {
+
+        @Test
+        void retryShouldWorkWhenDeliverWithRetry() {
+            MailboxListenerCountingSuccessfulExecution listener = newListener();
+            doThrow(new RuntimeException())
+                .doThrow(new RuntimeException())
+                .doThrow(new RuntimeException())
+                .doCallRealMethod()
+                .when(listener).event(EVENT);
+
+            inVmEventDelivery.deliver(listener, EVENT,
+                DeliveryOption.of(
+                    BackoffRetryer.of(RetryBackoffConfiguration.DEFAULT, listener),
+                    PermanentFailureHandler.NO_HANDLER))
+                .allListenerFuture()
+                .block();
+
+            assertThat(listener.numberOfEventCalls())
+                .isEqualTo(1);
+        }
 
-        verify(listener).event(event);
-    }
+        @Test
+        void failureHandlerShouldWorkWhenDeliverWithFailureHandler() {
+            MailboxListenerCountingSuccessfulExecution listener = newListener();
+            doThrow(new RuntimeException())
+                .when(listener).event(EVENT);
 
-    @Test
-    void deliverShouldHaveCalledSynchronousListenersWhenSynchronousListenerExecutedJoined() throws Exception {
-        when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
+            MemoryEventDeadLetters deadLetter = new MemoryEventDeadLetters();
 
-        inVmEventDelivery.deliver(ImmutableList.of(listener), event).synchronousListenerFuture().block();
+            inVmEventDelivery.deliver(listener, EVENT,
+                DeliveryOption.of(
+                    Retryer.NO_RETRYER,
+                    PermanentFailureHandler.StoreToDeadLetters.of(GROUP_A, deadLetter)))
+                .allListenerFuture()
+                .block();
 
-        verify(listener).event(event);
-    }
+            assertThat(deadLetter.groupsWithFailedEvents().toStream())
+                .containsOnly(GROUP_A);
+        }
 
-    @Test
-    void deliverShouldNotBlockOnAsynchronousListenersWhenSynchronousListenerExecutedJoined() throws Exception {
-        when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
-        CountDownLatch latch = new CountDownLatch(1);
-        doAnswer(invocation -> {
-            latch.await();
-            return null;
-        }).when(listener).event(event);
-
-        assertTimeout(Duration.ofSeconds(2),
-            () -> {
-                inVmEventDelivery.deliver(ImmutableList.of(listener), event).synchronousListenerFuture().block();
-                latch.countDown();
+        @Test
+        void failureHandlerShouldNotWorkWhenRetrySuccess() {
+            MailboxListenerCountingSuccessfulExecution listener = newListener();
+            doThrow(new RuntimeException())
+                .doThrow(new RuntimeException())
+                .doCallRealMethod()
+                .when(listener).event(EVENT);
+
+            MemoryEventDeadLetters deadLetter = new MemoryEventDeadLetters();
+
+            inVmEventDelivery.deliver(listener, EVENT,
+                DeliveryOption.of(
+                    BackoffRetryer.of(RetryBackoffConfiguration.DEFAULT, listener),
+                    PermanentFailureHandler.StoreToDeadLetters.of(GROUP_A, deadLetter)))
+                .allListenerFuture()
+                .block();
+
+            SoftAssertions.assertSoftly(softy -> {
+                softy.assertThat(listener.numberOfEventCalls())
+                    .isEqualTo(1);
+                softy.assertThat(deadLetter.groupsWithFailedEvents().toStream())
+                    .isEmpty();
             });
-    }
+        }
 
-    @Test
-    void deliverShouldNotBlockOnSynchronousListenersWhenNoJoin() throws Exception {
-        when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
-        CountDownLatch latch = new CountDownLatch(1);
-        doAnswer(invocation -> {
-            latch.await();
-            return null;
-        }).when(listener).event(event);
-
-        assertTimeout(Duration.ofSeconds(2),
-            () -> {
-                inVmEventDelivery.deliver(ImmutableList.of(listener), event);
-                latch.countDown();
-            });
-    }
 
-    @Test
-    void deliverShouldNotBlockOnAsynchronousListenersWhenNoJoin() throws Exception {
-        when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
-        CountDownLatch latch = new CountDownLatch(1);
-        doAnswer(invocation -> {
-            latch.await();
-            return null;
-        }).when(listener).event(event);
-
-        assertTimeout(Duration.ofSeconds(2),
-            () -> {
-                inVmEventDelivery.deliver(ImmutableList.of(listener), event);
-                latch.countDown();
+        @Test
+        void failureHandlerShouldWorkWhenRetryFails() {
+            MailboxListenerCountingSuccessfulExecution listener = newListener();
+            doThrow(new RuntimeException())
+                .doThrow(new RuntimeException())
+                .doThrow(new RuntimeException())
+                .doThrow(new RuntimeException())
+                .doCallRealMethod()
+                .when(listener).event(EVENT);
+
+            MemoryEventDeadLetters deadLetter = new MemoryEventDeadLetters();
+
+            inVmEventDelivery.deliver(listener, EVENT,
+                DeliveryOption.of(
+                    BackoffRetryer.of(RetryBackoffConfiguration.DEFAULT, listener),
+                    PermanentFailureHandler.StoreToDeadLetters.of(GROUP_A, deadLetter)))
+                .allListenerFuture()
+                .block();
+
+            SoftAssertions.assertSoftly(softy -> {
+                softy.assertThat(listener.numberOfEventCalls())
+                    .isEqualTo(0);
+                assertThat(deadLetter.groupsWithFailedEvents().toStream())
+                    .containsOnly(GROUP_A);
             });
-    }
-
-    @Test
-    void deliverShouldEventuallyDeliverAsynchronousListenersWhenSynchronousListenerExecutedJoined() throws Exception {
-        when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
-
-        inVmEventDelivery.deliver(ImmutableList.of(listener), event).synchronousListenerFuture().block();
-
-        verify(listener, timeout(DELIVERY_DELAY * 10)).event(event);
-    }
-
-    @Test
-    void deliverShouldEventuallyDeliverSynchronousListenersWhenNoJoin() throws Exception {
-        when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
-
-        inVmEventDelivery.deliver(ImmutableList.of(listener), event);
-
-        verify(listener, timeout(DELIVERY_DELAY * 10)).event(event);
-    }
-
-    @Test
-    void deliverShouldCallSynchronousListenersWhenAsynchronousListenersAreAlsoRegistered() throws Exception {
-        when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
-        when(listener2.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
-
-        inVmEventDelivery.deliver(ImmutableList.of(listener, listener2), event).synchronousListenerFuture().block();
-
-        verify(listener2).event(event);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/bb067a1b/mailbox/spring/src/main/resources/META-INF/spring/event-system.xml
----------------------------------------------------------------------
diff --git a/mailbox/spring/src/main/resources/META-INF/spring/event-system.xml b/mailbox/spring/src/main/resources/META-INF/spring/event-system.xml
index 609d183..5273359 100644
--- a/mailbox/spring/src/main/resources/META-INF/spring/event-system.xml
+++ b/mailbox/spring/src/main/resources/META-INF/spring/event-system.xml
@@ -27,12 +27,13 @@
 
     <bean id="event-bus" class="org.apache.james.mailbox.events.InVMEventBus" lazy-init="true">
         <constructor-arg index="0" ref="event-delivery"/>
+        <constructor-arg index="1">
+            <util:constant static-field="org.apache.james.mailbox.events.RetryBackoffConfiguration.DEFAULT"/>
+        </constructor-arg>
+        <constructor-arg index="2" ref="event-deadletters"/>
     </bean>
     <bean id="event-delivery" class="org.apache.james.mailbox.events.delivery.InVmEventDelivery" lazy-init="true">
         <constructor-arg index="0" ref="metricFactory"/>
-        <constructor-arg>
-            <util:constant static-field="org.apache.james.mailbox.events.RetryBackoffConfiguration.DEFAULT"/>
-        </constructor-arg>
     </bean>
-
+    <bean id="event-deadletters" class="org.apache.james.mailbox.events.MemoryEventDeadLetters" lazy-init="true"/>
 </beans>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/james-project/blob/bb067a1b/mailbox/store/src/test/java/org/apache/james/mailbox/store/MessageIdManagerTestSystem.java
----------------------------------------------------------------------
diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/MessageIdManagerTestSystem.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/MessageIdManagerTestSystem.java
index 1661de2..6ed8ee0 100644
--- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/MessageIdManagerTestSystem.java
+++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/MessageIdManagerTestSystem.java
@@ -89,7 +89,8 @@ public class MessageIdManagerTestSystem {
                 .mailbox(mailbox)
                 .addMessage(message)
                 .build(),
-                new MailboxIdRegistrationKey(mailboxId));
+                new MailboxIdRegistrationKey(mailboxId))
+            .block();
             return messageId;
         } catch (Exception e) {
             throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/james-project/blob/bb067a1b/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java
----------------------------------------------------------------------
diff --git a/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java b/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java
index 45dc122..e09153d 100644
--- a/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java
+++ b/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java
@@ -52,8 +52,8 @@ public class DefaultEventModule extends AbstractModule {
         bind(InVmEventDelivery.class).in(Scopes.SINGLETON);
         bind(InVMEventBus.class).in(Scopes.SINGLETON);
         bind(MemoryEventDeadLetters.class).in(Scopes.SINGLETON);
-        bind(EventDeadLetters.class).to(MemoryEventDeadLetters.class);
 
+        bind(EventDeadLetters.class).to(MemoryEventDeadLetters.class);
         bind(MailboxListenersLoader.class).to(MailboxListenersLoaderImpl.class);
         bind(EventDelivery.class).to(InVmEventDelivery.class);
         bind(EventBus.class).to(InVMEventBus.class);


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[2/2] james-project git commit: JAMES-2578 Fix warning using Mailet new API

Posted by bt...@apache.org.
JAMES-2578 Fix warning using Mailet new API


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/606ff9c9
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/606ff9c9
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/606ff9c9

Branch: refs/heads/master
Commit: 606ff9c93dd8733b36a601ef7ab3449c3583b0f0
Parents: bb067a1
Author: Gautier DI FOLCO <gd...@linagora.com>
Authored: Mon Jan 21 10:12:10 2019 +0100
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Wed Jan 23 10:04:20 2019 +0700

----------------------------------------------------------------------
 .../mailets/RemoveMailAttributeTest.java        | 39 +++++++++++---------
 .../org/apache/james/server/core/MailImpl.java  | 13 ++++---
 .../samples/mailets/InstrumentationMailet.java  |  7 ++--
 .../mailets/WithStorageDirectiveTest.java       | 15 +++++---
 .../CassandraMailRepositoryMailDAOTest.java     | 16 +++++---
 5 files changed, 52 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/606ff9c9/mailet/standard/src/test/java/org/apache/james/transport/mailets/RemoveMailAttributeTest.java
----------------------------------------------------------------------
diff --git a/mailet/standard/src/test/java/org/apache/james/transport/mailets/RemoveMailAttributeTest.java b/mailet/standard/src/test/java/org/apache/james/transport/mailets/RemoveMailAttributeTest.java
index 1881dd3..85a0dd3 100644
--- a/mailet/standard/src/test/java/org/apache/james/transport/mailets/RemoveMailAttributeTest.java
+++ b/mailet/standard/src/test/java/org/apache/james/transport/mailets/RemoveMailAttributeTest.java
@@ -24,6 +24,9 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 import javax.mail.MessagingException;
 
+import org.apache.mailet.Attribute;
+import org.apache.mailet.AttributeName;
+import org.apache.mailet.AttributeValue;
 import org.apache.mailet.Mail;
 import org.apache.mailet.Mailet;
 import org.apache.mailet.MailetException;
@@ -34,12 +37,12 @@ import org.junit.jupiter.api.Test;
 
 class RemoveMailAttributeTest {
 
-    private static final String ATTRIBUTE_1 = "attribute1";
-    private static final String ATTRIBUTE_2 = "attribute2";
-    private static final String ATTRIBUTE_3 = "attribute3";
-    private static final String VALUE_1 = "value1";
-    private static final String VALUE_2 = "value2";
-    private static final String VALUE_3 = "value3";
+    private static final AttributeName ATTRIBUTE_1 = AttributeName.of("attribute1");
+    private static final AttributeName ATTRIBUTE_2 = AttributeName.of("attribute2");
+    private static final AttributeName ATTRIBUTE_3 = AttributeName.of("attribute3");
+    private static final Attribute VALUE_1 = new Attribute(ATTRIBUTE_1, AttributeValue.of("value1"));
+    private static final Attribute VALUE_2 = new Attribute(ATTRIBUTE_2, AttributeValue.of("value2"));
+    private static final Attribute VALUE_3 = new Attribute(ATTRIBUTE_3, AttributeValue.of("value3"));
     private static final String ATTRIBUTE1_ATTRIBUTE2 = "attribute1, attribute2";
     private Mailet removeMailet;
 
@@ -77,7 +80,7 @@ class RemoveMailAttributeTest {
         Mail mail = FakeMail.builder().build();
         removeMailet.service(mail);
 
-        assertThat(mail.getAttributeNames()).isEmpty();
+        assertThat(mail.attributes()).isEmpty();
     }
 
     @Test
@@ -89,29 +92,29 @@ class RemoveMailAttributeTest {
         removeMailet.init(mailetConfig);
 
         Mail mail = FakeMail.builder()
-            .attribute(ATTRIBUTE_3, VALUE_3)
+            .attribute(VALUE_3)
             .build();
         removeMailet.service(mail);
 
-        assertThat(mail.getAttributeNames()).containsExactly(ATTRIBUTE_3);
+        assertThat(mail.attributes()).containsExactly(VALUE_3);
     }
 
     @Test
     void serviceShouldRemoveSpecifiedAttribute() throws MessagingException {
         FakeMailetConfig mailetConfig = FakeMailetConfig.builder()
                 .mailetName("Test")
-                .setProperty(RemoveMailAttribute.MAILET_NAME_PARAMETER, ATTRIBUTE_1)
+                .setProperty(RemoveMailAttribute.MAILET_NAME_PARAMETER, ATTRIBUTE_1.asString())
                 .build();
         removeMailet.init(mailetConfig);
 
         Mail mail = FakeMail.builder()
-            .attribute(ATTRIBUTE_1, VALUE_1)
-            .attribute(ATTRIBUTE_2, VALUE_2)
-            .attribute(ATTRIBUTE_3, VALUE_3)
+            .attribute(VALUE_1)
+            .attribute(VALUE_2)
+            .attribute(VALUE_3)
             .build();
         removeMailet.service(mail);
 
-        assertThat(mail.getAttributeNames()).containsOnly(ATTRIBUTE_2, ATTRIBUTE_3);
+        assertThat(mail.attributes()).containsOnly(VALUE_2, VALUE_3);
     }
 
     @Test
@@ -123,12 +126,12 @@ class RemoveMailAttributeTest {
         removeMailet.init(mailetConfig);
 
         Mail mail = FakeMail.builder()
-            .attribute(ATTRIBUTE_1, VALUE_1)
-            .attribute(ATTRIBUTE_2, VALUE_2)
-            .attribute(ATTRIBUTE_3, VALUE_3)
+            .attribute(VALUE_1)
+            .attribute(VALUE_2)
+            .attribute(VALUE_3)
             .build();
         removeMailet.service(mail);
 
-        assertThat(mail.getAttributeNames()).containsExactly(ATTRIBUTE_3);
+        assertThat(mail.attributes()).containsExactly(VALUE_3);
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/606ff9c9/server/container/core/src/main/java/org/apache/james/server/core/MailImpl.java
----------------------------------------------------------------------
diff --git a/server/container/core/src/main/java/org/apache/james/server/core/MailImpl.java b/server/container/core/src/main/java/org/apache/james/server/core/MailImpl.java
index 6b48478..9d97f30 100644
--- a/server/container/core/src/main/java/org/apache/james/server/core/MailImpl.java
+++ b/server/container/core/src/main/java/org/apache/james/server/core/MailImpl.java
@@ -64,6 +64,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.github.fge.lambdas.Throwing;
+import com.github.steveash.guavate.Guavate;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
@@ -442,12 +443,12 @@ public class MailImpl implements Disposable, Mail {
             if (mail instanceof MailImpl) {
                 setAttributesRaw((Map<String, Object>) cloneSerializableObject(((MailImpl) mail).getAttributesRaw()));
             } else {
-                HashMap<String, Object> attribs = new HashMap<>();
-                for (Iterator<String> i = mail.getAttributeNames(); i.hasNext(); ) {
-                    String hashKey = i.next();
-                    attribs.put(hashKey, cloneSerializableObject(mail.getAttribute(hashKey)));
-                }
-                setAttributesRaw(attribs);
+                ImmutableMap<String, Object> attributesMap = mail.attributes()
+                    .collect(Guavate.toImmutableMap(
+                            attribute -> attribute.getName().asString(),
+                            Throwing.function(attribute -> cloneSerializableObject(attribute.getValue().getValue()))));
+
+                setAttributesRaw(attributesMap);
             }
         } catch (IOException | ClassNotFoundException e) {
             LOGGER.error("Error while deserializing attributes", e);

http://git-wip-us.apache.org/repos/asf/james-project/blob/606ff9c9/server/mailet/mailets/src/test/java/org/apache/james/samples/mailets/InstrumentationMailet.java
----------------------------------------------------------------------
diff --git a/server/mailet/mailets/src/test/java/org/apache/james/samples/mailets/InstrumentationMailet.java b/server/mailet/mailets/src/test/java/org/apache/james/samples/mailets/InstrumentationMailet.java
index 05aa60f..4a68bb2 100644
--- a/server/mailet/mailets/src/test/java/org/apache/james/samples/mailets/InstrumentationMailet.java
+++ b/server/mailet/mailets/src/test/java/org/apache/james/samples/mailets/InstrumentationMailet.java
@@ -79,10 +79,9 @@ public class InstrumentationMailet implements Mailet {
         if (LOGGER.isInfoEnabled()) {
             LOGGER.info("Mail named: " + mail.getName());
 
-            for (Iterator<String> it = mail.getAttributeNames(); it.hasNext();) {
-                String attributeName = it.next();
-                LOGGER.info("Attribute " + attributeName);
-            }
+            mail.attributeNames()
+                .forEach(attributeName -> LOGGER.info("Attribute " + attributeName));
+
             LOGGER.info("Message size: " + mail.getMessageSize());
             LOGGER.info("Last updated: " + mail.getLastUpdated());
             LOGGER.info("Remote Address: " + mail.getRemoteAddr());

http://git-wip-us.apache.org/repos/asf/james-project/blob/606ff9c9/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/WithStorageDirectiveTest.java
----------------------------------------------------------------------
diff --git a/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/WithStorageDirectiveTest.java b/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/WithStorageDirectiveTest.java
index 9fcebe5..2f61236 100644
--- a/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/WithStorageDirectiveTest.java
+++ b/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/WithStorageDirectiveTest.java
@@ -23,6 +23,9 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 import org.apache.james.user.memory.MemoryUsersRepository;
+import org.apache.mailet.Attribute;
+import org.apache.mailet.AttributeName;
+import org.apache.mailet.AttributeValue;
 import org.apache.mailet.base.MailAddressFixture;
 import org.apache.mailet.base.test.FakeMail;
 import org.apache.mailet.base.test.FakeMailetConfig;
@@ -71,10 +74,12 @@ public class WithStorageDirectiveTest {
 
         testee.service(mail);
 
-        softly.assertThat(mail.getAttributeNames())
-            .containsOnly("DeliveryPath_recipient2@localhost", "DeliveryPath_recipient1@localhost");
-        softly.assertThat(mail.getAttribute("DeliveryPath_recipient1@localhost")).isEqualTo(targetFolderName);
-        softly.assertThat(mail.getAttribute("DeliveryPath_recipient2@localhost")).isEqualTo(targetFolderName);
+        AttributeName recipient1 = AttributeName.of("DeliveryPath_recipient1@localhost");
+        AttributeName recipient2 = AttributeName.of("DeliveryPath_recipient2@localhost");
+        softly.assertThat(mail.attributes())
+            .containsOnly(
+                new Attribute(recipient1, AttributeValue.of(targetFolderName)),
+                new Attribute(recipient2, AttributeValue.of(targetFolderName)));
     }
 
     @Test
@@ -90,7 +95,7 @@ public class WithStorageDirectiveTest {
 
         testee.service(mail);
 
-        assertThat(mail.getAttributeNames())
+        assertThat(mail.attributes())
             .isEmpty();
     }
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/606ff9c9/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAOTest.java
----------------------------------------------------------------------
diff --git a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAOTest.java b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAOTest.java
index c187823..2377b6b 100644
--- a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAOTest.java
+++ b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAOTest.java
@@ -29,6 +29,8 @@ import javax.mail.MessagingException;
 
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.CassandraClusterExtension;
+import org.apache.james.backends.cassandra.components.CassandraModule;
+import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
 import org.apache.james.blob.api.BlobId;
 import org.apache.james.blob.api.TestBlobId;
 import org.apache.james.mailrepository.api.MailKey;
@@ -52,8 +54,12 @@ class CassandraMailRepositoryMailDAOTest {
     static final MailKey KEY_1 = new MailKey("key1");
     static final TestBlobId.Factory BLOB_ID_FACTORY = new TestBlobId.Factory();
 
+    public static final CassandraModule MODULE = CassandraModule.aggregateModules(
+            CassandraMailRepositoryModule.MODULE,
+            CassandraSchemaVersionModule.MODULE);
+
     @RegisterExtension
-    static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraMailRepositoryModule.MODULE);
+    static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(MODULE);
 
 
     abstract class TestSuite {
@@ -206,9 +212,9 @@ class CassandraMailRepositoryMailDAOTest {
             String remoteAddr = "remoteAddr";
             String remoteHost = "remoteHost";
             PerRecipientHeaders.Header header = PerRecipientHeaders.Header.builder().name("headerName").value("headerValue").build();
-            String attributeName = "att1";
+            AttributeName attributeName = AttributeName.of("att1");
             List<AttributeValue<?>> attributeValue = ImmutableList.of(AttributeValue.of("value1"), AttributeValue.of("value2"));
-            Attribute attribute = new Attribute(AttributeName.of(attributeName), AttributeValue.of(attributeValue));
+            Attribute attribute = new Attribute(attributeName, AttributeValue.of(attributeValue));
             List<Attribute> attributes = ImmutableList.of(attribute);
 
             testee.store(URL,
@@ -238,8 +244,8 @@ class CassandraMailRepositoryMailDAOTest {
                 softly.assertThat(partialMail.getState()).isEqualTo(state);
                 softly.assertThat(partialMail.getRemoteAddr()).isEqualTo(remoteAddr);
                 softly.assertThat(partialMail.getRemoteHost()).isEqualTo(remoteHost);
-                softly.assertThat(partialMail.getAttributeNames()).containsOnly(attributeName);
-                softly.assertThat(partialMail.getAttribute(AttributeName.of(attributeName))).contains(attribute);
+                softly.assertThat(partialMail.attributeNames()).containsOnly(attributeName);
+                softly.assertThat(partialMail.getAttribute(attributeName)).contains(attribute);
                 softly.assertThat(partialMail.getPerRecipientSpecificHeaders().getRecipientsWithSpecificHeaders())
                     .containsOnly(MailAddressFixture.RECIPIENT1);
                 softly.assertThat(partialMail.getPerRecipientSpecificHeaders().getHeadersForRecipient(MailAddressFixture.RECIPIENT1))


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org