You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/01/22 02:20:43 UTC

[4/8] james-project git commit: MAILBOX-369 Using EventBusId to skip synchronous local registered listener

MAILBOX-369 Using EventBusId to skip synchronous local registered listener


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/61488efd
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/61488efd
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/61488efd

Branch: refs/heads/master
Commit: 61488efd3f99b24629901899aa235ae3932dad07
Parents: 91502b4
Author: datph <dp...@linagora.com>
Authored: Tue Jan 15 18:22:22 2019 +0700
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Tue Jan 22 09:20:12 2019 +0700

----------------------------------------------------------------------
 .../james/mailbox/events/GroupContract.java     |  2 +-
 .../james/mailbox/events/KeyContract.java       | 27 +++++++++++++++-
 .../apache/james/mailbox/events/EventBusId.java |  8 ++---
 .../james/mailbox/events/EventDispatcher.java   | 34 +++++++++++++++++---
 .../mailbox/events/KeyRegistrationHandler.java  | 21 +++++++++---
 .../mailbox/events/MailboxListenerRegistry.java |  4 ---
 .../james/mailbox/events/RabbitMQEventBus.java  |  9 ++++--
 .../mailbox/events/RabbitMQEventBusTest.java    |  2 +-
 8 files changed, 86 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/61488efd/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
