You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/03/04 11:43:37 UTC

[james-project] 03/16: MAILBOX-382 fix concurrency test issue for redelivery of multiple events

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 78da94b679bf327dd0272ba5b7dae7441a520057
Author: Rene Cordier <rc...@linagora.com>
AuthorDate: Fri Mar 1 11:04:27 2019 +0700

    MAILBOX-382 fix concurrency test issue for redelivery of multiple events
---
 .../mailbox/events/MemoryEventDeadLetters.java     | 45 ++++++++++++++--------
 .../service/EventDeadLettersRedeliverTask.java     |  7 ++--
 .../webadmin/service/EventDeadLettersService.java  |  9 +++--
 3 files changed, 37 insertions(+), 24 deletions(-)

diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/MemoryEventDeadLetters.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/MemoryEventDeadLetters.java
index fcddc5e..fdeec7c 100644
--- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/MemoryEventDeadLetters.java
+++ b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/MemoryEventDeadLetters.java
@@ -21,8 +21,9 @@ package org.apache.james.mailbox.events;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -33,7 +34,7 @@ public class MemoryEventDeadLetters implements EventDeadLetters {
     private final Multimap<Group, Event> deadLetters;
 
     public MemoryEventDeadLetters() {
-        this.deadLetters = Multimaps.synchronizedSetMultimap(HashMultimap.create());
+        this.deadLetters = HashMultimap.create();
     }
 
     @Override
@@ -41,9 +42,11 @@ public class MemoryEventDeadLetters implements EventDeadLetters {
         Preconditions.checkArgument(registeredGroup != null, REGISTERED_GROUP_CANNOT_BE_NULL);
         Preconditions.checkArgument(failDeliveredEvent != null, FAIL_DELIVERED_EVENT_CANNOT_BE_NULL);
 
-        return Mono.fromRunnable(() -> deadLetters.put(registeredGroup, failDeliveredEvent))
-            .subscribeWith(MonoProcessor.create())
-            .then();
+        synchronized (deadLetters) {
+            return Mono.fromRunnable(() -> deadLetters.put(registeredGroup, failDeliveredEvent))
+                .subscribeWith(MonoProcessor.create())
+                .then();
+        }
     }
 
     @Override
@@ -51,12 +54,14 @@ public class MemoryEventDeadLetters implements EventDeadLetters {
         Preconditions.checkArgument(registeredGroup != null, REGISTERED_GROUP_CANNOT_BE_NULL);
         Preconditions.checkArgument(failDeliveredEventId != null, FAIL_DELIVERED_ID_EVENT_CANNOT_BE_NULL);
 
-        return Flux.fromIterable(deadLetters.get(registeredGroup))
-            .filter(event -> event.getEventId().equals(failDeliveredEventId))
-            .next()
-            .doOnNext(event -> deadLetters.remove(registeredGroup, event))
-            .subscribeWith(MonoProcessor.create())
-            .then();
+        synchronized (deadLetters) {
+            return Flux.fromIterable(ImmutableList.copyOf(deadLetters.get(registeredGroup)))
+                .filter(event -> event.getEventId().equals(failDeliveredEventId))
+                .next()
+                .doOnNext(event -> deadLetters.remove(registeredGroup, event))
+                .subscribeWith(MonoProcessor.create())
+                .then();
+        }
     }
 
     @Override
@@ -64,21 +69,27 @@ public class MemoryEventDeadLetters implements EventDeadLetters {
         Preconditions.checkArgument(registeredGroup != null, REGISTERED_GROUP_CANNOT_BE_NULL);
         Preconditions.checkArgument(failDeliveredEventId != null, FAIL_DELIVERED_ID_EVENT_CANNOT_BE_NULL);
 
-        return Flux.fromIterable(deadLetters.get(registeredGroup))
-            .filter(event -> event.getEventId().equals(failDeliveredEventId))
-            .next();
+        synchronized (deadLetters) {
+            return Flux.fromIterable(ImmutableList.copyOf(deadLetters.get(registeredGroup)))
+                .filter(event -> event.getEventId().equals(failDeliveredEventId))
+                .next();
+        }
     }
 
     @Override
     public Flux<Event.EventId> failedEventIds(Group registeredGroup) {
         Preconditions.checkArgument(registeredGroup != null, REGISTERED_GROUP_CANNOT_BE_NULL);
 
-        return Flux.fromIterable(deadLetters.get(registeredGroup))
-            .map(Event::getEventId);
+        synchronized (deadLetters) {
+            return Flux.fromIterable(ImmutableList.copyOf(deadLetters.get(registeredGroup)))
+                .map(Event::getEventId);
+        }
     }
 
     @Override
     public Flux<Group> groupsWithFailedEvents() {
-        return Flux.fromIterable(deadLetters.keySet());
+        synchronized (deadLetters) {
+            return Flux.fromIterable(ImmutableSet.copyOf(deadLetters.keySet()));
+        }
     }
 }
