You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2022/03/29 06:43:46 UTC

[james-project] branch master updated: JAMES-3733 Support multi EventBus when re-deliver events from the dea… (#933)

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new 24feab3  JAMES-3733 Support multi EventBus when re-deliver events from the dea… (#933)
24feab3 is described below

commit 24feab311a68eadefe4034f07d89c1ff87e370de
Author: Benoit TELLIER <bt...@linagora.com>
AuthorDate: Tue Mar 29 13:43:41 2022 +0700

    JAMES-3733 Support multi EventBus when re-deliver events from the dea… (#933)
---
 .../java/org/apache/james/events/EventBus.java     |  6 ++
 .../james/events/GroupRegistrationHandler.java     |  5 ++
 .../org/apache/james/events/RabbitMQEventBus.java  |  6 ++
 .../java/org/apache/james/events/InVMEventBus.java |  6 ++
 .../james/modules/event/JMAPEventBusModule.java    |  5 ++
 .../modules/event/RabbitMQEventBusModule.java      |  4 ++
 .../james/modules/mailbox/DefaultEventModule.java  |  4 ++
 .../service/EventDeadLettersRedeliverService.java  | 28 +++++---
 .../routes/EventDeadLettersRoutesTest.java         | 84 ++++++++++++++++++++--
 9 files changed, 132 insertions(+), 16 deletions(-)

diff --git a/event-bus/api/src/main/java/org/apache/james/events/EventBus.java b/event-bus/api/src/main/java/org/apache/james/events/EventBus.java
index 21246e0..362acd1 100644
--- a/event-bus/api/src/main/java/org/apache/james/events/EventBus.java
+++ b/event-bus/api/src/main/java/org/apache/james/events/EventBus.java
@@ -19,10 +19,12 @@
 
 package org.apache.james.events;
 
+import java.util.Collection;
 import java.util.Set;
 
 import org.reactivestreams.Publisher;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 
 import reactor.core.publisher.Mono;
@@ -73,4 +75,8 @@ public interface EventBus {
     default Registration register(EventListener.ReactiveGroupEventListener groupListener) {
         return register(groupListener, groupListener.getDefaultGroup());
     }
+
+    default Collection<Group> listRegisteredGroups() {
+        return ImmutableList.of();
+    }
 }
diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java
index def37aa..9914cab 100644
--- a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java
+++ b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java
@@ -26,6 +26,7 @@ import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE;
 import static org.apache.james.backends.rabbitmq.Constants.REQUEUE;
 import static org.apache.james.events.GroupRegistration.DEFAULT_RETRY_COUNT;
 
+import java.util.Collection;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
@@ -195,4 +196,8 @@ class GroupRegistrationHandler {
             () -> groupRegistrations.remove(group),
             listenerExecutor, configuration);
     }
+
+    Collection<Group> registeredGroups() {
+        return groupRegistrations.keySet();
+    }
 }
\ No newline at end of file
diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java
index 0048485..27440af 100644
--- a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java
+++ b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java
@@ -19,6 +19,7 @@
 
 package org.apache.james.events;
 
+import java.util.Collection;
 import java.util.Set;
 
 import javax.annotation.PreDestroy;
@@ -172,4 +173,9 @@ public class RabbitMQEventBus implements EventBus, Startable {
         }
         return Mono.empty();
     }
+
+    @Override
+    public Collection<Group> listRegisteredGroups() {
+        return groupRegistrationHandler.registeredGroups();
+    }
 }
\ No newline at end of file
diff --git a/event-bus/in-vm/src/main/java/org/apache/james/events/InVMEventBus.java b/event-bus/in-vm/src/main/java/org/apache/james/events/InVMEventBus.java
index 7d98df4..d3ab03f 100644
--- a/event-bus/in-vm/src/main/java/org/apache/james/events/InVMEventBus.java
+++ b/event-bus/in-vm/src/main/java/org/apache/james/events/InVMEventBus.java
@@ -19,6 +19,7 @@
 
 package org.apache.james.events;
 
+import java.util.Collection;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -85,6 +86,11 @@ public class InVMEventBus implements EventBus {
         return Mono.empty();
     }
 
