You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2022/03/29 06:43:46 UTC
[james-project] branch master updated: JAMES-3733 Support multi EventBus when re-deliver events from the dea… (#933)
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new 24feab3 JAMES-3733 Support multi EventBus when re-deliver events from the dea… (#933)
24feab3 is described below
commit 24feab311a68eadefe4034f07d89c1ff87e370de
Author: Benoit TELLIER <bt...@linagora.com>
AuthorDate: Tue Mar 29 13:43:41 2022 +0700
JAMES-3733 Support multi EventBus when re-deliver events from the dea… (#933)
---
.../java/org/apache/james/events/EventBus.java | 6 ++
.../james/events/GroupRegistrationHandler.java | 5 ++
.../org/apache/james/events/RabbitMQEventBus.java | 6 ++
.../java/org/apache/james/events/InVMEventBus.java | 6 ++
.../james/modules/event/JMAPEventBusModule.java | 5 ++
.../modules/event/RabbitMQEventBusModule.java | 4 ++
.../james/modules/mailbox/DefaultEventModule.java | 4 ++
.../service/EventDeadLettersRedeliverService.java | 28 +++++---
.../routes/EventDeadLettersRoutesTest.java | 84 ++++++++++++++++++++--
9 files changed, 132 insertions(+), 16 deletions(-)
diff --git a/event-bus/api/src/main/java/org/apache/james/events/EventBus.java b/event-bus/api/src/main/java/org/apache/james/events/EventBus.java
index 21246e0..362acd1 100644
--- a/event-bus/api/src/main/java/org/apache/james/events/EventBus.java
+++ b/event-bus/api/src/main/java/org/apache/james/events/EventBus.java
@@ -19,10 +19,12 @@
package org.apache.james.events;
+import java.util.Collection;
import java.util.Set;
import org.reactivestreams.Publisher;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import reactor.core.publisher.Mono;
@@ -73,4 +75,8 @@ public interface EventBus {
default Registration register(EventListener.ReactiveGroupEventListener groupListener) {
return register(groupListener, groupListener.getDefaultGroup());
}
+
+ default Collection<Group> listRegisteredGroups() {
+ return ImmutableList.of();
+ }
}
diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java
index def37aa..9914cab 100644
--- a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java
+++ b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java
@@ -26,6 +26,7 @@ import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE;
import static org.apache.james.backends.rabbitmq.Constants.REQUEUE;
import static org.apache.james.events.GroupRegistration.DEFAULT_RETRY_COUNT;
+import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
@@ -195,4 +196,8 @@ class GroupRegistrationHandler {
() -> groupRegistrations.remove(group),
listenerExecutor, configuration);
}
+
+ Collection<Group> registeredGroups() {
+ return groupRegistrations.keySet();
+ }
}
\ No newline at end of file
diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java
index 0048485..27440af 100644
--- a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java
+++ b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java
@@ -19,6 +19,7 @@
package org.apache.james.events;
+import java.util.Collection;
import java.util.Set;
import javax.annotation.PreDestroy;
@@ -172,4 +173,9 @@ public class RabbitMQEventBus implements EventBus, Startable {
}
return Mono.empty();
}
+
+ @Override
+ public Collection<Group> listRegisteredGroups() {
+ return groupRegistrationHandler.registeredGroups();
+ }
}
\ No newline at end of file
diff --git a/event-bus/in-vm/src/main/java/org/apache/james/events/InVMEventBus.java b/event-bus/in-vm/src/main/java/org/apache/james/events/InVMEventBus.java
index 7d98df4..d3ab03f 100644
--- a/event-bus/in-vm/src/main/java/org/apache/james/events/InVMEventBus.java
+++ b/event-bus/in-vm/src/main/java/org/apache/james/events/InVMEventBus.java
@@ -19,6 +19,7 @@
package org.apache.james.events;
+import java.util.Collection;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -85,6 +86,11 @@ public class InVMEventBus implements EventBus {
return Mono.empty();
}
+ @Override
+ public Collection<Group> listRegisteredGroups() {
+ return groups.keySet();
+ }
+
private EventListener.ReactiveEventListener retrieveListenerFromGroup(Group group) {
return Optional.ofNullable(groups.get(group))
.orElseThrow(() -> new GroupRegistrationNotFound(group));
diff --git a/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/JMAPEventBusModule.java b/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/JMAPEventBusModule.java
index 62a34bf..dbff6cd 100644
--- a/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/JMAPEventBusModule.java
+++ b/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/JMAPEventBusModule.java
@@ -101,4 +101,9 @@ public class JMAPEventBusModule extends AbstractModule {
EventBus provideJmapEventBus(@Named(InjectionKeys.JMAP) RabbitMQEventBus rabbitMQEventBus) {
return rabbitMQEventBus;
}
+
+ @ProvidesIntoSet
+ EventBus registerEventBus(@Named(InjectionKeys.JMAP) EventBus eventBus) {
+ return eventBus;
+ }
}
diff --git a/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/RabbitMQEventBusModule.java b/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/RabbitMQEventBusModule.java
index 6c0a85d..16f3196 100644
--- a/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/RabbitMQEventBusModule.java
+++ b/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/RabbitMQEventBusModule.java
@@ -50,6 +50,10 @@ public class RabbitMQEventBusModule extends AbstractModule {
bind(RabbitMQEventBus.class).in(Scopes.SINGLETON);
bind(EventBus.class).to(RabbitMQEventBus.class);
+ Multibinder.newSetBinder(binder(), EventBus.class)
+ .addBinding()
+ .to(EventBus.class);
+
Multibinder.newSetBinder(binder(), RegistrationKey.Factory.class)
.addBinding().to(MailboxIdRegistrationKey.Factory.class);
diff --git a/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java b/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java
index 27a1c47..c71c90a 100644
--- a/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java
+++ b/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java
@@ -57,6 +57,10 @@ public class DefaultEventModule extends AbstractModule {
bind(EventDelivery.class).to(InVmEventDelivery.class);
bind(EventBus.class).to(InVMEventBus.class);
+ Multibinder.newSetBinder(binder(), EventBus.class)
+ .addBinding()
+ .to(EventBus.class);
+
bind(RetryBackoffConfiguration.class).toInstance(RetryBackoffConfiguration.DEFAULT);
Multibinder.newSetBinder(binder(), EventListener.GroupEventListener.class);
diff --git a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverService.java b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverService.java
index c6f239f..2363c77 100644
--- a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverService.java
+++ b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverService.java
@@ -21,6 +21,8 @@ package org.apache.james.webadmin.service;
import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
+import java.util.Set;
+
import javax.inject.Inject;
import org.apache.james.events.Event;
@@ -40,13 +42,13 @@ public class EventDeadLettersRedeliverService {
private static final Logger LOGGER = LoggerFactory.getLogger(EventDeadLettersRedeliverService.class);
- private final EventBus eventBus;
+ private final Set<EventBus> eventBuses;
private final EventDeadLetters deadLetters;
@Inject
@VisibleForTesting
- public EventDeadLettersRedeliverService(EventBus eventBus, EventDeadLetters deadLetters) {
- this.eventBus = eventBus;
+ public EventDeadLettersRedeliverService(Set<EventBus> eventBuses, EventDeadLetters deadLetters) {
+ this.eventBuses = eventBuses;
this.deadLetters = deadLetters;
}
@@ -56,12 +58,20 @@ public class EventDeadLettersRedeliverService {
}
private Mono<Task.Result> redeliverGroupEvents(Group group, Event event, EventDeadLetters.InsertionId insertionId) {
- return eventBus.reDeliver(group, event)
- .then(deadLetters.remove(group, insertionId))
- .thenReturn(Task.Result.COMPLETED)
- .onErrorResume(e -> {
- LOGGER.error("Error while performing redelivery of event: {} for group: {}",
- event.getEventId().toString(), group.asString(), e);
+ return eventBuses.stream()
+ .filter(eventBus -> eventBus.listRegisteredGroups().contains(group))
+ .findFirst()
+ .map(eventBus -> eventBus. reDeliver(group, event)
+ .then(deadLetters.remove(group, insertionId))
+ .thenReturn(Task.Result.COMPLETED)
+ .onErrorResume(e -> {
+ LOGGER.error("Error while performing redelivery of event: {} for group: {}",
+ event.getEventId().toString(), group.asString(), e);
+ return Mono.just(Task.Result.PARTIAL);
+ }))
+ .orElseGet(() -> {
+ LOGGER.error("No eventBus associated. event: {} for group: {}",
+ event.getEventId().toString(), group.asString());
return Mono.just(Task.Result.PARTIAL);
});
}
diff --git a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/EventDeadLettersRoutesTest.java b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/EventDeadLettersRoutesTest.java
index e0eb519..bc94e12 100644
--- a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/EventDeadLettersRoutesTest.java
+++ b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/EventDeadLettersRoutesTest.java
@@ -72,6 +72,8 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
+import com.google.common.collect.ImmutableSet;
+
import io.restassured.RestAssured;
import io.restassured.http.ContentType;
@@ -109,10 +111,12 @@ class EventDeadLettersRoutesTest {
" }" +
"}";
private static final String SERIALIZED_GROUP_A = new EventBusTestFixture.GroupA().asString();
+ private static final String SERIALIZED_GROUP_B = new EventBusTestFixture.GroupB().asString();
private WebAdminServer webAdminServer;
private EventDeadLetters deadLetters;
- private EventBus eventBus;
+ private EventBus eventBus1;
+ private EventBus eventBus2;
private MemoryTaskManager taskManager;
@BeforeEach
@@ -120,8 +124,9 @@ class EventDeadLettersRoutesTest {
deadLetters = new MemoryEventDeadLetters();
JsonTransformer jsonTransformer = new JsonTransformer();
MailboxEventSerializer eventSerializer = new MailboxEventSerializer(new InMemoryId.Factory(), new InMemoryMessageId.Factory(), new DefaultUserQuotaRootResolver.DefaultQuotaRootDeserializer());
- eventBus = new InVMEventBus(new InVmEventDelivery(new RecordingMetricFactory()), RetryBackoffConfiguration.DEFAULT, deadLetters);
- EventDeadLettersRedeliverService redeliverService = new EventDeadLettersRedeliverService(eventBus, deadLetters);
+ eventBus1 = new InVMEventBus(new InVmEventDelivery(new RecordingMetricFactory()), RetryBackoffConfiguration.DEFAULT, deadLetters);
+ eventBus2 = new InVMEventBus(new InVmEventDelivery(new RecordingMetricFactory()), RetryBackoffConfiguration.DEFAULT, deadLetters);
+ EventDeadLettersRedeliverService redeliverService = new EventDeadLettersRedeliverService(ImmutableSet.of(eventBus1, eventBus2), deadLetters);
EventDeadLettersService service = new EventDeadLettersService(redeliverService, deadLetters);
taskManager = new MemoryTaskManager(new Hostname("foo"));
@@ -391,8 +396,8 @@ class EventDeadLettersRoutesTest {
eventCollectorB = new EventCollector();
groupA = new EventBusTestFixture.GroupA();
groupB = new EventBusTestFixture.GroupB();
- eventBus.register(eventCollectorA, groupA);
- eventBus.register(eventCollectorB, groupB);
+ eventBus1.register(eventCollectorA, groupA);
+ eventBus1.register(eventCollectorB, groupB);
}
@Test
@@ -571,6 +576,71 @@ class EventDeadLettersRoutesTest {
}
@Nested
+ class SeveralEventBus {
+ private Group groupA;
+ private Group groupB;
+ private EventCollector eventCollectorA;
+ private EventCollector eventCollectorB;
+
+ @BeforeEach
+ void nestedBeforeEach() {
+ eventCollectorA = new EventCollector();
+ eventCollectorB = new EventCollector();
+ groupA = new EventBusTestFixture.GroupA();
+ groupB = new EventBusTestFixture.GroupB();
+ eventBus1.register(eventCollectorA, groupA);
+ eventBus2.register(eventCollectorB, groupB);
+ }
+
+ @Test
+ void postRedeliverAllEventsShouldRedeliverEventFromDeadLetters() {
+ deadLetters.store(groupA, EVENT_1).block();
+
+ String taskId = with()
+ .queryParam("action", EVENTS_ACTION)
+ .post("/events/deadLetter")
+ .jsonPath()
+ .get("taskId");
+
+ given()
+ .basePath(TasksRoutes.BASE)
+ .when()
+ .get(taskId + "/await")
+ .then()
+ .body("status", is("completed"))
+ .body("additionalInformation.successfulRedeliveriesCount", is(1))
+ .body("additionalInformation.failedRedeliveriesCount", is(0));
+
+ assertThat(eventCollectorA.getEvents()).hasSize(1);
+ }
+
+ @Test
+ void postRedeliverAllEventsShouldRemoveAllEventsFromDeadLetters() {
+ deadLetters.store(groupA, EVENT_1).block();
+ deadLetters.store(groupB, EVENT_2).block();
+
+ String taskId = with()
+ .queryParam("action", EVENTS_ACTION)
+ .post("/events/deadLetter")
+ .jsonPath()
+ .get("taskId");
+
+ given()
+ .basePath(TasksRoutes.BASE)
+ .when()
+ .get(taskId + "/await")
+ .then()
+ .body("status", is("completed"))
+ .body("additionalInformation.successfulRedeliveriesCount", is(2))
+ .body("additionalInformation.failedRedeliveriesCount", is(0));
+
+ assertThat(eventCollectorA.getEvents()).hasSize(1);
+ assertThat(eventCollectorB.getEvents()).hasSize(1);
+ }
+
+ }
+
+ @Nested
class RedeliverGroupEvents {
private Group groupA;
private EventCollector eventCollector;
@@ -579,7 +649,7 @@ class EventDeadLettersRoutesTest {
void nestedBeforeEach() {
eventCollector = new EventCollector();
groupA = new EventBusTestFixture.GroupA();
- eventBus.register(eventCollector, groupA);
+ eventBus1.register(eventCollector, groupA);
}
@Test
@@ -799,7 +869,7 @@ class EventDeadLettersRoutesTest {
void nestedBeforeEach() {
eventCollector = new EventCollector();
groupA = new EventBusTestFixture.GroupA();
- eventBus.register(eventCollector, groupA);
+ eventBus1.register(eventCollector, groupA);
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org