You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/01/22 02:20:43 UTC
[4/8] james-project git commit: MAILBOX-369 Using EventBusId to skip
synchronous local registered listener
MAILBOX-369 Using EventBusId to skip synchronous local registered listener
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/61488efd
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/61488efd
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/61488efd
Branch: refs/heads/master
Commit: 61488efd3f99b24629901899aa235ae3932dad07
Parents: 91502b4
Author: datph <dp...@linagora.com>
Authored: Tue Jan 15 18:22:22 2019 +0700
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Tue Jan 22 09:20:12 2019 +0700
----------------------------------------------------------------------
.../james/mailbox/events/GroupContract.java | 2 +-
.../james/mailbox/events/KeyContract.java | 27 +++++++++++++++-
.../apache/james/mailbox/events/EventBusId.java | 8 ++---
.../james/mailbox/events/EventDispatcher.java | 34 +++++++++++++++++---
.../mailbox/events/KeyRegistrationHandler.java | 21 +++++++++---
.../mailbox/events/MailboxListenerRegistry.java | 4 ---
.../james/mailbox/events/RabbitMQEventBus.java | 9 ++++--
.../mailbox/events/RabbitMQEventBusTest.java | 2 +-
8 files changed, 86 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/61488efd/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
index ac15639..c7f5009 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
@@ -217,7 +217,7 @@ public interface GroupContract {
MailboxListener listener = newListener();
MailboxListener failingListener = mock(MailboxListener.class);
- when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
+ when(failingListener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
doThrow(new RuntimeException()).when(failingListener).event(any());
eventBus().register(failingListener, GROUP_A);
http://git-wip-us.apache.org/repos/asf/james-project/blob/61488efd/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java
index 6bf091b..47090d0 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java
@@ -37,6 +37,7 @@ import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -114,6 +115,16 @@ public interface KeyContract extends EventBusContract {
}
@Test
+ default void dispatchShouldNotifyLocalRegisteredListenerWithoutDelay() throws Exception {
+ MailboxListener listener = newListener();
+ eventBus().register(listener, KEY_1);
+
+ eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
+
+ verify(listener, times(1)).event(any());
+ }
+
+ @Test
default void dispatchShouldNotifyOnlyRegisteredListener() throws Exception {
MailboxListener listener = newListener();
MailboxListener listener2 = newListener();
@@ -276,7 +287,7 @@ public interface KeyContract extends EventBusContract {
MailboxListener listener = newListener();
MailboxListener failingListener = mock(MailboxListener.class);
- when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
+ when(failingListener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
doThrow(new RuntimeException()).when(failingListener).event(any());
eventBus().register(failingListener, KEY_1);
@@ -339,5 +350,19 @@ public interface KeyContract extends EventBusContract {
.event(any());
}
+ @Test
+ default void localDispatchedListenersShouldBeDispatchedWithoutDelay() throws Exception {
+ MailboxListener mailboxListener1 = newListener();
+ MailboxListener mailboxListener2 = newListener();
+
+ eventBus().register(mailboxListener1, KEY_1);
+ eventBus2().register(mailboxListener2, KEY_1);
+
+ eventBus2().dispatch(EVENT, KEY_1).block();
+
+ verify(mailboxListener2, times(1)).event(any());
+ verify(mailboxListener1, timeout(ONE_SECOND).times(1)).event(any());
+ }
+
}
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/61488efd/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventBusId.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventBusId.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventBusId.java
index 65ff8a7..4e6dc52 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventBusId.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventBusId.java
@@ -46,10 +46,6 @@ public class EventBusId {
this.id = id;
}
- public UUID getId() {
- return id;
- }
-
@Override
public final boolean equals(Object o) {
if (o instanceof EventBusId) {
@@ -59,6 +55,10 @@ public class EventBusId {
return false;
}
+ public String asString() {
+ return id.toString();
+ }
+
@Override
public int hashCode() {
return Objects.hash(id);
http://git-wip-us.apache.org/repos/asf/james-project/blob/61488efd/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
index 77892b6..16e979e 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
@@ -21,6 +21,7 @@ package org.apache.james.mailbox.events;
import static org.apache.james.backend.rabbitmq.Constants.DIRECT_EXCHANGE;
import static org.apache.james.backend.rabbitmq.Constants.DURABLE;
+import static org.apache.james.mailbox.events.RabbitMQEventBus.EVENT_BUS_ID;
import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT_EXCHANGE_NAME;
import java.nio.charset.StandardCharsets;
@@ -28,6 +29,13 @@ import java.util.Set;
import org.apache.james.event.json.EventSerializer;
import org.apache.james.mailbox.Event;
+import org.apache.james.mailbox.MailboxListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.fge.lambdas.Throwing;
+import com.google.common.collect.ImmutableMap;
+import com.rabbitmq.client.AMQP;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -38,12 +46,20 @@ import reactor.rabbitmq.OutboundMessage;
import reactor.rabbitmq.Sender;
public class EventDispatcher {
+ private static final Logger LOGGER = LoggerFactory.getLogger(EventDispatcher.class);
+
private final EventSerializer eventSerializer;
private final Sender sender;
+ private final MailboxListenerRegistry mailboxListenerRegistry;
+ private final AMQP.BasicProperties basicProperties;
- EventDispatcher(EventSerializer eventSerializer, Sender sender) {
+ EventDispatcher(EventBusId eventBusId, EventSerializer eventSerializer, Sender sender, MailboxListenerRegistry mailboxListenerRegistry) {
this.eventSerializer = eventSerializer;
this.sender = sender;
+ this.mailboxListenerRegistry = mailboxListenerRegistry;
+ this.basicProperties = new AMQP.BasicProperties.Builder()
+ .headers(ImmutableMap.of(EVENT_BUS_ID, eventBusId.asString()))
+ .build();
}
void start() {
@@ -54,12 +70,23 @@ public class EventDispatcher {
}
Mono<Void> dispatch(Event event, Set<RegistrationKey> keys) {
+ Mono<Void> localListenerDelivery = Flux.fromIterable(keys)
+ .subscribeOn(Schedulers.elastic())
+ .flatMap(mailboxListenerRegistry::getLocalMailboxListeners)
+ .filter(mailboxListener -> mailboxListener.getExecutionMode().equals(MailboxListener.ExecutionMode.SYNCHRONOUS))
+ .flatMap(mailboxListener -> Mono.fromRunnable(Throwing.runnable(() -> mailboxListener.event(event)))
+ .doOnError(e -> LOGGER.error("Exception happens when handling event of user {}", event.getUser().asString(), e))
+ .onErrorResume(e -> Mono.empty()))
+ .then();
+
Mono<byte[]> serializedEvent = Mono.just(event)
.publishOn(Schedulers.parallel())
.map(this::serializeEvent)
.cache();
- return doDispatch(serializedEvent, keys)
+ Mono<Void> distantDispatchMono = doDispatch(serializedEvent, keys).cache();
+
+ return Flux.concat(localListenerDelivery, distantDispatchMono)
.subscribeWith(MonoProcessor.create());
}
@@ -71,7 +98,7 @@ public class EventDispatcher {
Flux<OutboundMessage> outboundMessages = routingKeys
.flatMap(routingKey -> serializedEvent
- .map(payload -> new OutboundMessage(MAILBOX_EVENT_EXCHANGE_NAME, routingKey.asString(), payload)));
+ .map(payload -> new OutboundMessage(MAILBOX_EVENT_EXCHANGE_NAME, routingKey.asString(), basicProperties, payload)));
return sender.send(outboundMessages);
}
@@ -79,5 +106,4 @@ public class EventDispatcher {
private byte[] serializeEvent(Event event) {
return eventSerializer.toJson(event).getBytes(StandardCharsets.UTF_8);
}
-
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/61488efd/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
index 3610b4b..864a234 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
@@ -23,6 +23,7 @@ import static org.apache.james.backend.rabbitmq.Constants.AUTO_DELETE;
import static org.apache.james.backend.rabbitmq.Constants.DURABLE;
import static org.apache.james.backend.rabbitmq.Constants.EXCLUSIVE;
import static org.apache.james.backend.rabbitmq.Constants.NO_ARGUMENTS;
+import static org.apache.james.mailbox.events.RabbitMQEventBus.EVENT_BUS_ID;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
@@ -50,6 +51,7 @@ import reactor.rabbitmq.Sender;
public class KeyRegistrationHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(KeyRegistrationHandler.class);
+ private final EventBusId eventBusId;
private final MailboxListenerRegistry mailboxListenerRegistry;
private final EventSerializer eventSerializer;
private final Sender sender;
@@ -59,11 +61,12 @@ public class KeyRegistrationHandler {
private final RegistrationBinder registrationBinder;
private Optional<Disposable> receiverSubscriber;
- public KeyRegistrationHandler(EventSerializer eventSerializer, Sender sender, Mono<Connection> connectionMono, RoutingKeyConverter routingKeyConverter) {
+ public KeyRegistrationHandler(EventBusId eventBusId, EventSerializer eventSerializer, Sender sender, Mono<Connection> connectionMono, RoutingKeyConverter routingKeyConverter, MailboxListenerRegistry mailboxListenerRegistry) {
+ this.eventBusId = eventBusId;
this.eventSerializer = eventSerializer;
this.sender = sender;
this.routingKeyConverter = routingKeyConverter;
- this.mailboxListenerRegistry = new MailboxListenerRegistry();
+ this.mailboxListenerRegistry = mailboxListenerRegistry;
this.receiver = RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(connectionMono));
this.registrationQueue = new RegistrationQueueName();
this.registrationBinder = new RegistrationBinder(sender, registrationQueue);
@@ -89,7 +92,6 @@ public class KeyRegistrationHandler {
receiverSubscriber.filter(subscriber -> !subscriber.isDisposed())
.ifPresent(subscriber -> subscriber.dispose());
receiver.close();
- mailboxListenerRegistry.clear();
sender.delete(QueueSpecification.queue(registrationQueue.asString())).block();
}
@@ -107,18 +109,29 @@ public class KeyRegistrationHandler {
if (delivery.getBody() == null) {
return Mono.empty();
}
+
+ String serializedEventBusId = delivery.getProperties().getHeaders().get(EVENT_BUS_ID).toString();
+ EventBusId eventBusId = EventBusId.of(serializedEventBusId);
+
String routingKey = delivery.getEnvelope().getRoutingKey();
RegistrationKey registrationKey = routingKeyConverter.toRegistrationKey(routingKey);
Event event = toEvent(delivery);
return mailboxListenerRegistry.getLocalMailboxListeners(registrationKey)
+ .filter(listener -> !isLocalSynchronousListeners(eventBusId, listener))
.flatMap(listener -> Mono.fromRunnable(Throwing.runnable(() -> listener.event(event)))
.doOnError(e -> LOGGER.error("Exception happens when handling event of user {}", event.getUser().asString(), e))
- .onErrorResume(e -> Mono.empty()))
+ .onErrorResume(e -> Mono.empty())
+ .then())
.subscribeOn(Schedulers.elastic())
.then();
}
+ private boolean isLocalSynchronousListeners(EventBusId eventBusId, MailboxListener listener) {
+ return eventBusId.equals(this.eventBusId) &&
+ listener.getExecutionMode().equals(MailboxListener.ExecutionMode.SYNCHRONOUS);
+ }
+
private Event toEvent(Delivery delivery) {
return eventSerializer.fromJson(new String(delivery.getBody(), StandardCharsets.UTF_8)).get();
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/61488efd/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/MailboxListenerRegistry.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/MailboxListenerRegistry.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/MailboxListenerRegistry.java
index 0d8f732..5fc89bd 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/MailboxListenerRegistry.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/MailboxListenerRegistry.java
@@ -48,10 +48,6 @@ class MailboxListenerRegistry {
}
}
- synchronized void clear() {
- listeners.clear();
- }
-
Flux<MailboxListener> getLocalMailboxListeners(RegistrationKey registrationKey) {
return Flux.fromIterable(listeners.get(registrationKey));
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/61488efd/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
index e9e594e..b2aaa8d 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
@@ -39,13 +39,16 @@ import reactor.rabbitmq.SenderOptions;
class RabbitMQEventBus implements EventBus {
static final String MAILBOX_EVENT = "mailboxEvent";
static final String MAILBOX_EVENT_EXCHANGE_NAME = MAILBOX_EVENT + "-exchange";
+ static final String EVENT_BUS_ID = "eventBusId";
private final Mono<Connection> connectionMono;
private final EventSerializer eventSerializer;
private final AtomicBoolean isRunning;
private final RoutingKeyConverter routingKeyConverter;
private final RetryBackoffConfiguration retryBackoff;
+ private final EventBusId eventBusId;
+ private MailboxListenerRegistry mailboxListenerRegistry;
private GroupRegistrationHandler groupRegistrationHandler;
private KeyRegistrationHandler keyRegistrationHandler;
private EventDispatcher eventDispatcher;
@@ -54,6 +57,7 @@ class RabbitMQEventBus implements EventBus {
RabbitMQEventBus(RabbitMQConnectionFactory rabbitMQConnectionFactory, EventSerializer eventSerializer,
RetryBackoffConfiguration retryBackoff,
RoutingKeyConverter routingKeyConverter) {
+ this.eventBusId = EventBusId.random();
this.connectionMono = Mono.fromSupplier(rabbitMQConnectionFactory::create).cache();
this.eventSerializer = eventSerializer;
this.routingKeyConverter = routingKeyConverter;
@@ -64,9 +68,10 @@ class RabbitMQEventBus implements EventBus {
public void start() {
if (!isRunning.get()) {
sender = RabbitFlux.createSender(new SenderOptions().connectionMono(connectionMono));
- keyRegistrationHandler = new KeyRegistrationHandler(eventSerializer, sender, connectionMono, routingKeyConverter);
groupRegistrationHandler = new GroupRegistrationHandler(eventSerializer, sender, connectionMono, retryBackoff);
- eventDispatcher = new EventDispatcher(eventSerializer, sender);
+ mailboxListenerRegistry = new MailboxListenerRegistry();
+ keyRegistrationHandler = new KeyRegistrationHandler(eventBusId, eventSerializer, sender, connectionMono, routingKeyConverter, mailboxListenerRegistry);
+ eventDispatcher = new EventDispatcher(eventBusId, eventSerializer, sender, mailboxListenerRegistry);
eventDispatcher.start();
keyRegistrationHandler.start();
http://git-wip-us.apache.org/repos/asf/james-project/blob/61488efd/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
index feaf712..6718093 100644
--- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
+++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
@@ -82,7 +82,7 @@ import reactor.rabbitmq.Sender;
import reactor.rabbitmq.SenderOptions;
class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract, GroupContract.MultipleEventBusGroupContract,
- EventBusConcurrentTestContract.SingleEventBusConcurrentContract, EventBusConcurrentTestContract.MultiEventBusConcurrentContract,
+ EventBusConcurrentTestContract.MultiEventBusConcurrentContract, EventBusConcurrentTestContract.SingleEventBusConcurrentContract,
KeyContract.SingleEventBusKeyContract, KeyContract.MultipleEventBusKeyContract,
ErrorHandlingContract {
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org