+    @Override
+    public Collection<Group> listRegisteredGroups() {
+        return groups.keySet();
+    }
+
     private EventListener.ReactiveEventListener retrieveListenerFromGroup(Group group) {
         return Optional.ofNullable(groups.get(group))
             .orElseThrow(() -> new GroupRegistrationNotFound(group));
diff --git a/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/JMAPEventBusModule.java b/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/JMAPEventBusModule.java
index 62a34bf..dbff6cd 100644
--- a/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/JMAPEventBusModule.java
+++ b/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/JMAPEventBusModule.java
@@ -101,4 +101,9 @@ public class JMAPEventBusModule extends AbstractModule {
     EventBus provideJmapEventBus(@Named(InjectionKeys.JMAP) RabbitMQEventBus rabbitMQEventBus) {
         return rabbitMQEventBus;
     }
+
+    @ProvidesIntoSet
+    EventBus registerEventBus(@Named(InjectionKeys.JMAP) EventBus eventBus) {
+        return eventBus;
+    }
 }
diff --git a/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/RabbitMQEventBusModule.java b/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/RabbitMQEventBusModule.java
index 6c0a85d..16f3196 100644
--- a/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/RabbitMQEventBusModule.java
+++ b/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/RabbitMQEventBusModule.java
@@ -50,6 +50,10 @@ public class RabbitMQEventBusModule extends AbstractModule {
         bind(RabbitMQEventBus.class).in(Scopes.SINGLETON);
         bind(EventBus.class).to(RabbitMQEventBus.class);
 
+        Multibinder.newSetBinder(binder(), EventBus.class)
+            .addBinding()
+            .to(EventBus.class);
+
         Multibinder.newSetBinder(binder(), RegistrationKey.Factory.class)
             .addBinding().to(MailboxIdRegistrationKey.Factory.class);
 
diff --git a/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java b/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java
index 27a1c47..c71c90a 100644
--- a/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java
+++ b/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java
@@ -57,6 +57,10 @@ public class DefaultEventModule extends AbstractModule {
         bind(EventDelivery.class).to(InVmEventDelivery.class);
         bind(EventBus.class).to(InVMEventBus.class);
 
+        Multibinder.newSetBinder(binder(), EventBus.class)
+            .addBinding()
+            .to(EventBus.class);
+
         bind(RetryBackoffConfiguration.class).toInstance(RetryBackoffConfiguration.DEFAULT);
 
         Multibinder.newSetBinder(binder(), EventListener.GroupEventListener.class);
diff --git a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverService.java b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverService.java
index c6f239f..2363c77 100644
--- a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverService.java
+++ b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverService.java
@@ -21,6 +21,8 @@ package org.apache.james.webadmin.service;
 
 import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
 
+import java.util.Set;
+
 import javax.inject.Inject;
 
 import org.apache.james.events.Event;
@@ -40,13 +42,13 @@ public class EventDeadLettersRedeliverService {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(EventDeadLettersRedeliverService.class);
 
-    private final EventBus eventBus;
+    private final Set<EventBus> eventBuses;
     private final EventDeadLetters deadLetters;
 
     @Inject
     @VisibleForTesting
-    public EventDeadLettersRedeliverService(EventBus eventBus, EventDeadLetters deadLetters) {
-        this.eventBus = eventBus;
+    public EventDeadLettersRedeliverService(Set<EventBus> eventBuses, EventDeadLetters deadLetters) {
+        this.eventBuses = eventBuses;
         this.deadLetters = deadLetters;
     }
 
@@ -56,12 +58,20 @@ public class EventDeadLettersRedeliverService {
     }
 
     private Mono<Task.Result> redeliverGroupEvents(Group group, Event event, EventDeadLetters.InsertionId insertionId) {
-        return eventBus.reDeliver(group, event)
-            .then(deadLetters.remove(group, insertionId))
-            .thenReturn(Task.Result.COMPLETED)
-            .onErrorResume(e -> {
-                LOGGER.error("Error while performing redelivery of event: {} for group: {}",
-                    event.getEventId().toString(), group.asString(), e);
+        return eventBuses.stream()
+            .filter(eventBus -> eventBus.listRegisteredGroups().contains(group))
+            .findFirst()
+            .map(eventBus -> eventBus. reDeliver(group, event)
+                .then(deadLetters.remove(group, insertionId))
+                .thenReturn(Task.Result.COMPLETED)
+                .onErrorResume(e -> {
+                    LOGGER.error("Error while performing redelivery of event: {} for group: {}",
+                        event.getEventId().toString(), group.asString(), e);
+                    return Mono.just(Task.Result.PARTIAL);
+                }))
+            .orElseGet(() -> {
+                LOGGER.error("No eventBus associated. event: {} for group: {}",
+                    event.getEventId().toString(), group.asString());
                 return Mono.just(Task.Result.PARTIAL);
             });
     }
diff --git a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/EventDeadLettersRoutesTest.java b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/EventDeadLettersRoutesTest.java
index e0eb519..bc94e12 100644
--- a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/EventDeadLettersRoutesTest.java
+++ b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/EventDeadLettersRoutesTest.java
@@ -72,6 +72,8 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.Test;
 
+import com.google.common.collect.ImmutableSet;
+
 import io.restassured.RestAssured;
 import io.restassured.http.ContentType;
 
@@ -109,10 +111,12 @@ class EventDeadLettersRoutesTest {
         "  }" +
         "}";
     private static final String SERIALIZED_GROUP_A = new EventBusTestFixture.GroupA().asString();
+    private static final String SERIALIZED_GROUP_B = new EventBusTestFixture.GroupB().asString();
 
     private WebAdminServer webAdminServer;
     private EventDeadLetters deadLetters;
-    private EventBus eventBus;
+    private EventBus eventBus1;
+    private EventBus eventBus2;
     private MemoryTaskManager taskManager;
 
     @BeforeEach
@@ -120,8 +124,9 @@ class EventDeadLettersRoutesTest {
         deadLetters = new MemoryEventDeadLetters();
         JsonTransformer jsonTransformer = new JsonTransformer();
         MailboxEventSerializer eventSerializer = new MailboxEventSerializer(new InMemoryId.Factory(), new InMemoryMessageId.Factory(), new DefaultUserQuotaRootResolver.DefaultQuotaRootDeserializer());
-        eventBus = new InVMEventBus(new InVmEventDelivery(new RecordingMetricFactory()), RetryBackoffConfiguration.DEFAULT, deadLetters);
-        EventDeadLettersRedeliverService redeliverService = new EventDeadLettersRedeliverService(eventBus, deadLetters);
+        eventBus1 = new InVMEventBus(new InVmEventDelivery(new RecordingMetricFactory()), RetryBackoffConfiguration.DEFAULT, deadLetters);
+        eventBus2 = new InVMEventBus(new InVmEventDelivery(new RecordingMetricFactory()), RetryBackoffConfiguration.DEFAULT, deadLetters);
+        EventDeadLettersRedeliverService redeliverService = new EventDeadLettersRedeliverService(ImmutableSet.of(eventBus1, eventBus2), deadLetters);
         EventDeadLettersService service = new EventDeadLettersService(redeliverService, deadLetters);
 
         taskManager = new MemoryTaskManager(new Hostname("foo"));
@@ -391,8 +396,8 @@ class EventDeadLettersRoutesTest {
             eventCollectorB = new EventCollector();
             groupA = new EventBusTestFixture.GroupA();
             groupB = new EventBusTestFixture.GroupB();
-            eventBus.register(eventCollectorA, groupA);
-            eventBus.register(eventCollectorB, groupB);
+            eventBus1.register(eventCollectorA, groupA);
+            eventBus1.register(eventCollectorB, groupB);
         }
 
         @Test
@@ -571,6 +576,71 @@ class EventDeadLettersRoutesTest {
     }
 
     @Nested
+    class SeveralEventBus {
+        private Group groupA;
+        private Group groupB;
+        private EventCollector eventCollectorA;
+        private EventCollector eventCollectorB;
+
+        @BeforeEach
+        void nestedBeforeEach() {
+            eventCollectorA = new EventCollector();
+            eventCollectorB = new EventCollector();
+            groupA = new EventBusTestFixture.GroupA();
+            groupB = new EventBusTestFixture.GroupB();
+            eventBus1.register(eventCollectorA, groupA);
+            eventBus2.register(eventCollectorB, groupB);
+        }
+
+        @Test
+        void postRedeliverAllEventsShouldRedeliverEventFromDeadLetters() {
+            deadLetters.store(groupA, EVENT_1).block();
+
+            String taskId = with()
+                .queryParam("action", EVENTS_ACTION)
+                .post("/events/deadLetter")
+                .jsonPath()
+                .get("taskId");
+
+            given()
+                .basePath(TasksRoutes.BASE)
+                .when()
+                .get(taskId + "/await")
+                .then()
+                .body("status", is("completed"))
+                .body("additionalInformation.successfulRedeliveriesCount", is(1))
+                .body("additionalInformation.failedRedeliveriesCount", is(0));
+
+            assertThat(eventCollectorA.getEvents()).hasSize(1);
+        }
+
+        @Test
+        void postRedeliverAllEventsShouldRemoveAllEventsFromDeadLetters() {
+            deadLetters.store(groupA, EVENT_1).block();
+            deadLetters.store(groupB, EVENT_2).block();
+
+            String taskId = with()
+                .queryParam("action", EVENTS_ACTION)
+                .post("/events/deadLetter")
+                .jsonPath()
+                .get("taskId");
+
+            given()
+                .basePath(TasksRoutes.BASE)
+            .when()
+                .get(taskId + "/await")
+            .then()
+                .body("status", is("completed"))
+                .body("additionalInformation.successfulRedeliveriesCount", is(2))
+                .body("additionalInformation.failedRedeliveriesCount", is(0));
+
+            assertThat(eventCollectorA.getEvents()).hasSize(1);
+            assertThat(eventCollectorB.getEvents()).hasSize(1);
+        }
+
+    }
+
+    @Nested
     class RedeliverGroupEvents {
         private Group groupA;
         private EventCollector eventCollector;
@@ -579,7 +649,7 @@ class EventDeadLettersRoutesTest {
         void nestedBeforeEach() {
             eventCollector = new EventCollector();
             groupA = new EventBusTestFixture.GroupA();
-            eventBus.register(eventCollector, groupA);
+            eventBus1.register(eventCollector, groupA);
         }
 
         @Test
@@ -799,7 +869,7 @@ class EventDeadLettersRoutesTest {
         void nestedBeforeEach() {
             eventCollector = new EventCollector();
             groupA = new EventBusTestFixture.GroupA();
-            eventBus.register(eventCollector, groupA);
+            eventBus1.register(eventCollector, groupA);
         }
 
         @Test

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org