You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/04/17 00:32:21 UTC

[james-project] 29/39: JAMES-3139 reDeliver() DispatchingFailureGroup should deliver events to all groups

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 5801a7947c58ec6d8d7a2f7755fd86b1d4ad0929
Author: Tran Tien Duc <dt...@linagora.com>
AuthorDate: Thu Apr 9 12:41:53 2020 +0700

    JAMES-3139 reDeliver() DispatchingFailureGroup should deliver events to all groups
---
 .../james/mailbox/events/RabbitMQEventBus.java     | 14 +++++
 .../james/mailbox/events/RabbitMQEventBusTest.java | 68 ++++++++++++++++++++++
 2 files changed, 82 insertions(+)

diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
index 311e789..5cc92e8 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
@@ -31,11 +31,13 @@ import org.apache.james.metrics.api.MetricFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
 
 import reactor.core.publisher.Mono;
 import reactor.rabbitmq.Sender;
 
 public class RabbitMQEventBus implements EventBus, Startable {
+    private static final Set<RegistrationKey> NO_KEY = ImmutableSet.of();
     private static final String NOT_RUNNING_ERROR_MESSAGE = "Event Bus is not running";
     static final String MAILBOX_EVENT = "mailboxEvent";
     static final String MAILBOX_EVENT_EXCHANGE_NAME = MAILBOX_EVENT + "-exchange";
@@ -143,6 +145,18 @@ public class RabbitMQEventBus implements EventBus, Startable {
     public Mono<Void> reDeliver(Group group, Event event) {
         Preconditions.checkState(isRunning, NOT_RUNNING_ERROR_MESSAGE);
         if (!event.isNoop()) {
+            /*
+            if the eventBus.dispatch() gets error while dispatching an event (rabbitMQ network outage maybe),
+            which means all the group consumers will not be receiving that event.
+
+            We store the that event in the dead letter and expecting in the future, it will be dispatched
+            again not only for a specific consumer but all.
+
+            That's why it is special, and we need to check event type before processing further.
+            */
+            if (group instanceof EventDispatcher.DispatchingFailureGroup) {
+                return eventDispatcher.dispatch(event, NO_KEY);
+            }
             return groupRegistrationHandler.retrieveGroupRegistration(group).reDeliver(event);
         }
         return Mono.empty();
diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
index abbfdf7..3dbbb80 100644
--- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
+++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
@@ -791,6 +791,74 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
             assertThat(dispatchingFailureEvents()).containsExactly(EVENT, EVENT_2);
         }
 
+        @Test
+        void dispatchShouldPersistEventsWhenDispatchingTheSameEventGetErrorMultipleTimes() {
+            EventCollector eventCollector = eventCollector();
+            eventBus().register(eventCollector, GROUP_A);
+
+            rabbitMQExtension.getRabbitMQ().pause();
+            doQuietly(() -> eventBus().dispatch(EVENT, NO_KEYS).block());
+            doQuietly(() -> eventBus().dispatch(EVENT, NO_KEYS).block());
+
+            assertThat(dispatchingFailureEvents()).containsExactly(EVENT, EVENT);
+        }
+
+        @Test
+        void reDeliverShouldDeliverToAllGroupsWhenDispatchingFailure() {
+            EventCollector eventCollector = eventCollector();
+            eventBus().register(eventCollector, GROUP_A);
+
+            EventCollector eventCollector2 = eventCollector();
+            eventBus().register(eventCollector2, GROUP_B);
+
+            rabbitMQExtension.getRabbitMQ().pause();
+            doQuietly(() -> eventBus().dispatch(EVENT, NO_KEYS).block());
+            rabbitMQExtension.getRabbitMQ().unpause();
+            dispatchingFailureEvents()
+                .forEach(event -> eventBus().reDeliver(DispatchingFailureGroup.INSTANCE, event).block());
+
+            getSpeedProfile().shortWaitCondition()
+                .untilAsserted(() -> assertThat(eventCollector.getEvents())
+                    .hasSameElementsAs(eventCollector2.getEvents())
+                    .containsExactly(EVENT));
+        }
+
+        @Test
+        void reDeliverShouldAddEventInDeadLetterWhenGettingError() {
+            EventCollector eventCollector = eventCollector();
+            eventBus().register(eventCollector, GROUP_A);
+
+            rabbitMQExtension.getRabbitMQ().pause();
+            doQuietly(() -> eventBus().dispatch(EVENT, NO_KEYS).block());
+            getSpeedProfile().longWaitCondition()
+                .until(() -> deadLetter().containEvents().block());
+
+            doQuietly(() -> eventBus().reDeliver(DispatchingFailureGroup.INSTANCE, EVENT).block());
+            rabbitMQExtension.getRabbitMQ().unpause();
+
+            getSpeedProfile().shortWaitCondition()
+                .untilAsserted(() -> assertThat(dispatchingFailureEvents())
+                    .containsExactly(EVENT, EVENT));
+        }
+
+        @Test
+        void reDeliverShouldNotStoreEventInAnotherGroupWhenGettingError() {
+            EventCollector eventCollector = eventCollector();
+            eventBus().register(eventCollector, GROUP_A);
+
+            rabbitMQExtension.getRabbitMQ().pause();
+            doQuietly(() -> eventBus().dispatch(EVENT, NO_KEYS).block());
+            getSpeedProfile().longWaitCondition()
+                .until(() -> deadLetter().containEvents().block());
+
+            doQuietly(() -> eventBus().reDeliver(DispatchingFailureGroup.INSTANCE, EVENT).block());
+            rabbitMQExtension.getRabbitMQ().unpause();
+
+            getSpeedProfile().shortWaitCondition()
+                .untilAsserted(() -> assertThat(deadLetter().groupsWithFailedEvents().toStream())
+                    .hasOnlyElementsOfType(DispatchingFailureGroup.class));
+        }
+
         private Stream<Event> dispatchingFailureEvents() {
             return deadLetter().failedIds(DispatchingFailureGroup.INSTANCE)
                 .flatMap(insertionId -> deadLetter().failedEvent(DispatchingFailureGroup.INSTANCE, insertionId))


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org