You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/01/11 09:58:15 UTC

[2/8] james-project git commit: MAILBOX-368 RabbitMQEventBus group registration

MAILBOX-368 RabbitMQEventBus group registration


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/ab2fa642
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/ab2fa642
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/ab2fa642

Branch: refs/heads/master
Commit: ab2fa64264a67c702575a1c50fddeecfd9080027
Parents: 16ac299
Author: datph <dp...@linagora.com>
Authored: Tue Jan 8 14:21:15 2019 +0700
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Fri Jan 11 16:57:08 2019 +0700

----------------------------------------------------------------------
 mailbox/event/event-rabbitmq/pom.xml            |   5 +
 .../james/mailbox/events/EventDispatcher.java   |  69 +++++++++
 .../james/mailbox/events/GroupRegistration.java | 152 +++++++++++++++++++
 .../events/GroupRegistrationHandler.java        |  70 +++++++++
 .../james/mailbox/events/RabbitMQEventBus.java  |  54 +++----
 .../events/RabbitMQEventBusPublishingTest.java  | 121 ---------------
 .../mailbox/events/RabbitMQEventBusTest.java    | 148 ++++++++++++++++++
 7 files changed, 467 insertions(+), 152 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/ab2fa642/mailbox/event/event-rabbitmq/pom.xml
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/pom.xml b/mailbox/event/event-rabbitmq/pom.xml
index c671468..0085059 100644
--- a/mailbox/event/event-rabbitmq/pom.xml
+++ b/mailbox/event/event-rabbitmq/pom.xml
@@ -91,6 +91,11 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.testcontainers</groupId>
             <artifactId>testcontainers</artifactId>
             <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/james-project/blob/ab2fa642/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
