You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/01/16 06:59:11 UTC

[03/17] james-project git commit: MAILBOX-371 RabbitMQEventBus impl for key registrations

MAILBOX-371 RabbitMQEventBus impl for key registrations


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/68735a22
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/68735a22
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/68735a22

Branch: refs/heads/master
Commit: 68735a2219435677d9bf2e9a85abf9dd6b20407e
Parents: 74fa711
Author: datph <dp...@linagora.com>
Authored: Fri Jan 11 15:36:01 2019 +0700
Committer: datph <dp...@linagora.com>
Committed: Wed Jan 16 05:38:45 2019 +0700

----------------------------------------------------------------------
 .../james/mailbox/events/KeyContract.java       |  76 ++++++
 .../james/mailbox/events/EventDispatcher.java   |  24 +-
 .../james/mailbox/events/KeyRegistration.java   |  33 +++
 .../mailbox/events/KeyRegistrationHandler.java  | 124 +++++++++
 .../mailbox/events/MailboxListenerRegistry.java |  58 +++++
 .../james/mailbox/events/RabbitMQEventBus.java  |  16 +-
 .../mailbox/events/RegistrationBinder.java      |  54 ++++
 .../mailbox/events/RegistrationQueueName.java   |  42 +++
 .../events/MailboxListenerRegistryTest.java     | 256 +++++++++++++++++++
 .../mailbox/events/RabbitMQEventBusTest.java    |  21 +-
 10 files changed, 689 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/68735a22/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java
index bd92d82..f50a66d 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java
@@ -25,13 +25,17 @@ import static org.apache.james.mailbox.events.EventBusTestFixture.KEY_1;
 import static org.apache.james.mailbox.events.EventBusTestFixture.KEY_2;
 import static org.apache.james.mailbox.events.EventBusTestFixture.NO_KEYS;
 import static org.apache.james.mailbox.events.EventBusTestFixture.ONE_SECOND;
+import static org.apache.james.mailbox.events.EventBusTestFixture.WAIT_CONDITION;
 import static org.apache.james.mailbox.events.EventBusTestFixture.newListener;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.junit.jupiter.api.Assertions.assertTimeout;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.after;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -242,5 +246,77 @@ public interface KeyContract extends EventBusContract {
                     latch.countDown();
                 });
         }
