You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/01/15 03:21:53 UTC

[18/30] james-project git commit: MAILBOX-367 RabbitMQEventBus exchange, queue life cycle tests

MAILBOX-367 RabbitMQEventBus exchange, queue life cycle tests


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/9fc3e750
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/9fc3e750
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/9fc3e750

Branch: refs/heads/master
Commit: 9fc3e75044ffc11677c6b9c11dc07efc5cf075b4
Parents: 9142624
Author: datph <dp...@linagora.com>
Authored: Thu Jan 10 10:56:27 2019 +0700
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Tue Jan 15 10:10:04 2019 +0700

----------------------------------------------------------------------
 .../mailbox/events/EventBusTestFixture.java     |   2 +
 .../james/mailbox/events/GroupContract.java     |  42 ++--
 .../james/mailbox/events/RabbitMQEventBus.java  |  37 +++-
 .../mailbox/events/RabbitMQEventBusTest.java    | 210 +++++++++++++++++++
 4 files changed, 259 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/9fc3e750/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java
index 530bbe5..90f5ff5 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java
@@ -97,6 +97,8 @@ public interface EventBusTestFixture {
     MailboxIdRegistrationKey KEY_2 = new MailboxIdRegistrationKey(ID_2);
     List<Class<? extends Group>> ALL_GROUPS = ImmutableList.of(GroupA.class, GroupB.class, GroupC.class);
 
+    GroupA GROUP_A = new GroupA();
+
     ConditionFactory WAIT_CONDITION = await().timeout(com.jayway.awaitility.Duration.ONE_SECOND);
 
     static MailboxListener newListener() {

http://git-wip-us.apache.org/repos/asf/james-project/blob/9fc3e750/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
index c6bc1b2..beeeb0b 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
@@ -22,7 +22,7 @@ package org.apache.james.mailbox.events;
 import static org.apache.james.mailbox.events.EventBusTestFixture.EVENT;
 import static org.apache.james.mailbox.events.EventBusTestFixture.EVENT_2;
 import static org.apache.james.mailbox.events.EventBusTestFixture.FIVE_HUNDRED_MS;
-import static org.apache.james.mailbox.events.EventBusTestFixture.GroupA;
+import static org.apache.james.mailbox.events.EventBusTestFixture.GROUP_A;
 import static org.apache.james.mailbox.events.EventBusTestFixture.GroupB;
 import static org.apache.james.mailbox.events.EventBusTestFixture.NO_KEYS;
 import static org.apache.james.mailbox.events.EventBusTestFixture.ONE_SECOND;
@@ -58,7 +58,7 @@ public interface GroupContract {
         default void listenerGroupShouldReceiveEvents() {
             MailboxListener listener = newListener();
 
-            eventBus().register(listener, new GroupA());
+            eventBus().register(listener, GROUP_A);
 
             eventBus().dispatch(EVENT, NO_KEYS).block();
 
@@ -69,7 +69,7 @@ public interface GroupContract {
         default void groupListenersShouldNotReceiveNoopEvents() {
             MailboxListener listener = newListener();
 
-            eventBus().register(listener, new GroupA());
+            eventBus().register(listener, GROUP_A);
 
             MailboxListener.Added noopEvent = new MailboxListener.Added(MailboxSession.SessionId.of(18), User.fromUsername("bob"), MailboxPath.forUser("bob", "mailbox"), TestId.of(58), ImmutableSortedMap.of(), Event.EventId.random());
             eventBus().dispatch(noopEvent, NO_KEYS).block();
@@ -83,7 +83,7 @@ public interface GroupContract {
             MailboxListener listener = newListener();
             doThrow(new RuntimeException()).when(listener).event(any());
 
-            eventBus().register(listener, new GroupA());
+            eventBus().register(listener, GROUP_A);
 
             assertThatCode(() -> eventBus().dispatch(EVENT, NO_KEYS).block())
                 .doesNotThrowAnyException();
@@ -93,7 +93,7 @@ public interface GroupContract {
         default void eachListenerGroupShouldReceiveEvents() {
             MailboxListener listener = newListener();
             MailboxListener listener2 = newListener();
-            eventBus().register(listener, new GroupA());
+            eventBus().register(listener, GROUP_A);
             eventBus().register(listener2, new GroupB());
 
             eventBus().dispatch(EVENT, NO_KEYS).block();
@@ -105,7 +105,7 @@ public interface GroupContract {
         @Test
         default void unregisteredGroupListenerShouldNotReceiveEvents() {
             MailboxListener listener = newListener();
-            Registration registration = eventBus().register(listener, new GroupA());
+            Registration registration = eventBus().register(listener, GROUP_A);
 
             registration.unregister();
 
@@ -119,9 +119,9 @@ public interface GroupContract {
             MailboxListener listener = newListener();
             MailboxListener listener2 = newListener();
 
-            eventBus().register(listener, new GroupA());
+            eventBus().register(listener, GROUP_A);
 
-            assertThatThrownBy(() -> eventBus().register(listener2, new GroupA()))
+            assertThatThrownBy(() -> eventBus().register(listener2, GROUP_A))
                 .isInstanceOf(GroupAlreadyRegistered.class);
         }
 
@@ -130,9 +130,9 @@ public interface GroupContract {
             MailboxListener listener = newListener();
             MailboxListener listener2 = newListener();
 
-            eventBus().register(listener, new GroupA()).unregister();
+            eventBus().register(listener, GROUP_A).unregister();
 
-            assertThatCode(() -> eventBus().register(listener2, new GroupA()))
+            assertThatCode(() -> eventBus().register(listener2, GROUP_A))
                 .doesNotThrowAnyException();
         }
 
@@ -140,7 +140,7 @@ public interface GroupContract {
         default void unregisterShouldBeIdempotentForGroups() {
             MailboxListener listener = newListener();
 
-            Registration registration = eventBus().register(listener, new GroupA());
+            Registration registration = eventBus().register(listener, GROUP_A);
             registration.unregister();
 
             assertThatCode(registration::unregister)
@@ -151,8 +151,8 @@ public interface GroupContract {
         default void registerShouldAcceptAlreadyUnregisteredGroups() {
             MailboxListener listener = newListener();
 
-            eventBus().register(listener, new GroupA()).unregister();
-            eventBus().register(listener, new GroupA());
+            eventBus().register(listener, GROUP_A).unregister();
+            eventBus().register(listener, GROUP_A);
 
             eventBus().dispatch(EVENT, NO_KEYS).block();
 
@@ -163,7 +163,7 @@ public interface GroupContract {
         default void dispatchShouldCallSynchronousListener() {
             MailboxListener listener = newListener();
 
-            eventBus().register(listener, new GroupA());
+            eventBus().register(listener, GROUP_A);
 
             eventBus().dispatch(EVENT, NO_KEYS).block();
 
@@ -172,8 +172,8 @@ public interface GroupContract {
 
         @Test
         default void failingGroupListenersShouldNotAbortGroupDelivery() {
-            EventBusTestFixture.EventMatcherThrowingListener listener = new EventBusTestFixture.EventMatcherThrowingListener(ImmutableSet.of(EVENT));
-            eventBus().register(listener, new GroupA());
+            EventBusTestFixture.MailboxListenerCountingSuccessfulExecution listener = new EventBusTestFixture.EventMatcherThrowingListener(ImmutableSet.of(EVENT));
+            eventBus().register(listener, GROUP_A);
 
             eventBus().dispatch(EVENT, NO_KEYS).block();
             eventBus().dispatch(EVENT_2, NO_KEYS).block();
@@ -190,7 +190,7 @@ public interface GroupContract {
             when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
             doThrow(new RuntimeException()).when(failingListener).event(any());
 
-            eventBus().register(failingListener, new GroupA());
+            eventBus().register(failingListener, GROUP_A);
             eventBus().register(listener, new GroupB());
 
             eventBus().dispatch(EVENT, NO_KEYS).block();
@@ -205,7 +205,7 @@ public interface GroupContract {
         default void groupsDefinedOnlyOnSomeNodesShouldBeNotified() {
             MailboxListener mailboxListener = newListener();
 
-            eventBus().register(mailboxListener, new GroupA());
+            eventBus().register(mailboxListener, GROUP_A);
 
             eventBus2().dispatch(EVENT, NO_KEYS).block();
 
@@ -216,8 +216,8 @@ public interface GroupContract {
         default void groupListenersShouldBeExecutedOnceInAControlledEnvironment() {
             MailboxListener mailboxListener = newListener();
 
-            eventBus().register(mailboxListener, new GroupA());
-            eventBus2().register(mailboxListener, new GroupA());
+            eventBus().register(mailboxListener, GROUP_A);
+            eventBus2().register(mailboxListener, GROUP_A);
 
             eventBus2().dispatch(EVENT, NO_KEYS).block();
 
@@ -228,7 +228,7 @@ public interface GroupContract {
         default void unregisterShouldStopNotificationForDistantGroups() {
             MailboxListener mailboxListener = newListener();
 
-            eventBus().register(mailboxListener, new GroupA()).unregister();
+            eventBus().register(mailboxListener, GROUP_A).unregister();
 
             eventBus2().dispatch(EVENT, NO_KEYS).block();
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/9fc3e750/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
index accd6cc..87e8ab1 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
@@ -20,6 +20,7 @@
 package org.apache.james.mailbox.events;
 
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.annotation.PreDestroy;
 
@@ -36,29 +37,43 @@ import reactor.rabbitmq.RabbitFlux;
 import reactor.rabbitmq.Sender;
 import reactor.rabbitmq.SenderOptions;
 
-public class RabbitMQEventBus implements EventBus {
+class RabbitMQEventBus implements EventBus {
     static final String MAILBOX_EVENT = "mailboxEvent";
     static final String MAILBOX_EVENT_EXCHANGE_NAME = MAILBOX_EVENT + "-exchange";
 
-    private final Sender sender;
-    private final GroupRegistrationHandler groupRegistrationHandler;
-    private final EventDispatcher eventDispatcher;
+    private final Mono<Connection> connectionMono;
+    private final EventSerializer eventSerializer;
+    private final AtomicBoolean isRunning;
+
+    private GroupRegistrationHandler groupRegistrationHandler;
+    private EventDispatcher eventDispatcher;
+    private Sender sender;
 
     RabbitMQEventBus(RabbitMQConnectionFactory rabbitMQConnectionFactory, EventSerializer eventSerializer) {
-        Mono<Connection> connectionMono = Mono.fromSupplier(rabbitMQConnectionFactory::create).cache();
-        this.sender = RabbitFlux.createSender(new SenderOptions().connectionMono(connectionMono));
-        this.groupRegistrationHandler = new GroupRegistrationHandler(eventSerializer, sender, connectionMono);
-        this.eventDispatcher = new EventDispatcher(eventSerializer, sender);
+        this.connectionMono = Mono.fromSupplier(rabbitMQConnectionFactory::create).cache();
+        this.eventSerializer = eventSerializer;
+        isRunning = new AtomicBoolean(false);
     }
 
     public void start() {
-        eventDispatcher.start();
+        if (!isRunning.get()) {
+            sender = RabbitFlux.createSender(new SenderOptions().connectionMono(connectionMono));
+            groupRegistrationHandler = new GroupRegistrationHandler(eventSerializer, sender, connectionMono);
+            eventDispatcher = new EventDispatcher(eventSerializer, sender);
+
+            eventDispatcher.start();
+
+            isRunning.set(true);
+        }
     }
 
     @PreDestroy
     public void stop() {
-        groupRegistrationHandler.stop();
-        sender.close();
+        if (isRunning.get()) {
+            groupRegistrationHandler.stop();
+            sender.close();
+            isRunning.set(false);
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/james-project/blob/9fc3e750/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
index b8d5cee..8e5503a 100644
--- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
+++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
@@ -20,25 +20,37 @@
 package org.apache.james.mailbox.events;
 
 import static org.apache.james.backend.rabbitmq.Constants.AUTO_DELETE;
+import static org.apache.james.backend.rabbitmq.Constants.DIRECT_EXCHANGE;
 import static org.apache.james.backend.rabbitmq.Constants.DURABLE;
 import static org.apache.james.backend.rabbitmq.Constants.EMPTY_ROUTING_KEY;
 import static org.apache.james.backend.rabbitmq.Constants.EXCLUSIVE;
 import static org.apache.james.backend.rabbitmq.Constants.NO_ARGUMENTS;
 import static org.apache.james.mailbox.events.EventBusTestFixture.ALL_GROUPS;
 import static org.apache.james.mailbox.events.EventBusTestFixture.EVENT;
+import static org.apache.james.mailbox.events.EventBusTestFixture.GROUP_A;
+import static org.apache.james.mailbox.events.EventBusTestFixture.GroupA;
+import static org.apache.james.mailbox.events.EventBusTestFixture.KEY_1;
+import static org.apache.james.mailbox.events.EventBusTestFixture.MailboxListenerCountingSuccessfulExecution;
 import static org.apache.james.mailbox.events.EventBusTestFixture.NO_KEYS;
 import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT;
 import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT_EXCHANGE_NAME;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.mockito.Mockito.mock;
 
 import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.james.backend.rabbitmq.RabbitMQConnectionFactory;
 import org.apache.james.backend.rabbitmq.RabbitMQExtension;
+import org.apache.james.backend.rabbitmq.RabbitMQManagementAPI;
 import org.apache.james.event.json.EventSerializer;
 import org.apache.james.mailbox.Event;
+import org.apache.james.mailbox.MailboxListener;
 import org.apache.james.mailbox.model.TestId;
 import org.apache.james.mailbox.model.TestMessageId;
+import org.apache.james.util.concurrency.ConcurrentTestRunner;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Nested;
@@ -158,4 +170,202 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
                 .get();
         }
     }
+
+    @Nested
+    class LifeCycleTest {
+
+        private RabbitMQManagementAPI rabbitManagementAPI;
+
+        @BeforeEach
+        void setUp() throws Exception {
+            rabbitManagementAPI = rabbitMQExtension.managementAPI();
+        }
+
+        @Nested
+        class SingleEventBus {
+            @Test
+            void startShouldCreateEventExchange() {
+                eventBus.start();
+                assertThat(rabbitManagementAPI.listExchanges())
+                    .filteredOn(exchange -> exchange.getName().equals(MAILBOX_EVENT_EXCHANGE_NAME))
+                    .hasOnlyOneElementSatisfying(exchange -> {
+                        assertThat(exchange.isDurable()).isTrue();
+                        assertThat(exchange.getType()).isEqualTo(DIRECT_EXCHANGE);
+                    });
+            }
+
+            @Test
+            void stopShouldNotDeleteEventBusExchange() {
+                eventBus.start();
+                eventBus.stop();
+
+                assertThat(rabbitManagementAPI.listExchanges())
+                    .anySatisfy(exchange -> exchange.getName().equals(MAILBOX_EVENT_EXCHANGE_NAME));
+            }
+
+            @Test
+            void stopShouldNotDeleteGroupRegistrationWorkQueue() {
+                eventBus.start();
+                eventBus.register(mock(MailboxListener.class), GROUP_A);
+                eventBus.stop();
+
+                assertThat(rabbitManagementAPI.listQueues())
+                    .anySatisfy(queue -> queue.getName().contains(GroupA.class.getName()));
+            }
+
+            @Test
+            void stopShouldDeleteKeyRegistrationWorkQueue() {
+                eventBus.start();
+                eventBus.stop();
+
+                assertThat(rabbitManagementAPI.listQueues())
+                    .isEmpty();
+            }
+
+            @Test
+            void eventBusShouldNotThrowWhenContinuouslyStartAndStop() {
+                assertThatCode(() -> {
+                    eventBus.start();
+                    eventBus.stop();
+                    eventBus.stop();
+                    eventBus.start();
+                    eventBus.start();
+                    eventBus.start();
+                    eventBus.stop();
+                    eventBus.stop();
+                }).doesNotThrowAnyException();
+            }
+
+            @Test
+            void registrationsShouldNotHandleEventsAfterStop() throws Exception {
+                eventBus.start();
+
+                MailboxListenerCountingSuccessfulExecution listener = new MailboxListenerCountingSuccessfulExecution();
+                eventBus.register(listener, GROUP_A);
+
+                int threadCount = 10;
+                int operationCount = 1000;
+                int maxEventsDispatched = threadCount * operationCount;
+                ConcurrentTestRunner.builder()
+                    .operation((threadNumber, step) -> eventBus.dispatch(EVENT, KEY_1))
+                    .threadCount(10)
+                    .operationCount(1000)
+                    .runSuccessfullyWithin(Duration.ofSeconds(5));
+
+                eventBus.stop();
+                int callsAfterStop = listener.numberOfEventCalls();
+
+                TimeUnit.SECONDS.sleep(1);
+                assertThat(listener.numberOfEventCalls())
+                    .isEqualTo(callsAfterStop)
+                    .isLessThan(maxEventsDispatched);
+            }
+        }
+
+        @Nested
+        class MultiEventBus {
+
+            private RabbitMQEventBus eventBus3;
+
+            @BeforeEach
+            void setUp() {
+                eventBus3 = new RabbitMQEventBus(connectionFactory, eventSerializer);
+                eventBus3.start();
+            }
+
+            @AfterEach
+            void tearDown() {
+                eventBus3.stop();
+            }
+
+            @Test
+            void multipleEventBusStartShouldCreateOnlyOneEventExchange() {
+                assertThat(rabbitManagementAPI.listExchanges())
+                    .filteredOn(exchange -> exchange.getName().equals(MAILBOX_EVENT_EXCHANGE_NAME))
+                    .hasSize(1);
+            }
+
+            @Test
+            void multipleEventBusShouldNotThrowWhenStartAndStopContinuously() {
+                assertThatCode(() -> {
+                    eventBus.start();
+                    eventBus.start();
+                    eventBus2.start();
+                    eventBus2.start();
+                    eventBus.stop();
+                    eventBus.stop();
+                    eventBus.stop();
+                    eventBus3.start();
+                    eventBus3.start();
+                    eventBus3.start();
+                    eventBus3.stop();
+                    eventBus.start();
+                    eventBus2.start();
+                    eventBus.stop();
+                    eventBus2.stop();
+                }).doesNotThrowAnyException();
+            }
+
+            @Test
+            void multipleEventBusStopShouldNotDeleteEventBusExchange() {
+                eventBus.stop();
+                eventBus2.stop();
+                eventBus3.stop();
+
+                assertThat(rabbitManagementAPI.listExchanges())
+                    .anySatisfy(exchange -> exchange.getName().equals(MAILBOX_EVENT_EXCHANGE_NAME));
+            }
+
+            @Test
+            void multipleEventBusStopShouldNotDeleteGroupRegistrationWorkQueue() {
+                eventBus.register(mock(MailboxListener.class), GROUP_A);
+
+                eventBus.stop();
+                eventBus2.stop();
+                eventBus3.stop();
+
+                assertThat(rabbitManagementAPI.listQueues())
+                    .anySatisfy(queue -> queue.getName().contains(GroupA.class.getName()));
+            }
+
+            @Test
+            void multipleEventBusStopShouldDeleteAllKeyRegistrationsWorkQueue() {
+                eventBus.stop();
+                eventBus2.stop();
+                eventBus3.stop();
+
+                assertThat(rabbitManagementAPI.listQueues())
+                    .isEmpty();
+            }
+
+            @Test
+            void registrationsShouldNotHandleEventsAfterStop() throws Exception {
+                eventBus.start();
+                eventBus2.start();
+
+                MailboxListenerCountingSuccessfulExecution listener = new MailboxListenerCountingSuccessfulExecution();
+                eventBus.register(listener, GROUP_A);
+                eventBus2.register(listener, GROUP_A);
+
+                int threadCount = 10;
+                int operationCount = 1000;
+                int maxEventsDispatched = threadCount * operationCount;
+                ConcurrentTestRunner.builder()
+                    .operation((threadNumber, step) -> eventBus.dispatch(EVENT, KEY_1))
+                    .threadCount(10)
+                    .operationCount(1000)
+                    .runSuccessfullyWithin(Duration.ofSeconds(5));
+
+                eventBus.stop();
+                eventBus2.stop();
+                int callsAfterStop = listener.numberOfEventCalls();
+
+                TimeUnit.SECONDS.sleep(1);
+                assertThat(listener.numberOfEventCalls())
+                    .isEqualTo(callsAfterStop)
+                    .isLessThan(maxEventsDispatched);
+            }
+        }
+
+    }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org