new file mode 100644
index 0000000..b01fc53
--- /dev/null
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
@@ -0,0 +1,69 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events;
+
+import static org.apache.james.backend.rabbitmq.Constants.DIRECT_EXCHANGE;
+import static org.apache.james.backend.rabbitmq.Constants.DURABLE;
+import static org.apache.james.backend.rabbitmq.Constants.EMPTY_ROUTING_KEY;
+import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT_EXCHANGE_NAME;
+
+import java.nio.charset.StandardCharsets;
+
+import org.apache.james.event.json.EventSerializer;
+import org.apache.james.mailbox.Event;
+
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.MonoProcessor;
+import reactor.core.scheduler.Schedulers;
+import reactor.rabbitmq.ExchangeSpecification;
+import reactor.rabbitmq.OutboundMessage;
+import reactor.rabbitmq.Sender;
+
+public class EventDispatcher {
+    private final EventSerializer eventSerializer;
+    private final Sender sender;
+
+    EventDispatcher(EventSerializer eventSerializer, Sender sender) {
+        this.eventSerializer = eventSerializer;
+        this.sender = sender;
+    }
+
+    void start() {
+        sender.declareExchange(ExchangeSpecification.exchange(MAILBOX_EVENT_EXCHANGE_NAME)
+            .durable(DURABLE)
+            .type(DIRECT_EXCHANGE))
+            .block();
+    }
+
+    Mono<Void> dispatch(Event event) {
+        Mono<OutboundMessage> outboundMessage = Mono.just(event)
+            .publishOn(Schedulers.parallel())
+            .map(this::serializeEvent)
+            .map(payload -> new OutboundMessage(MAILBOX_EVENT_EXCHANGE_NAME, EMPTY_ROUTING_KEY, payload));
+
+        return sender.send(outboundMessage)
+            .subscribeWith(MonoProcessor.create());
+    }
+
+    private byte[] serializeEvent(Event event) {
+        return eventSerializer.toJson(event).getBytes(StandardCharsets.UTF_8);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/ab2fa642/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
new file mode 100644
index 0000000..36db6ae
--- /dev/null
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
@@ -0,0 +1,152 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events;
+
+import static org.apache.james.backend.rabbitmq.Constants.AUTO_DELETE;
+import static org.apache.james.backend.rabbitmq.Constants.DURABLE;
+import static org.apache.james.backend.rabbitmq.Constants.EMPTY_ROUTING_KEY;
+import static org.apache.james.backend.rabbitmq.Constants.EXCLUSIVE;
+import static org.apache.james.backend.rabbitmq.Constants.NO_ARGUMENTS;
+import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT;
+import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT_EXCHANGE_NAME;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.apache.james.event.json.EventSerializer;
+import org.apache.james.mailbox.Event;
+import org.apache.james.mailbox.MailboxListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.Delivery;
+
+import play.api.libs.json.JsResult;
+import reactor.core.Disposable;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+import reactor.rabbitmq.BindingSpecification;
+import reactor.rabbitmq.QueueSpecification;
+import reactor.rabbitmq.RabbitFlux;
+import reactor.rabbitmq.Receiver;
+import reactor.rabbitmq.ReceiverOptions;
+import reactor.rabbitmq.Sender;
+
+class GroupRegistration implements Registration {
+    static class WorkQueueName {
+        @VisibleForTesting
+        static WorkQueueName of(Class<? extends Group> clazz) {
+            return new WorkQueueName(clazz.getName());
+        }
+
+        static WorkQueueName of(Group group) {
+            return of(group.getClass());
+        }
+
+        static final String MAILBOX_EVENT_WORK_QUEUE_PREFIX = MAILBOX_EVENT + "-workQueue-";
+
+        private final String name;
+
+        private WorkQueueName(String name) {
+            Preconditions.checkArgument(!Strings.isNullOrEmpty(name), "Queue name must be specified");
+            this.name = name;
+        }
+
+        String asString() {
+            return MAILBOX_EVENT_WORK_QUEUE_PREFIX + name;
+        }
+    }
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(GroupRegistration.class);
+
+    private final MailboxListener mailboxListener;
+    private final WorkQueueName queueName;
+    private final Receiver receiver;
+    private final Runnable unregisterGroup;
+    private final Sender sender;
+    private final EventSerializer eventSerializer;
+    private Optional<Disposable> receiverSubscriber;
+
+    GroupRegistration(Mono<Connection> connectionSupplier, Sender sender, EventSerializer eventSerializer,
+                              MailboxListener mailboxListener, Group group, Runnable unregisterGroup) {
+        this.eventSerializer = eventSerializer;
+        this.mailboxListener = mailboxListener;
+        this.queueName = WorkQueueName.of(group);
+        this.sender = sender;
+        this.receiver = RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(connectionSupplier));
+        this.receiverSubscriber = Optional.empty();
+        this.unregisterGroup = unregisterGroup;
+    }
+
+    GroupRegistration start() {
+        createGroupWorkQueue()
+            .doOnSuccess(any -> this.subscribeWorkQueue())
+            .block();
+        return this;
+    }
+
+    private Mono<Void> createGroupWorkQueue() {
+        return Flux.concat(
+            sender.declareQueue(QueueSpecification.queue(queueName.asString())
+                .durable(DURABLE)
+                .exclusive(!EXCLUSIVE)
+                .autoDelete(!AUTO_DELETE)
+                .arguments(NO_ARGUMENTS)),
+            sender.bind(BindingSpecification.binding()
+                .exchange(MAILBOX_EVENT_EXCHANGE_NAME)
+                .queue(queueName.asString())
+                .routingKey(EMPTY_ROUTING_KEY)))
+            .then();
+    }
+
+    private void subscribeWorkQueue() {
+        receiverSubscriber = Optional.of(receiver.consumeAutoAck(queueName.asString())
+            .subscribeOn(Schedulers.parallel())
+            .map(Delivery::getBody)
+            .filter(Objects::nonNull)
+            .map(eventInBytes -> new String(eventInBytes, StandardCharsets.UTF_8))
+            .map(eventSerializer::fromJson)
+            .map(JsResult::get)
+            .subscribeOn(Schedulers.elastic())
+            .subscribe(event -> deliverEvent(mailboxListener, event)));
+    }
+
+    private void deliverEvent(MailboxListener mailboxListener, Event event) {
+        try {
+            mailboxListener.event(event);
+        } catch (Exception e) {
+            LOGGER.error("Exception happens when handling event of user {}", event.getUser().asString(), e);
+        }
+    }
+
+    @Override
+    public void unregister() {
+        receiverSubscriber.filter(subscriber -> !subscriber.isDisposed())
+            .ifPresent(subscriber -> subscriber.dispose());
+        receiver.close();
+        unregisterGroup.run();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/james-project/blob/ab2fa642/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistrationHandler.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistrationHandler.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistrationHandler.java
new file mode 100644
index 0000000..c0f4339
--- /dev/null
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistrationHandler.java
@@ -0,0 +1,70 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.james.event.json.EventSerializer;
+import org.apache.james.mailbox.MailboxListener;
+
+import com.rabbitmq.client.Connection;
+
+import reactor.core.publisher.Mono;
+import reactor.rabbitmq.Sender;
+
+class GroupRegistrationHandler {
+    private final Map<Group, GroupRegistration> groupRegistrations;
+    private final EventSerializer eventSerializer;
+    private final Sender sender;
+    private final Mono<Connection> connectionMono;
+
+    GroupRegistrationHandler(EventSerializer eventSerializer, Sender sender, Mono<Connection> connectionMono) {
+        this.eventSerializer = eventSerializer;
+        this.sender = sender;
+        this.connectionMono = connectionMono;
+        this.groupRegistrations = new ConcurrentHashMap<>();
+    }
+
+    void stop() {
+        groupRegistrations.values().forEach(GroupRegistration::unregister);
+    }
+
+    Registration register(MailboxListener listener, Group group) {
+        return groupRegistrations
+            .compute(group, (groupToRegister, oldGroupRegistration) -> {
+                if (oldGroupRegistration != null) {
+                    throw new GroupAlreadyRegistered(group);
+                }
+                return newGroupRegistration(listener, groupToRegister);
+            })
+            .start();
+    }
+
+    private GroupRegistration newGroupRegistration(MailboxListener listener, Group group) {
+        return new GroupRegistration(
+            connectionMono,
+            sender,
+            eventSerializer,
+            listener,
+            group,
+            () -> groupRegistrations.remove(group));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/james-project/blob/ab2fa642/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
index 4ce792a..accd6cc 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
@@ -19,19 +19,19 @@
 
 package org.apache.james.mailbox.events;
 
-import java.nio.charset.StandardCharsets;
 import java.util.Set;
 
+import javax.annotation.PreDestroy;
+
 import org.apache.commons.lang3.NotImplementedException;
 import org.apache.james.backend.rabbitmq.RabbitMQConnectionFactory;
 import org.apache.james.event.json.EventSerializer;
 import org.apache.james.mailbox.Event;
 import org.apache.james.mailbox.MailboxListener;
 
+import com.rabbitmq.client.Connection;
+
 import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Schedulers;
-import reactor.rabbitmq.ExchangeSpecification;
-import reactor.rabbitmq.OutboundMessage;
 import reactor.rabbitmq.RabbitFlux;
 import reactor.rabbitmq.Sender;
 import reactor.rabbitmq.SenderOptions;
@@ -39,26 +39,26 @@ import reactor.rabbitmq.SenderOptions;
 public class RabbitMQEventBus implements EventBus {
     static final String MAILBOX_EVENT = "mailboxEvent";
     static final String MAILBOX_EVENT_EXCHANGE_NAME = MAILBOX_EVENT + "-exchange";
-    static final String EMPTY_ROUTING_KEY = "";
-
-    private static final boolean DURABLE = true;
-    private static final String DIRECT_EXCHANGE = "direct";
 
-    private final EventSerializer eventSerializer;
     private final Sender sender;
+    private final GroupRegistrationHandler groupRegistrationHandler;
+    private final EventDispatcher eventDispatcher;
 
     RabbitMQEventBus(RabbitMQConnectionFactory rabbitMQConnectionFactory, EventSerializer eventSerializer) {
-        this.eventSerializer = eventSerializer;
-        SenderOptions senderOption = new SenderOptions().connectionMono(Mono.fromSupplier(rabbitMQConnectionFactory::create));
-        this.sender = RabbitFlux.createSender(senderOption);
+        Mono<Connection> connectionMono = Mono.fromSupplier(rabbitMQConnectionFactory::create).cache();
+        this.sender = RabbitFlux.createSender(new SenderOptions().connectionMono(connectionMono));
+        this.groupRegistrationHandler = new GroupRegistrationHandler(eventSerializer, sender, connectionMono);
+        this.eventDispatcher = new EventDispatcher(eventSerializer, sender);
+    }
+
+    public void start() {
+        eventDispatcher.start();
     }
 
-    Mono<Void> start() {
-        return sender.declareExchange(ExchangeSpecification.exchange(MAILBOX_EVENT_EXCHANGE_NAME)
-                .durable(DURABLE)
-                .type(DIRECT_EXCHANGE))
-            .subscribeOn(Schedulers.elastic())
-            .then();
+    @PreDestroy
+    public void stop() {
+        groupRegistrationHandler.stop();
+        sender.close();
     }
 
     @Override
@@ -68,22 +68,14 @@ public class RabbitMQEventBus implements EventBus {
 
     @Override
     public Registration register(MailboxListener listener, Group group) {
-        throw new NotImplementedException("will implement latter");
+        return groupRegistrationHandler.register(listener, group);
     }
 
     @Override
     public Mono<Void> dispatch(Event event, Set<RegistrationKey> key) {
-        Mono<OutboundMessage> outboundMessage = Mono.just(event)
-            .publishOn(Schedulers.parallel())
-            .map(this::serializeEvent)
-            .map(payload -> new OutboundMessage(MAILBOX_EVENT_EXCHANGE_NAME, EMPTY_ROUTING_KEY, payload));
-
-        Mono<Void> publishMono = sender.send(outboundMessage).cache();
-        publishMono.subscribe();
-        return publishMono;
-    }
-
-    private byte[] serializeEvent(Event event) {
-        return eventSerializer.toJson(event).getBytes(StandardCharsets.UTF_8);
+        if (!event.isNoop()) {
+            return eventDispatcher.dispatch(event);
+        }
+        return Mono.empty();
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/james-project/blob/ab2fa642/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusPublishingTest.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusPublishingTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusPublishingTest.java
deleted file mode 100644
index 50a37ca..0000000
--- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusPublishingTest.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-
-package org.apache.james.mailbox.events;
-
-import static org.apache.james.backend.rabbitmq.Constants.AUTO_DELETE;
-import static org.apache.james.backend.rabbitmq.Constants.DURABLE;
-import static org.apache.james.backend.rabbitmq.Constants.EXCLUSIVE;
-import static org.apache.james.backend.rabbitmq.Constants.NO_ARGUMENTS;
-import static org.apache.james.mailbox.events.EventBusContract.EVENT;
-import static org.apache.james.mailbox.events.EventBusContract.NO_KEYS;
-import static org.apache.james.mailbox.events.RabbitMQEventBus.EMPTY_ROUTING_KEY;
-import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT;
-import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT_EXCHANGE_NAME;
-import static org.assertj.core.api.Assertions.assertThat;
-
-import java.nio.charset.StandardCharsets;
-
-import org.apache.james.backend.rabbitmq.RabbitMQConnectionFactory;
-import org.apache.james.backend.rabbitmq.RabbitMQExtension;
-import org.apache.james.event.json.EventSerializer;
-import org.apache.james.mailbox.Event;
-import org.apache.james.mailbox.model.TestId;
-import org.apache.james.mailbox.model.TestMessageId;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.RegisterExtension;
-
-import com.google.common.collect.ImmutableSet;
-
-import reactor.core.publisher.Mono;
-import reactor.rabbitmq.BindingSpecification;
-import reactor.rabbitmq.QueueSpecification;
-import reactor.rabbitmq.RabbitFlux;
-import reactor.rabbitmq.Receiver;
-import reactor.rabbitmq.ReceiverOptions;
-import reactor.rabbitmq.Sender;
-import reactor.rabbitmq.SenderOptions;
-
-class RabbitMQEventBusPublishingTest {
-    private static final String MAILBOX_WORK_QUEUE_NAME = MAILBOX_EVENT + "-workQueue";
-
-    @RegisterExtension
-    static RabbitMQExtension rabbitMQExtension = new RabbitMQExtension();
-
-    private RabbitMQEventBus eventBus;
-    private EventSerializer eventSerializer;
-    private RabbitMQConnectionFactory connectionFactory;
-
-    @BeforeEach
-    void setUp() {
-        connectionFactory = rabbitMQExtension.getConnectionFactory();
-
-        eventSerializer = new EventSerializer(new TestId.Factory(), new TestMessageId.Factory());
-        eventBus = new RabbitMQEventBus(connectionFactory, eventSerializer);
-        eventBus.start().block();
-
-        createQueue();
-    }
-
-    private void createQueue() {
-        SenderOptions senderOption = new SenderOptions()
-            .connectionMono(Mono.fromSupplier(connectionFactory::create));
-        Sender sender = RabbitFlux.createSender(senderOption);
-
-        sender.declareQueue(QueueSpecification.queue(MAILBOX_WORK_QUEUE_NAME)
-            .durable(DURABLE)
-            .exclusive(!EXCLUSIVE)
-            .autoDelete(!AUTO_DELETE)
-            .arguments(NO_ARGUMENTS))
-            .block();
-        sender.bind(BindingSpecification.binding()
-            .exchange(MAILBOX_EVENT_EXCHANGE_NAME)
-            .queue(MAILBOX_WORK_QUEUE_NAME)
-            .routingKey(EMPTY_ROUTING_KEY))
-            .block();
-    }
-
-    @Test
-    void dispatchShouldPublishSerializedEventToRabbitMQ() {
-        eventBus.dispatch(EVENT, NO_KEYS).block();
-
-        assertThat(dequeueEvent()).isEqualTo(EVENT);
-    }
-
-    @Test
-    void dispatchShouldPublishSerializedEventToRabbitMQWhenNotBlocking() {
-        eventBus.dispatch(EVENT, NO_KEYS);
-
-        assertThat(dequeueEvent()).isEqualTo(EVENT);
-    }
-
-
-    private Event dequeueEvent() {
-        RabbitMQConnectionFactory connectionFactory = rabbitMQExtension.getConnectionFactory();
-        Receiver receiver = RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(Mono.just(connectionFactory.create())));
-
-        byte[] eventInBytes = receiver.consumeAutoAck(MAILBOX_WORK_QUEUE_NAME)
-            .blockFirst()
-            .getBody();
-
-        return eventSerializer.fromJson(new String(eventInBytes, StandardCharsets.UTF_8))
-            .get();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/james-project/blob/ab2fa642/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
new file mode 100644
index 0000000..b39fb42
--- /dev/null
+++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
@@ -0,0 +1,148 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events;
+
+import static org.apache.james.backend.rabbitmq.Constants.AUTO_DELETE;
+import static org.apache.james.backend.rabbitmq.Constants.DURABLE;
+import static org.apache.james.backend.rabbitmq.Constants.EMPTY_ROUTING_KEY;
+import static org.apache.james.backend.rabbitmq.Constants.EXCLUSIVE;
+import static org.apache.james.backend.rabbitmq.Constants.NO_ARGUMENTS;
+import static org.apache.james.mailbox.events.EventBusTestFixture.ALL_GROUPS;
+import static org.apache.james.mailbox.events.EventBusTestFixture.EVENT;
+import static org.apache.james.mailbox.events.EventBusTestFixture.NO_KEYS;
+import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT;
+import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT_EXCHANGE_NAME;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.nio.charset.StandardCharsets;
+
+import org.apache.james.backend.rabbitmq.RabbitMQConnectionFactory;
+import org.apache.james.backend.rabbitmq.RabbitMQExtension;
+import org.apache.james.event.json.EventSerializer;
+import org.apache.james.mailbox.Event;
+import org.apache.james.mailbox.model.TestId;
+import org.apache.james.mailbox.model.TestMessageId;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import com.rabbitmq.client.Connection;
+
+import reactor.core.publisher.Mono;
+import reactor.rabbitmq.BindingSpecification;
+import reactor.rabbitmq.QueueSpecification;
+import reactor.rabbitmq.RabbitFlux;
+import reactor.rabbitmq.Receiver;
+import reactor.rabbitmq.ReceiverOptions;
+import reactor.rabbitmq.Sender;
+import reactor.rabbitmq.SenderOptions;
+
+class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract {
+
+    @RegisterExtension
+    static RabbitMQExtension rabbitMQExtension = new RabbitMQExtension();
+
+    private RabbitMQEventBus eventBus;
+    private Sender sender;
+    private RabbitMQConnectionFactory connectionFactory;
+    private EventSerializer eventSerializer;
+
+    @BeforeEach
+    void setUp() {
+        connectionFactory = rabbitMQExtension.getConnectionFactory();
+        Mono<Connection> connectionMono = Mono.fromSupplier(connectionFactory::create).cache();
+
+        TestId.Factory mailboxIdFactory = new TestId.Factory();
+        eventSerializer = new EventSerializer(mailboxIdFactory, new TestMessageId.Factory());
+
+        eventBus = new RabbitMQEventBus(connectionFactory, eventSerializer);
+        eventBus.start();
+        sender = RabbitFlux.createSender(new SenderOptions().connectionMono(connectionMono));
+    }
+
+    @AfterEach
+    void tearDown() {
+        eventBus.stop();
+        ALL_GROUPS.stream()
+            .map(groupClass -> GroupRegistration.WorkQueueName.of(groupClass).asString())
+            .forEach(queueName -> sender.delete(QueueSpecification.queue(queueName)).block());
+    }
+
+    @Override
+    public EventBus eventBus() {
+        return eventBus;
+    }
+
+    @Nested
+    class PublishingTest {
+        private static final String MAILBOX_WORK_QUEUE_NAME = MAILBOX_EVENT + "-workQueue";
+
+        @BeforeEach
+        void setUp() {
+            createQueue();
+        }
+
+        private void createQueue() {
+            SenderOptions senderOption = new SenderOptions()
+                .connectionMono(Mono.fromSupplier(connectionFactory::create));
+            Sender sender = RabbitFlux.createSender(senderOption);
+
+            sender.declareQueue(QueueSpecification.queue(MAILBOX_WORK_QUEUE_NAME)
+                .durable(DURABLE)
+                .exclusive(!EXCLUSIVE)
+                .autoDelete(!AUTO_DELETE)
+                .arguments(NO_ARGUMENTS))
+                .block();
+            sender.bind(BindingSpecification.binding()
+                .exchange(MAILBOX_EVENT_EXCHANGE_NAME)
+                .queue(MAILBOX_WORK_QUEUE_NAME)
+                .routingKey(EMPTY_ROUTING_KEY))
+                .block();
+        }
+
+        @Test
+        void dispatchShouldPublishSerializedEventToRabbitMQ() {
+            eventBus.dispatch(EVENT, NO_KEYS).block();
+
+            assertThat(dequeueEvent()).isEqualTo(EVENT);
+        }
+
+        @Test
+        void dispatchShouldPublishSerializedEventToRabbitMQWhenNotBlocking() {
+            eventBus.dispatch(EVENT, NO_KEYS);
+
+            assertThat(dequeueEvent()).isEqualTo(EVENT);
+        }
+
+        private Event dequeueEvent() {
+            RabbitMQConnectionFactory connectionFactory = rabbitMQExtension.getConnectionFactory();
+            Receiver receiver = RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(Mono.just(connectionFactory.create())));
+
+            byte[] eventInBytes = receiver.consumeAutoAck(MAILBOX_WORK_QUEUE_NAME)
+                .blockFirst()
+                .getBody();
+
+            return eventSerializer.fromJson(new String(eventInBytes, StandardCharsets.UTF_8))
+                .get();
+        }
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org