You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/04/17 00:32:21 UTC
[james-project] 29/39: JAMES-3139 reDeliver()
DispatchingFailureGroup should deliver events to all groups
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 5801a7947c58ec6d8d7a2f7755fd86b1d4ad0929
Author: Tran Tien Duc <dt...@linagora.com>
AuthorDate: Thu Apr 9 12:41:53 2020 +0700
JAMES-3139 reDeliver() DispatchingFailureGroup should deliver events to all groups
---
.../james/mailbox/events/RabbitMQEventBus.java | 14 +++++
.../james/mailbox/events/RabbitMQEventBusTest.java | 68 ++++++++++++++++++++++
2 files changed, 82 insertions(+)
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
index 311e789..5cc92e8 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
@@ -31,11 +31,13 @@ import org.apache.james.metrics.api.MetricFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
import reactor.core.publisher.Mono;
import reactor.rabbitmq.Sender;
public class RabbitMQEventBus implements EventBus, Startable {
+ private static final Set<RegistrationKey> NO_KEY = ImmutableSet.of();
private static final String NOT_RUNNING_ERROR_MESSAGE = "Event Bus is not running";
static final String MAILBOX_EVENT = "mailboxEvent";
static final String MAILBOX_EVENT_EXCHANGE_NAME = MAILBOX_EVENT + "-exchange";
@@ -143,6 +145,18 @@ public class RabbitMQEventBus implements EventBus, Startable {
public Mono<Void> reDeliver(Group group, Event event) {
Preconditions.checkState(isRunning, NOT_RUNNING_ERROR_MESSAGE);
if (!event.isNoop()) {
+ /*
+ if the eventBus.dispatch() gets error while dispatching an event (rabbitMQ network outage maybe),
+ which means all the group consumers will not be receiving that event.
+
+ We store the that event in the dead letter and expecting in the future, it will be dispatched
+ again not only for a specific consumer but all.
+
+ That's why it is special, and we need to check event type before processing further.
+ */
+ if (group instanceof EventDispatcher.DispatchingFailureGroup) {
+ return eventDispatcher.dispatch(event, NO_KEY);
+ }
return groupRegistrationHandler.retrieveGroupRegistration(group).reDeliver(event);
}
return Mono.empty();
diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
index abbfdf7..3dbbb80 100644
--- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
+++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
@@ -791,6 +791,74 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
assertThat(dispatchingFailureEvents()).containsExactly(EVENT, EVENT_2);
}
+ @Test
+ void dispatchShouldPersistEventsWhenDispatchingTheSameEventGetErrorMultipleTimes() {
+ EventCollector eventCollector = eventCollector();
+ eventBus().register(eventCollector, GROUP_A);
+
+ rabbitMQExtension.getRabbitMQ().pause();
+ doQuietly(() -> eventBus().dispatch(EVENT, NO_KEYS).block());
+ doQuietly(() -> eventBus().dispatch(EVENT, NO_KEYS).block());
+
+ assertThat(dispatchingFailureEvents()).containsExactly(EVENT, EVENT);
+ }
+
+ @Test
+ void reDeliverShouldDeliverToAllGroupsWhenDispatchingFailure() {
+ EventCollector eventCollector = eventCollector();
+ eventBus().register(eventCollector, GROUP_A);
+
+ EventCollector eventCollector2 = eventCollector();
+ eventBus().register(eventCollector2, GROUP_B);
+
+ rabbitMQExtension.getRabbitMQ().pause();
+ doQuietly(() -> eventBus().dispatch(EVENT, NO_KEYS).block());
+ rabbitMQExtension.getRabbitMQ().unpause();
+ dispatchingFailureEvents()
+ .forEach(event -> eventBus().reDeliver(DispatchingFailureGroup.INSTANCE, event).block());
+
+ getSpeedProfile().shortWaitCondition()
+ .untilAsserted(() -> assertThat(eventCollector.getEvents())
+ .hasSameElementsAs(eventCollector2.getEvents())
+ .containsExactly(EVENT));
+ }
+
+ @Test
+ void reDeliverShouldAddEventInDeadLetterWhenGettingError() {
+ EventCollector eventCollector = eventCollector();
+ eventBus().register(eventCollector, GROUP_A);
+
+ rabbitMQExtension.getRabbitMQ().pause();
+ doQuietly(() -> eventBus().dispatch(EVENT, NO_KEYS).block());
+ getSpeedProfile().longWaitCondition()
+ .until(() -> deadLetter().containEvents().block());
+
+ doQuietly(() -> eventBus().reDeliver(DispatchingFailureGroup.INSTANCE, EVENT).block());
+ rabbitMQExtension.getRabbitMQ().unpause();
+
+ getSpeedProfile().shortWaitCondition()
+ .untilAsserted(() -> assertThat(dispatchingFailureEvents())
+ .containsExactly(EVENT, EVENT));
+ }
+
+ @Test
+ void reDeliverShouldNotStoreEventInAnotherGroupWhenGettingError() {
+ EventCollector eventCollector = eventCollector();
+ eventBus().register(eventCollector, GROUP_A);
+
+ rabbitMQExtension.getRabbitMQ().pause();
+ doQuietly(() -> eventBus().dispatch(EVENT, NO_KEYS).block());
+ getSpeedProfile().longWaitCondition()
+ .until(() -> deadLetter().containEvents().block());
+
+ doQuietly(() -> eventBus().reDeliver(DispatchingFailureGroup.INSTANCE, EVENT).block());
+ rabbitMQExtension.getRabbitMQ().unpause();
+
+ getSpeedProfile().shortWaitCondition()
+ .untilAsserted(() -> assertThat(deadLetter().groupsWithFailedEvents().toStream())
+ .hasOnlyElementsOfType(DispatchingFailureGroup.class));
+ }
+
private Stream<Event> dispatchingFailureEvents() {
return deadLetter().failedIds(DispatchingFailureGroup.INSTANCE)
.flatMap(insertionId -> deadLetter().failedEvent(DispatchingFailureGroup.INSTANCE, insertionId))
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org