You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/03/04 11:43:37 UTC
[james-project] 03/16: MAILBOX-382 fix concurrency test issue for
redelivery of multiple events
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 78da94b679bf327dd0272ba5b7dae7441a520057
Author: Rene Cordier <rc...@linagora.com>
AuthorDate: Fri Mar 1 11:04:27 2019 +0700
MAILBOX-382 fix concurrency test issue for redelivery of multiple events
---
.../mailbox/events/MemoryEventDeadLetters.java | 45 ++++++++++++++--------
.../service/EventDeadLettersRedeliverTask.java | 7 ++--
.../webadmin/service/EventDeadLettersService.java | 9 +++--
3 files changed, 37 insertions(+), 24 deletions(-)
diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/MemoryEventDeadLetters.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/MemoryEventDeadLetters.java
index fcddc5e..fdeec7c 100644
--- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/MemoryEventDeadLetters.java
+++ b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/MemoryEventDeadLetters.java
@@ -21,8 +21,9 @@ package org.apache.james.mailbox.events;
import com.google.common.base.Preconditions;
import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -33,7 +34,7 @@ public class MemoryEventDeadLetters implements EventDeadLetters {
private final Multimap<Group, Event> deadLetters;
public MemoryEventDeadLetters() {
- this.deadLetters = Multimaps.synchronizedSetMultimap(HashMultimap.create());
+ this.deadLetters = HashMultimap.create();
}
@Override
@@ -41,9 +42,11 @@ public class MemoryEventDeadLetters implements EventDeadLetters {
Preconditions.checkArgument(registeredGroup != null, REGISTERED_GROUP_CANNOT_BE_NULL);
Preconditions.checkArgument(failDeliveredEvent != null, FAIL_DELIVERED_EVENT_CANNOT_BE_NULL);
- return Mono.fromRunnable(() -> deadLetters.put(registeredGroup, failDeliveredEvent))
- .subscribeWith(MonoProcessor.create())
- .then();
+ synchronized (deadLetters) {
+ return Mono.fromRunnable(() -> deadLetters.put(registeredGroup, failDeliveredEvent))
+ .subscribeWith(MonoProcessor.create())
+ .then();
+ }
}
@Override
@@ -51,12 +54,14 @@ public class MemoryEventDeadLetters implements EventDeadLetters {
Preconditions.checkArgument(registeredGroup != null, REGISTERED_GROUP_CANNOT_BE_NULL);
Preconditions.checkArgument(failDeliveredEventId != null, FAIL_DELIVERED_ID_EVENT_CANNOT_BE_NULL);
- return Flux.fromIterable(deadLetters.get(registeredGroup))
- .filter(event -> event.getEventId().equals(failDeliveredEventId))
- .next()
- .doOnNext(event -> deadLetters.remove(registeredGroup, event))
- .subscribeWith(MonoProcessor.create())
- .then();
+ synchronized (deadLetters) {
+ return Flux.fromIterable(ImmutableList.copyOf(deadLetters.get(registeredGroup)))
+ .filter(event -> event.getEventId().equals(failDeliveredEventId))
+ .next()
+ .doOnNext(event -> deadLetters.remove(registeredGroup, event))
+ .subscribeWith(MonoProcessor.create())
+ .then();
+ }
}
@Override
@@ -64,21 +69,27 @@ public class MemoryEventDeadLetters implements EventDeadLetters {
Preconditions.checkArgument(registeredGroup != null, REGISTERED_GROUP_CANNOT_BE_NULL);
Preconditions.checkArgument(failDeliveredEventId != null, FAIL_DELIVERED_ID_EVENT_CANNOT_BE_NULL);
- return Flux.fromIterable(deadLetters.get(registeredGroup))
- .filter(event -> event.getEventId().equals(failDeliveredEventId))
- .next();
+ synchronized (deadLetters) {
+ return Flux.fromIterable(ImmutableList.copyOf(deadLetters.get(registeredGroup)))
+ .filter(event -> event.getEventId().equals(failDeliveredEventId))
+ .next();
+ }
}
@Override
public Flux<Event.EventId> failedEventIds(Group registeredGroup) {
Preconditions.checkArgument(registeredGroup != null, REGISTERED_GROUP_CANNOT_BE_NULL);
- return Flux.fromIterable(deadLetters.get(registeredGroup))
- .map(Event::getEventId);
+ synchronized (deadLetters) {
+ return Flux.fromIterable(ImmutableList.copyOf(deadLetters.get(registeredGroup)))
+ .map(Event::getEventId);
+ }
}
@Override
public Flux<Group> groupsWithFailedEvents() {
- return Flux.fromIterable(deadLetters.keySet());
+ synchronized (deadLetters) {
+ return Flux.fromIterable(ImmutableSet.copyOf(deadLetters.keySet()));
+ }
}
}
diff --git a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverTask.java b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverTask.java
index 7e8c456..ea5553c 100644
--- a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverTask.java
+++ b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverTask.java
@@ -21,6 +21,7 @@ package org.apache.james.webadmin.service;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
import javax.inject.Inject;
@@ -61,12 +62,12 @@ public class EventDeadLettersRedeliverTask implements Task {
private final EventBus eventBus;
private final EventDeadLetters deadLetters;
- private final Flux<Tuple2<Group, Event>> groupsWithEvents;
+ private final Supplier<Flux<Tuple2<Group, Event>>> groupsWithEvents;
private final AtomicLong successfulRedeliveriesCount;
private final AtomicLong failedRedeliveriesCount;
@Inject
- EventDeadLettersRedeliverTask(EventBus eventBus, EventDeadLetters deadLetters, Flux<Tuple2<Group, Event>> groupsWithEvents) {
+ EventDeadLettersRedeliverTask(EventBus eventBus, EventDeadLetters deadLetters, Supplier<Flux<Tuple2<Group, Event>>> groupsWithEvents) {
this.eventBus = eventBus;
this.deadLetters = deadLetters;
this.groupsWithEvents = groupsWithEvents;
@@ -76,7 +77,7 @@ public class EventDeadLettersRedeliverTask implements Task {
@Override
public Result run() {
- return groupsWithEvents.flatMap(entry -> redeliverGroupEvent(entry.getT1(), entry.getT2()))
+ return groupsWithEvents.get().flatMap(entry -> redeliverGroupEvent(entry.getT1(), entry.getT2()))
.reduce(Result.COMPLETED, Task::combine)
.onErrorResume(e -> {
LOGGER.error("Error while redelivering events", e);
diff --git a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersService.java b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersService.java
index ce2c915..e9ea848 100644
--- a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersService.java
+++ b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersService.java
@@ -21,6 +21,7 @@ package org.apache.james.webadmin.service;
import java.util.List;
import java.util.UUID;
+import java.util.function.Supplier;
import javax.inject.Inject;
@@ -97,22 +98,22 @@ public class EventDeadLettersService {
}
public Task createActionOnEventsTask() {
- Flux<Tuple2<Group, Event>> groupsWithEvents = listGroups().flatMap(this::getGroupWithEvents);
+ Supplier<Flux<Tuple2<Group, Event>>> groupsWithEvents = () -> listGroups().flatMap(this::getGroupWithEvents);
return redeliverEvents(groupsWithEvents);
}
public Task createActionOnEventsTask(Group group) {
- return redeliverEvents(getGroupWithEvents(group));
+ return redeliverEvents(() -> getGroupWithEvents(group));
}
public Task createActionOnEventsTask(Group group, Event.EventId eventId) {
- Flux<Tuple2<Group, Event>> groupWithEvent = Flux.just(group).zipWith(getEvent(group, eventId));
+ Supplier<Flux<Tuple2<Group, Event>>> groupWithEvent = () -> Flux.just(group).zipWith(getEvent(group, eventId));
return redeliverEvents(groupWithEvent);
}
- private Task redeliverEvents(Flux<Tuple2<Group, Event>> groupsWithEvents) {
+ private Task redeliverEvents(Supplier<Flux<Tuple2<Group, Event>>> groupsWithEvents) {
return new EventDeadLettersRedeliverTask(eventBus, deadLetters, groupsWithEvents);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org