index ac15639..c7f5009 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
@@ -217,7 +217,7 @@ public interface GroupContract {
             MailboxListener listener = newListener();
 
             MailboxListener failingListener = mock(MailboxListener.class);
-            when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
+            when(failingListener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
             doThrow(new RuntimeException()).when(failingListener).event(any());
 
             eventBus().register(failingListener, GROUP_A);

http://git-wip-us.apache.org/repos/asf/james-project/blob/61488efd/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java
index 6bf091b..47090d0 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java
@@ -37,6 +37,7 @@ import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -114,6 +115,16 @@ public interface KeyContract extends EventBusContract {
         }
 
         @Test
+        default void dispatchShouldNotifyLocalRegisteredListenerWithoutDelay() throws Exception {
+            MailboxListener listener = newListener();
+            eventBus().register(listener, KEY_1);
+
+            eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
+
+            verify(listener, times(1)).event(any());
+        }
+
+        @Test
         default void dispatchShouldNotifyOnlyRegisteredListener() throws Exception {
             MailboxListener listener = newListener();
             MailboxListener listener2 = newListener();
@@ -276,7 +287,7 @@ public interface KeyContract extends EventBusContract {
             MailboxListener listener = newListener();
 
             MailboxListener failingListener = mock(MailboxListener.class);
-            when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
+            when(failingListener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
             doThrow(new RuntimeException()).when(failingListener).event(any());
 
             eventBus().register(failingListener, KEY_1);
@@ -339,5 +350,19 @@ public interface KeyContract extends EventBusContract {
                 .event(any());
         }
 
+        @Test
+        default void localDispatchedListenersShouldBeDispatchedWithoutDelay() throws Exception {
+            MailboxListener mailboxListener1 = newListener();
+            MailboxListener mailboxListener2 = newListener();
+
+            eventBus().register(mailboxListener1, KEY_1);
+            eventBus2().register(mailboxListener2, KEY_1);
+
+            eventBus2().dispatch(EVENT, KEY_1).block();
+
+            verify(mailboxListener2, times(1)).event(any());
+            verify(mailboxListener1, timeout(ONE_SECOND).times(1)).event(any());
+        }
+
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/61488efd/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventBusId.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventBusId.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventBusId.java
index 65ff8a7..4e6dc52 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventBusId.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventBusId.java
@@ -46,10 +46,6 @@ public class EventBusId {
         this.id = id;
     }
 
-    public UUID getId() {
-        return id;
-    }
-
     @Override
     public final boolean equals(Object o) {
         if (o instanceof EventBusId) {
@@ -59,6 +55,10 @@ public class EventBusId {
         return false;
     }
 
+    public String asString() {
+        return id.toString();
+    }
+
     @Override
     public int hashCode() {
         return Objects.hash(id);

http://git-wip-us.apache.org/repos/asf/james-project/blob/61488efd/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
index 77892b6..16e979e 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
@@ -21,6 +21,7 @@ package org.apache.james.mailbox.events;
 
 import static org.apache.james.backend.rabbitmq.Constants.DIRECT_EXCHANGE;
 import static org.apache.james.backend.rabbitmq.Constants.DURABLE;
+import static org.apache.james.mailbox.events.RabbitMQEventBus.EVENT_BUS_ID;
 import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT_EXCHANGE_NAME;
 
 import java.nio.charset.StandardCharsets;
@@ -28,6 +29,13 @@ import java.util.Set;
 
 import org.apache.james.event.json.EventSerializer;
 import org.apache.james.mailbox.Event;
+import org.apache.james.mailbox.MailboxListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.fge.lambdas.Throwing;
+import com.google.common.collect.ImmutableMap;
+import com.rabbitmq.client.AMQP;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -38,12 +46,20 @@ import reactor.rabbitmq.OutboundMessage;
 import reactor.rabbitmq.Sender;
 
 public class EventDispatcher {
+    private static final Logger LOGGER = LoggerFactory.getLogger(EventDispatcher.class);
+
     private final EventSerializer eventSerializer;
     private final Sender sender;
+    private final MailboxListenerRegistry mailboxListenerRegistry;
+    private final AMQP.BasicProperties basicProperties;
 
-    EventDispatcher(EventSerializer eventSerializer, Sender sender) {
+    EventDispatcher(EventBusId eventBusId, EventSerializer eventSerializer, Sender sender, MailboxListenerRegistry mailboxListenerRegistry) {
         this.eventSerializer = eventSerializer;
         this.sender = sender;
+        this.mailboxListenerRegistry = mailboxListenerRegistry;
+        this.basicProperties = new AMQP.BasicProperties.Builder()
+            .headers(ImmutableMap.of(EVENT_BUS_ID, eventBusId.asString()))
+            .build();
     }
 
     void start() {
@@ -54,12 +70,23 @@ public class EventDispatcher {
     }
 
     Mono<Void> dispatch(Event event, Set<RegistrationKey> keys) {
+        Mono<Void> localListenerDelivery = Flux.fromIterable(keys)
+            .subscribeOn(Schedulers.elastic())
+            .flatMap(mailboxListenerRegistry::getLocalMailboxListeners)
+            .filter(mailboxListener -> mailboxListener.getExecutionMode().equals(MailboxListener.ExecutionMode.SYNCHRONOUS))
+            .flatMap(mailboxListener -> Mono.fromRunnable(Throwing.runnable(() -> mailboxListener.event(event)))
+                .doOnError(e -> LOGGER.error("Exception happens when handling event of user {}", event.getUser().asString(), e))
+                .onErrorResume(e -> Mono.empty()))
+            .then();
+
         Mono<byte[]> serializedEvent = Mono.just(event)
             .publishOn(Schedulers.parallel())
             .map(this::serializeEvent)
             .cache();
 
-        return doDispatch(serializedEvent, keys)
+        Mono<Void> distantDispatchMono = doDispatch(serializedEvent, keys).cache();
+
+        return Flux.concat(localListenerDelivery, distantDispatchMono)
             .subscribeWith(MonoProcessor.create());
     }
 
@@ -71,7 +98,7 @@ public class EventDispatcher {
 
         Flux<OutboundMessage> outboundMessages = routingKeys
             .flatMap(routingKey -> serializedEvent
-                .map(payload -> new OutboundMessage(MAILBOX_EVENT_EXCHANGE_NAME, routingKey.asString(), payload)));
+                .map(payload -> new OutboundMessage(MAILBOX_EVENT_EXCHANGE_NAME, routingKey.asString(), basicProperties, payload)));
 
         return sender.send(outboundMessages);
     }
@@ -79,5 +106,4 @@ public class EventDispatcher {
     private byte[] serializeEvent(Event event) {
         return eventSerializer.toJson(event).getBytes(StandardCharsets.UTF_8);
     }
-
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/61488efd/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
index 3610b4b..864a234 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
@@ -23,6 +23,7 @@ import static org.apache.james.backend.rabbitmq.Constants.AUTO_DELETE;
 import static org.apache.james.backend.rabbitmq.Constants.DURABLE;
 import static org.apache.james.backend.rabbitmq.Constants.EXCLUSIVE;
 import static org.apache.james.backend.rabbitmq.Constants.NO_ARGUMENTS;
+import static org.apache.james.mailbox.events.RabbitMQEventBus.EVENT_BUS_ID;
 
 import java.nio.charset.StandardCharsets;
 import java.util.Optional;
@@ -50,6 +51,7 @@ import reactor.rabbitmq.Sender;
 public class KeyRegistrationHandler {
     private static final Logger LOGGER = LoggerFactory.getLogger(KeyRegistrationHandler.class);
 
+    private final EventBusId eventBusId;
     private final MailboxListenerRegistry mailboxListenerRegistry;
     private final EventSerializer eventSerializer;
     private final Sender sender;
@@ -59,11 +61,12 @@ public class KeyRegistrationHandler {
     private final RegistrationBinder registrationBinder;
     private Optional<Disposable> receiverSubscriber;
 
-    public KeyRegistrationHandler(EventSerializer eventSerializer, Sender sender, Mono<Connection> connectionMono, RoutingKeyConverter routingKeyConverter) {
+    public KeyRegistrationHandler(EventBusId eventBusId, EventSerializer eventSerializer, Sender sender, Mono<Connection> connectionMono, RoutingKeyConverter routingKeyConverter, MailboxListenerRegistry mailboxListenerRegistry) {
+        this.eventBusId = eventBusId;
         this.eventSerializer = eventSerializer;
         this.sender = sender;
         this.routingKeyConverter = routingKeyConverter;
-        this.mailboxListenerRegistry = new MailboxListenerRegistry();
+        this.mailboxListenerRegistry = mailboxListenerRegistry;
         this.receiver = RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(connectionMono));
         this.registrationQueue = new RegistrationQueueName();
         this.registrationBinder = new RegistrationBinder(sender, registrationQueue);
@@ -89,7 +92,6 @@ public class KeyRegistrationHandler {
         receiverSubscriber.filter(subscriber -> !subscriber.isDisposed())
             .ifPresent(subscriber -> subscriber.dispose());
         receiver.close();
-        mailboxListenerRegistry.clear();
         sender.delete(QueueSpecification.queue(registrationQueue.asString())).block();
     }
 
@@ -107,18 +109,29 @@ public class KeyRegistrationHandler {
         if (delivery.getBody() == null) {
             return Mono.empty();
         }
+
+        String serializedEventBusId = delivery.getProperties().getHeaders().get(EVENT_BUS_ID).toString();
+        EventBusId eventBusId = EventBusId.of(serializedEventBusId);
+
         String routingKey = delivery.getEnvelope().getRoutingKey();
         RegistrationKey registrationKey = routingKeyConverter.toRegistrationKey(routingKey);
         Event event = toEvent(delivery);
 
         return mailboxListenerRegistry.getLocalMailboxListeners(registrationKey)
+            .filter(listener -> !isLocalSynchronousListeners(eventBusId, listener))
             .flatMap(listener -> Mono.fromRunnable(Throwing.runnable(() -> listener.event(event)))
                 .doOnError(e -> LOGGER.error("Exception happens when handling event of user {}", event.getUser().asString(), e))
-                .onErrorResume(e -> Mono.empty()))
+                .onErrorResume(e -> Mono.empty())
+                .then())
             .subscribeOn(Schedulers.elastic())
             .then();
     }
 
+    private boolean isLocalSynchronousListeners(EventBusId eventBusId, MailboxListener listener) {
+        return eventBusId.equals(this.eventBusId) &&
+            listener.getExecutionMode().equals(MailboxListener.ExecutionMode.SYNCHRONOUS);
+    }
+
     private Event toEvent(Delivery delivery) {
         return eventSerializer.fromJson(new String(delivery.getBody(), StandardCharsets.UTF_8)).get();
     }

http://git-wip-us.apache.org/repos/asf/james-project/blob/61488efd/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/MailboxListenerRegistry.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/MailboxListenerRegistry.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/MailboxListenerRegistry.java
index 0d8f732..5fc89bd 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/MailboxListenerRegistry.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/MailboxListenerRegistry.java
@@ -48,10 +48,6 @@ class MailboxListenerRegistry {
         }
     }
 
-    synchronized void clear() {
-        listeners.clear();
-    }
-
     Flux<MailboxListener> getLocalMailboxListeners(RegistrationKey registrationKey) {
         return Flux.fromIterable(listeners.get(registrationKey));
     }

http://git-wip-us.apache.org/repos/asf/james-project/blob/61488efd/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
index e9e594e..b2aaa8d 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
@@ -39,13 +39,16 @@ import reactor.rabbitmq.SenderOptions;
 class RabbitMQEventBus implements EventBus {
     static final String MAILBOX_EVENT = "mailboxEvent";
     static final String MAILBOX_EVENT_EXCHANGE_NAME = MAILBOX_EVENT + "-exchange";
+    static final String EVENT_BUS_ID = "eventBusId";
 
     private final Mono<Connection> connectionMono;
     private final EventSerializer eventSerializer;
     private final AtomicBoolean isRunning;
     private final RoutingKeyConverter routingKeyConverter;
     private final RetryBackoffConfiguration retryBackoff;
+    private final EventBusId eventBusId;
 
+    private MailboxListenerRegistry mailboxListenerRegistry;
     private GroupRegistrationHandler groupRegistrationHandler;
     private KeyRegistrationHandler keyRegistrationHandler;
     private EventDispatcher eventDispatcher;
@@ -54,6 +57,7 @@ class RabbitMQEventBus implements EventBus {
     RabbitMQEventBus(RabbitMQConnectionFactory rabbitMQConnectionFactory, EventSerializer eventSerializer,
                      RetryBackoffConfiguration retryBackoff,
                      RoutingKeyConverter routingKeyConverter) {
+        this.eventBusId = EventBusId.random();
         this.connectionMono = Mono.fromSupplier(rabbitMQConnectionFactory::create).cache();
         this.eventSerializer = eventSerializer;
         this.routingKeyConverter = routingKeyConverter;
@@ -64,9 +68,10 @@ class RabbitMQEventBus implements EventBus {
     public void start() {
         if (!isRunning.get()) {
             sender = RabbitFlux.createSender(new SenderOptions().connectionMono(connectionMono));
-            keyRegistrationHandler = new KeyRegistrationHandler(eventSerializer, sender, connectionMono, routingKeyConverter);
             groupRegistrationHandler = new GroupRegistrationHandler(eventSerializer, sender, connectionMono, retryBackoff);
-            eventDispatcher = new EventDispatcher(eventSerializer, sender);
+            mailboxListenerRegistry = new MailboxListenerRegistry();
+            keyRegistrationHandler = new KeyRegistrationHandler(eventBusId, eventSerializer, sender, connectionMono, routingKeyConverter, mailboxListenerRegistry);
+            eventDispatcher = new EventDispatcher(eventBusId, eventSerializer, sender, mailboxListenerRegistry);
 
             eventDispatcher.start();
             keyRegistrationHandler.start();

http://git-wip-us.apache.org/repos/asf/james-project/blob/61488efd/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
index feaf712..6718093 100644
--- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
+++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
@@ -82,7 +82,7 @@ import reactor.rabbitmq.Sender;
 import reactor.rabbitmq.SenderOptions;
 
 class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract, GroupContract.MultipleEventBusGroupContract,
-    EventBusConcurrentTestContract.SingleEventBusConcurrentContract, EventBusConcurrentTestContract.MultiEventBusConcurrentContract,
+    EventBusConcurrentTestContract.MultiEventBusConcurrentContract, EventBusConcurrentTestContract.SingleEventBusConcurrentContract,
     KeyContract.SingleEventBusKeyContract, KeyContract.MultipleEventBusKeyContract,
     ErrorHandlingContract {
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org