You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/01/15 03:21:53 UTC
[18/30] james-project git commit: MAILBOX-367 RabbitMQEventBus
exchange, queue life cycle tests
MAILBOX-367 RabbitMQEventBus exchange, queue life cycle tests
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/9fc3e750
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/9fc3e750
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/9fc3e750
Branch: refs/heads/master
Commit: 9fc3e75044ffc11677c6b9c11dc07efc5cf075b4
Parents: 9142624
Author: datph <dp...@linagora.com>
Authored: Thu Jan 10 10:56:27 2019 +0700
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Tue Jan 15 10:10:04 2019 +0700
----------------------------------------------------------------------
.../mailbox/events/EventBusTestFixture.java | 2 +
.../james/mailbox/events/GroupContract.java | 42 ++--
.../james/mailbox/events/RabbitMQEventBus.java | 37 +++-
.../mailbox/events/RabbitMQEventBusTest.java | 210 +++++++++++++++++++
4 files changed, 259 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/9fc3e750/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java
index 530bbe5..90f5ff5 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java
@@ -97,6 +97,8 @@ public interface EventBusTestFixture {
MailboxIdRegistrationKey KEY_2 = new MailboxIdRegistrationKey(ID_2);
List<Class<? extends Group>> ALL_GROUPS = ImmutableList.of(GroupA.class, GroupB.class, GroupC.class);
+ GroupA GROUP_A = new GroupA();
+
ConditionFactory WAIT_CONDITION = await().timeout(com.jayway.awaitility.Duration.ONE_SECOND);
static MailboxListener newListener() {
http://git-wip-us.apache.org/repos/asf/james-project/blob/9fc3e750/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
index c6bc1b2..beeeb0b 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
@@ -22,7 +22,7 @@ package org.apache.james.mailbox.events;
import static org.apache.james.mailbox.events.EventBusTestFixture.EVENT;
import static org.apache.james.mailbox.events.EventBusTestFixture.EVENT_2;
import static org.apache.james.mailbox.events.EventBusTestFixture.FIVE_HUNDRED_MS;
-import static org.apache.james.mailbox.events.EventBusTestFixture.GroupA;
+import static org.apache.james.mailbox.events.EventBusTestFixture.GROUP_A;
import static org.apache.james.mailbox.events.EventBusTestFixture.GroupB;
import static org.apache.james.mailbox.events.EventBusTestFixture.NO_KEYS;
import static org.apache.james.mailbox.events.EventBusTestFixture.ONE_SECOND;
@@ -58,7 +58,7 @@ public interface GroupContract {
default void listenerGroupShouldReceiveEvents() {
MailboxListener listener = newListener();
- eventBus().register(listener, new GroupA());
+ eventBus().register(listener, GROUP_A);
eventBus().dispatch(EVENT, NO_KEYS).block();
@@ -69,7 +69,7 @@ public interface GroupContract {
default void groupListenersShouldNotReceiveNoopEvents() {
MailboxListener listener = newListener();
- eventBus().register(listener, new GroupA());
+ eventBus().register(listener, GROUP_A);
MailboxListener.Added noopEvent = new MailboxListener.Added(MailboxSession.SessionId.of(18), User.fromUsername("bob"), MailboxPath.forUser("bob", "mailbox"), TestId.of(58), ImmutableSortedMap.of(), Event.EventId.random());
eventBus().dispatch(noopEvent, NO_KEYS).block();
@@ -83,7 +83,7 @@ public interface GroupContract {
MailboxListener listener = newListener();
doThrow(new RuntimeException()).when(listener).event(any());
- eventBus().register(listener, new GroupA());
+ eventBus().register(listener, GROUP_A);
assertThatCode(() -> eventBus().dispatch(EVENT, NO_KEYS).block())
.doesNotThrowAnyException();
@@ -93,7 +93,7 @@ public interface GroupContract {
default void eachListenerGroupShouldReceiveEvents() {
MailboxListener listener = newListener();
MailboxListener listener2 = newListener();
- eventBus().register(listener, new GroupA());
+ eventBus().register(listener, GROUP_A);
eventBus().register(listener2, new GroupB());
eventBus().dispatch(EVENT, NO_KEYS).block();
@@ -105,7 +105,7 @@ public interface GroupContract {
@Test
default void unregisteredGroupListenerShouldNotReceiveEvents() {
MailboxListener listener = newListener();
- Registration registration = eventBus().register(listener, new GroupA());
+ Registration registration = eventBus().register(listener, GROUP_A);
registration.unregister();
@@ -119,9 +119,9 @@ public interface GroupContract {
MailboxListener listener = newListener();
MailboxListener listener2 = newListener();
- eventBus().register(listener, new GroupA());
+ eventBus().register(listener, GROUP_A);
- assertThatThrownBy(() -> eventBus().register(listener2, new GroupA()))
+ assertThatThrownBy(() -> eventBus().register(listener2, GROUP_A))
.isInstanceOf(GroupAlreadyRegistered.class);
}
@@ -130,9 +130,9 @@ public interface GroupContract {
MailboxListener listener = newListener();
MailboxListener listener2 = newListener();
- eventBus().register(listener, new GroupA()).unregister();
+ eventBus().register(listener, GROUP_A).unregister();
- assertThatCode(() -> eventBus().register(listener2, new GroupA()))
+ assertThatCode(() -> eventBus().register(listener2, GROUP_A))
.doesNotThrowAnyException();
}
@@ -140,7 +140,7 @@ public interface GroupContract {
default void unregisterShouldBeIdempotentForGroups() {
MailboxListener listener = newListener();
- Registration registration = eventBus().register(listener, new GroupA());
+ Registration registration = eventBus().register(listener, GROUP_A);
registration.unregister();
assertThatCode(registration::unregister)
@@ -151,8 +151,8 @@ public interface GroupContract {
default void registerShouldAcceptAlreadyUnregisteredGroups() {
MailboxListener listener = newListener();
- eventBus().register(listener, new GroupA()).unregister();
- eventBus().register(listener, new GroupA());
+ eventBus().register(listener, GROUP_A).unregister();
+ eventBus().register(listener, GROUP_A);
eventBus().dispatch(EVENT, NO_KEYS).block();
@@ -163,7 +163,7 @@ public interface GroupContract {
default void dispatchShouldCallSynchronousListener() {
MailboxListener listener = newListener();
- eventBus().register(listener, new GroupA());
+ eventBus().register(listener, GROUP_A);
eventBus().dispatch(EVENT, NO_KEYS).block();
@@ -172,8 +172,8 @@ public interface GroupContract {
@Test
default void failingGroupListenersShouldNotAbortGroupDelivery() {
- EventBusTestFixture.EventMatcherThrowingListener listener = new EventBusTestFixture.EventMatcherThrowingListener(ImmutableSet.of(EVENT));
- eventBus().register(listener, new GroupA());
+ EventBusTestFixture.MailboxListenerCountingSuccessfulExecution listener = new EventBusTestFixture.EventMatcherThrowingListener(ImmutableSet.of(EVENT));
+ eventBus().register(listener, GROUP_A);
eventBus().dispatch(EVENT, NO_KEYS).block();
eventBus().dispatch(EVENT_2, NO_KEYS).block();
@@ -190,7 +190,7 @@ public interface GroupContract {
when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
doThrow(new RuntimeException()).when(failingListener).event(any());
- eventBus().register(failingListener, new GroupA());
+ eventBus().register(failingListener, GROUP_A);
eventBus().register(listener, new GroupB());
eventBus().dispatch(EVENT, NO_KEYS).block();
@@ -205,7 +205,7 @@ public interface GroupContract {
default void groupsDefinedOnlyOnSomeNodesShouldBeNotified() {
MailboxListener mailboxListener = newListener();
- eventBus().register(mailboxListener, new GroupA());
+ eventBus().register(mailboxListener, GROUP_A);
eventBus2().dispatch(EVENT, NO_KEYS).block();
@@ -216,8 +216,8 @@ public interface GroupContract {
default void groupListenersShouldBeExecutedOnceInAControlledEnvironment() {
MailboxListener mailboxListener = newListener();
- eventBus().register(mailboxListener, new GroupA());
- eventBus2().register(mailboxListener, new GroupA());
+ eventBus().register(mailboxListener, GROUP_A);
+ eventBus2().register(mailboxListener, GROUP_A);
eventBus2().dispatch(EVENT, NO_KEYS).block();
@@ -228,7 +228,7 @@ public interface GroupContract {
default void unregisterShouldStopNotificationForDistantGroups() {
MailboxListener mailboxListener = newListener();
- eventBus().register(mailboxListener, new GroupA()).unregister();
+ eventBus().register(mailboxListener, GROUP_A).unregister();
eventBus2().dispatch(EVENT, NO_KEYS).block();
http://git-wip-us.apache.org/repos/asf/james-project/blob/9fc3e750/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
index accd6cc..87e8ab1 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
@@ -20,6 +20,7 @@
package org.apache.james.mailbox.events;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.PreDestroy;
@@ -36,29 +37,43 @@ import reactor.rabbitmq.RabbitFlux;
import reactor.rabbitmq.Sender;
import reactor.rabbitmq.SenderOptions;
-public class RabbitMQEventBus implements EventBus {
+class RabbitMQEventBus implements EventBus {
static final String MAILBOX_EVENT = "mailboxEvent";
static final String MAILBOX_EVENT_EXCHANGE_NAME = MAILBOX_EVENT + "-exchange";
- private final Sender sender;
- private final GroupRegistrationHandler groupRegistrationHandler;
- private final EventDispatcher eventDispatcher;
+ private final Mono<Connection> connectionMono;
+ private final EventSerializer eventSerializer;
+ private final AtomicBoolean isRunning;
+
+ private GroupRegistrationHandler groupRegistrationHandler;
+ private EventDispatcher eventDispatcher;
+ private Sender sender;
RabbitMQEventBus(RabbitMQConnectionFactory rabbitMQConnectionFactory, EventSerializer eventSerializer) {
- Mono<Connection> connectionMono = Mono.fromSupplier(rabbitMQConnectionFactory::create).cache();
- this.sender = RabbitFlux.createSender(new SenderOptions().connectionMono(connectionMono));
- this.groupRegistrationHandler = new GroupRegistrationHandler(eventSerializer, sender, connectionMono);
- this.eventDispatcher = new EventDispatcher(eventSerializer, sender);
+ this.connectionMono = Mono.fromSupplier(rabbitMQConnectionFactory::create).cache();
+ this.eventSerializer = eventSerializer;
+ isRunning = new AtomicBoolean(false);
}
public void start() {
- eventDispatcher.start();
+ if (!isRunning.get()) {
+ sender = RabbitFlux.createSender(new SenderOptions().connectionMono(connectionMono));
+ groupRegistrationHandler = new GroupRegistrationHandler(eventSerializer, sender, connectionMono);
+ eventDispatcher = new EventDispatcher(eventSerializer, sender);
+
+ eventDispatcher.start();
+
+ isRunning.set(true);
+ }
}
@PreDestroy
public void stop() {
- groupRegistrationHandler.stop();
- sender.close();
+ if (isRunning.get()) {
+ groupRegistrationHandler.stop();
+ sender.close();
+ isRunning.set(false);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/james-project/blob/9fc3e750/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
index b8d5cee..8e5503a 100644
--- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
+++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
@@ -20,25 +20,37 @@
package org.apache.james.mailbox.events;
import static org.apache.james.backend.rabbitmq.Constants.AUTO_DELETE;
+import static org.apache.james.backend.rabbitmq.Constants.DIRECT_EXCHANGE;
import static org.apache.james.backend.rabbitmq.Constants.DURABLE;
import static org.apache.james.backend.rabbitmq.Constants.EMPTY_ROUTING_KEY;
import static org.apache.james.backend.rabbitmq.Constants.EXCLUSIVE;
import static org.apache.james.backend.rabbitmq.Constants.NO_ARGUMENTS;
import static org.apache.james.mailbox.events.EventBusTestFixture.ALL_GROUPS;
import static org.apache.james.mailbox.events.EventBusTestFixture.EVENT;
+import static org.apache.james.mailbox.events.EventBusTestFixture.GROUP_A;
+import static org.apache.james.mailbox.events.EventBusTestFixture.GroupA;
+import static org.apache.james.mailbox.events.EventBusTestFixture.KEY_1;
+import static org.apache.james.mailbox.events.EventBusTestFixture.MailboxListenerCountingSuccessfulExecution;
import static org.apache.james.mailbox.events.EventBusTestFixture.NO_KEYS;
import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT;
import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT_EXCHANGE_NAME;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.mockito.Mockito.mock;
import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
import org.apache.james.backend.rabbitmq.RabbitMQConnectionFactory;
import org.apache.james.backend.rabbitmq.RabbitMQExtension;
+import org.apache.james.backend.rabbitmq.RabbitMQManagementAPI;
import org.apache.james.event.json.EventSerializer;
import org.apache.james.mailbox.Event;
+import org.apache.james.mailbox.MailboxListener;
import org.apache.james.mailbox.model.TestId;
import org.apache.james.mailbox.model.TestMessageId;
+import org.apache.james.util.concurrency.ConcurrentTestRunner;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
@@ -158,4 +170,202 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
.get();
}
}
+
+ @Nested
+ class LifeCycleTest {
+
+ private RabbitMQManagementAPI rabbitManagementAPI;
+
+ @BeforeEach
+ void setUp() throws Exception {
+ rabbitManagementAPI = rabbitMQExtension.managementAPI();
+ }
+
+ @Nested
+ class SingleEventBus {
+ @Test
+ void startShouldCreateEventExchange() {
+ eventBus.start();
+ assertThat(rabbitManagementAPI.listExchanges())
+ .filteredOn(exchange -> exchange.getName().equals(MAILBOX_EVENT_EXCHANGE_NAME))
+ .hasOnlyOneElementSatisfying(exchange -> {
+ assertThat(exchange.isDurable()).isTrue();
+ assertThat(exchange.getType()).isEqualTo(DIRECT_EXCHANGE);
+ });
+ }
+
+ @Test
+ void stopShouldNotDeleteEventBusExchange() {
+ eventBus.start();
+ eventBus.stop();
+
+ assertThat(rabbitManagementAPI.listExchanges())
+ .anySatisfy(exchange -> exchange.getName().equals(MAILBOX_EVENT_EXCHANGE_NAME));
+ }
+
+ @Test
+ void stopShouldNotDeleteGroupRegistrationWorkQueue() {
+ eventBus.start();
+ eventBus.register(mock(MailboxListener.class), GROUP_A);
+ eventBus.stop();
+
+ assertThat(rabbitManagementAPI.listQueues())
+ .anySatisfy(queue -> queue.getName().contains(GroupA.class.getName()));
+ }
+
+ @Test
+ void stopShouldDeleteKeyRegistrationWorkQueue() {
+ eventBus.start();
+ eventBus.stop();
+
+ assertThat(rabbitManagementAPI.listQueues())
+ .isEmpty();
+ }
+
+ @Test
+ void eventBusShouldNotThrowWhenContinuouslyStartAndStop() {
+ assertThatCode(() -> {
+ eventBus.start();
+ eventBus.stop();
+ eventBus.stop();
+ eventBus.start();
+ eventBus.start();
+ eventBus.start();
+ eventBus.stop();
+ eventBus.stop();
+ }).doesNotThrowAnyException();
+ }
+
+ @Test
+ void registrationsShouldNotHandleEventsAfterStop() throws Exception {
+ eventBus.start();
+
+ MailboxListenerCountingSuccessfulExecution listener = new MailboxListenerCountingSuccessfulExecution();
+ eventBus.register(listener, GROUP_A);
+
+ int threadCount = 10;
+ int operationCount = 1000;
+ int maxEventsDispatched = threadCount * operationCount;
+ ConcurrentTestRunner.builder()
+ .operation((threadNumber, step) -> eventBus.dispatch(EVENT, KEY_1))
+ .threadCount(10)
+ .operationCount(1000)
+ .runSuccessfullyWithin(Duration.ofSeconds(5));
+
+ eventBus.stop();
+ int callsAfterStop = listener.numberOfEventCalls();
+
+ TimeUnit.SECONDS.sleep(1);
+ assertThat(listener.numberOfEventCalls())
+ .isEqualTo(callsAfterStop)
+ .isLessThan(maxEventsDispatched);
+ }
+ }
+
+ @Nested
+ class MultiEventBus {
+
+ private RabbitMQEventBus eventBus3;
+
+ @BeforeEach
+ void setUp() {
+ eventBus3 = new RabbitMQEventBus(connectionFactory, eventSerializer);
+ eventBus3.start();
+ }
+
+ @AfterEach
+ void tearDown() {
+ eventBus3.stop();
+ }
+
+ @Test
+ void multipleEventBusStartShouldCreateOnlyOneEventExchange() {
+ assertThat(rabbitManagementAPI.listExchanges())
+ .filteredOn(exchange -> exchange.getName().equals(MAILBOX_EVENT_EXCHANGE_NAME))
+ .hasSize(1);
+ }
+
+ @Test
+ void multipleEventBusShouldNotThrowWhenStartAndStopContinuously() {
+ assertThatCode(() -> {
+ eventBus.start();
+ eventBus.start();
+ eventBus2.start();
+ eventBus2.start();
+ eventBus.stop();
+ eventBus.stop();
+ eventBus.stop();
+ eventBus3.start();
+ eventBus3.start();
+ eventBus3.start();
+ eventBus3.stop();
+ eventBus.start();
+ eventBus2.start();
+ eventBus.stop();
+ eventBus2.stop();
+ }).doesNotThrowAnyException();
+ }
+
+ @Test
+ void multipleEventBusStopShouldNotDeleteEventBusExchange() {
+ eventBus.stop();
+ eventBus2.stop();
+ eventBus3.stop();
+
+ assertThat(rabbitManagementAPI.listExchanges())
+ .anySatisfy(exchange -> exchange.getName().equals(MAILBOX_EVENT_EXCHANGE_NAME));
+ }
+
+ @Test
+ void multipleEventBusStopShouldNotDeleteGroupRegistrationWorkQueue() {
+ eventBus.register(mock(MailboxListener.class), GROUP_A);
+
+ eventBus.stop();
+ eventBus2.stop();
+ eventBus3.stop();
+
+ assertThat(rabbitManagementAPI.listQueues())
+ .anySatisfy(queue -> queue.getName().contains(GroupA.class.getName()));
+ }
+
+ @Test
+ void multipleEventBusStopShouldDeleteAllKeyRegistrationsWorkQueue() {
+ eventBus.stop();
+ eventBus2.stop();
+ eventBus3.stop();
+
+ assertThat(rabbitManagementAPI.listQueues())
+ .isEmpty();
+ }
+
+ @Test
+ void registrationsShouldNotHandleEventsAfterStop() throws Exception {
+ eventBus.start();
+ eventBus2.start();
+
+ MailboxListenerCountingSuccessfulExecution listener = new MailboxListenerCountingSuccessfulExecution();
+ eventBus.register(listener, GROUP_A);
+ eventBus2.register(listener, GROUP_A);
+
+ int threadCount = 10;
+ int operationCount = 1000;
+ int maxEventsDispatched = threadCount * operationCount;
+ ConcurrentTestRunner.builder()
+ .operation((threadNumber, step) -> eventBus.dispatch(EVENT, KEY_1))
+ .threadCount(10)
+ .operationCount(1000)
+ .runSuccessfullyWithin(Duration.ofSeconds(5));
+
+ eventBus.stop();
+ eventBus2.stop();
+ int callsAfterStop = listener.numberOfEventCalls();
+
+ TimeUnit.SECONDS.sleep(1);
+ assertThat(listener.numberOfEventCalls())
+ .isEqualTo(callsAfterStop)
+ .isLessThan(maxEventsDispatched);
+ }
+ }
+
+ }
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org