You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/01/24 03:03:56 UTC
[5/8] james-project git commit: MAILBOX-374 Fix a bug between group
registration and key registration
MAILBOX-374 Fix a bug between group registration and key registration
In some cases groups were not dispatched when a key was registered/unregistered.
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/ac3d5209
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/ac3d5209
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/ac3d5209
Branch: refs/heads/master
Commit: ac3d5209e11b5573ed74ea9a55014245f2ad6830
Parents: 6a9554c
Author: Benoit Tellier <bt...@linagora.com>
Authored: Tue Jan 22 11:47:14 2019 +0700
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Thu Jan 24 09:46:03 2019 +0700
----------------------------------------------------------------------
.../james/mailbox/events/EventDispatcher.java | 21 +++++++++++---------
.../james/mailbox/events/GroupRegistration.java | 2 +-
2 files changed, 13 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/ac3d5209/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
index 628bab3..f1cc7d3 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
@@ -76,6 +76,14 @@ class EventDispatcher {
}
Mono<Void> dispatch(Event event, Set<RegistrationKey> keys) {
+ Mono<byte[]> serializedEvent = Mono.just(event)
+ .publishOn(Schedulers.parallel())
+ .map(this::serializeEvent)
+ .cache();
+
+ Mono<Void> distantDispatchMono = doDispatch(serializedEvent, keys).cache()
+ .subscribeWith(MonoProcessor.create());
+
Mono<Void> localListenerDelivery = Flux.fromIterable(keys)
.subscribeOn(Schedulers.elastic())
.flatMap(key -> mailboxListenerRegistry.getLocalMailboxListeners(key)
@@ -85,17 +93,12 @@ class EventDispatcher {
.doOnError(e -> structuredLogger(event, keys)
.log(logger -> logger.error("Exception happens when dispatching event", e)))
.onErrorResume(e -> Mono.empty()))
- .then();
-
- Mono<byte[]> serializedEvent = Mono.just(event)
- .publishOn(Schedulers.parallel())
- .map(this::serializeEvent)
- .cache();
-
- Mono<Void> distantDispatchMono = doDispatch(serializedEvent, keys).cache();
+ .cache()
+ .then()
+ .subscribeWith(MonoProcessor.create());
return Flux.concat(localListenerDelivery, distantDispatchMono)
- .subscribeWith(MonoProcessor.create());
+ .then();
}
private void executeListener(Event event, MailboxListener mailboxListener, RegistrationKey registrationKey) throws Exception {
http://git-wip-us.apache.org/repos/asf/james-project/blob/ac3d5209/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
index ee3013d..5a9bdf0 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
@@ -142,7 +142,7 @@ class GroupRegistration implements Registration {
int currentRetryCount = getRetryCount(acknowledgableDelivery);
return delayGenerator.delayIfHaveTo(currentRetryCount)
- .flatMap(any -> Mono.fromRunnable(Throwing.runnable(() -> runListener(event))))
+ .flatMap(any -> Mono.fromRunnable(Throwing.runnable(() -> runListener(event))).publishOn(Schedulers.elastic()))
.onErrorResume(throwable -> retryHandler.handleRetry(eventAsBytes, event, currentRetryCount, throwable))
.then(Mono.fromRunnable(acknowledgableDelivery::ack))
.subscribeWith(MonoProcessor.create())
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org