diff --git a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverTask.java b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverTask.java
index 7e8c456..ea5553c 100644
--- a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverTask.java
+++ b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverTask.java
@@ -21,6 +21,7 @@ package org.apache.james.webadmin.service;
 
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
 
 import javax.inject.Inject;
 
@@ -61,12 +62,12 @@ public class EventDeadLettersRedeliverTask implements Task {
 
     private final EventBus eventBus;
     private final EventDeadLetters deadLetters;
-    private final Flux<Tuple2<Group, Event>> groupsWithEvents;
+    private final Supplier<Flux<Tuple2<Group, Event>>> groupsWithEvents;
     private final AtomicLong successfulRedeliveriesCount;
     private final AtomicLong failedRedeliveriesCount;
 
     @Inject
-    EventDeadLettersRedeliverTask(EventBus eventBus, EventDeadLetters deadLetters, Flux<Tuple2<Group, Event>> groupsWithEvents) {
+    EventDeadLettersRedeliverTask(EventBus eventBus, EventDeadLetters deadLetters, Supplier<Flux<Tuple2<Group, Event>>> groupsWithEvents) {
         this.eventBus = eventBus;
         this.deadLetters = deadLetters;
         this.groupsWithEvents = groupsWithEvents;
@@ -76,7 +77,7 @@ public class EventDeadLettersRedeliverTask implements Task {
 
     @Override
     public Result run() {
-        return groupsWithEvents.flatMap(entry -> redeliverGroupEvent(entry.getT1(), entry.getT2()))
+        return groupsWithEvents.get().flatMap(entry -> redeliverGroupEvent(entry.getT1(), entry.getT2()))
             .reduce(Result.COMPLETED, Task::combine)
             .onErrorResume(e -> {
                 LOGGER.error("Error while redelivering events", e);
diff --git a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersService.java b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersService.java
index ce2c915..e9ea848 100644
--- a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersService.java
+++ b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersService.java
@@ -21,6 +21,7 @@ package org.apache.james.webadmin.service;
 
 import java.util.List;
 import java.util.UUID;
+import java.util.function.Supplier;
 
 import javax.inject.Inject;
 
@@ -97,22 +98,22 @@ public class EventDeadLettersService {
     }
 
     public Task createActionOnEventsTask() {
-        Flux<Tuple2<Group, Event>> groupsWithEvents = listGroups().flatMap(this::getGroupWithEvents);
+        Supplier<Flux<Tuple2<Group, Event>>> groupsWithEvents = () -> listGroups().flatMap(this::getGroupWithEvents);
 
         return redeliverEvents(groupsWithEvents);
     }
 
     public Task createActionOnEventsTask(Group group) {
-        return redeliverEvents(getGroupWithEvents(group));
+        return redeliverEvents(() -> getGroupWithEvents(group));
     }
 
     public Task createActionOnEventsTask(Group group, Event.EventId eventId) {
-        Flux<Tuple2<Group, Event>> groupWithEvent = Flux.just(group).zipWith(getEvent(group, eventId));
+        Supplier<Flux<Tuple2<Group, Event>>> groupWithEvent = () -> Flux.just(group).zipWith(getEvent(group, eventId));
 
         return redeliverEvents(groupWithEvent);
     }
 
-    private Task redeliverEvents(Flux<Tuple2<Group, Event>> groupsWithEvents) {
+    private Task redeliverEvents(Supplier<Flux<Tuple2<Group, Event>>> groupsWithEvents) {
         return new EventDeadLettersRedeliverTask(eventBus, deadLetters, groupsWithEvents);
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org