+
+        @Test
+        default void failingRegisteredListenersShouldNotAbortRegisteredDelivery() {
+            EventBusTestFixture.MailboxListenerCountingSuccessfulExecution listener = spy(new EventBusTestFixture.MailboxListenerCountingSuccessfulExecution());
+            doThrow(new RuntimeException())
+                .doCallRealMethod()
+                .when(listener).event(any());
+            eventBus().register(listener, KEY_1);
+
+            eventBus().dispatch(EVENT, KEY_1).block();
+            eventBus().dispatch(EVENT, KEY_1).block();
+
+            WAIT_CONDITION
+                .until(() -> assertThat(listener.numberOfEventCalls()).isEqualTo(1));
+        }
+
+        @Test
+        default void allRegisteredListenersShouldBeExecutedWhenARegisteredListenerFails() {
+            MailboxListener listener = newListener();
+
+            MailboxListener failingListener = mock(MailboxListener.class);
+            when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
+            doThrow(new RuntimeException()).when(failingListener).event(any());
+
+            eventBus().register(failingListener, KEY_1);
+            eventBus().register(listener, KEY_1);
+
+            eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
+
+            verify(listener, timeout(ONE_SECOND).times(1)).event(any());
+        }
+    }
+
+    interface MultipleEventBusKeyContract extends MultipleEventBusContract {
+
+        @Test
+        default void crossEventBusRegistrationShouldBeAllowed() {
+            MailboxListener mailboxListener = newListener();
+
+            eventBus().register(mailboxListener, KEY_1);
+
+            eventBus2().dispatch(EVENT, KEY_1).block();
+
+            verify(mailboxListener, timeout(ONE_SECOND).times(1)).event(any());
+        }
+
+        @Test
+        default void unregisteredDistantListenersShouldNotBeNotified() {
+            MailboxListener mailboxListener = newListener();
+
+            eventBus().register(mailboxListener, KEY_1).unregister();
+
+            eventBus2().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
+
+            verify(mailboxListener, after(FIVE_HUNDRED_MS).never())
+                .event(any());
+        }
+
+        @Test
+        default void allRegisteredListenersShouldBeDispatched() {
+            MailboxListener mailboxListener1 = newListener();
+            MailboxListener mailboxListener2 = newListener();
+
+            eventBus().register(mailboxListener1, KEY_1);
+            eventBus2().register(mailboxListener2, KEY_1);
+
+            eventBus2().dispatch(EVENT, KEY_1).block();
+
+            verify(mailboxListener1, timeout(ONE_SECOND).times(1)).event(any());
+            verify(mailboxListener2, timeout(ONE_SECOND).times(1)).event(any());
+        }
+
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/68735a22/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
index b01fc53..77892b6 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
@@ -21,14 +21,15 @@ package org.apache.james.mailbox.events;
 
 import static org.apache.james.backend.rabbitmq.Constants.DIRECT_EXCHANGE;
 import static org.apache.james.backend.rabbitmq.Constants.DURABLE;
-import static org.apache.james.backend.rabbitmq.Constants.EMPTY_ROUTING_KEY;
 import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT_EXCHANGE_NAME;
 
 import java.nio.charset.StandardCharsets;
+import java.util.Set;
 
 import org.apache.james.event.json.EventSerializer;
 import org.apache.james.mailbox.Event;
 
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.publisher.MonoProcessor;
 import reactor.core.scheduler.Schedulers;
@@ -52,16 +53,29 @@ public class EventDispatcher {
             .block();
     }
 
-    Mono<Void> dispatch(Event event) {
-        Mono<OutboundMessage> outboundMessage = Mono.just(event)
+    Mono<Void> dispatch(Event event, Set<RegistrationKey> keys) {
+        Mono<byte[]> serializedEvent = Mono.just(event)
             .publishOn(Schedulers.parallel())
             .map(this::serializeEvent)
-            .map(payload -> new OutboundMessage(MAILBOX_EVENT_EXCHANGE_NAME, EMPTY_ROUTING_KEY, payload));
+            .cache();
 
-        return sender.send(outboundMessage)
+        return doDispatch(serializedEvent, keys)
             .subscribeWith(MonoProcessor.create());
     }
 
+    private Mono<Void> doDispatch(Mono<byte[]> serializedEvent, Set<RegistrationKey> keys) {
+        Flux<RoutingKeyConverter.RoutingKey> routingKeys = Flux.concat(
+            Mono.just(RoutingKeyConverter.RoutingKey.empty()),
+            Flux.fromIterable(keys)
+                .map(RoutingKeyConverter.RoutingKey::of));
+
+        Flux<OutboundMessage> outboundMessages = routingKeys
+            .flatMap(routingKey -> serializedEvent
+                .map(payload -> new OutboundMessage(MAILBOX_EVENT_EXCHANGE_NAME, routingKey.asString(), payload)));
+
+        return sender.send(outboundMessages);
+    }
+
     private byte[] serializeEvent(Event event) {
         return eventSerializer.toJson(event).getBytes(StandardCharsets.UTF_8);
     }

http://git-wip-us.apache.org/repos/asf/james-project/blob/68735a22/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistration.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistration.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistration.java
new file mode 100644
index 0000000..8dd002d
--- /dev/null
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistration.java
@@ -0,0 +1,33 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events;
+
+class KeyRegistration implements Registration {
+    private final Runnable unregister;
+
+    KeyRegistration(Runnable unregister) {
+        this.unregister = unregister;
+    }
+
+    @Override
+    public void unregister() {
+        unregister.run();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/james-project/blob/68735a22/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
new file mode 100644
index 0000000..ff388df
--- /dev/null
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
@@ -0,0 +1,124 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events;
+
+import static org.apache.james.backend.rabbitmq.Constants.AUTO_DELETE;
+import static org.apache.james.backend.rabbitmq.Constants.DURABLE;
+import static org.apache.james.backend.rabbitmq.Constants.EXCLUSIVE;
+import static org.apache.james.backend.rabbitmq.Constants.NO_ARGUMENTS;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Optional;
+
+import org.apache.james.event.json.EventSerializer;
+import org.apache.james.mailbox.Event;
+import org.apache.james.mailbox.MailboxListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.Delivery;
+
+import reactor.core.Disposable;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+import reactor.rabbitmq.QueueSpecification;
+import reactor.rabbitmq.RabbitFlux;
+import reactor.rabbitmq.Receiver;
+import reactor.rabbitmq.ReceiverOptions;
+import reactor.rabbitmq.Sender;
+
+public class KeyRegistrationHandler {
+    private static final Logger LOGGER = LoggerFactory.getLogger(KeyRegistrationHandler.class);
+
+    private final MailboxListenerRegistry mailboxListenerRegistry;
+    private final EventSerializer eventSerializer;
+    private final Sender sender;
+    private final RoutingKeyConverter routingKeyConverter;
+    private final Receiver receiver;
+    private final RegistrationQueueName registrationQueue;
+    private final RegistrationBinder registrationBinder;
+    private Optional<Disposable> receiverSubscriber;
+
+    public KeyRegistrationHandler(EventSerializer eventSerializer, Sender sender, Mono<Connection> connectionMono, RoutingKeyConverter routingKeyConverter) {
+        this.eventSerializer = eventSerializer;
+        this.sender = sender;
+        this.routingKeyConverter = routingKeyConverter;
+        this.mailboxListenerRegistry = new MailboxListenerRegistry();
+        this.receiver = RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(connectionMono));
+        this.registrationQueue = new RegistrationQueueName();
+        this.registrationBinder = new RegistrationBinder(sender, registrationQueue);
+    }
+
+    void start() {
+        sender.declareQueue(QueueSpecification.queue()
+            .durable(DURABLE)
+            .exclusive(EXCLUSIVE)
+            .autoDelete(AUTO_DELETE)
+            .arguments(NO_ARGUMENTS))
+            .map(AMQP.Queue.DeclareOk::getQueue)
+            .doOnSuccess(registrationQueue::initialize)
+            .block();
+
+        receiverSubscriber = Optional.of(receiver.consumeAutoAck(registrationQueue.asString())
+            .subscribeOn(Schedulers.parallel())
+            .flatMap(this::handleDelivery)
+            .subscribe());
+    }
+
+    void stop() {
+        receiverSubscriber.filter(subscriber -> !subscriber.isDisposed())
+            .ifPresent(subscriber -> subscriber.dispose());
+        receiver.close();
+        mailboxListenerRegistry.clear();
+        sender.delete(QueueSpecification.queue(registrationQueue.asString())).block();
+    }
+
+    Registration register(MailboxListener listener, RegistrationKey key) {
+        Runnable bindIfEmpty = () -> registrationBinder.bind(key).block();
+        Runnable unbindIfEmpty = () -> registrationBinder.unbind(key).block();
+        Runnable unregister = () -> mailboxListenerRegistry.removeListener(key, listener, unbindIfEmpty);
+
+        KeyRegistration keyRegistration = new KeyRegistration(unregister);
+        mailboxListenerRegistry.addListener(key, listener, bindIfEmpty);
+        return keyRegistration;
+    }
+
+    private Mono<Void> handleDelivery(Delivery delivery) {
+        if (delivery.getBody() == null) {
+            return Mono.empty();
+        }
+        String routingKey = delivery.getEnvelope().getRoutingKey();
+        RegistrationKey registrationKey = routingKeyConverter.toRegistrationKey(routingKey);
+        Event event = toEvent(delivery);
+
+        return mailboxListenerRegistry.getLocalMailboxListeners(registrationKey)
+            .flatMap(listener -> Mono.fromRunnable(() -> listener.event(event))
+                .doOnError(e -> LOGGER.error("Exception happens when handling event of user {}", event.getUser().asString(), e))
+                .onErrorResume(e -> Mono.empty()))
+            .subscribeOn(Schedulers.elastic())
+            .then();
+    }
+
+    private Event toEvent(Delivery delivery) {
+        return eventSerializer.fromJson(new String(delivery.getBody(), StandardCharsets.UTF_8)).get();
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/68735a22/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/MailboxListenerRegistry.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/MailboxListenerRegistry.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/MailboxListenerRegistry.java
new file mode 100644
index 0000000..0d8f732
--- /dev/null
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/MailboxListenerRegistry.java
@@ -0,0 +1,58 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events;
+
+import org.apache.james.mailbox.MailboxListener;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+
+import reactor.core.publisher.Flux;
+
+class MailboxListenerRegistry {
+    private final Multimap<RegistrationKey, MailboxListener> listeners;
+
+    MailboxListenerRegistry() {
+        this.listeners = Multimaps.synchronizedMultimap(HashMultimap.create());
+    }
+
+    synchronized void addListener(RegistrationKey registrationKey, MailboxListener listener, Runnable runIfEmpty) {
+        if (listeners.get(registrationKey).isEmpty()) {
+            runIfEmpty.run();
+        }
+        listeners.put(registrationKey, listener);
+    }
+
+    synchronized void removeListener(RegistrationKey registrationKey, MailboxListener listener, Runnable runIfEmpty) {
+        boolean wasRemoved = listeners.remove(registrationKey, listener);
+        if (wasRemoved && listeners.get(registrationKey).isEmpty()) {
+            runIfEmpty.run();
+        }
+    }
+
+    synchronized void clear() {
+        listeners.clear();
+    }
+
+    Flux<MailboxListener> getLocalMailboxListeners(RegistrationKey registrationKey) {
+        return Flux.fromIterable(listeners.get(registrationKey));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/james-project/blob/68735a22/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
index 87e8ab1..447b345 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
@@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.annotation.PreDestroy;
 
-import org.apache.commons.lang3.NotImplementedException;
 import org.apache.james.backend.rabbitmq.RabbitMQConnectionFactory;
 import org.apache.james.event.json.EventSerializer;
 import org.apache.james.mailbox.Event;
@@ -44,25 +43,29 @@ class RabbitMQEventBus implements EventBus {
     private final Mono<Connection> connectionMono;
     private final EventSerializer eventSerializer;
     private final AtomicBoolean isRunning;
+    private final RoutingKeyConverter routingKeyConverter;
 
     private GroupRegistrationHandler groupRegistrationHandler;
+    private KeyRegistrationHandler keyRegistrationHandler;
     private EventDispatcher eventDispatcher;
     private Sender sender;
 
-    RabbitMQEventBus(RabbitMQConnectionFactory rabbitMQConnectionFactory, EventSerializer eventSerializer) {
+    RabbitMQEventBus(RabbitMQConnectionFactory rabbitMQConnectionFactory, EventSerializer eventSerializer, RoutingKeyConverter routingKeyConverter) {
         this.connectionMono = Mono.fromSupplier(rabbitMQConnectionFactory::create).cache();
         this.eventSerializer = eventSerializer;
-        isRunning = new AtomicBoolean(false);
+        this.routingKeyConverter = routingKeyConverter;
+        this.isRunning = new AtomicBoolean(false);
     }
 
     public void start() {
         if (!isRunning.get()) {
             sender = RabbitFlux.createSender(new SenderOptions().connectionMono(connectionMono));
             groupRegistrationHandler = new GroupRegistrationHandler(eventSerializer, sender, connectionMono);
+            keyRegistrationHandler = new KeyRegistrationHandler(eventSerializer, sender, connectionMono, routingKeyConverter);
             eventDispatcher = new EventDispatcher(eventSerializer, sender);
 
             eventDispatcher.start();
-
+            keyRegistrationHandler.start();
             isRunning.set(true);
         }
     }
@@ -71,6 +74,7 @@ class RabbitMQEventBus implements EventBus {
     public void stop() {
         if (isRunning.get()) {
             groupRegistrationHandler.stop();
+            keyRegistrationHandler.stop();
             sender.close();
             isRunning.set(false);
         }
@@ -78,7 +82,7 @@ class RabbitMQEventBus implements EventBus {
 
     @Override
     public Registration register(MailboxListener listener, RegistrationKey key) {
-        throw new NotImplementedException("will implement latter");
+        return keyRegistrationHandler.register(listener, key);
     }
 
     @Override
@@ -89,7 +93,7 @@ class RabbitMQEventBus implements EventBus {
     @Override
     public Mono<Void> dispatch(Event event, Set<RegistrationKey> key) {
         if (!event.isNoop()) {
-            return eventDispatcher.dispatch(event);
+            return eventDispatcher.dispatch(event, key);
         }
         return Mono.empty();
     }

http://git-wip-us.apache.org/repos/asf/james-project/blob/68735a22/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RegistrationBinder.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RegistrationBinder.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RegistrationBinder.java
new file mode 100644
index 0000000..f4405a6
--- /dev/null
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RegistrationBinder.java
@@ -0,0 +1,54 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events;
+
+import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT_EXCHANGE_NAME;
+
+import reactor.core.publisher.Mono;
+import reactor.rabbitmq.BindingSpecification;
+import reactor.rabbitmq.Sender;
+
+class RegistrationBinder {
+    private final Sender sender;
+    private final RegistrationQueueName registrationQueue;
+
+    RegistrationBinder(Sender sender, RegistrationQueueName registrationQueue) {
+        this.sender = sender;
+        this.registrationQueue = registrationQueue;
+    }
+
+    Mono<Void> bind(RegistrationKey key) {
+        return sender.bind(bindingSpecification(key))
+            .then();
+    }
+
+    Mono<Void> unbind(RegistrationKey key) {
+        return sender.unbind(bindingSpecification(key))
+            .then();
+    }
+
+    private BindingSpecification bindingSpecification(RegistrationKey key) {
+        RoutingKeyConverter.RoutingKey routingKey = RoutingKeyConverter.RoutingKey.of(key);
+        return BindingSpecification.binding()
+            .exchange(MAILBOX_EVENT_EXCHANGE_NAME)
+            .queue(registrationQueue.asString())
+            .routingKey(routingKey.asString());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/james-project/blob/68735a22/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RegistrationQueueName.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RegistrationQueueName.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RegistrationQueueName.java
new file mode 100644
index 0000000..06fad8c
--- /dev/null
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RegistrationQueueName.java
@@ -0,0 +1,42 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events;
+
+import java.util.Optional;
+
+import com.google.common.base.Preconditions;
+
+public class RegistrationQueueName {
+    private Optional<String> queueName;
+
+    RegistrationQueueName() {
+        this.queueName = Optional.empty();
+    }
+
+    void initialize(String queueName) {
+        Preconditions.checkNotNull(queueName);
+        Preconditions.checkState(!this.queueName.isPresent(), "'queueName' must be empty for initializing");
+        this.queueName = Optional.of(queueName);
+    }
+
+    String asString() {
+        return queueName.orElseThrow(() -> new IllegalStateException("'queueName' is not yet initialized"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/68735a22/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/MailboxListenerRegistryTest.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/MailboxListenerRegistryTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/MailboxListenerRegistryTest.java
new file mode 100644
index 0000000..e86dd6b
--- /dev/null
+++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/MailboxListenerRegistryTest.java
@@ -0,0 +1,256 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+
+import java.time.Duration;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.james.mailbox.MailboxListener;
+import org.apache.james.mailbox.model.TestId;
+import org.apache.james.util.concurrency.ConcurrentTestRunner;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+
+class MailboxListenerRegistryTest {
+    private static final MailboxIdRegistrationKey KEY_1 = new MailboxIdRegistrationKey(TestId.of(42));
+
+    private MailboxListenerRegistry testee;
+
+    @BeforeEach
+    void setUp() {
+        testee = new MailboxListenerRegistry();
+    }
+
+    @Test
+    void getLocalMailboxListenersShouldReturnEmptyWhenNone() {
+        assertThat(testee.getLocalMailboxListeners(KEY_1).collectList().block())
+            .isEmpty();
+    }
+
+    @Test
+    void getLocalMailboxListenersShouldReturnPreviouslyAddedListener() {
+        MailboxListener listener = mock(MailboxListener.class);
+        testee.addListener(KEY_1, listener, () -> {
+        });
+
+        assertThat(testee.getLocalMailboxListeners(KEY_1).collectList().block())
+            .containsOnly(listener);
+    }
+
+    @Test
+    void getLocalMailboxListenersShouldReturnPreviouslyAddedListeners() {
+        MailboxListener listener1 = mock(MailboxListener.class);
+        MailboxListener listener2 = mock(MailboxListener.class);
+        testee.addListener(KEY_1, listener1, () -> {
+        });
+        testee.addListener(KEY_1, listener2, () -> {
+        });
+
+        assertThat(testee.getLocalMailboxListeners(KEY_1).collectList().block())
+            .containsOnly(listener1, listener2);
+    }
+
+    @Test
+    void getLocalMailboxListenersShouldNotReturnRemovedListeners() {
+        MailboxListener listener1 = mock(MailboxListener.class);
+        MailboxListener listener2 = mock(MailboxListener.class);
+        testee.addListener(KEY_1, listener1, () -> {
+        });
+        testee.addListener(KEY_1, listener2, () -> {
+        });
+
+        testee.removeListener(KEY_1, listener2, () -> {
+        });
+
+        assertThat(testee.getLocalMailboxListeners(KEY_1).collectList().block())
+            .containsOnly(listener1);
+    }
+
+    @Test
+    void addListenerShouldRunTaskWhenNoPreviouslyRegisteredListeners() {
+        MailboxListener listener = mock(MailboxListener.class);
+
+        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
+        testee.addListener(KEY_1, listener, () -> atomicBoolean.set(true));
+
+        assertThat(atomicBoolean).isTrue();
+    }
+
+    @Test
+    void addListenerShouldNotRunTaskWhenPreviouslyRegisteredListeners() {
+        MailboxListener listener = mock(MailboxListener.class);
+
+        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
+        testee.addListener(KEY_1, listener, () -> {
+        });
+        testee.addListener(KEY_1, listener, () -> atomicBoolean.set(true));
+
+        assertThat(atomicBoolean).isFalse();
+    }
+
+    @Test
+    void removeListenerShouldNotRunTaskWhenNoListener() {
+        MailboxListener listener = mock(MailboxListener.class);
+
+        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
+        testee.removeListener(KEY_1, listener, () -> atomicBoolean.set(true));
+
+        assertThat(atomicBoolean).isFalse();
+    }
+
+    @Test
+    void removeListenerShouldNotRunTaskWhenSeveralListener() {
+        MailboxListener listener = mock(MailboxListener.class);
+        MailboxListener listener2 = mock(MailboxListener.class);
+
+        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
+        testee.addListener(KEY_1, listener, () -> {
+        });
+        testee.addListener(KEY_1, listener2, () -> {
+        });
+        testee.removeListener(KEY_1, listener, () -> atomicBoolean.set(true));
+
+        assertThat(atomicBoolean).isFalse();
+    }
+
+    @Test
+    void removeListenerShouldRunTaskWhenOneListener() {
+        MailboxListener listener = mock(MailboxListener.class);
+
+        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
+        testee.addListener(KEY_1, listener, () -> {
+        });
+        testee.removeListener(KEY_1, listener, () -> atomicBoolean.set(true));
+
+        assertThat(atomicBoolean).isTrue();
+    }
+
+    @Nested
+    class ConcurrentTest {
+        private final Duration ONE_SECOND = Duration.ofSeconds(1);
+
+        @Test
+        void getLocalMailboxListenersShouldReturnPreviousAddedListener() throws Exception {
+            MailboxListener listener = mock(MailboxListener.class);
+
+            ConcurrentTestRunner.builder()
+                .operation((threadNumber, operationNumber) -> testee.addListener(KEY_1, listener, () -> {
+                }))
+                .threadCount(10)
+                .operationCount(10)
+                .runSuccessfullyWithin(ONE_SECOND);
+
+            assertThat(testee.getLocalMailboxListeners(KEY_1).collectList().block())
+                .containsOnly(listener);
+        }
+
+        @Test
+        void getLocalMailboxListenersShouldReturnAllPreviousAddedListeners() throws Exception {
+            MailboxListener listener1 = mock(MailboxListener.class);
+            MailboxListener listener2 = mock(MailboxListener.class);
+            MailboxListener listener3 = mock(MailboxListener.class);
+
+            ConcurrentTestRunner.builder()
+                .operation((threadNumber, operationNumber) -> {
+                    if (threadNumber % 3 == 0) {
+                        testee.addListener(KEY_1, listener1, () -> {
+                        });
+                    } else if (threadNumber % 3 == 1) {
+                        testee.addListener(KEY_1, listener2, () -> {
+                        });
+                    } else if (threadNumber % 3 == 2) {
+                        testee.addListener(KEY_1, listener3, () -> {
+                        });
+                    }
+                })
+                .threadCount(6)
+                .operationCount(10)
+                .runSuccessfullyWithin(ONE_SECOND);
+
+            assertThat(testee.getLocalMailboxListeners(KEY_1).collectList().block())
+                .containsOnly(listener1, listener2, listener3);
+        }
+
+        @Test
+        void getLocalMailboxListenersShouldReturnEmptyWhenRemoveAddedListener() throws Exception {
+            MailboxListener listener1 = mock(MailboxListener.class);
+
+            testee.addListener(KEY_1, listener1, () -> {
+            });
+
+            ConcurrentTestRunner.builder()
+                .operation(((threadNumber, operationNumber) ->
+                    testee.removeListener(KEY_1, listener1, () -> {
+                    })))
+                .threadCount(10)
+                .operationCount(10)
+                .runSuccessfullyWithin(ONE_SECOND);
+
+            assertThat(testee.getLocalMailboxListeners(KEY_1).collectList().block())
+                .isEmpty();
+        }
+
+        @Test
+        void addListenerOnlyRunTaskOnceForEmptyRegistry() throws Exception {
+            MailboxListener listener1 = mock(MailboxListener.class);
+            MailboxListener listener2 = mock(MailboxListener.class);
+            MailboxListener listener3 = mock(MailboxListener.class);
+
+            AtomicInteger runIfEmptyCount = new AtomicInteger(0);
+
+            ConcurrentTestRunner.builder()
+                .operation((threadNumber, operationNumber) -> {
+                    if (threadNumber % 3 == 0) {
+                        testee.addListener(KEY_1, listener1, () -> runIfEmptyCount.incrementAndGet());
+                    } else if (threadNumber % 3 == 1) {
+                        testee.addListener(KEY_1, listener2, () -> runIfEmptyCount.incrementAndGet());
+                    } else if (threadNumber % 3 == 2) {
+                        testee.addListener(KEY_1, listener3, () -> runIfEmptyCount.incrementAndGet());
+                    }
+                })
+                .threadCount(6)
+                .operationCount(10)
+                .runSuccessfullyWithin(ONE_SECOND);
+
+            assertThat(runIfEmptyCount.get()).isEqualTo(1);
+        }
+
+        @Test
+        void removeListenerOnlyRunTaskOnceForEmptyRegistry() throws Exception {
+            MailboxListener listener1 = mock(MailboxListener.class);
+            AtomicInteger runIfEmptyCount = new AtomicInteger(0);
+
+            testee.addListener(KEY_1, listener1, () -> {});
+            ConcurrentTestRunner.builder()
+                .operation(((threadNumber, operationNumber) -> testee.removeListener(KEY_1, listener1, () -> runIfEmptyCount.incrementAndGet())))
+                .threadCount(10)
+                .operationCount(10)
+                .runSuccessfullyWithin(ONE_SECOND);
+
+            assertThat(runIfEmptyCount.get()).isEqualTo(1);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/james-project/blob/68735a22/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
index 8524e38..0daf7f0 100644
--- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
+++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
@@ -53,6 +53,7 @@ import org.apache.james.mailbox.model.TestMessageId;
 import org.apache.james.util.concurrency.ConcurrentTestRunner;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
@@ -70,7 +71,8 @@ import reactor.rabbitmq.Sender;
 import reactor.rabbitmq.SenderOptions;
 
 class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract, GroupContract.MultipleEventBusGroupContract,
-    EventBusConcurrentTestContract.SingleEventBusConcurrentContract, EventBusConcurrentTestContract.MultiEventBusConcurrentContract {
+    EventBusConcurrentTestContract.SingleEventBusConcurrentContract, EventBusConcurrentTestContract.MultiEventBusConcurrentContract,
+    KeyContract.SingleEventBusKeyContract, KeyContract.MultipleEventBusKeyContract {
 
     @RegisterExtension
     static RabbitMQExtension rabbitMQExtension = new RabbitMQExtension();
@@ -80,6 +82,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
     private Sender sender;
     private RabbitMQConnectionFactory connectionFactory;
     private EventSerializer eventSerializer;
+    private RoutingKeyConverter routingKeyConverter;
 
     @BeforeEach
     void setUp() {
@@ -88,9 +91,10 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
 
         TestId.Factory mailboxIdFactory = new TestId.Factory();
         eventSerializer = new EventSerializer(mailboxIdFactory, new TestMessageId.Factory());
+        routingKeyConverter = RoutingKeyConverter.forFactories(new MailboxIdRegistrationKey.Factory(mailboxIdFactory));
 
-        eventBus = new RabbitMQEventBus(connectionFactory, eventSerializer);
-        eventBus2 = new RabbitMQEventBus(connectionFactory, eventSerializer);
+        eventBus = new RabbitMQEventBus(connectionFactory, eventSerializer, routingKeyConverter);
+        eventBus2 = new RabbitMQEventBus(connectionFactory, eventSerializer, routingKeyConverter);
         eventBus.start();
         eventBus2.start();
         sender = RabbitFlux.createSender(new SenderOptions().connectionMono(connectionMono));
@@ -117,6 +121,13 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
         return eventBus2;
     }
 
+    @Override
+    @Test
+    @Disabled("This test is failing by design as the different registration keys are handled by distinct messages")
+    public void dispatchShouldCallListenerOnceWhenSeveralKeysMatching() {
+
+    }
+
     @Nested
     class PublishingTest {
         private static final String MAILBOX_WORK_QUEUE_NAME = MAILBOX_EVENT + "-workQueue";
@@ -269,7 +280,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
 
             @BeforeEach
             void setUp() {
-                eventBus3 = new RabbitMQEventBus(connectionFactory, eventSerializer);
+                eventBus3 = new RabbitMQEventBus(connectionFactory, eventSerializer, routingKeyConverter);
                 eventBus3.start();
             }
 
@@ -368,4 +379,6 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
         }
 
     }
+
+
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org