You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/01/23 03:04:55 UTC
[1/2] james-project git commit: MAILBOX-376 Plug EventDeadLetter into
InVmEventBus
Repository: james-project
Updated Branches:
refs/heads/master d51c70854 -> 606ff9c93
MAILBOX-376 Plug EventDeadLetter into InVmEventBus
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/bb067a1b
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/bb067a1b
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/bb067a1b
Branch: refs/heads/master
Commit: bb067a1bcb18752c86c0f6bc52096b980e96e7ee
Parents: d51c708
Author: tran tien duc <dt...@linagora.com>
Authored: Thu Jan 17 18:27:50 2019 +0700
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Wed Jan 23 10:00:08 2019 +0700
----------------------------------------------------------------------
.../mailbox/events/EventBusTestFixture.java | 2 +-
.../james/mailbox/events/InVMEventBus.java | 40 +-
.../mailbox/events/MemoryEventDeadLetters.java | 2 +-
.../mailbox/events/delivery/EventDelivery.java | 129 ++++-
.../events/delivery/InVmEventDelivery.java | 86 +---
.../james/mailbox/events/InVMEventBusTest.java | 27 +-
.../events/delivery/InVmEventDeliveryTest.java | 472 +++++++------------
.../resources/META-INF/spring/event-system.xml | 9 +-
.../store/MessageIdManagerTestSystem.java | 3 +-
.../modules/mailbox/DefaultEventModule.java | 2 +-
10 files changed, 372 insertions(+), 400 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/bb067a1b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java
index 263a439..d11571c 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java
@@ -50,7 +50,7 @@ public interface EventBusTestFixture {
calls.incrementAndGet();
}
- int numberOfEventCalls() {
+ public int numberOfEventCalls() {
return calls.get();
}
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/bb067a1b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/InVMEventBus.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/InVMEventBus.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/InVMEventBus.java
index 3bb11b0..5da4adb 100644
--- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/InVMEventBus.java
+++ b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/InVMEventBus.java
@@ -27,8 +27,11 @@ import javax.inject.Inject;
import org.apache.james.mailbox.Event;
import org.apache.james.mailbox.MailboxListener;
import org.apache.james.mailbox.events.delivery.EventDelivery;
+import org.apache.james.mailbox.events.delivery.EventDelivery.PermanentFailureHandler.StoreToDeadLetters;
+import org.apache.james.mailbox.events.delivery.EventDelivery.Retryer.BackoffRetryer;
import com.github.steveash.guavate.Guavate;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
@@ -37,17 +40,27 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class InVMEventBus implements EventBus {
+
private final Multimap<RegistrationKey, MailboxListener> registrations;
private final ConcurrentHashMap<Group, MailboxListener> groups;
private final EventDelivery eventDelivery;
+ private final RetryBackoffConfiguration retryBackoff;
+ private final EventDeadLetters eventDeadLetters;
@Inject
- public InVMEventBus(EventDelivery eventDelivery) {
+ public InVMEventBus(EventDelivery eventDelivery, RetryBackoffConfiguration retryBackoff, EventDeadLetters eventDeadLetters) {
this.eventDelivery = eventDelivery;
+ this.retryBackoff = retryBackoff;
+ this.eventDeadLetters = eventDeadLetters;
this.registrations = Multimaps.synchronizedSetMultimap(HashMultimap.create());
this.groups = new ConcurrentHashMap<>();
}
+ @VisibleForTesting
+ public InVMEventBus(EventDelivery eventDelivery) {
+ this(eventDelivery, RetryBackoffConfiguration.DEFAULT, new MemoryEventDeadLetters());
+ }
+
@Override
public Registration register(MailboxListener listener, RegistrationKey key) {
registrations.put(key, listener);
@@ -66,15 +79,34 @@ public class InVMEventBus implements EventBus {
@Override
public Mono<Void> dispatch(Event event, Set<RegistrationKey> keys) {
if (!event.isNoop()) {
- return Flux.merge(
- eventDelivery.deliverWithRetries(groups.values(), event).synchronousListenerFuture(),
- eventDelivery.deliver(registeredListenersByKeys(keys), event).synchronousListenerFuture())
+ return Flux.merge(groupDeliveries(event), keyDeliveries(event, keys))
+ .reduceWith(EventDelivery.ExecutionStages::empty, EventDelivery.ExecutionStages::combine)
+ .flatMap(EventDelivery.ExecutionStages::synchronousListenerFuture)
.then()
.onErrorResume(throwable -> Mono.empty());
}
return Mono.empty();
}
+ private Flux<EventDelivery.ExecutionStages> keyDeliveries(Event event, Set<RegistrationKey> keys) {
+ return Flux.fromIterable(registeredListenersByKeys(keys))
+ .map(listener -> eventDelivery.deliver(listener, event, EventDelivery.DeliveryOption.none()));
+ }
+
+ private Flux<EventDelivery.ExecutionStages> groupDeliveries(Event event) {
+ return Flux.fromIterable(groups.entrySet())
+ .map(entry -> groupDelivery(event, entry.getValue(), entry.getKey()));
+ }
+
+ private EventDelivery.ExecutionStages groupDelivery(Event event, MailboxListener mailboxListener, Group group) {
+ return eventDelivery.deliver(
+ mailboxListener,
+ event,
+ EventDelivery.DeliveryOption.of(
+ BackoffRetryer.of(retryBackoff, mailboxListener),
+ StoreToDeadLetters.of(group, eventDeadLetters)));
+ }
+
public Set<Group> registeredGroups() {
return groups.keySet();
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/bb067a1b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/MemoryEventDeadLetters.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/MemoryEventDeadLetters.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/MemoryEventDeadLetters.java
index 8464ed9..122763a 100644
--- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/MemoryEventDeadLetters.java
+++ b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/MemoryEventDeadLetters.java
@@ -38,7 +38,7 @@ public class MemoryEventDeadLetters implements EventDeadLetters {
private final Multimap<Group, Event> deadLetters;
- MemoryEventDeadLetters() {
+ public MemoryEventDeadLetters() {
this.deadLetters = Multimaps.synchronizedSetMultimap(HashMultimap.create());
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/bb067a1b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java
index 359e4d7..e34cb31 100644
--- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java
+++ b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java
@@ -19,19 +19,133 @@
package org.apache.james.mailbox.events.delivery;
-import java.util.Collection;
+import static org.apache.james.mailbox.events.delivery.EventDelivery.PermanentFailureHandler.NO_HANDLER;
+import static org.apache.james.mailbox.events.delivery.EventDelivery.Retryer.NO_RETRYER;
+
+import java.time.Duration;
import org.apache.james.mailbox.Event;
import org.apache.james.mailbox.MailboxListener;
+import org.apache.james.mailbox.events.EventDeadLetters;
+import org.apache.james.mailbox.events.Group;
+import org.apache.james.mailbox.events.RetryBackoffConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public interface EventDelivery {
+
+ class DeliveryOption {
+ public static DeliveryOption of(Retryer retrier, PermanentFailureHandler permanentFailureHandler) {
+ return new DeliveryOption(retrier, permanentFailureHandler);
+ }
+
+ public static DeliveryOption none() {
+ return new DeliveryOption(NO_RETRYER, NO_HANDLER);
+ }
+
+ private final Retryer retrier;
+ private final PermanentFailureHandler permanentFailureHandler;
+
+ private DeliveryOption(Retryer retrier, PermanentFailureHandler permanentFailureHandler) {
+ this.retrier = retrier;
+ this.permanentFailureHandler = permanentFailureHandler;
+ }
+
+ Retryer getRetrier() {
+ return retrier;
+ }
+
+ PermanentFailureHandler getPermanentFailureHandler() {
+ return permanentFailureHandler;
+ }
+ }
+
+
+ interface Retryer {
+
+ Retryer NO_RETRYER = (executionResult, event) -> executionResult;
+
+ class BackoffRetryer implements EventDelivery.Retryer {
+
+ public static BackoffRetryer of(RetryBackoffConfiguration retryBackoff, MailboxListener mailboxListener) {
+ return new BackoffRetryer(retryBackoff, mailboxListener);
+ }
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(BackoffRetryer.class);
+ private static final Duration MAX_BACKOFF = Duration.ofMillis(Long.MAX_VALUE);
+
+ private final RetryBackoffConfiguration retryBackoff;
+ private final MailboxListener mailboxListener;
+
+ public BackoffRetryer(RetryBackoffConfiguration retryBackoff, MailboxListener mailboxListener) {
+ this.retryBackoff = retryBackoff;
+ this.mailboxListener = mailboxListener;
+ }
+
+ @Override
+ public Mono<Void> doRetry(Mono<Void> executionResult, Event event) {
+ return executionResult
+ .retryBackoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff(), MAX_BACKOFF, retryBackoff.getJitterFactor())
+ .doOnError(throwable -> LOGGER.error("listener {} exceeded maximum retry({}) to handle event {}",
+ mailboxListener.getClass().getCanonicalName(),
+ retryBackoff.getMaxRetries(),
+ event.getClass().getCanonicalName(),
+ throwable))
+ .then();
+ }
+ }
+
+ Mono<Void> doRetry(Mono<Void> executionResult, Event event);
+ }
+
+ interface PermanentFailureHandler {
+
+ PermanentFailureHandler NO_HANDLER = event -> Mono.error(new UnsupportedOperationException("doesn't handle error"));
+
+ class StoreToDeadLetters implements EventDelivery.PermanentFailureHandler {
+
+ public static StoreToDeadLetters of(Group group, EventDeadLetters eventDeadLetters) {
+ return new StoreToDeadLetters(group, eventDeadLetters);
+ }
+
+ private final Group group;
+ private final EventDeadLetters eventDeadLetters;
+
+ private StoreToDeadLetters(Group group, EventDeadLetters eventDeadLetters) {
+ this.group = group;
+ this.eventDeadLetters = eventDeadLetters;
+ }
+
+ @Override
+ public Mono<Void> handle(Event event) {
+ return eventDeadLetters.store(group, event);
+ }
+ }
+
+ Mono<Void> handle(Event event);
+ }
+
class ExecutionStages {
+
+ public static ExecutionStages empty() {
+ return new ExecutionStages(Mono.empty(), Mono.empty());
+ }
+
+ static ExecutionStages synchronous(Mono<Void> synchronousListenerFuture) {
+ return new ExecutionStages(synchronousListenerFuture, Mono.empty());
+ }
+
+ static ExecutionStages asynchronous(Mono<Void> asynchronousListenerFuture) {
+ return new ExecutionStages(Mono.empty(),asynchronousListenerFuture);
+ }
+
private final Mono<Void> synchronousListenerFuture;
private final Mono<Void> asynchronousListenerFuture;
- ExecutionStages(Mono<Void> synchronousListenerFuture, Mono<Void> asynchronousListenerFuture) {
+ private ExecutionStages(Mono<Void> synchronousListenerFuture, Mono<Void> asynchronousListenerFuture) {
this.synchronousListenerFuture = synchronousListenerFuture;
this.asynchronousListenerFuture = asynchronousListenerFuture;
}
@@ -45,10 +159,13 @@ public interface EventDelivery {
.concatWith(asynchronousListenerFuture)
.then();
}
- }
-
- ExecutionStages deliver(Collection<MailboxListener> mailboxListeners, Event event);
+ public ExecutionStages combine(ExecutionStages another) {
+ return new ExecutionStages(
+ Flux.concat(this.synchronousListenerFuture, another.synchronousListenerFuture).then(),
+ Flux.concat(this.asynchronousListenerFuture, another.asynchronousListenerFuture).then());
+ }
+ }
- ExecutionStages deliverWithRetries(Collection<MailboxListener> mailboxListeners, Event event);
+ ExecutionStages deliver(MailboxListener listener, Event event, DeliveryOption option);
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/bb067a1b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java
index 84a6a90..ac4ce4e 100644
--- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java
+++ b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java
@@ -19,15 +19,10 @@
package org.apache.james.mailbox.events.delivery;
-import java.time.Duration;
-import java.util.Collection;
-import java.util.stream.Stream;
-
import javax.inject.Inject;
import org.apache.james.mailbox.Event;
import org.apache.james.mailbox.MailboxListener;
-import org.apache.james.mailbox.events.RetryBackoffConfiguration;
import org.apache.james.metrics.api.MetricFactory;
import org.apache.james.metrics.api.TimeMetric;
import org.slf4j.Logger;
@@ -35,87 +30,58 @@ import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
-import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.scheduler.Schedulers;
public class InVmEventDelivery implements EventDelivery {
- private enum DeliveryOption {
- NO_RETRY, WITH_RETRY
+ private static String listenerName(MailboxListener mailboxListener) {
+ return mailboxListener.getClass().getCanonicalName();
+ }
+
+ private static String eventName(Event event) {
+ return event.getClass().getCanonicalName();
}
private static final Logger LOGGER = LoggerFactory.getLogger(InVmEventDelivery.class);
- private static final Duration MAX_BACKOFF = Duration.ofMillis(Long.MAX_VALUE);
private final MetricFactory metricFactory;
- private final RetryBackoffConfiguration retryBackoff;
@Inject
@VisibleForTesting
- public InVmEventDelivery(MetricFactory metricFactory, RetryBackoffConfiguration retryBackoff) {
- this.metricFactory = metricFactory;
- this.retryBackoff = retryBackoff;
- }
-
public InVmEventDelivery(MetricFactory metricFactory) {
- this(metricFactory, RetryBackoffConfiguration.DEFAULT);
- }
-
- @Override
- public ExecutionStages deliver(Collection<MailboxListener> mailboxListeners, Event event) {
- return deliverByOption(mailboxListeners, event, DeliveryOption.NO_RETRY);
+ this.metricFactory = metricFactory;
}
@Override
- public ExecutionStages deliverWithRetries(Collection<MailboxListener> mailboxListeners, Event event) {
- return deliverByOption(mailboxListeners, event, DeliveryOption.WITH_RETRY);
- }
-
- private ExecutionStages deliverByOption(Collection<MailboxListener> mailboxListeners, Event event, DeliveryOption deliveryOption) {
- Mono<Void> synchronousListeners = doDeliver(
- filterByExecutionMode(mailboxListeners, MailboxListener.ExecutionMode.SYNCHRONOUS), event, deliveryOption)
- .subscribeWith(MonoProcessor.create());
- Mono<Void> asyncListener = doDeliver(
- filterByExecutionMode(mailboxListeners, MailboxListener.ExecutionMode.ASYNCHRONOUS), event, deliveryOption)
- .subscribeWith(MonoProcessor.create());
+ public ExecutionStages deliver(MailboxListener listener, Event event, DeliveryOption option) {
+ Mono<Void> executionResult = deliverByOption(listener, event, option);
- return new ExecutionStages(synchronousListeners, asyncListener);
+ return toExecutionStages(listener.getExecutionMode(), executionResult);
}
- private Stream<MailboxListener> filterByExecutionMode(Collection<MailboxListener> mailboxListeners, MailboxListener.ExecutionMode executionMode) {
- return mailboxListeners.stream()
- .filter(listener -> listener.getExecutionMode() == executionMode);
- }
+ private ExecutionStages toExecutionStages(MailboxListener.ExecutionMode executionMode, Mono<Void> executionResult) {
+ if (executionMode.equals(MailboxListener.ExecutionMode.SYNCHRONOUS)) {
+ return ExecutionStages.synchronous(executionResult);
+ }
- private Mono<Void> doDeliver(Stream<MailboxListener> mailboxListeners, Event event, DeliveryOption deliveryOption) {
- return Flux.fromStream(mailboxListeners)
- .flatMap(mailboxListener -> deliveryWithRetries(event, mailboxListener, deliveryOption))
- .then()
- .subscribeOn(Schedulers.elastic());
+ return ExecutionStages.asynchronous(executionResult);
}
- private Mono<Void> deliveryWithRetries(Event event, MailboxListener mailboxListener, DeliveryOption deliveryOption) {
- Mono<Void> firstDelivery = Mono.fromRunnable(() -> doDeliverToListener(mailboxListener, event))
+ private Mono<Void> deliverByOption(MailboxListener listener, Event event, DeliveryOption deliveryOption) {
+ Mono<Void> deliveryToListener = Mono.fromRunnable(() -> doDeliverToListener(listener, event))
.doOnError(throwable -> LOGGER.error("Error while processing listener {} for {}",
- listenerName(mailboxListener),
+ listenerName(listener),
eventName(event),
throwable))
.subscribeOn(Schedulers.elastic())
.then();
- if (deliveryOption == DeliveryOption.NO_RETRY) {
- return firstDelivery;
- }
-
- return firstDelivery
- .retryBackoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff(), MAX_BACKOFF, retryBackoff.getJitterFactor())
- .doOnError(throwable -> LOGGER.error("listener {} exceeded maximum retry({}) to handle event {}",
- listenerName(mailboxListener),
- retryBackoff.getMaxRetries(),
- eventName(event),
- throwable))
+ return deliveryOption.getRetrier().doRetry(deliveryToListener, event)
+ .onErrorResume(throwable -> deliveryOption.getPermanentFailureHandler().handle(event))
+ .subscribeWith(MonoProcessor.create())
+ .subscribeOn(Schedulers.elastic())
.then();
}
@@ -129,12 +95,4 @@ public class InVmEventDelivery implements EventDelivery {
timer.stopAndPublish();
}
}
-
- private String listenerName(MailboxListener mailboxListener) {
- return mailboxListener.getClass().getCanonicalName();
- }
-
- private String eventName(Event event) {
- return event.getClass().getCanonicalName();
- }
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/bb067a1b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/InVMEventBusTest.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/InVMEventBusTest.java b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/InVMEventBusTest.java
index c7e7edd..56a9cde 100644
--- a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/InVMEventBusTest.java
+++ b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/InVMEventBusTest.java
@@ -22,19 +22,18 @@ package org.apache.james.mailbox.events;
import org.apache.james.mailbox.events.delivery.InVmEventDelivery;
import org.apache.james.metrics.api.NoopMetricFactory;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.Test;
public class InVMEventBusTest implements KeyContract.SingleEventBusKeyContract, GroupContract.SingleEventBusGroupContract,
ErrorHandlingContract{
private InVMEventBus eventBus;
+ private MemoryEventDeadLetters deadLetters;
@BeforeEach
void setUp() {
+ deadLetters = new MemoryEventDeadLetters();
eventBus = new InVMEventBus(
- new InVmEventDelivery(
- new NoopMetricFactory(), RetryBackoffConfiguration.DEFAULT));
+ new InVmEventDelivery(new NoopMetricFactory()), RetryBackoffConfiguration.DEFAULT, deadLetters);
}
@Override
@@ -44,24 +43,6 @@ public class InVMEventBusTest implements KeyContract.SingleEventBusKeyContract,
@Override
public EventDeadLetters deadLetter() {
- throw new RuntimeException("this method is not a part of this task contents, will be handled in another pull request");
- }
-
- @Test
- @Disabled("this test is not a part of this task contents, will be handled in another pull request")
- @Override
- public void deadLettersIsNotAppliedForKeyRegistrations() {
- }
-
- @Test
- @Disabled("this test is not a part of this task contents, will be handled in another pull request")
- @Override
- public void deadLetterShouldNotStoreWhenFailsLessThanMaxRetries() {
- }
-
- @Test
- @Disabled("this test is not a part of this task contents, will be handled in another pull request")
- @Override
- public void deadLetterShouldStoreWhenFailsGreaterThanMaxRetries() {
+ return deadLetters;
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/james-project/blob/bb067a1b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java
index c0ae5a1..dc122f6 100644
--- a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java
+++ b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java
@@ -19,359 +19,241 @@
package org.apache.james.mailbox.events.delivery;
+import static org.apache.james.mailbox.events.EventBusTestFixture.EVENT;
+import static org.apache.james.mailbox.events.EventBusTestFixture.GROUP_A;
+import static org.apache.james.mailbox.events.EventBusTestFixture.MailboxListenerCountingSuccessfulExecution;
+import static org.apache.james.mailbox.events.delivery.EventDelivery.Retryer;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.junit.jupiter.api.Assertions.assertTimeout;
-import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import java.time.Duration;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
import org.apache.james.mailbox.MailboxListener;
+import org.apache.james.mailbox.events.MemoryEventDeadLetters;
import org.apache.james.mailbox.events.RetryBackoffConfiguration;
-import org.apache.james.mailbox.util.EventCollector;
+import org.apache.james.mailbox.events.delivery.EventDelivery.DeliveryOption;
+import org.apache.james.mailbox.events.delivery.EventDelivery.PermanentFailureHandler;
+import org.apache.james.mailbox.events.delivery.EventDelivery.Retryer.BackoffRetryer;
import org.apache.james.metrics.api.NoopMetricFactory;
import org.assertj.core.api.SoftAssertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
-import com.google.common.collect.ImmutableList;
-
-import reactor.core.publisher.Mono;
-
class InVmEventDeliveryTest {
- private static final int DELIVERY_DELAY = (int) TimeUnit.MILLISECONDS.toMillis(100);
-
private InVmEventDelivery inVmEventDelivery;
- private MailboxListener listener;
- private MailboxListener listener2;
+ private MailboxListenerCountingSuccessfulExecution listener;
private MailboxListener.MailboxEvent event;
@BeforeEach
void setUp() {
event = mock(MailboxListener.MailboxEvent.class);
- listener = mock(MailboxListener.class);
- listener2 = mock(MailboxListener.class);
- inVmEventDelivery = new InVmEventDelivery(new NoopMetricFactory(), RetryBackoffConfiguration.DEFAULT);
+ listener = newListener();
+ inVmEventDelivery = new InVmEventDelivery(new NoopMetricFactory());
}
- @Nested
- class ErrorHandling {
+ MailboxListenerCountingSuccessfulExecution newListener() {
+ return spy(new MailboxListenerCountingSuccessfulExecution());
+ }
- class AsyncEventCollector extends EventCollector {
- @Override
- public ExecutionMode getExecutionMode() {
- return ExecutionMode.ASYNCHRONOUS;
- }
- }
+ @Nested
+ class SynchronousListener {
- private EventCollector syncEventCollector;
- private EventCollector asyncEventCollector;
+ @Test
+ void deliverShouldDeliverEvent() {
+ when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
+ inVmEventDelivery.deliver(listener, EVENT, DeliveryOption.none())
+ .allListenerFuture()
+ .block();
- @BeforeEach
- void setUp() {
- syncEventCollector = spy(new EventCollector());
- asyncEventCollector = spy(new AsyncEventCollector());
+ assertThat(listener.numberOfEventCalls())
+ .isEqualTo(1);
}
- @Nested
- class SynchronousOnly {
- @Test
- void deliverShouldNotDeliverEventToListenerWhenException() {
- doThrow(RuntimeException.class).when(syncEventCollector).event(event);
-
- assertThatThrownBy(() -> inVmEventDelivery
- .deliver(ImmutableList.of(syncEventCollector), event).allListenerFuture()
+ @Test
+ void deliverShouldReturnSuccessSynchronousMono() {
+ when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
+ assertThatCode(() -> inVmEventDelivery.deliver(listener, EVENT, DeliveryOption.none())
+ .synchronousListenerFuture()
.block())
- .isInstanceOf(RuntimeException.class);
-
- assertThat(syncEventCollector.getEvents())
- .isEmpty();
- }
-
- @Test
- void deliverWithRetriesShouldNotDeliverEventToListenerWhenException() {
- doThrow(RuntimeException.class).when(syncEventCollector).event(event);
-
- assertThatThrownBy(() -> inVmEventDelivery
- .deliverWithRetries(ImmutableList.of(syncEventCollector), event).allListenerFuture()
- .block())
- .isInstanceOf(IllegalStateException.class);
-
- assertThat(syncEventCollector.getEvents())
- .isEmpty();
- }
-
- @Test
- void deliverShouldBeErrorWhenException() {
- doThrow(new RuntimeException("mock exception")).when(syncEventCollector).event(event);
-
- assertThatThrownBy(() -> inVmEventDelivery
- .deliver(ImmutableList.of(syncEventCollector), event).allListenerFuture()
- .block())
- .isInstanceOf(RuntimeException.class)
- .hasMessageContaining("mock exception");
- }
-
- @Test
- void deliverWithRetriesShouldBeErrorWhenException() {
- doThrow(RuntimeException.class).when(syncEventCollector).event(event);
-
- assertThatThrownBy(() -> inVmEventDelivery
- .deliverWithRetries(ImmutableList.of(syncEventCollector), event).allListenerFuture()
- .block())
- .isInstanceOf(IllegalStateException.class)
- .hasMessageContaining("Retries exhausted");
- }
+ .doesNotThrowAnyException();
}
- @Nested
- class AsynchronousOnly {
- @Test
- void deliverShouldNotDeliverEventToListenerWhenException() {
- doThrow(RuntimeException.class).when(asyncEventCollector).event(event);
+ @Test
+ void deliverShouldNotDeliverWhenListenerGetException() {
+ when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
+ doThrow(new RuntimeException())
+ .when(listener).event(EVENT);
- assertThatThrownBy(() -> inVmEventDelivery
- .deliver(ImmutableList.of(asyncEventCollector), event).allListenerFuture()
- .block())
- .isInstanceOf(RuntimeException.class);
+ assertThatThrownBy(() -> inVmEventDelivery.deliver(listener, EVENT, DeliveryOption.none())
+ .allListenerFuture()
+ .block())
+ .isInstanceOf(RuntimeException.class);
- assertThat(asyncEventCollector.getEvents())
- .isEmpty();
- }
-
- @Test
- void deliverWithRetriesShouldNotDeliverEventToListenerWhenException() {
- doThrow(RuntimeException.class).when(asyncEventCollector).event(event);
-
- assertThatThrownBy(() -> inVmEventDelivery
- .deliverWithRetries(ImmutableList.of(asyncEventCollector), event).allListenerFuture()
- .block())
- .isInstanceOf(IllegalStateException.class);
+ assertThat(listener.numberOfEventCalls())
+ .isEqualTo(0);
+ }
- assertThat(asyncEventCollector.getEvents())
- .isEmpty();
- }
+ @Test
+ void deliverShouldReturnAnErrorMonoWhenListenerGetException() {
+ when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
+ doThrow(new RuntimeException())
+ .when(listener).event(EVENT);
- @Test
- void deliverShouldBeErrorWhenException() {
- doThrow(new RuntimeException("mock exception")).when(asyncEventCollector).event(event);
+ assertThatThrownBy(() -> inVmEventDelivery.deliver(listener, EVENT, DeliveryOption.none())
+ .synchronousListenerFuture()
+ .block())
+ .isInstanceOf(RuntimeException.class);
+ }
+ }
- assertThatThrownBy(() -> inVmEventDelivery
- .deliver(ImmutableList.of(asyncEventCollector), event).allListenerFuture()
- .block())
- .isInstanceOf(RuntimeException.class)
- .hasMessageContaining("mock exception");
- }
+ @Nested
+ class AsynchronousListener {
- @Test
- void deliverWithRetriesShouldBeErrorWhenException() {
- doThrow(RuntimeException.class).when(asyncEventCollector).event(event);
+ @Test
+ void deliverShouldDeliverEvent() {
+ when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
+ inVmEventDelivery.deliver(listener, EVENT, DeliveryOption.none())
+ .allListenerFuture()
+ .block();
- assertThatThrownBy(() -> inVmEventDelivery
- .deliverWithRetries(ImmutableList.of(asyncEventCollector), event).allListenerFuture()
- .block())
- .isInstanceOf(IllegalStateException.class)
- .hasMessageContaining("Retries exhausted");
- }
+ assertThat(listener.numberOfEventCalls())
+ .isEqualTo(1);
}
- @Nested
- class BothAsynchronousAndSynchronous {
- @Test
- void deliverShouldDeliverEventToSyncListenerWhenAsyncGetException() {
- doThrow(RuntimeException.class).when(asyncEventCollector).event(event);
-
- assertThatThrownBy(() -> inVmEventDelivery
- .deliver(ImmutableList.of(asyncEventCollector, syncEventCollector), event).allListenerFuture()
+ @Test
+ void deliverShouldReturnSuccessSynchronousMono() {
+ when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
+ assertThatCode(() -> inVmEventDelivery.deliver(listener, EVENT, DeliveryOption.none())
+ .synchronousListenerFuture()
.block())
- .isInstanceOf(RuntimeException.class);
-
- SoftAssertions.assertSoftly(softly -> {
- softly.assertThat(asyncEventCollector.getEvents()).isEmpty();
- softly.assertThat(syncEventCollector.getEvents()).hasSize(1);
- });
-
- }
+ .doesNotThrowAnyException();
+ }
- @Test
- void deliverWithRetriesShouldDeliverEventToSyncListenerWhenAsyncGetException() {
- doThrow(RuntimeException.class).when(asyncEventCollector).event(event);
+ @Test
+ void deliverShouldNotDeliverWhenListenerGetException() {
+ when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
+ doThrow(new RuntimeException())
+ .when(listener).event(EVENT);
- assertThatThrownBy(() -> inVmEventDelivery
- .deliverWithRetries(ImmutableList.of(asyncEventCollector, syncEventCollector), event).allListenerFuture()
- .block())
- .isInstanceOf(IllegalStateException.class)
- .hasMessageContaining("Retries exhausted");
-
- SoftAssertions.assertSoftly(softly -> {
- softly.assertThat(asyncEventCollector.getEvents()).isEmpty();
- softly.assertThat(syncEventCollector.getEvents()).hasSize(1);
- });
- }
-
- @Test
- void deliverShouldDeliverEventToAsyncListenerWhenSyncGetException() {
- doThrow(RuntimeException.class).when(syncEventCollector).event(event);
-
- inVmEventDelivery.deliver(ImmutableList.of(asyncEventCollector, syncEventCollector), event).allListenerFuture()
- .onErrorResume(e -> Mono.empty())
- .block();
-
- SoftAssertions.assertSoftly(softly -> {
- softly.assertThat(syncEventCollector.getEvents()).isEmpty();
- softly.assertThat(asyncEventCollector.getEvents()).hasSize(1);
- });
- }
-
- @Test
- void deliverWithRetriesShouldDeliverEventToAsyncListenerWhenSyncGetException() {
- doThrow(RuntimeException.class).when(syncEventCollector).event(event);
-
- inVmEventDelivery.deliverWithRetries(ImmutableList.of(asyncEventCollector, syncEventCollector), event).allListenerFuture()
- .onErrorResume(e -> Mono.empty())
- .block();
-
- SoftAssertions.assertSoftly(softly -> {
- softly.assertThat(syncEventCollector.getEvents()).isEmpty();
- softly.assertThat(asyncEventCollector.getEvents()).hasSize(1);
- });
- }
-
- @Test
- void deliverShouldBeErrorWhenException() {
- doThrow(new RuntimeException("mock exception")).when(syncEventCollector).event(event);
- doThrow(new RuntimeException("mock exception")).when(asyncEventCollector).event(event);
-
- assertThatThrownBy(() -> inVmEventDelivery
- .deliver(ImmutableList.of(asyncEventCollector), event).allListenerFuture()
- .block())
- .isInstanceOf(RuntimeException.class)
- .hasMessageContaining("mock exception");
- }
+ assertThatThrownBy(() -> inVmEventDelivery.deliver(listener, EVENT, DeliveryOption.none())
+ .allListenerFuture()
+ .block())
+ .isInstanceOf(RuntimeException.class);
- @Test
- void deliverWithRetriesShouldBeErrorWhenException() {
- doThrow(RuntimeException.class).when(syncEventCollector).event(event);
- doThrow(RuntimeException.class).when(asyncEventCollector).event(event);
-
- assertThatThrownBy(() -> inVmEventDelivery
- .deliverWithRetries(ImmutableList.of(asyncEventCollector), event).allListenerFuture()
- .block())
- .isInstanceOf(IllegalStateException.class)
- .hasMessageContaining("Retries exhausted");
- }
+ assertThat(listener.numberOfEventCalls())
+ .isEqualTo(0);
}
- }
-
- @Test
- void deliverShouldHaveCalledSynchronousListenersWhenAllListenerExecutedJoined() throws Exception {
- when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
- inVmEventDelivery.deliver(ImmutableList.of(listener), event).allListenerFuture().block();
+ @Test
+ void deliverShouldReturnAnSuccessSyncMonoWhenListenerGetException() {
+ when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
+ doThrow(new RuntimeException())
+ .when(listener).event(EVENT);
- verify(listener).event(event);
+ assertThatCode(() -> inVmEventDelivery.deliver(listener, EVENT, DeliveryOption.none())
+ .synchronousListenerFuture()
+ .block())
+ .doesNotThrowAnyException();
+ }
}
- @Test
- void deliverShouldHaveCalledAsynchronousListenersWhenAllListenerExecutedJoined() throws Exception {
- when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
-
- inVmEventDelivery.deliver(ImmutableList.of(listener), event).allListenerFuture().block();
+ @Nested
+ class WithOptions {
+
+ @Test
+ void retryShouldWorkWhenDeliverWithRetry() {
+ MailboxListenerCountingSuccessfulExecution listener = newListener();
+ doThrow(new RuntimeException())
+ .doThrow(new RuntimeException())
+ .doThrow(new RuntimeException())
+ .doCallRealMethod()
+ .when(listener).event(EVENT);
+
+ inVmEventDelivery.deliver(listener, EVENT,
+ DeliveryOption.of(
+ BackoffRetryer.of(RetryBackoffConfiguration.DEFAULT, listener),
+ PermanentFailureHandler.NO_HANDLER))
+ .allListenerFuture()
+ .block();
+
+ assertThat(listener.numberOfEventCalls())
+ .isEqualTo(1);
+ }
- verify(listener).event(event);
- }
+ @Test
+ void failureHandlerShouldWorkWhenDeliverWithFailureHandler() {
+ MailboxListenerCountingSuccessfulExecution listener = newListener();
+ doThrow(new RuntimeException())
+ .when(listener).event(EVENT);
- @Test
- void deliverShouldHaveCalledSynchronousListenersWhenSynchronousListenerExecutedJoined() throws Exception {
- when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
+ MemoryEventDeadLetters deadLetter = new MemoryEventDeadLetters();
- inVmEventDelivery.deliver(ImmutableList.of(listener), event).synchronousListenerFuture().block();
+ inVmEventDelivery.deliver(listener, EVENT,
+ DeliveryOption.of(
+ Retryer.NO_RETRYER,
+ PermanentFailureHandler.StoreToDeadLetters.of(GROUP_A, deadLetter)))
+ .allListenerFuture()
+ .block();
- verify(listener).event(event);
- }
+ assertThat(deadLetter.groupsWithFailedEvents().toStream())
+ .containsOnly(GROUP_A);
+ }
- @Test
- void deliverShouldNotBlockOnAsynchronousListenersWhenSynchronousListenerExecutedJoined() throws Exception {
- when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
- CountDownLatch latch = new CountDownLatch(1);
- doAnswer(invocation -> {
- latch.await();
- return null;
- }).when(listener).event(event);
-
- assertTimeout(Duration.ofSeconds(2),
- () -> {
- inVmEventDelivery.deliver(ImmutableList.of(listener), event).synchronousListenerFuture().block();
- latch.countDown();
+ @Test
+ void failureHandlerShouldNotWorkWhenRetrySuccess() {
+ MailboxListenerCountingSuccessfulExecution listener = newListener();
+ doThrow(new RuntimeException())
+ .doThrow(new RuntimeException())
+ .doCallRealMethod()
+ .when(listener).event(EVENT);
+
+ MemoryEventDeadLetters deadLetter = new MemoryEventDeadLetters();
+
+ inVmEventDelivery.deliver(listener, EVENT,
+ DeliveryOption.of(
+ BackoffRetryer.of(RetryBackoffConfiguration.DEFAULT, listener),
+ PermanentFailureHandler.StoreToDeadLetters.of(GROUP_A, deadLetter)))
+ .allListenerFuture()
+ .block();
+
+ SoftAssertions.assertSoftly(softy -> {
+ softy.assertThat(listener.numberOfEventCalls())
+ .isEqualTo(1);
+ softy.assertThat(deadLetter.groupsWithFailedEvents().toStream())
+ .isEmpty();
});
- }
+ }
- @Test
- void deliverShouldNotBlockOnSynchronousListenersWhenNoJoin() throws Exception {
- when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
- CountDownLatch latch = new CountDownLatch(1);
- doAnswer(invocation -> {
- latch.await();
- return null;
- }).when(listener).event(event);
-
- assertTimeout(Duration.ofSeconds(2),
- () -> {
- inVmEventDelivery.deliver(ImmutableList.of(listener), event);
- latch.countDown();
- });
- }
- @Test
- void deliverShouldNotBlockOnAsynchronousListenersWhenNoJoin() throws Exception {
- when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
- CountDownLatch latch = new CountDownLatch(1);
- doAnswer(invocation -> {
- latch.await();
- return null;
- }).when(listener).event(event);
-
- assertTimeout(Duration.ofSeconds(2),
- () -> {
- inVmEventDelivery.deliver(ImmutableList.of(listener), event);
- latch.countDown();
+ @Test
+ void failureHandlerShouldWorkWhenRetryFails() {
+ MailboxListenerCountingSuccessfulExecution listener = newListener();
+ doThrow(new RuntimeException())
+ .doThrow(new RuntimeException())
+ .doThrow(new RuntimeException())
+ .doThrow(new RuntimeException())
+ .doCallRealMethod()
+ .when(listener).event(EVENT);
+
+ MemoryEventDeadLetters deadLetter = new MemoryEventDeadLetters();
+
+ inVmEventDelivery.deliver(listener, EVENT,
+ DeliveryOption.of(
+ BackoffRetryer.of(RetryBackoffConfiguration.DEFAULT, listener),
+ PermanentFailureHandler.StoreToDeadLetters.of(GROUP_A, deadLetter)))
+ .allListenerFuture()
+ .block();
+
+ SoftAssertions.assertSoftly(softy -> {
+ softy.assertThat(listener.numberOfEventCalls())
+ .isEqualTo(0);
+ assertThat(deadLetter.groupsWithFailedEvents().toStream())
+ .containsOnly(GROUP_A);
});
- }
-
- @Test
- void deliverShouldEventuallyDeliverAsynchronousListenersWhenSynchronousListenerExecutedJoined() throws Exception {
- when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
-
- inVmEventDelivery.deliver(ImmutableList.of(listener), event).synchronousListenerFuture().block();
-
- verify(listener, timeout(DELIVERY_DELAY * 10)).event(event);
- }
-
- @Test
- void deliverShouldEventuallyDeliverSynchronousListenersWhenNoJoin() throws Exception {
- when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
-
- inVmEventDelivery.deliver(ImmutableList.of(listener), event);
-
- verify(listener, timeout(DELIVERY_DELAY * 10)).event(event);
- }
-
- @Test
- void deliverShouldCallSynchronousListenersWhenAsynchronousListenersAreAlsoRegistered() throws Exception {
- when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
- when(listener2.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
-
- inVmEventDelivery.deliver(ImmutableList.of(listener, listener2), event).synchronousListenerFuture().block();
-
- verify(listener2).event(event);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/bb067a1b/mailbox/spring/src/main/resources/META-INF/spring/event-system.xml
----------------------------------------------------------------------
diff --git a/mailbox/spring/src/main/resources/META-INF/spring/event-system.xml b/mailbox/spring/src/main/resources/META-INF/spring/event-system.xml
index 609d183..5273359 100644
--- a/mailbox/spring/src/main/resources/META-INF/spring/event-system.xml
+++ b/mailbox/spring/src/main/resources/META-INF/spring/event-system.xml
@@ -27,12 +27,13 @@
<bean id="event-bus" class="org.apache.james.mailbox.events.InVMEventBus" lazy-init="true">
<constructor-arg index="0" ref="event-delivery"/>
+ <constructor-arg index="1">
+ <util:constant static-field="org.apache.james.mailbox.events.RetryBackoffConfiguration.DEFAULT"/>
+ </constructor-arg>
+ <constructor-arg index="2" ref="event-deadletters"/>
</bean>
<bean id="event-delivery" class="org.apache.james.mailbox.events.delivery.InVmEventDelivery" lazy-init="true">
<constructor-arg index="0" ref="metricFactory"/>
- <constructor-arg>
- <util:constant static-field="org.apache.james.mailbox.events.RetryBackoffConfiguration.DEFAULT"/>
- </constructor-arg>
</bean>
-
+ <bean id="event-deadletters" class="org.apache.james.mailbox.events.MemoryEventDeadLetters" lazy-init="true"/>
</beans>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/james-project/blob/bb067a1b/mailbox/store/src/test/java/org/apache/james/mailbox/store/MessageIdManagerTestSystem.java
----------------------------------------------------------------------
diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/MessageIdManagerTestSystem.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/MessageIdManagerTestSystem.java
index 1661de2..6ed8ee0 100644
--- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/MessageIdManagerTestSystem.java
+++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/MessageIdManagerTestSystem.java
@@ -89,7 +89,8 @@ public class MessageIdManagerTestSystem {
.mailbox(mailbox)
.addMessage(message)
.build(),
- new MailboxIdRegistrationKey(mailboxId));
+ new MailboxIdRegistrationKey(mailboxId))
+ .block();
return messageId;
} catch (Exception e) {
throw new RuntimeException(e);
http://git-wip-us.apache.org/repos/asf/james-project/blob/bb067a1b/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java
----------------------------------------------------------------------
diff --git a/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java b/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java
index 45dc122..e09153d 100644
--- a/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java
+++ b/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java
@@ -52,8 +52,8 @@ public class DefaultEventModule extends AbstractModule {
bind(InVmEventDelivery.class).in(Scopes.SINGLETON);
bind(InVMEventBus.class).in(Scopes.SINGLETON);
bind(MemoryEventDeadLetters.class).in(Scopes.SINGLETON);
- bind(EventDeadLetters.class).to(MemoryEventDeadLetters.class);
+ bind(EventDeadLetters.class).to(MemoryEventDeadLetters.class);
bind(MailboxListenersLoader.class).to(MailboxListenersLoaderImpl.class);
bind(EventDelivery.class).to(InVmEventDelivery.class);
bind(EventBus.class).to(InVMEventBus.class);
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org
[2/2] james-project git commit: JAMES-2578 Fix warning using Mailet
new API
Posted by bt...@apache.org.
JAMES-2578 Fix warning using Mailet new API
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/606ff9c9
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/606ff9c9
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/606ff9c9
Branch: refs/heads/master
Commit: 606ff9c93dd8733b36a601ef7ab3449c3583b0f0
Parents: bb067a1
Author: Gautier DI FOLCO <gd...@linagora.com>
Authored: Mon Jan 21 10:12:10 2019 +0100
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Wed Jan 23 10:04:20 2019 +0700
----------------------------------------------------------------------
.../mailets/RemoveMailAttributeTest.java | 39 +++++++++++---------
.../org/apache/james/server/core/MailImpl.java | 13 ++++---
.../samples/mailets/InstrumentationMailet.java | 7 ++--
.../mailets/WithStorageDirectiveTest.java | 15 +++++---
.../CassandraMailRepositoryMailDAOTest.java | 16 +++++---
5 files changed, 52 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/606ff9c9/mailet/standard/src/test/java/org/apache/james/transport/mailets/RemoveMailAttributeTest.java
----------------------------------------------------------------------
diff --git a/mailet/standard/src/test/java/org/apache/james/transport/mailets/RemoveMailAttributeTest.java b/mailet/standard/src/test/java/org/apache/james/transport/mailets/RemoveMailAttributeTest.java
index 1881dd3..85a0dd3 100644
--- a/mailet/standard/src/test/java/org/apache/james/transport/mailets/RemoveMailAttributeTest.java
+++ b/mailet/standard/src/test/java/org/apache/james/transport/mailets/RemoveMailAttributeTest.java
@@ -24,6 +24,9 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
import javax.mail.MessagingException;
+import org.apache.mailet.Attribute;
+import org.apache.mailet.AttributeName;
+import org.apache.mailet.AttributeValue;
import org.apache.mailet.Mail;
import org.apache.mailet.Mailet;
import org.apache.mailet.MailetException;
@@ -34,12 +37,12 @@ import org.junit.jupiter.api.Test;
class RemoveMailAttributeTest {
- private static final String ATTRIBUTE_1 = "attribute1";
- private static final String ATTRIBUTE_2 = "attribute2";
- private static final String ATTRIBUTE_3 = "attribute3";
- private static final String VALUE_1 = "value1";
- private static final String VALUE_2 = "value2";
- private static final String VALUE_3 = "value3";
+ private static final AttributeName ATTRIBUTE_1 = AttributeName.of("attribute1");
+ private static final AttributeName ATTRIBUTE_2 = AttributeName.of("attribute2");
+ private static final AttributeName ATTRIBUTE_3 = AttributeName.of("attribute3");
+ private static final Attribute VALUE_1 = new Attribute(ATTRIBUTE_1, AttributeValue.of("value1"));
+ private static final Attribute VALUE_2 = new Attribute(ATTRIBUTE_2, AttributeValue.of("value2"));
+ private static final Attribute VALUE_3 = new Attribute(ATTRIBUTE_3, AttributeValue.of("value3"));
private static final String ATTRIBUTE1_ATTRIBUTE2 = "attribute1, attribute2";
private Mailet removeMailet;
@@ -77,7 +80,7 @@ class RemoveMailAttributeTest {
Mail mail = FakeMail.builder().build();
removeMailet.service(mail);
- assertThat(mail.getAttributeNames()).isEmpty();
+ assertThat(mail.attributes()).isEmpty();
}
@Test
@@ -89,29 +92,29 @@ class RemoveMailAttributeTest {
removeMailet.init(mailetConfig);
Mail mail = FakeMail.builder()
- .attribute(ATTRIBUTE_3, VALUE_3)
+ .attribute(VALUE_3)
.build();
removeMailet.service(mail);
- assertThat(mail.getAttributeNames()).containsExactly(ATTRIBUTE_3);
+ assertThat(mail.attributes()).containsExactly(VALUE_3);
}
@Test
void serviceShouldRemoveSpecifiedAttribute() throws MessagingException {
FakeMailetConfig mailetConfig = FakeMailetConfig.builder()
.mailetName("Test")
- .setProperty(RemoveMailAttribute.MAILET_NAME_PARAMETER, ATTRIBUTE_1)
+ .setProperty(RemoveMailAttribute.MAILET_NAME_PARAMETER, ATTRIBUTE_1.asString())
.build();
removeMailet.init(mailetConfig);
Mail mail = FakeMail.builder()
- .attribute(ATTRIBUTE_1, VALUE_1)
- .attribute(ATTRIBUTE_2, VALUE_2)
- .attribute(ATTRIBUTE_3, VALUE_3)
+ .attribute(VALUE_1)
+ .attribute(VALUE_2)
+ .attribute(VALUE_3)
.build();
removeMailet.service(mail);
- assertThat(mail.getAttributeNames()).containsOnly(ATTRIBUTE_2, ATTRIBUTE_3);
+ assertThat(mail.attributes()).containsOnly(VALUE_2, VALUE_3);
}
@Test
@@ -123,12 +126,12 @@ class RemoveMailAttributeTest {
removeMailet.init(mailetConfig);
Mail mail = FakeMail.builder()
- .attribute(ATTRIBUTE_1, VALUE_1)
- .attribute(ATTRIBUTE_2, VALUE_2)
- .attribute(ATTRIBUTE_3, VALUE_3)
+ .attribute(VALUE_1)
+ .attribute(VALUE_2)
+ .attribute(VALUE_3)
.build();
removeMailet.service(mail);
- assertThat(mail.getAttributeNames()).containsExactly(ATTRIBUTE_3);
+ assertThat(mail.attributes()).containsExactly(VALUE_3);
}
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/606ff9c9/server/container/core/src/main/java/org/apache/james/server/core/MailImpl.java
----------------------------------------------------------------------
diff --git a/server/container/core/src/main/java/org/apache/james/server/core/MailImpl.java b/server/container/core/src/main/java/org/apache/james/server/core/MailImpl.java
index 6b48478..9d97f30 100644
--- a/server/container/core/src/main/java/org/apache/james/server/core/MailImpl.java
+++ b/server/container/core/src/main/java/org/apache/james/server/core/MailImpl.java
@@ -64,6 +64,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.github.fge.lambdas.Throwing;
+import com.github.steveash.guavate.Guavate;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
@@ -442,12 +443,12 @@ public class MailImpl implements Disposable, Mail {
if (mail instanceof MailImpl) {
setAttributesRaw((Map<String, Object>) cloneSerializableObject(((MailImpl) mail).getAttributesRaw()));
} else {
- HashMap<String, Object> attribs = new HashMap<>();
- for (Iterator<String> i = mail.getAttributeNames(); i.hasNext(); ) {
- String hashKey = i.next();
- attribs.put(hashKey, cloneSerializableObject(mail.getAttribute(hashKey)));
- }
- setAttributesRaw(attribs);
+ ImmutableMap<String, Object> attributesMap = mail.attributes()
+ .collect(Guavate.toImmutableMap(
+ attribute -> attribute.getName().asString(),
+ Throwing.function(attribute -> cloneSerializableObject(attribute.getValue().getValue()))));
+
+ setAttributesRaw(attributesMap);
}
} catch (IOException | ClassNotFoundException e) {
LOGGER.error("Error while deserializing attributes", e);
http://git-wip-us.apache.org/repos/asf/james-project/blob/606ff9c9/server/mailet/mailets/src/test/java/org/apache/james/samples/mailets/InstrumentationMailet.java
----------------------------------------------------------------------
diff --git a/server/mailet/mailets/src/test/java/org/apache/james/samples/mailets/InstrumentationMailet.java b/server/mailet/mailets/src/test/java/org/apache/james/samples/mailets/InstrumentationMailet.java
index 05aa60f..4a68bb2 100644
--- a/server/mailet/mailets/src/test/java/org/apache/james/samples/mailets/InstrumentationMailet.java
+++ b/server/mailet/mailets/src/test/java/org/apache/james/samples/mailets/InstrumentationMailet.java
@@ -79,10 +79,9 @@ public class InstrumentationMailet implements Mailet {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Mail named: " + mail.getName());
- for (Iterator<String> it = mail.getAttributeNames(); it.hasNext();) {
- String attributeName = it.next();
- LOGGER.info("Attribute " + attributeName);
- }
+ mail.attributeNames()
+ .forEach(attributeName -> LOGGER.info("Attribute " + attributeName));
+
LOGGER.info("Message size: " + mail.getMessageSize());
LOGGER.info("Last updated: " + mail.getLastUpdated());
LOGGER.info("Remote Address: " + mail.getRemoteAddr());
http://git-wip-us.apache.org/repos/asf/james-project/blob/606ff9c9/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/WithStorageDirectiveTest.java
----------------------------------------------------------------------
diff --git a/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/WithStorageDirectiveTest.java b/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/WithStorageDirectiveTest.java
index 9fcebe5..2f61236 100644
--- a/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/WithStorageDirectiveTest.java
+++ b/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/WithStorageDirectiveTest.java
@@ -23,6 +23,9 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import org.apache.james.user.memory.MemoryUsersRepository;
+import org.apache.mailet.Attribute;
+import org.apache.mailet.AttributeName;
+import org.apache.mailet.AttributeValue;
import org.apache.mailet.base.MailAddressFixture;
import org.apache.mailet.base.test.FakeMail;
import org.apache.mailet.base.test.FakeMailetConfig;
@@ -71,10 +74,12 @@ public class WithStorageDirectiveTest {
testee.service(mail);
- softly.assertThat(mail.getAttributeNames())
- .containsOnly("DeliveryPath_recipient2@localhost", "DeliveryPath_recipient1@localhost");
- softly.assertThat(mail.getAttribute("DeliveryPath_recipient1@localhost")).isEqualTo(targetFolderName);
- softly.assertThat(mail.getAttribute("DeliveryPath_recipient2@localhost")).isEqualTo(targetFolderName);
+ AttributeName recipient1 = AttributeName.of("DeliveryPath_recipient1@localhost");
+ AttributeName recipient2 = AttributeName.of("DeliveryPath_recipient2@localhost");
+ softly.assertThat(mail.attributes())
+ .containsOnly(
+ new Attribute(recipient1, AttributeValue.of(targetFolderName)),
+ new Attribute(recipient2, AttributeValue.of(targetFolderName)));
}
@Test
@@ -90,7 +95,7 @@ public class WithStorageDirectiveTest {
testee.service(mail);
- assertThat(mail.getAttributeNames())
+ assertThat(mail.attributes())
.isEmpty();
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/606ff9c9/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAOTest.java
----------------------------------------------------------------------
diff --git a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAOTest.java b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAOTest.java
index c187823..2377b6b 100644
--- a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAOTest.java
+++ b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAOTest.java
@@ -29,6 +29,8 @@ import javax.mail.MessagingException;
import org.apache.james.backends.cassandra.CassandraCluster;
import org.apache.james.backends.cassandra.CassandraClusterExtension;
+import org.apache.james.backends.cassandra.components.CassandraModule;
+import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
import org.apache.james.blob.api.BlobId;
import org.apache.james.blob.api.TestBlobId;
import org.apache.james.mailrepository.api.MailKey;
@@ -52,8 +54,12 @@ class CassandraMailRepositoryMailDAOTest {
static final MailKey KEY_1 = new MailKey("key1");
static final TestBlobId.Factory BLOB_ID_FACTORY = new TestBlobId.Factory();
+ public static final CassandraModule MODULE = CassandraModule.aggregateModules(
+ CassandraMailRepositoryModule.MODULE,
+ CassandraSchemaVersionModule.MODULE);
+
@RegisterExtension
- static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraMailRepositoryModule.MODULE);
+ static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(MODULE);
abstract class TestSuite {
@@ -206,9 +212,9 @@ class CassandraMailRepositoryMailDAOTest {
String remoteAddr = "remoteAddr";
String remoteHost = "remoteHost";
PerRecipientHeaders.Header header = PerRecipientHeaders.Header.builder().name("headerName").value("headerValue").build();
- String attributeName = "att1";
+ AttributeName attributeName = AttributeName.of("att1");
List<AttributeValue<?>> attributeValue = ImmutableList.of(AttributeValue.of("value1"), AttributeValue.of("value2"));
- Attribute attribute = new Attribute(AttributeName.of(attributeName), AttributeValue.of(attributeValue));
+ Attribute attribute = new Attribute(attributeName, AttributeValue.of(attributeValue));
List<Attribute> attributes = ImmutableList.of(attribute);
testee.store(URL,
@@ -238,8 +244,8 @@ class CassandraMailRepositoryMailDAOTest {
softly.assertThat(partialMail.getState()).isEqualTo(state);
softly.assertThat(partialMail.getRemoteAddr()).isEqualTo(remoteAddr);
softly.assertThat(partialMail.getRemoteHost()).isEqualTo(remoteHost);
- softly.assertThat(partialMail.getAttributeNames()).containsOnly(attributeName);
- softly.assertThat(partialMail.getAttribute(AttributeName.of(attributeName))).contains(attribute);
+ softly.assertThat(partialMail.attributeNames()).containsOnly(attributeName);
+ softly.assertThat(partialMail.getAttribute(attributeName)).contains(attribute);
softly.assertThat(partialMail.getPerRecipientSpecificHeaders().getRecipientsWithSpecificHeaders())
.containsOnly(MailAddressFixture.RECIPIENT1);
softly.assertThat(partialMail.getPerRecipientSpecificHeaders().getHeadersForRecipient(MailAddressFixture.RECIPIENT1))
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org