You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2021/02/04 05:02:39 UTC
[james-project] 13/18: JAMES-3498 Restore static imports
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit cfce689fcd2e55e59e689f78dbd10218ef82a2b3
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Mon Jan 25 18:58:43 2021 +0700
JAMES-3498 Restore static imports
---
.../events/EventBusConcurrentTestContract.java | 131 +++++++------
.../org/apache/james/events/EventBusContract.java | 16 +-
.../james/events/EventDeadLettersContract.java | 51 ++---
.../EventDeadLettersHealthCheckContract.java | 7 +-
.../org/apache/james/events/GroupContract.java | 184 +++++++++---------
.../java/org/apache/james/events/KeyContract.java | 206 +++++++++++----------
.../james/events/CassandraEventDeadLettersDAO.java | 57 +++---
.../events/CassandraEventDeadLettersGroupDAO.java | 15 +-
.../events/CassandraEventDeadLettersDAOTest.java | 57 +++---
.../CassandraEventDeadLettersGroupDAOTest.java | 8 +-
.../apache/james/events/GroupConsumerRetry.java | 3 +-
.../org/apache/james/events/GroupRegistration.java | 3 +-
.../james/events/KeyReconnectionHandler.java | 4 +-
.../events/delivery/InVmEventDeliveryTest.java | 90 ++++-----
14 files changed, 441 insertions(+), 391 deletions(-)
diff --git a/event-bus/api/src/test/java/org/apache/james/events/EventBusConcurrentTestContract.java b/event-bus/api/src/test/java/org/apache/james/events/EventBusConcurrentTestContract.java
index 6648458..8905df3 100644
--- a/event-bus/api/src/test/java/org/apache/james/events/EventBusConcurrentTestContract.java
+++ b/event-bus/api/src/test/java/org/apache/james/events/EventBusConcurrentTestContract.java
@@ -19,6 +19,11 @@
package org.apache.james.events;
+import static org.apache.james.events.EventBusTestFixture.EVENT;
+import static org.apache.james.events.EventBusTestFixture.KEY_1;
+import static org.apache.james.events.EventBusTestFixture.KEY_2;
+import static org.apache.james.events.EventBusTestFixture.KEY_3;
+import static org.apache.james.events.EventBusTestFixture.NO_KEYS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
@@ -26,6 +31,10 @@ import java.time.Duration;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import org.apache.james.events.EventBusTestFixture.EventListenerCountingSuccessfulExecution;
+import org.apache.james.events.EventBusTestFixture.GroupA;
+import org.apache.james.events.EventBusTestFixture.GroupB;
+import org.apache.james.events.EventBusTestFixture.GroupC;
import org.apache.james.util.concurrency.ConcurrentTestRunner;
import org.awaitility.core.ConditionFactory;
import org.junit.jupiter.api.Test;
@@ -44,15 +53,15 @@ public interface EventBusConcurrentTestContract {
int OPERATION_COUNT = 30;
int TOTAL_DISPATCH_OPERATIONS = THREAD_COUNT * OPERATION_COUNT;
- Set<RegistrationKey> ALL_KEYS = ImmutableSet.of(EventBusTestFixture.KEY_1, EventBusTestFixture.KEY_2, EventBusTestFixture.KEY_3);
+ Set<RegistrationKey> ALL_KEYS = ImmutableSet.of(KEY_1, KEY_2, KEY_3);
- static EventBusTestFixture.EventListenerCountingSuccessfulExecution newCountingListener() {
- return new EventBusTestFixture.EventListenerCountingSuccessfulExecution();
+ static EventListenerCountingSuccessfulExecution newCountingListener() {
+ return new EventListenerCountingSuccessfulExecution();
}
- static int totalEventsReceived(ImmutableList<EventBusTestFixture.EventListenerCountingSuccessfulExecution> allListeners) {
+ static int totalEventsReceived(ImmutableList<EventListenerCountingSuccessfulExecution> allListeners) {
return allListeners.stream()
- .mapToInt(EventBusTestFixture.EventListenerCountingSuccessfulExecution::numberOfEventCalls)
+ .mapToInt(EventListenerCountingSuccessfulExecution::numberOfEventCalls)
.sum();
}
@@ -60,17 +69,17 @@ public interface EventBusConcurrentTestContract {
@Test
default void concurrentDispatchGroupShouldDeliverAllEventsToListenersWithSingleEventBus() throws Exception {
- EventBusTestFixture.EventListenerCountingSuccessfulExecution countingListener1 = newCountingListener();
- EventBusTestFixture.EventListenerCountingSuccessfulExecution countingListener2 = newCountingListener();
- EventBusTestFixture.EventListenerCountingSuccessfulExecution countingListener3 = newCountingListener();
+ EventListenerCountingSuccessfulExecution countingListener1 = newCountingListener();
+ EventListenerCountingSuccessfulExecution countingListener2 = newCountingListener();
+ EventListenerCountingSuccessfulExecution countingListener3 = newCountingListener();
- eventBus().register(countingListener1, new EventBusTestFixture.GroupA());
- eventBus().register(countingListener2, new EventBusTestFixture.GroupB());
- eventBus().register(countingListener3, new EventBusTestFixture.GroupC());
+ eventBus().register(countingListener1, new GroupA());
+ eventBus().register(countingListener2, new GroupB());
+ eventBus().register(countingListener3, new GroupC());
int totalGlobalRegistrations = 3; // GroupA + GroupB + GroupC
ConcurrentTestRunner.builder()
- .operation((threadNumber, operationNumber) -> eventBus().dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.NO_KEYS).block())
+ .operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, NO_KEYS).block())
.threadCount(THREAD_COUNT)
.operationCount(OPERATION_COUNT)
.runSuccessfullyWithin(FIVE_SECONDS);
@@ -82,18 +91,18 @@ public interface EventBusConcurrentTestContract {
@Test
default void concurrentDispatchKeyShouldDeliverAllEventsToListenersWithSingleEventBus() throws Exception {
- EventBusTestFixture.EventListenerCountingSuccessfulExecution countingListener1 = newCountingListener();
- EventBusTestFixture.EventListenerCountingSuccessfulExecution countingListener2 = newCountingListener();
- EventBusTestFixture.EventListenerCountingSuccessfulExecution countingListener3 = newCountingListener();
- Mono.from(eventBus().register(countingListener1, EventBusTestFixture.KEY_1)).block();
- Mono.from(eventBus().register(countingListener2, EventBusTestFixture.KEY_2)).block();
- Mono.from(eventBus().register(countingListener3, EventBusTestFixture.KEY_3)).block();
+ EventListenerCountingSuccessfulExecution countingListener1 = newCountingListener();
+ EventListenerCountingSuccessfulExecution countingListener2 = newCountingListener();
+ EventListenerCountingSuccessfulExecution countingListener3 = newCountingListener();
+ Mono.from(eventBus().register(countingListener1, KEY_1)).block();
+ Mono.from(eventBus().register(countingListener2, KEY_2)).block();
+ Mono.from(eventBus().register(countingListener3, KEY_3)).block();
int totalKeyListenerRegistrations = 3; // KEY1 + KEY2 + KEY3
int totalEventBus = 1;
ConcurrentTestRunner.builder()
- .operation((threadNumber, operationNumber) -> eventBus().dispatch(EventBusTestFixture.EVENT, ALL_KEYS).block())
+ .operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, ALL_KEYS).block())
.threadCount(THREAD_COUNT)
.operationCount(OPERATION_COUNT)
.runSuccessfullyWithin(FIVE_SECONDS);
@@ -105,25 +114,25 @@ public interface EventBusConcurrentTestContract {
@Test
default void concurrentDispatchShouldDeliverAllEventsToListenersWithSingleEventBus() throws Exception {
- EventBusTestFixture.EventListenerCountingSuccessfulExecution countingListener1 = newCountingListener();
- EventBusTestFixture.EventListenerCountingSuccessfulExecution countingListener2 = newCountingListener();
- EventBusTestFixture.EventListenerCountingSuccessfulExecution countingListener3 = newCountingListener();
+ EventListenerCountingSuccessfulExecution countingListener1 = newCountingListener();
+ EventListenerCountingSuccessfulExecution countingListener2 = newCountingListener();
+ EventListenerCountingSuccessfulExecution countingListener3 = newCountingListener();
- eventBus().register(countingListener1, new EventBusTestFixture.GroupA());
- eventBus().register(countingListener2, new EventBusTestFixture.GroupB());
- eventBus().register(countingListener3, new EventBusTestFixture.GroupC());
+ eventBus().register(countingListener1, new GroupA());
+ eventBus().register(countingListener2, new GroupB());
+ eventBus().register(countingListener3, new GroupC());
int totalGlobalRegistrations = 3; // GroupA + GroupB + GroupC
int totalEventDeliveredGlobally = totalGlobalRegistrations * TOTAL_DISPATCH_OPERATIONS;
- Mono.from(eventBus().register(countingListener1, EventBusTestFixture.KEY_1)).block();
- Mono.from(eventBus().register(countingListener2, EventBusTestFixture.KEY_2)).block();
- Mono.from(eventBus().register(countingListener3, EventBusTestFixture.KEY_3)).block();
+ Mono.from(eventBus().register(countingListener1, KEY_1)).block();
+ Mono.from(eventBus().register(countingListener2, KEY_2)).block();
+ Mono.from(eventBus().register(countingListener3, KEY_3)).block();
int totalKeyListenerRegistrations = 3; // KEY1 + KEY2 + KEY3
int totalEventDeliveredByKeys = totalKeyListenerRegistrations * TOTAL_DISPATCH_OPERATIONS;
ConcurrentTestRunner.builder()
- .operation((threadNumber, operationNumber) -> eventBus().dispatch(EventBusTestFixture.EVENT, ALL_KEYS).block())
+ .operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, ALL_KEYS).block())
.threadCount(THREAD_COUNT)
.operationCount(OPERATION_COUNT)
.runSuccessfullyWithin(FIVE_SECONDS);
@@ -140,22 +149,22 @@ public interface EventBusConcurrentTestContract {
@Test
default void concurrentDispatchGroupShouldDeliverAllEventsToListenersWithMultipleEventBus() throws Exception {
- EventBusTestFixture.EventListenerCountingSuccessfulExecution countingListener1 = newCountingListener();
- EventBusTestFixture.EventListenerCountingSuccessfulExecution countingListener2 = newCountingListener();
- EventBusTestFixture.EventListenerCountingSuccessfulExecution countingListener3 = newCountingListener();
+ EventListenerCountingSuccessfulExecution countingListener1 = newCountingListener();
+ EventListenerCountingSuccessfulExecution countingListener2 = newCountingListener();
+ EventListenerCountingSuccessfulExecution countingListener3 = newCountingListener();
- eventBus().register(countingListener1, new EventBusTestFixture.GroupA());
- eventBus().register(countingListener2, new EventBusTestFixture.GroupB());
- eventBus().register(countingListener3, new EventBusTestFixture.GroupC());
+ eventBus().register(countingListener1, new GroupA());
+ eventBus().register(countingListener2, new GroupB());
+ eventBus().register(countingListener3, new GroupC());
- eventBus2().register(countingListener1, new EventBusTestFixture.GroupA());
- eventBus2().register(countingListener2, new EventBusTestFixture.GroupB());
- eventBus2().register(countingListener3, new EventBusTestFixture.GroupC());
+ eventBus2().register(countingListener1, new GroupA());
+ eventBus2().register(countingListener2, new GroupB());
+ eventBus2().register(countingListener3, new GroupC());
int totalGlobalRegistrations = 3; // GroupA + GroupB + GroupC
ConcurrentTestRunner.builder()
- .operation((threadNumber, operationNumber) -> eventBus().dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.NO_KEYS).block())
+ .operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, NO_KEYS).block())
.threadCount(THREAD_COUNT)
.operationCount(OPERATION_COUNT)
.runSuccessfullyWithin(FIVE_SECONDS);
@@ -167,23 +176,23 @@ public interface EventBusConcurrentTestContract {
@Test
default void concurrentDispatchKeyShouldDeliverAllEventsToListenersWithMultipleEventBus() throws Exception {
- EventBusTestFixture.EventListenerCountingSuccessfulExecution countingListener1 = newCountingListener();
- EventBusTestFixture.EventListenerCountingSuccessfulExecution countingListener2 = newCountingListener();
- EventBusTestFixture.EventListenerCountingSuccessfulExecution countingListener3 = newCountingListener();
+ EventListenerCountingSuccessfulExecution countingListener1 = newCountingListener();
+ EventListenerCountingSuccessfulExecution countingListener2 = newCountingListener();
+ EventListenerCountingSuccessfulExecution countingListener3 = newCountingListener();
- Mono.from(eventBus().register(countingListener1, EventBusTestFixture.KEY_1)).block();
- Mono.from(eventBus().register(countingListener2, EventBusTestFixture.KEY_2)).block();
- Mono.from(eventBus().register(countingListener3, EventBusTestFixture.KEY_3)).block();
+ Mono.from(eventBus().register(countingListener1, KEY_1)).block();
+ Mono.from(eventBus().register(countingListener2, KEY_2)).block();
+ Mono.from(eventBus().register(countingListener3, KEY_3)).block();
- Mono.from(eventBus2().register(countingListener1, EventBusTestFixture.KEY_1)).block();
- Mono.from(eventBus2().register(countingListener2, EventBusTestFixture.KEY_2)).block();
- Mono.from(eventBus2().register(countingListener3, EventBusTestFixture.KEY_3)).block();
+ Mono.from(eventBus2().register(countingListener1, KEY_1)).block();
+ Mono.from(eventBus2().register(countingListener2, KEY_2)).block();
+ Mono.from(eventBus2().register(countingListener3, KEY_3)).block();
int totalKeyListenerRegistrations = 3; // KEY1 + KEY2 + KEY3
int totalEventBus = 2; // eventBus1 + eventBus2
ConcurrentTestRunner.builder()
- .operation((threadNumber, operationNumber) -> eventBus().dispatch(EventBusTestFixture.EVENT, ALL_KEYS).block())
+ .operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, ALL_KEYS).block())
.threadCount(THREAD_COUNT)
.operationCount(OPERATION_COUNT)
.runSuccessfullyWithin(FIVE_SECONDS);
@@ -195,31 +204,31 @@ public interface EventBusConcurrentTestContract {
@Test
default void concurrentDispatchShouldDeliverAllEventsToListenersWithMultipleEventBus() throws Exception {
- EventBusTestFixture.EventListenerCountingSuccessfulExecution countingListener1 = newCountingListener();
- EventBusTestFixture.EventListenerCountingSuccessfulExecution countingListener2 = newCountingListener();
- EventBusTestFixture.EventListenerCountingSuccessfulExecution countingListener3 = newCountingListener();
+ EventListenerCountingSuccessfulExecution countingListener1 = newCountingListener();
+ EventListenerCountingSuccessfulExecution countingListener2 = newCountingListener();
+ EventListenerCountingSuccessfulExecution countingListener3 = newCountingListener();
eventBus2().register(countingListener1, EventDeadLettersContract.GROUP_A);
- eventBus2().register(countingListener2, new EventBusTestFixture.GroupB());
- eventBus2().register(countingListener3, new EventBusTestFixture.GroupC());
+ eventBus2().register(countingListener2, new GroupB());
+ eventBus2().register(countingListener3, new GroupC());
int totalGlobalRegistrations = 3; // GroupA + GroupB + GroupC
int totalEventDeliveredGlobally = totalGlobalRegistrations * TOTAL_DISPATCH_OPERATIONS;
- Mono.from(eventBus().register(countingListener1, EventBusTestFixture.KEY_1)).block();
- Mono.from(eventBus().register(countingListener2, EventBusTestFixture.KEY_2)).block();
+ Mono.from(eventBus().register(countingListener1, KEY_1)).block();
+ Mono.from(eventBus().register(countingListener2, KEY_2)).block();
- Mono.from(eventBus2().register(countingListener1, EventBusTestFixture.KEY_1)).block();
- Mono.from(eventBus2().register(countingListener2, EventBusTestFixture.KEY_2)).block();
+ Mono.from(eventBus2().register(countingListener1, KEY_1)).block();
+ Mono.from(eventBus2().register(countingListener2, KEY_2)).block();
- Mono.from(eventBus3().register(countingListener3, EventBusTestFixture.KEY_1)).block();
- Mono.from(eventBus3().register(countingListener3, EventBusTestFixture.KEY_2)).block();
+ Mono.from(eventBus3().register(countingListener3, KEY_1)).block();
+ Mono.from(eventBus3().register(countingListener3, KEY_2)).block();
int totalKeyListenerRegistrations = 2; // KEY1 + KEY2
int totalEventBus = 3; // eventBus1 + eventBus2 + eventBus3
int totalEventDeliveredByKeys = totalKeyListenerRegistrations * totalEventBus * TOTAL_DISPATCH_OPERATIONS;
ConcurrentTestRunner.builder()
- .operation((threadNumber, operationNumber) -> eventBus().dispatch(EventBusTestFixture.EVENT, ALL_KEYS).block())
+ .operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, ALL_KEYS).block())
.threadCount(THREAD_COUNT)
.operationCount(OPERATION_COUNT)
.runSuccessfullyWithin(FIVE_SECONDS);
diff --git a/event-bus/api/src/test/java/org/apache/james/events/EventBusContract.java b/event-bus/api/src/test/java/org/apache/james/events/EventBusContract.java
index c1b52f9..d439d9c 100644
--- a/event-bus/api/src/test/java/org/apache/james/events/EventBusContract.java
+++ b/event-bus/api/src/test/java/org/apache/james/events/EventBusContract.java
@@ -19,10 +19,12 @@
package org.apache.james.events;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.awaitility.Awaitility.await;
+import static org.awaitility.Duration.ONE_HUNDRED_MILLISECONDS;
+import static org.awaitility.Duration.ZERO;
import java.time.Duration;
-import java.util.concurrent.TimeUnit;
import org.awaitility.core.ConditionFactory;
@@ -49,15 +51,15 @@ public interface EventBusContract {
}
public ConditionFactory shortWaitCondition() {
- return await().pollDelay(org.awaitility.Duration.ZERO)
- .pollInterval(org.awaitility.Duration.ONE_HUNDRED_MILLISECONDS)
- .timeout(new org.awaitility.Duration(this.getShortWaitTime().toMillis(), TimeUnit.MILLISECONDS));
+ return await().pollDelay(ZERO)
+ .pollInterval(ONE_HUNDRED_MILLISECONDS)
+ .timeout(new org.awaitility.Duration(this.getShortWaitTime().toMillis(), MILLISECONDS));
}
public ConditionFactory longWaitCondition() {
- return await().pollDelay(org.awaitility.Duration.ZERO)
- .pollInterval(org.awaitility.Duration.ONE_HUNDRED_MILLISECONDS)
- .timeout(new org.awaitility.Duration(this.getLongWaitTime().toMillis(), TimeUnit.MILLISECONDS));
+ return await().pollDelay(ZERO)
+ .pollInterval(ONE_HUNDRED_MILLISECONDS)
+ .timeout(new org.awaitility.Duration(this.getLongWaitTime().toMillis(), MILLISECONDS));
}
}
diff --git a/event-bus/api/src/test/java/org/apache/james/events/EventDeadLettersContract.java b/event-bus/api/src/test/java/org/apache/james/events/EventDeadLettersContract.java
index e351e26..20f0d56 100644
--- a/event-bus/api/src/test/java/org/apache/james/events/EventDeadLettersContract.java
+++ b/event-bus/api/src/test/java/org/apache/james/events/EventDeadLettersContract.java
@@ -32,6 +32,7 @@ import java.util.stream.Stream;
import org.apache.james.core.Username;
import org.apache.james.events.EventBusTestFixture.TestEvent;
+import org.apache.james.events.EventDeadLetters.InsertionId;
import org.apache.james.util.concurrency.ConcurrentTestRunner;
import org.junit.jupiter.api.Test;
@@ -107,19 +108,19 @@ interface EventDeadLettersContract {
Event EVENT_1 = new TestEvent(EVENT_ID_1, USERNAME);
Event EVENT_2 = new TestEvent(EVENT_ID_2, USERNAME);
Event EVENT_3 = new TestEvent(EVENT_ID_3, USERNAME);
- EventDeadLetters.InsertionId INSERTION_ID_1 = EventDeadLetters.InsertionId.of("6e0dd59d-660e-4d9b-b22f-0354479f47b7");
- EventDeadLetters.InsertionId INSERTION_ID_2 = EventDeadLetters.InsertionId.of("6e0dd59d-660e-4d9b-b22f-0354479f47b8");
- EventDeadLetters.InsertionId INSERTION_ID_3 = EventDeadLetters.InsertionId.of("6e0dd59d-660e-4d9b-b22f-0354479f47b9");
+ InsertionId INSERTION_ID_1 = InsertionId.of("6e0dd59d-660e-4d9b-b22f-0354479f47b7");
+ InsertionId INSERTION_ID_2 = InsertionId.of("6e0dd59d-660e-4d9b-b22f-0354479f47b8");
+ InsertionId INSERTION_ID_3 = InsertionId.of("6e0dd59d-660e-4d9b-b22f-0354479f47b9");
Group GROUP_A = new EventBusTestFixture.GroupA();
Group GROUP_B = new EventBusTestFixture.GroupB();
Group NULL_GROUP = null;
Event NULL_EVENT = null;
- EventDeadLetters.InsertionId NULL_INSERTION_ID = null;
+ InsertionId NULL_INSERTION_ID = null;
EventDeadLetters eventDeadLetters();
- default Stream<EventDeadLetters.InsertionId> allInsertionIds() {
+ default Stream<InsertionId> allInsertionIds() {
EventDeadLetters eventDeadLetters = eventDeadLetters();
return eventDeadLetters.groupsWithFailedEvents()
@@ -157,7 +158,7 @@ interface EventDeadLettersContract {
default void storeShouldStoreGroupWithCorrespondingEvent() {
EventDeadLetters eventDeadLetters = eventDeadLetters();
- EventDeadLetters.InsertionId insertionId = eventDeadLetters.store(GROUP_A, EVENT_1).block();
+ InsertionId insertionId = eventDeadLetters.store(GROUP_A, EVENT_1).block();
assertThat(eventDeadLetters.failedEvent(GROUP_A, insertionId).block())
.isEqualTo(EVENT_1);
}
@@ -167,12 +168,12 @@ interface EventDeadLettersContract {
EventDeadLetters eventDeadLetters = eventDeadLetters();
ImmutableMap<Integer, Group> groups = concurrentGroups();
- Multimap<Integer, EventDeadLetters.InsertionId> storedInsertionIds = Multimaps.synchronizedSetMultimap(HashMultimap.create());
+ Multimap<Integer, InsertionId> storedInsertionIds = Multimaps.synchronizedSetMultimap(HashMultimap.create());
ConcurrentTestRunner.builder()
.operation((threadNumber, step) -> {
Event.EventId eventId = Event.EventId.random();
- EventDeadLetters.InsertionId insertionId = eventDeadLetters.store(groups.get(threadNumber), event(eventId)).block();
+ InsertionId insertionId = eventDeadLetters.store(groups.get(threadNumber), event(eventId)).block();
storedInsertionIds.put(threadNumber, insertionId);
})
.threadCount(THREAD_COUNT)
@@ -230,9 +231,9 @@ interface EventDeadLettersContract {
default void removeShouldKeepNonMatched() {
EventDeadLetters eventDeadLetters = eventDeadLetters();
- EventDeadLetters.InsertionId insertionId1 = eventDeadLetters.store(GROUP_A, EVENT_1).block();
- EventDeadLetters.InsertionId insertionId2 = eventDeadLetters.store(GROUP_A, EVENT_2).block();
- EventDeadLetters.InsertionId insertionId3 = eventDeadLetters.store(GROUP_A, EVENT_3).block();
+ InsertionId insertionId1 = eventDeadLetters.store(GROUP_A, EVENT_1).block();
+ InsertionId insertionId2 = eventDeadLetters.store(GROUP_A, EVENT_2).block();
+ InsertionId insertionId3 = eventDeadLetters.store(GROUP_A, EVENT_3).block();
eventDeadLetters.remove(GROUP_A, insertionId1).block();
@@ -265,13 +266,13 @@ interface EventDeadLettersContract {
EventDeadLetters eventDeadLetters = eventDeadLetters();
ImmutableMap<Integer, Group> groups = concurrentGroups();
- ConcurrentHashMap<Integer, EventDeadLetters.InsertionId> storedInsertionIds = new ConcurrentHashMap<>();
+ ConcurrentHashMap<Integer, InsertionId> storedInsertionIds = new ConcurrentHashMap<>();
ConcurrentTestRunner.builder()
.operation((threadNumber, step) -> {
int operationIndex = threadNumber * OPERATION_COUNT + step;
Event.EventId eventId = Event.EventId.random();
- EventDeadLetters.InsertionId insertionId = eventDeadLetters.store(groups.get(threadNumber), event(eventId)).block();
+ InsertionId insertionId = eventDeadLetters.store(groups.get(threadNumber), event(eventId)).block();
storedInsertionIds.put(operationIndex, insertionId);
})
.threadCount(THREAD_COUNT)
@@ -334,7 +335,7 @@ interface EventDeadLettersContract {
default void failedEventShouldReturnEventWhenContains() {
EventDeadLetters eventDeadLetters = eventDeadLetters();
- EventDeadLetters.InsertionId insertionId = eventDeadLetters.store(GROUP_A, EVENT_1).block();
+ InsertionId insertionId = eventDeadLetters.store(GROUP_A, EVENT_1).block();
eventDeadLetters.store(GROUP_A, EVENT_2).block();
assertThat(eventDeadLetters.failedEvent(GROUP_A, insertionId).block())
@@ -345,9 +346,9 @@ interface EventDeadLettersContract {
default void failedEventShouldNotRemoveEvent() {
EventDeadLetters eventDeadLetters = eventDeadLetters();
- EventDeadLetters.InsertionId insertionId1 = eventDeadLetters.store(GROUP_A, EVENT_1).block();
- EventDeadLetters.InsertionId insertionId2 = eventDeadLetters.store(GROUP_A, EVENT_2).block();
- EventDeadLetters.InsertionId insertionId3 = eventDeadLetters.store(GROUP_A, EVENT_3).block();
+ InsertionId insertionId1 = eventDeadLetters.store(GROUP_A, EVENT_1).block();
+ InsertionId insertionId2 = eventDeadLetters.store(GROUP_A, EVENT_2).block();
+ InsertionId insertionId3 = eventDeadLetters.store(GROUP_A, EVENT_3).block();
eventDeadLetters.failedEvent(GROUP_A, insertionId1).block();
@@ -359,7 +360,7 @@ interface EventDeadLettersContract {
default void failedEventShouldNotThrowWhenNoGroupMatched() {
EventDeadLetters eventDeadLetters = eventDeadLetters();
- EventDeadLetters.InsertionId insertionId = eventDeadLetters.store(GROUP_A, EVENT_1).block();
+ InsertionId insertionId = eventDeadLetters.store(GROUP_A, EVENT_1).block();
assertThatCode(() -> eventDeadLetters.failedEvent(GROUP_B, insertionId).block())
.doesNotThrowAnyException();
@@ -392,7 +393,7 @@ interface EventDeadLettersContract {
default void failedEventsByGroupShouldReturnAllEventsCorrespondingToGivenGroup() {
EventDeadLetters eventDeadLetters = eventDeadLetters();
- EventDeadLetters.InsertionId insertionId = eventDeadLetters.store(GROUP_A, EVENT_1).block();
+ InsertionId insertionId = eventDeadLetters.store(GROUP_A, EVENT_1).block();
eventDeadLetters.store(GROUP_B, EVENT_2).block();
eventDeadLetters.store(GROUP_B, EVENT_3).block();
@@ -404,9 +405,9 @@ interface EventDeadLettersContract {
default void failedEventsByGroupShouldNotRemoveEvents() {
EventDeadLetters eventDeadLetters = eventDeadLetters();
- EventDeadLetters.InsertionId insertionId1 = eventDeadLetters.store(GROUP_A, EVENT_1).block();
- EventDeadLetters.InsertionId insertionId2 = eventDeadLetters.store(GROUP_A, EVENT_2).block();
- EventDeadLetters.InsertionId insertionId3 = eventDeadLetters.store(GROUP_B, EVENT_3).block();
+ InsertionId insertionId1 = eventDeadLetters.store(GROUP_A, EVENT_1).block();
+ InsertionId insertionId2 = eventDeadLetters.store(GROUP_A, EVENT_2).block();
+ InsertionId insertionId3 = eventDeadLetters.store(GROUP_B, EVENT_3).block();
eventDeadLetters.failedIds(GROUP_A).toStream();
@@ -453,8 +454,8 @@ interface EventDeadLettersContract {
@Test
default void containEventsShouldReturnFalseWhenRemoveAllStoredEvents() {
EventDeadLetters eventDeadLetters = eventDeadLetters();
- EventDeadLetters.InsertionId insertionId1 = eventDeadLetters().store(GROUP_A, EVENT_1).block();
- EventDeadLetters.InsertionId insertionId2 = eventDeadLetters().store(GROUP_A, EVENT_2).block();
+ InsertionId insertionId1 = eventDeadLetters().store(GROUP_A, EVENT_1).block();
+ InsertionId insertionId2 = eventDeadLetters().store(GROUP_A, EVENT_2).block();
assertThat(eventDeadLetters.containEvents().block()).isTrue();
@@ -467,7 +468,7 @@ interface EventDeadLettersContract {
@Test
default void containEventsShouldReturnTrueWhenRemoveSomeStoredEvents() {
EventDeadLetters eventDeadLetters = eventDeadLetters();
- EventDeadLetters.InsertionId insertionId1 = eventDeadLetters().store(GROUP_A, EVENT_1).block();
+ InsertionId insertionId1 = eventDeadLetters().store(GROUP_A, EVENT_1).block();
eventDeadLetters().store(GROUP_B, EVENT_2).block();
assertThat(eventDeadLetters.containEvents().block()).isTrue();
diff --git a/event-bus/api/src/test/java/org/apache/james/events/EventDeadLettersHealthCheckContract.java b/event-bus/api/src/test/java/org/apache/james/events/EventDeadLettersHealthCheckContract.java
index 335f1c7..fd1bb22 100644
--- a/event-bus/api/src/test/java/org/apache/james/events/EventDeadLettersHealthCheckContract.java
+++ b/event-bus/api/src/test/java/org/apache/james/events/EventDeadLettersHealthCheckContract.java
@@ -25,6 +25,7 @@ import org.apache.james.core.Username;
import org.apache.james.core.healthcheck.ComponentName;
import org.apache.james.core.healthcheck.Result;
import org.apache.james.events.EventBusTestFixture.TestEvent;
+import org.apache.james.events.EventDeadLetters.InsertionId;
import org.junit.jupiter.api.Test;
interface EventDeadLettersHealthCheckContract {
@@ -78,8 +79,8 @@ interface EventDeadLettersHealthCheckContract {
@Test
default void checkShouldReturnHealthyWhenRemovedAllEventDeadLetters() {
- EventDeadLetters.InsertionId insertionId1 = eventDeadLetters().store(GROUP_A, EVENT_1).block();
- EventDeadLetters.InsertionId insertionId2 = eventDeadLetters().store(GROUP_B, EVENT_2).block();
+ InsertionId insertionId1 = eventDeadLetters().store(GROUP_A, EVENT_1).block();
+ InsertionId insertionId2 = eventDeadLetters().store(GROUP_B, EVENT_2).block();
assertThat(testee().check().block().isDegraded()).isTrue();
assertThat(testee().check().block())
@@ -95,7 +96,7 @@ interface EventDeadLettersHealthCheckContract {
@Test
default void checkShouldReturnDegradedWhenRemovedSomeEventDeadLetters() {
- EventDeadLetters.InsertionId insertionId1 = eventDeadLetters().store(GROUP_A, EVENT_1).block();
+ InsertionId insertionId1 = eventDeadLetters().store(GROUP_A, EVENT_1).block();
eventDeadLetters().store(GROUP_B, EVENT_2).block();
assertThat(testee().check().block().isDegraded()).isTrue();
diff --git a/event-bus/api/src/test/java/org/apache/james/events/GroupContract.java b/event-bus/api/src/test/java/org/apache/james/events/GroupContract.java
index 246f6b7..db42ff7 100644
--- a/event-bus/api/src/test/java/org/apache/james/events/GroupContract.java
+++ b/event-bus/api/src/test/java/org/apache/james/events/GroupContract.java
@@ -19,7 +19,15 @@
package org.apache.james.events;
+import static org.apache.james.events.EventBusTestFixture.EVENT;
+import static org.apache.james.events.EventBusTestFixture.EVENT_2;
import static org.apache.james.events.EventBusTestFixture.EVENT_ID;
+import static org.apache.james.events.EventBusTestFixture.EVENT_UNSUPPORTED_BY_LISTENER;
+import static org.apache.james.events.EventBusTestFixture.FIVE_HUNDRED_MS;
+import static org.apache.james.events.EventBusTestFixture.GROUP_A;
+import static org.apache.james.events.EventBusTestFixture.GROUP_B;
+import static org.apache.james.events.EventBusTestFixture.NO_KEYS;
+import static org.apache.james.events.EventBusTestFixture.ONE_SECOND;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -79,10 +87,10 @@ public interface GroupContract {
finishedExecutions.incrementAndGet();
}
- }, EventBusTestFixture.GROUP_A);
+ }, GROUP_A);
IntStream.range(0, eventCount)
- .forEach(i -> eventBus().dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.NO_KEYS).block());
+ .forEach(i -> eventBus().dispatch(EVENT, NO_KEYS).block());
getSpeedProfile().shortWaitCondition().atMost(org.awaitility.Duration.TEN_MINUTES)
.untilAsserted(() -> assertThat(finishedExecutions.get()).isEqualTo(eventCount));
@@ -105,7 +113,7 @@ public interface GroupContract {
threads.add(Thread.currentThread().getName());
countDownLatch.await();
}
- }, EventBusTestFixture.GROUP_A);
+ }, GROUP_A);
eventBus().register(new EventListener.GroupEventListener() {
@Override
public Group getDefaultGroup() {
@@ -117,7 +125,7 @@ public interface GroupContract {
threads.add(Thread.currentThread().getName());
countDownLatch.await();
}
- }, EventBusTestFixture.GROUP_B);
+ }, GROUP_B);
eventBus().register(new EventListener.GroupEventListener() {
@Override
public Group getDefaultGroup() {
@@ -131,7 +139,7 @@ public interface GroupContract {
}
}, EventBusTestFixture.GROUP_C);
- eventBus().dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.NO_KEYS).subscribeOn(Schedulers.elastic()).subscribe();
+ eventBus().dispatch(EVENT, NO_KEYS).subscribeOn(Schedulers.elastic()).subscribe();
getSpeedProfile().shortWaitCondition().atMost(org.awaitility.Duration.TEN_SECONDS)
@@ -147,15 +155,15 @@ public interface GroupContract {
AtomicBoolean successfulRetry = new AtomicBoolean(false);
EventListener listener = event -> {
if (event.getEventId().equals(EVENT_ID)) {
- eventBus().dispatch(EventBusTestFixture.EVENT_2, EventBusTestFixture.NO_KEYS)
+ eventBus().dispatch(EVENT_2, NO_KEYS)
.subscribeOn(Schedulers.elastic())
.block();
successfulRetry.set(true);
}
};
- eventBus().register(listener, EventBusTestFixture.GROUP_A);
- eventBus().dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.NO_KEYS).block();
+ eventBus().register(listener, GROUP_A);
+ eventBus().dispatch(EVENT, NO_KEYS).block();
getSpeedProfile().shortWaitCondition().until(successfulRetry::get);
}
@@ -164,11 +172,11 @@ public interface GroupContract {
default void registerShouldNotDispatchPastEventsForGroups() throws Exception {
EventListener listener = EventBusTestFixture.newListener();
- eventBus().dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.NO_KEYS).block();
+ eventBus().dispatch(EVENT, NO_KEYS).block();
- eventBus().register(listener, EventBusTestFixture.GROUP_A);
+ eventBus().register(listener, GROUP_A);
- verify(listener, after(EventBusTestFixture.FIVE_HUNDRED_MS.toMillis()).never())
+ verify(listener, after(FIVE_HUNDRED_MS.toMillis()).never())
.event(any());
}
@@ -176,23 +184,23 @@ public interface GroupContract {
default void listenerGroupShouldReceiveEvents() throws Exception {
EventListener listener = EventBusTestFixture.newListener();
- eventBus().register(listener, EventBusTestFixture.GROUP_A);
+ eventBus().register(listener, GROUP_A);
- eventBus().dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.NO_KEYS).block();
+ eventBus().dispatch(EVENT, NO_KEYS).block();
- verify(listener, timeout(EventBusTestFixture.ONE_SECOND.toMillis()).times(1)).event(any());
+ verify(listener, timeout(ONE_SECOND.toMillis()).times(1)).event(any());
}
@Test
default void groupListenersShouldNotReceiveNoopEvents() throws Exception {
EventListener listener = EventBusTestFixture.newListener();
- eventBus().register(listener, EventBusTestFixture.GROUP_A);
+ eventBus().register(listener, GROUP_A);
Event noopEvent = new TestEvent(EVENT_ID, Username.of("noop"));
- eventBus().dispatch(noopEvent, EventBusTestFixture.NO_KEYS).block();
+ eventBus().dispatch(noopEvent, NO_KEYS).block();
- verify(listener, after(EventBusTestFixture.FIVE_HUNDRED_MS.toMillis()).never())
+ verify(listener, after(FIVE_HUNDRED_MS.toMillis()).never())
.event(any());
}
@@ -200,11 +208,11 @@ public interface GroupContract {
default void groupListenersShouldReceiveOnlyHandledEvents() throws Exception {
EventListener listener = EventBusTestFixture.newListener();
- eventBus().register(listener, EventBusTestFixture.GROUP_A);
+ eventBus().register(listener, GROUP_A);
- eventBus().dispatch(EventBusTestFixture.EVENT_UNSUPPORTED_BY_LISTENER, EventBusTestFixture.NO_KEYS).block();
+ eventBus().dispatch(EVENT_UNSUPPORTED_BY_LISTENER, NO_KEYS).block();
- verify(listener, after(EventBusTestFixture.FIVE_HUNDRED_MS.toMillis()).never())
+ verify(listener, after(FIVE_HUNDRED_MS.toMillis()).never())
.event(any());
}
@@ -213,9 +221,9 @@ public interface GroupContract {
EventListener listener = EventBusTestFixture.newListener();
doThrow(new RuntimeException()).when(listener).event(any());
- eventBus().register(listener, EventBusTestFixture.GROUP_A);
+ eventBus().register(listener, GROUP_A);
- assertThatCode(() -> eventBus().dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.NO_KEYS).block())
+ assertThatCode(() -> eventBus().dispatch(EVENT, NO_KEYS).block())
.doesNotThrowAnyException();
}
@@ -223,24 +231,24 @@ public interface GroupContract {
default void eachListenerGroupShouldReceiveEvents() throws Exception {
EventListener listener = EventBusTestFixture.newListener();
EventListener listener2 = EventBusTestFixture.newListener();
- eventBus().register(listener, EventBusTestFixture.GROUP_A);
- eventBus().register(listener2, EventBusTestFixture.GROUP_B);
+ eventBus().register(listener, GROUP_A);
+ eventBus().register(listener2, GROUP_B);
- eventBus().dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.NO_KEYS).block();
+ eventBus().dispatch(EVENT, NO_KEYS).block();
- verify(listener, timeout(EventBusTestFixture.ONE_SECOND.toMillis()).times(1)).event(any());
- verify(listener2, timeout(EventBusTestFixture.ONE_SECOND.toMillis()).times(1)).event(any());
+ verify(listener, timeout(ONE_SECOND.toMillis()).times(1)).event(any());
+ verify(listener2, timeout(ONE_SECOND.toMillis()).times(1)).event(any());
}
@Test
default void unregisteredGroupListenerShouldNotReceiveEvents() throws Exception {
EventListener listener = EventBusTestFixture.newListener();
- Registration registration = eventBus().register(listener, EventBusTestFixture.GROUP_A);
+ Registration registration = eventBus().register(listener, GROUP_A);
registration.unregister();
- eventBus().dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.NO_KEYS).block();
- verify(listener, after(EventBusTestFixture.FIVE_HUNDRED_MS.toMillis()).never())
+ eventBus().dispatch(EVENT, NO_KEYS).block();
+ verify(listener, after(FIVE_HUNDRED_MS.toMillis()).never())
.event(any());
}
@@ -249,9 +257,9 @@ public interface GroupContract {
EventListener listener = EventBusTestFixture.newListener();
EventListener listener2 = EventBusTestFixture.newListener();
- eventBus().register(listener, EventBusTestFixture.GROUP_A);
+ eventBus().register(listener, GROUP_A);
- assertThatThrownBy(() -> eventBus().register(listener2, EventBusTestFixture.GROUP_A))
+ assertThatThrownBy(() -> eventBus().register(listener2, GROUP_A))
.isInstanceOf(GroupAlreadyRegistered.class);
}
@@ -260,9 +268,9 @@ public interface GroupContract {
EventListener listener = EventBusTestFixture.newListener();
EventListener listener2 = EventBusTestFixture.newListener();
- eventBus().register(listener, EventBusTestFixture.GROUP_A).unregister();
+ eventBus().register(listener, GROUP_A).unregister();
- assertThatCode(() -> eventBus().register(listener2, EventBusTestFixture.GROUP_A))
+ assertThatCode(() -> eventBus().register(listener2, GROUP_A))
.doesNotThrowAnyException();
}
@@ -270,7 +278,7 @@ public interface GroupContract {
default void unregisterShouldBeIdempotentForGroups() {
EventListener listener = EventBusTestFixture.newListener();
- Registration registration = eventBus().register(listener, EventBusTestFixture.GROUP_A);
+ Registration registration = eventBus().register(listener, GROUP_A);
registration.unregister();
assertThatCode(registration::unregister)
@@ -281,32 +289,32 @@ public interface GroupContract {
default void registerShouldAcceptAlreadyUnregisteredGroups() throws Exception {
EventListener listener = EventBusTestFixture.newListener();
- eventBus().register(listener, EventBusTestFixture.GROUP_A).unregister();
- eventBus().register(listener, EventBusTestFixture.GROUP_A);
+ eventBus().register(listener, GROUP_A).unregister();
+ eventBus().register(listener, GROUP_A);
- eventBus().dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.NO_KEYS).block();
+ eventBus().dispatch(EVENT, NO_KEYS).block();
- verify(listener, timeout(EventBusTestFixture.ONE_SECOND.toMillis()).times(1)).event(any());
+ verify(listener, timeout(ONE_SECOND.toMillis()).times(1)).event(any());
}
@Test
default void dispatchShouldCallSynchronousListener() throws Exception {
EventListener listener = EventBusTestFixture.newListener();
- eventBus().register(listener, EventBusTestFixture.GROUP_A);
+ eventBus().register(listener, GROUP_A);
- eventBus().dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.NO_KEYS).block();
+ eventBus().dispatch(EVENT, NO_KEYS).block();
- verify(listener, timeout(EventBusTestFixture.ONE_SECOND.toMillis()).times(1)).event(any());
+ verify(listener, timeout(ONE_SECOND.toMillis()).times(1)).event(any());
}
@Test
default void failingGroupListenersShouldNotAbortGroupDelivery() {
- EventBusTestFixture.EventListenerCountingSuccessfulExecution listener = new EventBusTestFixture.EventMatcherThrowingListener(ImmutableSet.of(EventBusTestFixture.EVENT));
- eventBus().register(listener, EventBusTestFixture.GROUP_A);
+ EventBusTestFixture.EventListenerCountingSuccessfulExecution listener = new EventBusTestFixture.EventMatcherThrowingListener(ImmutableSet.of(EVENT));
+ eventBus().register(listener, GROUP_A);
- eventBus().dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.NO_KEYS).block();
- eventBus().dispatch(EventBusTestFixture.EVENT_2, EventBusTestFixture.NO_KEYS).block();
+ eventBus().dispatch(EVENT, NO_KEYS).block();
+ eventBus().dispatch(EVENT_2, NO_KEYS).block();
getSpeedProfile().shortWaitCondition()
.untilAsserted(() -> assertThat(listener.numberOfEventCalls()).isEqualTo(1));
@@ -320,12 +328,12 @@ public interface GroupContract {
when(failingListener.getExecutionMode()).thenReturn(EventListener.ExecutionMode.SYNCHRONOUS);
doThrow(new RuntimeException()).when(failingListener).event(any());
- eventBus().register(failingListener, EventBusTestFixture.GROUP_A);
- eventBus().register(listener, EventBusTestFixture.GROUP_B);
+ eventBus().register(failingListener, GROUP_A);
+ eventBus().register(listener, GROUP_B);
- eventBus().dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.NO_KEYS).block();
+ eventBus().dispatch(EVENT, NO_KEYS).block();
- verify(listener, timeout(EventBusTestFixture.ONE_SECOND.toMillis()).times(1)).event(any());
+ verify(listener, timeout(ONE_SECOND.toMillis()).times(1)).event(any());
}
@Test
@@ -336,21 +344,21 @@ public interface GroupContract {
eventBus().register(listener1, new GenericGroup("a"));
eventBus().register(listener2, new GenericGroup("b"));
- eventBus().dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.NO_KEYS).block();
+ eventBus().dispatch(EVENT, NO_KEYS).block();
- verify(listener1, timeout(EventBusTestFixture.ONE_SECOND.toMillis()).times(1)).event(any());
- verify(listener2, timeout(EventBusTestFixture.ONE_SECOND.toMillis()).times(1)).event(any());
+ verify(listener1, timeout(ONE_SECOND.toMillis()).times(1)).event(any());
+ verify(listener2, timeout(ONE_SECOND.toMillis()).times(1)).event(any());
}
@Test
default void groupListenerShouldReceiveEventWhenRedeliver() throws Exception {
EventListener listener = EventBusTestFixture.newListener();
- eventBus().register(listener, EventBusTestFixture.GROUP_A);
+ eventBus().register(listener, GROUP_A);
- eventBus().reDeliver(EventBusTestFixture.GROUP_A, EventBusTestFixture.EVENT).block();
+ eventBus().reDeliver(GROUP_A, EVENT).block();
- verify(listener, timeout(EventBusTestFixture.ONE_SECOND.toMillis()).times(1)).event(any());
+ verify(listener, timeout(ONE_SECOND.toMillis()).times(1)).event(any());
}
@Test
@@ -358,15 +366,15 @@ public interface GroupContract {
EventListener listener = EventBusTestFixture.newListener();
doThrow(new RuntimeException()).when(listener).event(any());
- eventBus().register(listener, EventBusTestFixture.GROUP_A);
+ eventBus().register(listener, GROUP_A);
- assertThatCode(() -> eventBus().reDeliver(EventBusTestFixture.GROUP_A, EventBusTestFixture.EVENT).block())
+ assertThatCode(() -> eventBus().reDeliver(GROUP_A, EVENT).block())
.doesNotThrowAnyException();
}
@Test
default void redeliverShouldThrowWhenGroupNotRegistered() {
- assertThatThrownBy(() -> eventBus().reDeliver(EventBusTestFixture.GROUP_A, EventBusTestFixture.EVENT).block())
+ assertThatThrownBy(() -> eventBus().reDeliver(GROUP_A, EVENT).block())
.isInstanceOf(GroupRegistrationNotFound.class);
}
@@ -374,9 +382,9 @@ public interface GroupContract {
default void redeliverShouldThrowAfterGroupIsUnregistered() {
EventListener listener = EventBusTestFixture.newListener();
- eventBus().register(listener, EventBusTestFixture.GROUP_A).unregister();
+ eventBus().register(listener, GROUP_A).unregister();
- assertThatThrownBy(() -> eventBus().reDeliver(EventBusTestFixture.GROUP_A, EventBusTestFixture.EVENT).block())
+ assertThatThrownBy(() -> eventBus().reDeliver(GROUP_A, EVENT).block())
.isInstanceOf(GroupRegistrationNotFound.class);
}
@@ -384,25 +392,25 @@ public interface GroupContract {
default void redeliverShouldOnlySendEventToDefinedGroup() throws Exception {
EventListener listener = EventBusTestFixture.newListener();
EventListener listener2 = EventBusTestFixture.newListener();
- eventBus().register(listener, EventBusTestFixture.GROUP_A);
- eventBus().register(listener2, EventBusTestFixture.GROUP_B);
+ eventBus().register(listener, GROUP_A);
+ eventBus().register(listener2, GROUP_B);
- eventBus().reDeliver(EventBusTestFixture.GROUP_A, EventBusTestFixture.EVENT).block();
+ eventBus().reDeliver(GROUP_A, EVENT).block();
- verify(listener, timeout(EventBusTestFixture.ONE_SECOND.toMillis()).times(1)).event(any());
- verify(listener2, after(EventBusTestFixture.FIVE_HUNDRED_MS.toMillis()).never()).event(any());
+ verify(listener, timeout(ONE_SECOND.toMillis()).times(1)).event(any());
+ verify(listener2, after(FIVE_HUNDRED_MS.toMillis()).never()).event(any());
}
@Test
default void groupListenersShouldNotReceiveNoopRedeliveredEvents() throws Exception {
EventListener listener = EventBusTestFixture.newListener();
- eventBus().register(listener, EventBusTestFixture.GROUP_A);
+ eventBus().register(listener, GROUP_A);
Event noopEvent = new TestEvent(EVENT_ID, Username.of("noop"));
- eventBus().reDeliver(EventBusTestFixture.GROUP_A, noopEvent).block();
+ eventBus().reDeliver(GROUP_A, noopEvent).block();
- verify(listener, after(EventBusTestFixture.FIVE_HUNDRED_MS.toMillis()).never()).event(any());
+ verify(listener, after(FIVE_HUNDRED_MS.toMillis()).never()).event(any());
}
}
@@ -412,20 +420,20 @@ public interface GroupContract {
default void groupsDefinedOnlyOnSomeNodesShouldBeNotifiedWhenDispatch() throws Exception {
EventListener mailboxListener = EventBusTestFixture.newListener();
- eventBus().register(mailboxListener, EventBusTestFixture.GROUP_A);
+ eventBus().register(mailboxListener, GROUP_A);
- eventBus2().dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.NO_KEYS).block();
+ eventBus2().dispatch(EVENT, NO_KEYS).block();
- verify(mailboxListener, timeout(EventBusTestFixture.ONE_SECOND.toMillis()).times(1)).event(any());
+ verify(mailboxListener, timeout(ONE_SECOND.toMillis()).times(1)).event(any());
}
@Test
default void groupsDefinedOnlyOnSomeNodesShouldNotBeNotifiedWhenRedeliver() {
EventListener mailboxListener = EventBusTestFixture.newListener();
- eventBus().register(mailboxListener, EventBusTestFixture.GROUP_A);
+ eventBus().register(mailboxListener, GROUP_A);
- assertThatThrownBy(() -> eventBus2().reDeliver(EventBusTestFixture.GROUP_A, EventBusTestFixture.EVENT).block())
+ assertThatThrownBy(() -> eventBus2().reDeliver(GROUP_A, EVENT).block())
.isInstanceOf(GroupRegistrationNotFound.class);
}
@@ -433,36 +441,36 @@ public interface GroupContract {
default void groupListenersShouldBeExecutedOnceWhenRedeliverInADistributedEnvironment() throws Exception {
EventListener mailboxListener = EventBusTestFixture.newListener();
- eventBus().register(mailboxListener, EventBusTestFixture.GROUP_A);
- eventBus2().register(mailboxListener, EventBusTestFixture.GROUP_A);
+ eventBus().register(mailboxListener, GROUP_A);
+ eventBus2().register(mailboxListener, GROUP_A);
- eventBus2().reDeliver(EventBusTestFixture.GROUP_A, EventBusTestFixture.EVENT).block();
+ eventBus2().reDeliver(GROUP_A, EVENT).block();
- verify(mailboxListener, timeout(EventBusTestFixture.ONE_SECOND.toMillis()).times(1)).event(any());
+ verify(mailboxListener, timeout(ONE_SECOND.toMillis()).times(1)).event(any());
}
@Test
default void groupListenersShouldBeExecutedOnceInAControlledEnvironment() throws Exception {
EventListener mailboxListener = EventBusTestFixture.newListener();
- eventBus().register(mailboxListener, EventBusTestFixture.GROUP_A);
- eventBus2().register(mailboxListener, EventBusTestFixture.GROUP_A);
+ eventBus().register(mailboxListener, GROUP_A);
+ eventBus2().register(mailboxListener, GROUP_A);
- eventBus2().dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.NO_KEYS).block();
+ eventBus2().dispatch(EVENT, NO_KEYS).block();
- verify(mailboxListener, timeout(EventBusTestFixture.ONE_SECOND.toMillis()).times(1)).event(any());
+ verify(mailboxListener, timeout(ONE_SECOND.toMillis()).times(1)).event(any());
}
@Test
default void unregisterShouldStopNotificationForDistantGroups() throws Exception {
EventListener mailboxListener = EventBusTestFixture.newListener();
- eventBus().register(mailboxListener, EventBusTestFixture.GROUP_A).unregister();
+ eventBus().register(mailboxListener, GROUP_A).unregister();
- eventBus2().dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.NO_KEYS).block();
+ eventBus2().dispatch(EVENT, NO_KEYS).block();
- verify(mailboxListener, after(EventBusTestFixture.FIVE_HUNDRED_MS.toMillis()).never())
+ verify(mailboxListener, after(FIVE_HUNDRED_MS.toMillis()).never())
.event(any());
}
@@ -470,11 +478,11 @@ public interface GroupContract {
default void registerShouldNotDispatchPastEventsForGroupsInADistributedContext() throws Exception {
EventListener listener = EventBusTestFixture.newListener();
- eventBus().dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.NO_KEYS).block();
+ eventBus().dispatch(EVENT, NO_KEYS).block();
- eventBus2().register(listener, EventBusTestFixture.GROUP_A);
+ eventBus2().register(listener, GROUP_A);
- verify(listener, after(EventBusTestFixture.FIVE_HUNDRED_MS.toMillis()).never())
+ verify(listener, after(FIVE_HUNDRED_MS.toMillis()).never())
.event(any());
}
}
diff --git a/event-bus/api/src/test/java/org/apache/james/events/KeyContract.java b/event-bus/api/src/test/java/org/apache/james/events/KeyContract.java
index 190b517..52e5a53 100644
--- a/event-bus/api/src/test/java/org/apache/james/events/KeyContract.java
+++ b/event-bus/api/src/test/java/org/apache/james/events/KeyContract.java
@@ -19,7 +19,14 @@
package org.apache.james.events;
+import static org.apache.james.events.EventBusTestFixture.EVENT;
import static org.apache.james.events.EventBusTestFixture.EVENT_ID;
+import static org.apache.james.events.EventBusTestFixture.EVENT_UNSUPPORTED_BY_LISTENER;
+import static org.apache.james.events.EventBusTestFixture.FIVE_HUNDRED_MS;
+import static org.apache.james.events.EventBusTestFixture.KEY_1;
+import static org.apache.james.events.EventBusTestFixture.KEY_2;
+import static org.apache.james.events.EventBusTestFixture.NO_KEYS;
+import static org.apache.james.events.EventBusTestFixture.ONE_SECOND;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.junit.jupiter.api.Assertions.assertTimeout;
@@ -41,6 +48,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
import org.apache.james.core.Username;
+import org.apache.james.events.EventListener.ExecutionMode;
import org.junit.jupiter.api.Test;
import com.google.common.collect.ImmutableSet;
@@ -66,10 +74,10 @@ public interface KeyContract extends EventBusContract {
Thread.sleep(Duration.ofMillis(20).toMillis());
finishedExecutions.incrementAndGet();
- }, EventBusTestFixture.KEY_1)).block();
+ }, KEY_1)).block();
IntStream.range(0, eventCount)
- .forEach(i -> eventBus().dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.KEY_1).block());
+ .forEach(i -> eventBus().dispatch(EVENT, KEY_1).block());
getSpeedProfile().shortWaitCondition().atMost(org.awaitility.Duration.TEN_MINUTES)
.untilAsserted(() -> assertThat(finishedExecutions.get()).isEqualTo(eventCount));
@@ -84,17 +92,17 @@ public interface KeyContract extends EventBusContract {
Mono.from(eventBus().register(event -> {
threads.add(Thread.currentThread().getName());
countDownLatch.await();
- }, EventBusTestFixture.KEY_1)).block();
+ }, KEY_1)).block();
Mono.from(eventBus().register(event -> {
threads.add(Thread.currentThread().getName());
countDownLatch.await();
- }, EventBusTestFixture.KEY_1)).block();
+ }, KEY_1)).block();
Mono.from(eventBus().register(event -> {
threads.add(Thread.currentThread().getName());
countDownLatch.await();
- }, EventBusTestFixture.KEY_1)).block();
+ }, KEY_1)).block();
- eventBus().dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.KEY_1).subscribeOn(Schedulers.elastic()).subscribe();
+ eventBus().dispatch(EVENT, KEY_1).subscribeOn(Schedulers.elastic()).subscribe();
getSpeedProfile().shortWaitCondition().atMost(org.awaitility.Duration.TEN_SECONDS)
@@ -110,12 +118,12 @@ public interface KeyContract extends EventBusContract {
default void registeredListenersShouldNotReceiveNoopEvents() throws Exception {
EventListener listener = EventBusTestFixture.newListener();
- Mono.from(eventBus().register(listener, EventBusTestFixture.KEY_1)).block();
+ Mono.from(eventBus().register(listener, KEY_1)).block();
Event noopEvent = new EventBusTestFixture.TestEvent(EVENT_ID, Username.of("noop"));
- eventBus().dispatch(noopEvent, EventBusTestFixture.KEY_1).block();
+ eventBus().dispatch(noopEvent, KEY_1).block();
- verify(listener, after(EventBusTestFixture.FIVE_HUNDRED_MS.toMillis()).never())
+ verify(listener, after(FIVE_HUNDRED_MS.toMillis()).never())
.event(any());
}
@@ -123,11 +131,11 @@ public interface KeyContract extends EventBusContract {
default void registeredListenersShouldReceiveOnlyHandledEvents() throws Exception {
EventListener listener = EventBusTestFixture.newListener();
- Mono.from(eventBus().register(listener, EventBusTestFixture.KEY_1)).block();
+ Mono.from(eventBus().register(listener, KEY_1)).block();
- eventBus().dispatch(EventBusTestFixture.EVENT_UNSUPPORTED_BY_LISTENER, EventBusTestFixture.KEY_1).block();
+ eventBus().dispatch(EVENT_UNSUPPORTED_BY_LISTENER, KEY_1).block();
- verify(listener, after(EventBusTestFixture.FIVE_HUNDRED_MS.toMillis()).never())
+ verify(listener, after(FIVE_HUNDRED_MS.toMillis()).never())
.event(any());
}
@@ -136,50 +144,50 @@ public interface KeyContract extends EventBusContract {
EventListener listener = EventBusTestFixture.newListener();
doThrow(new RuntimeException()).when(listener).event(any());
- Mono.from(eventBus().register(listener, EventBusTestFixture.KEY_1)).block();
+ Mono.from(eventBus().register(listener, KEY_1)).block();
- assertThatCode(() -> eventBus().dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.NO_KEYS).block())
+ assertThatCode(() -> eventBus().dispatch(EVENT, NO_KEYS).block())
.doesNotThrowAnyException();
}
@Test
default void dispatchShouldNotNotifyRegisteredListenerWhenEmptyKeySet() throws Exception {
EventListener listener = EventBusTestFixture.newListener();
- Mono.from(eventBus().register(listener, EventBusTestFixture.KEY_1)).block();
+ Mono.from(eventBus().register(listener, KEY_1)).block();
- eventBus().dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.NO_KEYS).block();
+ eventBus().dispatch(EVENT, NO_KEYS).block();
- verify(listener, after(EventBusTestFixture.FIVE_HUNDRED_MS.toMillis()).never())
+ verify(listener, after(FIVE_HUNDRED_MS.toMillis()).never())
.event(any());
}
@Test
default void dispatchShouldNotNotifyListenerRegisteredOnOtherKeys() throws Exception {
EventListener listener = EventBusTestFixture.newListener();
- Mono.from(eventBus().register(listener, EventBusTestFixture.KEY_1)).block();
+ Mono.from(eventBus().register(listener, KEY_1)).block();
- eventBus().dispatch(EventBusTestFixture.EVENT, ImmutableSet.of(EventBusTestFixture.KEY_2)).block();
+ eventBus().dispatch(EVENT, ImmutableSet.of(KEY_2)).block();
- verify(listener, after(EventBusTestFixture.FIVE_HUNDRED_MS.toMillis()).never())
+ verify(listener, after(FIVE_HUNDRED_MS.toMillis()).never())
.event(any());
}
@Test
default void dispatchShouldNotifyRegisteredListeners() throws Exception {
EventListener listener = EventBusTestFixture.newListener();
- Mono.from(eventBus().register(listener, EventBusTestFixture.KEY_1)).block();
+ Mono.from(eventBus().register(listener, KEY_1)).block();
- eventBus().dispatch(EventBusTestFixture.EVENT, ImmutableSet.of(EventBusTestFixture.KEY_1)).block();
+ eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
- verify(listener, timeout(EventBusTestFixture.ONE_SECOND.toMillis()).times(1)).event(any());
+ verify(listener, timeout(ONE_SECOND.toMillis()).times(1)).event(any());
}
@Test
default void dispatchShouldNotifyLocalRegisteredListenerWithoutDelay() throws Exception {
EventListener listener = EventBusTestFixture.newListener();
- Mono.from(eventBus().register(listener, EventBusTestFixture.KEY_1)).block();
+ Mono.from(eventBus().register(listener, KEY_1)).block();
- eventBus().dispatch(EventBusTestFixture.EVENT, ImmutableSet.of(EventBusTestFixture.KEY_1)).block();
+ eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
verify(listener, times(1)).event(any());
}
@@ -188,13 +196,13 @@ public interface KeyContract extends EventBusContract {
default void dispatchShouldNotifyOnlyRegisteredListener() throws Exception {
EventListener listener = EventBusTestFixture.newListener();
EventListener listener2 = EventBusTestFixture.newListener();
- Mono.from(eventBus().register(listener, EventBusTestFixture.KEY_1)).block();
- Mono.from(eventBus().register(listener2, EventBusTestFixture.KEY_2)).block();
+ Mono.from(eventBus().register(listener, KEY_1)).block();
+ Mono.from(eventBus().register(listener2, KEY_2)).block();
- eventBus().dispatch(EventBusTestFixture.EVENT, ImmutableSet.of(EventBusTestFixture.KEY_1)).block();
+ eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
- verify(listener, timeout(EventBusTestFixture.ONE_SECOND.toMillis()).times(1)).event(any());
- verify(listener2, after(EventBusTestFixture.FIVE_HUNDRED_MS.toMillis()).never())
+ verify(listener, timeout(ONE_SECOND.toMillis()).times(1)).event(any());
+ verify(listener2, after(FIVE_HUNDRED_MS.toMillis()).never())
.event(any());
}
@@ -202,35 +210,35 @@ public interface KeyContract extends EventBusContract {
default void dispatchShouldNotifyAllListenersRegisteredOnAKey() throws Exception {
EventListener listener = EventBusTestFixture.newListener();
EventListener listener2 = EventBusTestFixture.newListener();
- Mono.from(eventBus().register(listener, EventBusTestFixture.KEY_1)).block();
- Mono.from(eventBus().register(listener2, EventBusTestFixture.KEY_1)).block();
+ Mono.from(eventBus().register(listener, KEY_1)).block();
+ Mono.from(eventBus().register(listener2, KEY_1)).block();
- eventBus().dispatch(EventBusTestFixture.EVENT, ImmutableSet.of(EventBusTestFixture.KEY_1)).block();
+ eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
- verify(listener, timeout(EventBusTestFixture.ONE_SECOND.toMillis()).times(1)).event(any());
- verify(listener2, timeout(EventBusTestFixture.ONE_SECOND.toMillis()).times(1)).event(any());
+ verify(listener, timeout(ONE_SECOND.toMillis()).times(1)).event(any());
+ verify(listener2, timeout(ONE_SECOND.toMillis()).times(1)).event(any());
}
@Test
default void registerShouldAllowDuplicatedRegistration() throws Exception {
EventListener listener = EventBusTestFixture.newListener();
- Mono.from(eventBus().register(listener, EventBusTestFixture.KEY_1)).block();
- Mono.from(eventBus().register(listener, EventBusTestFixture.KEY_1)).block();
+ Mono.from(eventBus().register(listener, KEY_1)).block();
+ Mono.from(eventBus().register(listener, KEY_1)).block();
- eventBus().dispatch(EventBusTestFixture.EVENT, ImmutableSet.of(EventBusTestFixture.KEY_1)).block();
+ eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
- verify(listener, timeout(EventBusTestFixture.ONE_SECOND.toMillis()).times(1)).event(any());
+ verify(listener, timeout(ONE_SECOND.toMillis()).times(1)).event(any());
}
@Test
default void unregisterShouldRemoveDoubleRegisteredListener() throws Exception {
EventListener listener = EventBusTestFixture.newListener();
- Mono.from(eventBus().register(listener, EventBusTestFixture.KEY_1)).block();
- Mono.from(eventBus().register(listener, EventBusTestFixture.KEY_1)).block().unregister();
+ Mono.from(eventBus().register(listener, KEY_1)).block();
+ Mono.from(eventBus().register(listener, KEY_1)).block().unregister();
- eventBus().dispatch(EventBusTestFixture.EVENT, ImmutableSet.of(EventBusTestFixture.KEY_1)).block();
+ eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
- verify(listener, after(EventBusTestFixture.FIVE_HUNDRED_MS.toMillis()).never())
+ verify(listener, after(FIVE_HUNDRED_MS.toMillis()).never())
.event(any());
}
@@ -238,43 +246,43 @@ public interface KeyContract extends EventBusContract {
default void registerShouldNotDispatchPastEvents() throws Exception {
EventListener listener = EventBusTestFixture.newListener();
- eventBus().dispatch(EventBusTestFixture.EVENT, ImmutableSet.of(EventBusTestFixture.KEY_1)).block();
+ eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
- Mono.from(eventBus().register(listener, EventBusTestFixture.KEY_1)).block();
+ Mono.from(eventBus().register(listener, KEY_1)).block();
- verify(listener, after(EventBusTestFixture.FIVE_HUNDRED_MS.toMillis()).never())
+ verify(listener, after(FIVE_HUNDRED_MS.toMillis()).never())
.event(any());
}
@Test
default void callingAllUnregisterMethodShouldUnregisterTheListener() throws Exception {
EventListener listener = EventBusTestFixture.newListener();
- Registration registration = Mono.from(eventBus().register(listener, EventBusTestFixture.KEY_1)).block();
- Mono.from(eventBus().register(listener, EventBusTestFixture.KEY_1)).block().unregister();
+ Registration registration = Mono.from(eventBus().register(listener, KEY_1)).block();
+ Mono.from(eventBus().register(listener, KEY_1)).block().unregister();
registration.unregister();
- eventBus().dispatch(EventBusTestFixture.EVENT, ImmutableSet.of(EventBusTestFixture.KEY_1)).block();
+ eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
- verify(listener, after(EventBusTestFixture.FIVE_HUNDRED_MS.toMillis()).never())
+ verify(listener, after(FIVE_HUNDRED_MS.toMillis()).never())
.event(any());
}
@Test
default void unregisterShouldHaveNotNotifyWhenCalledOnDifferentKeys() throws Exception {
EventListener listener = EventBusTestFixture.newListener();
- Mono.from(eventBus().register(listener, EventBusTestFixture.KEY_1)).block();
- Mono.from(eventBus().register(listener, EventBusTestFixture.KEY_2)).block().unregister();
+ Mono.from(eventBus().register(listener, KEY_1)).block();
+ Mono.from(eventBus().register(listener, KEY_2)).block().unregister();
- eventBus().dispatch(EventBusTestFixture.EVENT, ImmutableSet.of(EventBusTestFixture.KEY_1)).block();
+ eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
- verify(listener, timeout(EventBusTestFixture.ONE_SECOND.toMillis()).times(1)).event(any());
+ verify(listener, timeout(ONE_SECOND.toMillis()).times(1)).event(any());
}
@Test
default void unregisterShouldBeIdempotentForKeyRegistrations() {
EventListener listener = EventBusTestFixture.newListener();
- Registration registration = Mono.from(eventBus().register(listener, EventBusTestFixture.KEY_1)).block();
+ Registration registration = Mono.from(eventBus().register(listener, KEY_1)).block();
registration.unregister();
assertThatCode(registration::unregister)
@@ -284,32 +292,32 @@ public interface KeyContract extends EventBusContract {
@Test
default void dispatchShouldAcceptSeveralKeys() throws Exception {
EventListener listener = EventBusTestFixture.newListener();
- Mono.from(eventBus().register(listener, EventBusTestFixture.KEY_1)).block();
+ Mono.from(eventBus().register(listener, KEY_1)).block();
- eventBus().dispatch(EventBusTestFixture.EVENT, ImmutableSet.of(EventBusTestFixture.KEY_1, EventBusTestFixture.KEY_2)).block();
+ eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1, KEY_2)).block();
- verify(listener, timeout(EventBusTestFixture.ONE_SECOND.toMillis()).times(1)).event(any());
+ verify(listener, timeout(ONE_SECOND.toMillis()).times(1)).event(any());
}
@Test
default void dispatchShouldCallListenerOnceWhenSeveralKeysMatching() throws Exception {
EventListener listener = EventBusTestFixture.newListener();
- Mono.from(eventBus().register(listener, EventBusTestFixture.KEY_1)).block();
- Mono.from(eventBus().register(listener, EventBusTestFixture.KEY_2)).block();
+ Mono.from(eventBus().register(listener, KEY_1)).block();
+ Mono.from(eventBus().register(listener, KEY_2)).block();
- eventBus().dispatch(EventBusTestFixture.EVENT, ImmutableSet.of(EventBusTestFixture.KEY_1, EventBusTestFixture.KEY_2)).block();
+ eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1, KEY_2)).block();
- verify(listener, timeout(EventBusTestFixture.ONE_SECOND.toMillis()).times(1)).event(any());
+ verify(listener, timeout(ONE_SECOND.toMillis()).times(1)).event(any());
}
@Test
default void dispatchShouldNotNotifyUnregisteredListener() throws Exception {
EventListener listener = EventBusTestFixture.newListener();
- Mono.from(eventBus().register(listener, EventBusTestFixture.KEY_1)).block().unregister();
+ Mono.from(eventBus().register(listener, KEY_1)).block().unregister();
- eventBus().dispatch(EventBusTestFixture.EVENT, ImmutableSet.of(EventBusTestFixture.KEY_1)).block();
+ eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
- verify(listener, after(EventBusTestFixture.FIVE_HUNDRED_MS.toMillis()).never())
+ verify(listener, after(FIVE_HUNDRED_MS.toMillis()).never())
.event(any());
}
@@ -317,38 +325,38 @@ public interface KeyContract extends EventBusContract {
@Test
default void dispatchShouldNotifyAsynchronousListener() throws Exception {
EventListener listener = EventBusTestFixture.newListener();
- when(listener.getExecutionMode()).thenReturn(EventListener.ExecutionMode.ASYNCHRONOUS);
- Mono.from(eventBus().register(listener, EventBusTestFixture.KEY_1)).block();
+ when(listener.getExecutionMode()).thenReturn(ExecutionMode.ASYNCHRONOUS);
+ Mono.from(eventBus().register(listener, KEY_1)).block();
- eventBus().dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.KEY_1).block();
+ eventBus().dispatch(EVENT, KEY_1).block();
- verify(listener, after(EventBusTestFixture.FIVE_HUNDRED_MS.toMillis())).event(EventBusTestFixture.EVENT);
+ verify(listener, after(FIVE_HUNDRED_MS.toMillis())).event(EVENT);
}
@Test
default void dispatchShouldNotBlockAsynchronousListener() throws Exception {
EventListener listener = EventBusTestFixture.newListener();
- when(listener.getExecutionMode()).thenReturn(EventListener.ExecutionMode.ASYNCHRONOUS);
+ when(listener.getExecutionMode()).thenReturn(ExecutionMode.ASYNCHRONOUS);
CountDownLatch latch = new CountDownLatch(1);
doAnswer(invocation -> {
latch.await();
return null;
- }).when(listener).event(EventBusTestFixture.EVENT);
+ }).when(listener).event(EVENT);
assertTimeout(Duration.ofSeconds(2),
() -> {
- eventBus().dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.NO_KEYS).block();
+ eventBus().dispatch(EVENT, NO_KEYS).block();
latch.countDown();
});
}
@Test
default void failingRegisteredListenersShouldNotAbortRegisteredDelivery() {
- EventBusTestFixture.EventListenerCountingSuccessfulExecution listener = new EventBusTestFixture.EventMatcherThrowingListener(ImmutableSet.of(EventBusTestFixture.EVENT));
- Mono.from(eventBus().register(listener, EventBusTestFixture.KEY_1)).block();
+ EventBusTestFixture.EventListenerCountingSuccessfulExecution listener = new EventBusTestFixture.EventMatcherThrowingListener(ImmutableSet.of(EVENT));
+ Mono.from(eventBus().register(listener, KEY_1)).block();
- eventBus().dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.KEY_1).block();
- eventBus().dispatch(EventBusTestFixture.EVENT_2, EventBusTestFixture.KEY_1).block();
+ eventBus().dispatch(EVENT, KEY_1).block();
+ eventBus().dispatch(EventBusTestFixture.EVENT_2, KEY_1).block();
getSpeedProfile().shortWaitCondition()
.untilAsserted(() -> assertThat(listener.numberOfEventCalls()).isEqualTo(1));
@@ -359,15 +367,15 @@ public interface KeyContract extends EventBusContract {
EventListener listener = EventBusTestFixture.newListener();
EventListener failingListener = mock(EventListener.class);
- when(failingListener.getExecutionMode()).thenReturn(EventListener.ExecutionMode.SYNCHRONOUS);
+ when(failingListener.getExecutionMode()).thenReturn(ExecutionMode.SYNCHRONOUS);
doThrow(new RuntimeException()).when(failingListener).event(any());
- Mono.from(eventBus().register(failingListener, EventBusTestFixture.KEY_1)).block();
- Mono.from(eventBus().register(listener, EventBusTestFixture.KEY_1)).block();
+ Mono.from(eventBus().register(failingListener, KEY_1)).block();
+ Mono.from(eventBus().register(listener, KEY_1)).block();
- eventBus().dispatch(EventBusTestFixture.EVENT, ImmutableSet.of(EventBusTestFixture.KEY_1)).block();
+ eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
- verify(listener, timeout(EventBusTestFixture.ONE_SECOND.toMillis()).times(1)).event(any());
+ verify(listener, timeout(ONE_SECOND.toMillis()).times(1)).event(any());
}
}
@@ -377,22 +385,22 @@ public interface KeyContract extends EventBusContract {
default void crossEventBusRegistrationShouldBeAllowed() throws Exception {
EventListener mailboxListener = EventBusTestFixture.newListener();
- Mono.from(eventBus().register(mailboxListener, EventBusTestFixture.KEY_1)).block();
+ Mono.from(eventBus().register(mailboxListener, KEY_1)).block();
- eventBus2().dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.KEY_1).block();
+ eventBus2().dispatch(EVENT, KEY_1).block();
- verify(mailboxListener, timeout(EventBusTestFixture.ONE_SECOND.toMillis()).times(1)).event(any());
+ verify(mailboxListener, timeout(ONE_SECOND.toMillis()).times(1)).event(any());
}
@Test
default void unregisteredDistantListenersShouldNotBeNotified() throws Exception {
EventListener eventListener = EventBusTestFixture.newListener();
- Mono.from(eventBus().register(eventListener, EventBusTestFixture.KEY_1)).block().unregister();
+ Mono.from(eventBus().register(eventListener, KEY_1)).block().unregister();
- eventBus2().dispatch(EventBusTestFixture.EVENT, ImmutableSet.of(EventBusTestFixture.KEY_1)).block();
+ eventBus2().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
- verify(eventListener, after(EventBusTestFixture.FIVE_HUNDRED_MS.toMillis()).never())
+ verify(eventListener, after(FIVE_HUNDRED_MS.toMillis()).never())
.event(any());
}
@@ -401,24 +409,24 @@ public interface KeyContract extends EventBusContract {
EventListener mailboxListener1 = EventBusTestFixture.newListener();
EventListener mailboxListener2 = EventBusTestFixture.newListener();
- Mono.from(eventBus().register(mailboxListener1, EventBusTestFixture.KEY_1)).block();
- Mono.from(eventBus2().register(mailboxListener2, EventBusTestFixture.KEY_1)).block();
+ Mono.from(eventBus().register(mailboxListener1, KEY_1)).block();
+ Mono.from(eventBus2().register(mailboxListener2, KEY_1)).block();
- eventBus2().dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.KEY_1).block();
+ eventBus2().dispatch(EVENT, KEY_1).block();
- verify(mailboxListener1, timeout(EventBusTestFixture.ONE_SECOND.toMillis()).times(1)).event(any());
- verify(mailboxListener2, timeout(EventBusTestFixture.ONE_SECOND.toMillis()).times(1)).event(any());
+ verify(mailboxListener1, timeout(ONE_SECOND.toMillis()).times(1)).event(any());
+ verify(mailboxListener2, timeout(ONE_SECOND.toMillis()).times(1)).event(any());
}
@Test
default void registerShouldNotDispatchPastEventsInDistributedContext() throws Exception {
EventListener listener = EventBusTestFixture.newListener();
- eventBus2().dispatch(EventBusTestFixture.EVENT, ImmutableSet.of(EventBusTestFixture.KEY_1)).block();
+ eventBus2().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
- Mono.from(eventBus().register(listener, EventBusTestFixture.KEY_1)).block();
+ Mono.from(eventBus().register(listener, KEY_1)).block();
- verify(listener, after(EventBusTestFixture.FIVE_HUNDRED_MS.toMillis()).never())
+ verify(listener, after(FIVE_HUNDRED_MS.toMillis()).never())
.event(any());
}
@@ -427,13 +435,13 @@ public interface KeyContract extends EventBusContract {
EventListener mailboxListener1 = EventBusTestFixture.newListener();
EventListener mailboxListener2 = EventBusTestFixture.newListener();
- Mono.from(eventBus().register(mailboxListener1, EventBusTestFixture.KEY_1)).block();
- Mono.from(eventBus2().register(mailboxListener2, EventBusTestFixture.KEY_1)).block();
+ Mono.from(eventBus().register(mailboxListener1, KEY_1)).block();
+ Mono.from(eventBus2().register(mailboxListener2, KEY_1)).block();
- eventBus2().dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.KEY_1).block();
+ eventBus2().dispatch(EVENT, KEY_1).block();
verify(mailboxListener2, times(1)).event(any());
- verify(mailboxListener1, timeout(EventBusTestFixture.ONE_SECOND.toMillis()).times(1)).event(any());
+ verify(mailboxListener1, timeout(ONE_SECOND.toMillis()).times(1)).event(any());
}
}
diff --git a/event-bus/cassandra/src/main/java/org/apache/james/events/CassandraEventDeadLettersDAO.java b/event-bus/cassandra/src/main/java/org/apache/james/events/CassandraEventDeadLettersDAO.java
index df1f339..096cce6 100644
--- a/event-bus/cassandra/src/main/java/org/apache/james/events/CassandraEventDeadLettersDAO.java
+++ b/event-bus/cassandra/src/main/java/org/apache/james/events/CassandraEventDeadLettersDAO.java
@@ -24,11 +24,14 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.delete;
import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
+import static org.apache.james.events.tables.CassandraEventDeadLettersTable.EVENT;
+import static org.apache.james.events.tables.CassandraEventDeadLettersTable.GROUP;
+import static org.apache.james.events.tables.CassandraEventDeadLettersTable.INSERTION_ID;
+import static org.apache.james.events.tables.CassandraEventDeadLettersTable.TABLE_NAME;
import javax.inject.Inject;
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
-import org.apache.james.events.tables.CassandraEventDeadLettersTable;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Session;
@@ -57,62 +60,62 @@ public class CassandraEventDeadLettersDAO {
}
private PreparedStatement prepareInsertStatement(Session session) {
- return session.prepare(insertInto(CassandraEventDeadLettersTable.TABLE_NAME)
- .value(CassandraEventDeadLettersTable.GROUP, bindMarker(CassandraEventDeadLettersTable.GROUP))
- .value(CassandraEventDeadLettersTable.INSERTION_ID, bindMarker(CassandraEventDeadLettersTable.INSERTION_ID))
- .value(CassandraEventDeadLettersTable.EVENT, bindMarker(CassandraEventDeadLettersTable.EVENT)));
+ return session.prepare(insertInto(TABLE_NAME)
+ .value(GROUP, bindMarker(GROUP))
+ .value(INSERTION_ID, bindMarker(INSERTION_ID))
+ .value(EVENT, bindMarker(EVENT)));
}
private PreparedStatement prepareDeleteStatement(Session session) {
return session.prepare(delete()
- .from(CassandraEventDeadLettersTable.TABLE_NAME)
- .where(eq(CassandraEventDeadLettersTable.GROUP, bindMarker(CassandraEventDeadLettersTable.GROUP)))
- .and(eq(CassandraEventDeadLettersTable.INSERTION_ID, bindMarker(CassandraEventDeadLettersTable.INSERTION_ID))));
+ .from(TABLE_NAME)
+ .where(eq(GROUP, bindMarker(GROUP)))
+ .and(eq(INSERTION_ID, bindMarker(INSERTION_ID))));
}
private PreparedStatement prepareSelectEventStatement(Session session) {
- return session.prepare(select(CassandraEventDeadLettersTable.EVENT)
- .from(CassandraEventDeadLettersTable.TABLE_NAME)
- .where(eq(CassandraEventDeadLettersTable.GROUP, bindMarker(CassandraEventDeadLettersTable.GROUP)))
- .and(eq(CassandraEventDeadLettersTable.INSERTION_ID, bindMarker(CassandraEventDeadLettersTable.INSERTION_ID))));
+ return session.prepare(select(EVENT)
+ .from(TABLE_NAME)
+ .where(eq(GROUP, bindMarker(GROUP)))
+ .and(eq(INSERTION_ID, bindMarker(INSERTION_ID))));
}
private PreparedStatement prepareSelectInsertionIdsWithGroupStatement(Session session) {
- return session.prepare(select(CassandraEventDeadLettersTable.INSERTION_ID)
- .from(CassandraEventDeadLettersTable.TABLE_NAME)
- .where(eq(CassandraEventDeadLettersTable.GROUP, bindMarker(CassandraEventDeadLettersTable.GROUP))));
+ return session.prepare(select(INSERTION_ID)
+ .from(TABLE_NAME)
+ .where(eq(GROUP, bindMarker(GROUP))));
}
private PreparedStatement prepareContainEventStatement(Session session) {
- return session.prepare(select(CassandraEventDeadLettersTable.EVENT)
- .from(CassandraEventDeadLettersTable.TABLE_NAME)
+ return session.prepare(select(EVENT)
+ .from(TABLE_NAME)
.limit(1));
}
Mono<Void> store(Group group, Event failedEvent, EventDeadLetters.InsertionId insertionId) {
return executor.executeVoid(insertStatement.bind()
- .setString(CassandraEventDeadLettersTable.GROUP, group.asString())
- .setUUID(CassandraEventDeadLettersTable.INSERTION_ID, insertionId.getId())
- .setString(CassandraEventDeadLettersTable.EVENT, eventSerializer.toJson(failedEvent)));
+ .setString(GROUP, group.asString())
+ .setUUID(INSERTION_ID, insertionId.getId())
+ .setString(EVENT, eventSerializer.toJson(failedEvent)));
}
Mono<Void> removeEvent(Group group, EventDeadLetters.InsertionId failedInsertionId) {
return executor.executeVoid(deleteStatement.bind()
- .setString(CassandraEventDeadLettersTable.GROUP, group.asString())
- .setUUID(CassandraEventDeadLettersTable.INSERTION_ID, failedInsertionId.getId()));
+ .setString(GROUP, group.asString())
+ .setUUID(INSERTION_ID, failedInsertionId.getId()));
}
Mono<Event> retrieveFailedEvent(Group group, EventDeadLetters.InsertionId insertionId) {
return executor.executeSingleRow(selectEventStatement.bind()
- .setString(CassandraEventDeadLettersTable.GROUP, group.asString())
- .setUUID(CassandraEventDeadLettersTable.INSERTION_ID, insertionId.getId()))
- .map(row -> deserializeEvent(row.getString(CassandraEventDeadLettersTable.EVENT)));
+ .setString(GROUP, group.asString())
+ .setUUID(INSERTION_ID, insertionId.getId()))
+ .map(row -> deserializeEvent(row.getString(EVENT)));
}
Flux<EventDeadLetters.InsertionId> retrieveInsertionIdsWithGroup(Group group) {
return executor.executeRows(selectEventIdsWithGroupStatement.bind()
- .setString(CassandraEventDeadLettersTable.GROUP, group.asString()))
- .map(row -> EventDeadLetters.InsertionId.of(row.getUUID(CassandraEventDeadLettersTable.INSERTION_ID)));
+ .setString(GROUP, group.asString()))
+ .map(row -> EventDeadLetters.InsertionId.of(row.getUUID(INSERTION_ID)));
}
Mono<Boolean> containEvents() {
diff --git a/event-bus/cassandra/src/main/java/org/apache/james/events/CassandraEventDeadLettersGroupDAO.java b/event-bus/cassandra/src/main/java/org/apache/james/events/CassandraEventDeadLettersGroupDAO.java
index 2fefd3c..bd0c27e 100644
--- a/event-bus/cassandra/src/main/java/org/apache/james/events/CassandraEventDeadLettersGroupDAO.java
+++ b/event-bus/cassandra/src/main/java/org/apache/james/events/CassandraEventDeadLettersGroupDAO.java
@@ -20,11 +20,12 @@
package org.apache.james.events;
import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker;
+import static org.apache.james.events.tables.CassandraEventDeadLettersGroupTable.GROUP;
+import static org.apache.james.events.tables.CassandraEventDeadLettersGroupTable.TABLE_NAME;
import javax.inject.Inject;
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
-import org.apache.james.events.tables.CassandraEventDeadLettersGroupTable;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Session;
@@ -47,22 +48,22 @@ public class CassandraEventDeadLettersGroupDAO {
}
private PreparedStatement prepareInsertStatement(Session session) {
- return session.prepare(QueryBuilder.insertInto(CassandraEventDeadLettersGroupTable.TABLE_NAME)
- .value(CassandraEventDeadLettersGroupTable.GROUP, bindMarker(CassandraEventDeadLettersGroupTable.GROUP)));
+ return session.prepare(QueryBuilder.insertInto(TABLE_NAME)
+ .value(GROUP, bindMarker(GROUP)));
}
private PreparedStatement prepareSelectStatement(Session session) {
- return session.prepare(QueryBuilder.select(CassandraEventDeadLettersGroupTable.GROUP)
- .from(CassandraEventDeadLettersGroupTable.TABLE_NAME));
+ return session.prepare(QueryBuilder.select(GROUP)
+ .from(TABLE_NAME));
}
Mono<Void> storeGroup(Group group) {
return executor.executeVoid(insertStatement.bind()
- .setString(CassandraEventDeadLettersGroupTable.GROUP, group.asString()));
+ .setString(GROUP, group.asString()));
}
Flux<Group> retrieveAllGroups() {
return executor.executeRows(selectAllStatement.bind())
- .map(Throwing.function(row -> Group.deserialize(row.getString(CassandraEventDeadLettersGroupTable.GROUP))));
+ .map(Throwing.function(row -> Group.deserialize(row.getString(GROUP))));
}
}
diff --git a/event-bus/cassandra/src/test/java/org/apache/james/events/CassandraEventDeadLettersDAOTest.java b/event-bus/cassandra/src/test/java/org/apache/james/events/CassandraEventDeadLettersDAOTest.java
index 114022d..ffa7cd3 100644
--- a/event-bus/cassandra/src/test/java/org/apache/james/events/CassandraEventDeadLettersDAOTest.java
+++ b/event-bus/cassandra/src/test/java/org/apache/james/events/CassandraEventDeadLettersDAOTest.java
@@ -19,6 +19,13 @@
package org.apache.james.events;
+import static org.apache.james.events.EventDeadLettersContract.EVENT_1;
+import static org.apache.james.events.EventDeadLettersContract.EVENT_2;
+import static org.apache.james.events.EventDeadLettersContract.GROUP_A;
+import static org.apache.james.events.EventDeadLettersContract.GROUP_B;
+import static org.apache.james.events.EventDeadLettersContract.INSERTION_ID_1;
+import static org.apache.james.events.EventDeadLettersContract.INSERTION_ID_2;
+import static org.apache.james.events.EventDeadLettersContract.INSERTION_ID_3;
import static org.assertj.core.api.Assertions.assertThat;
import org.apache.james.backends.cassandra.CassandraCluster;
@@ -42,12 +49,12 @@ class CassandraEventDeadLettersDAOTest {
@Test
void removeEventShouldSucceededWhenRemoveStoredEvent() {
- cassandraEventDeadLettersDAO.store(EventDeadLettersContract.GROUP_A, EventDeadLettersContract.EVENT_1, EventDeadLettersContract.INSERTION_ID_1).block();
+ cassandraEventDeadLettersDAO.store(GROUP_A, EVENT_1, INSERTION_ID_1).block();
- cassandraEventDeadLettersDAO.removeEvent(EventDeadLettersContract.GROUP_A, EventDeadLettersContract.INSERTION_ID_1).block();
+ cassandraEventDeadLettersDAO.removeEvent(GROUP_A, INSERTION_ID_1).block();
assertThat(cassandraEventDeadLettersDAO
- .retrieveInsertionIdsWithGroup(EventDeadLettersContract.GROUP_A)
+ .retrieveInsertionIdsWithGroup(GROUP_A)
.collectList().block())
.isEmpty();
}
@@ -55,45 +62,45 @@ class CassandraEventDeadLettersDAOTest {
@Test
void retrieveFailedEventShouldReturnEmptyWhenDefault() {
assertThat(cassandraEventDeadLettersDAO
- .retrieveFailedEvent(EventDeadLettersContract.GROUP_A, EventDeadLettersContract.INSERTION_ID_1)
+ .retrieveFailedEvent(GROUP_A, INSERTION_ID_1)
.blockOptional().isPresent())
.isFalse();
}
@Test
void retrieveFailedEventShouldReturnStoredEvent() {
- cassandraEventDeadLettersDAO.store(EventDeadLettersContract.GROUP_A, EventDeadLettersContract.EVENT_1, EventDeadLettersContract.INSERTION_ID_1).block();
- cassandraEventDeadLettersDAO.store(EventDeadLettersContract.GROUP_B, EventDeadLettersContract.EVENT_2, EventDeadLettersContract.INSERTION_ID_2).block();
+ cassandraEventDeadLettersDAO.store(GROUP_A, EVENT_1, INSERTION_ID_1).block();
+ cassandraEventDeadLettersDAO.store(GROUP_B, EVENT_2, INSERTION_ID_2).block();
assertThat(cassandraEventDeadLettersDAO
- .retrieveFailedEvent(EventDeadLettersContract.GROUP_B, EventDeadLettersContract.INSERTION_ID_2)
+ .retrieveFailedEvent(GROUP_B, INSERTION_ID_2)
.blockOptional().get())
- .isEqualTo(EventDeadLettersContract.EVENT_2);
+ .isEqualTo(EVENT_2);
}
@Test
void retrieveInsertionIdsWithGroupShouldReturnEmptyWhenDefault() {
assertThat(cassandraEventDeadLettersDAO
- .retrieveInsertionIdsWithGroup(EventDeadLettersContract.GROUP_A)
+ .retrieveInsertionIdsWithGroup(GROUP_A)
.collectList().block())
.isEmpty();
}
@Test
void retrieveInsertionIdsWithGroupShouldReturnStoredInsertionId() {
- cassandraEventDeadLettersDAO.store(EventDeadLettersContract.GROUP_B, EventDeadLettersContract.EVENT_1, EventDeadLettersContract.INSERTION_ID_1).block();
- cassandraEventDeadLettersDAO.store(EventDeadLettersContract.GROUP_B, EventDeadLettersContract.EVENT_2, EventDeadLettersContract.INSERTION_ID_2).block();
- cassandraEventDeadLettersDAO.store(EventDeadLettersContract.GROUP_B, EventDeadLettersContract.EVENT_3, EventDeadLettersContract.INSERTION_ID_3).block();
+ cassandraEventDeadLettersDAO.store(GROUP_B, EVENT_1, INSERTION_ID_1).block();
+ cassandraEventDeadLettersDAO.store(GROUP_B, EVENT_2, INSERTION_ID_2).block();
+ cassandraEventDeadLettersDAO.store(GROUP_B, EventDeadLettersContract.EVENT_3, INSERTION_ID_3).block();
assertThat(cassandraEventDeadLettersDAO
- .retrieveInsertionIdsWithGroup(EventDeadLettersContract.GROUP_B)
+ .retrieveInsertionIdsWithGroup(GROUP_B)
.collectList().block())
- .containsOnly(EventDeadLettersContract.INSERTION_ID_1, EventDeadLettersContract.INSERTION_ID_2, EventDeadLettersContract.INSERTION_ID_3);
+ .containsOnly(INSERTION_ID_1, INSERTION_ID_2, INSERTION_ID_3);
}
@Test
void shouldReturnTrueWhenEventStored() {
- cassandraEventDeadLettersDAO.store(EventDeadLettersContract.GROUP_B, EventDeadLettersContract.EVENT_1, EventDeadLettersContract.INSERTION_ID_1).block();
+ cassandraEventDeadLettersDAO.store(GROUP_B, EVENT_1, INSERTION_ID_1).block();
assertThat(cassandraEventDeadLettersDAO.containEvents().block()).isTrue();
}
@@ -104,28 +111,28 @@ class CassandraEventDeadLettersDAOTest {
@Test
void shouldReturnTrueWhenEventsStoredAndRemovedSome() {
- cassandraEventDeadLettersDAO.store(EventDeadLettersContract.GROUP_B, EventDeadLettersContract.EVENT_1, EventDeadLettersContract.INSERTION_ID_1).block();
- cassandraEventDeadLettersDAO.store(EventDeadLettersContract.GROUP_B, EventDeadLettersContract.EVENT_1, EventDeadLettersContract.INSERTION_ID_2).block();
- cassandraEventDeadLettersDAO.store(EventDeadLettersContract.GROUP_B, EventDeadLettersContract.EVENT_1, EventDeadLettersContract.INSERTION_ID_3).block();
+ cassandraEventDeadLettersDAO.store(GROUP_B, EVENT_1, INSERTION_ID_1).block();
+ cassandraEventDeadLettersDAO.store(GROUP_B, EVENT_1, INSERTION_ID_2).block();
+ cassandraEventDeadLettersDAO.store(GROUP_B, EVENT_1, INSERTION_ID_3).block();
assertThat(cassandraEventDeadLettersDAO.containEvents().block()).isTrue();
- cassandraEventDeadLettersDAO.removeEvent(EventDeadLettersContract.GROUP_B, EventDeadLettersContract.INSERTION_ID_3).block();
+ cassandraEventDeadLettersDAO.removeEvent(GROUP_B, INSERTION_ID_3).block();
assertThat(cassandraEventDeadLettersDAO.containEvents().block()).isTrue();
}
@Test
void shouldReturnFalseWhenRemovedAllEventsStored() {
- cassandraEventDeadLettersDAO.store(EventDeadLettersContract.GROUP_B, EventDeadLettersContract.EVENT_1, EventDeadLettersContract.INSERTION_ID_1).block();
- cassandraEventDeadLettersDAO.store(EventDeadLettersContract.GROUP_B, EventDeadLettersContract.EVENT_1, EventDeadLettersContract.INSERTION_ID_2).block();
- cassandraEventDeadLettersDAO.store(EventDeadLettersContract.GROUP_B, EventDeadLettersContract.EVENT_1, EventDeadLettersContract.INSERTION_ID_3).block();
+ cassandraEventDeadLettersDAO.store(GROUP_B, EVENT_1, INSERTION_ID_1).block();
+ cassandraEventDeadLettersDAO.store(GROUP_B, EVENT_1, INSERTION_ID_2).block();
+ cassandraEventDeadLettersDAO.store(GROUP_B, EVENT_1, INSERTION_ID_3).block();
assertThat(cassandraEventDeadLettersDAO.containEvents().block()).isTrue();
- cassandraEventDeadLettersDAO.removeEvent(EventDeadLettersContract.GROUP_B, EventDeadLettersContract.INSERTION_ID_3).block();
- cassandraEventDeadLettersDAO.removeEvent(EventDeadLettersContract.GROUP_B, EventDeadLettersContract.INSERTION_ID_2).block();
- cassandraEventDeadLettersDAO.removeEvent(EventDeadLettersContract.GROUP_B, EventDeadLettersContract.INSERTION_ID_1).block();
+ cassandraEventDeadLettersDAO.removeEvent(GROUP_B, INSERTION_ID_3).block();
+ cassandraEventDeadLettersDAO.removeEvent(GROUP_B, INSERTION_ID_2).block();
+ cassandraEventDeadLettersDAO.removeEvent(GROUP_B, INSERTION_ID_1).block();
assertThat(cassandraEventDeadLettersDAO.containEvents().block()).isFalse();
}
diff --git a/event-bus/cassandra/src/test/java/org/apache/james/events/CassandraEventDeadLettersGroupDAOTest.java b/event-bus/cassandra/src/test/java/org/apache/james/events/CassandraEventDeadLettersGroupDAOTest.java
index f5eef74..891070e 100644
--- a/event-bus/cassandra/src/test/java/org/apache/james/events/CassandraEventDeadLettersGroupDAOTest.java
+++ b/event-bus/cassandra/src/test/java/org/apache/james/events/CassandraEventDeadLettersGroupDAOTest.java
@@ -19,6 +19,8 @@
package org.apache.james.events;
+import static org.apache.james.events.EventDeadLettersContract.GROUP_A;
+import static org.apache.james.events.EventDeadLettersContract.GROUP_B;
import static org.assertj.core.api.Assertions.assertThat;
import org.apache.james.backends.cassandra.CassandraCluster;
@@ -48,11 +50,11 @@ public class CassandraEventDeadLettersGroupDAOTest {
@Test
void retrieveAllGroupsShouldReturnStoredGroups() {
- GROUP_DAO.storeGroup(EventDeadLettersContract.GROUP_A).block();
- GROUP_DAO.storeGroup(EventDeadLettersContract.GROUP_B).block();
+ GROUP_DAO.storeGroup(GROUP_A).block();
+ GROUP_DAO.storeGroup(GROUP_B).block();
assertThat(GROUP_DAO.retrieveAllGroups()
.collectList().block())
- .containsOnly(EventDeadLettersContract.GROUP_A, EventDeadLettersContract.GROUP_B);
+ .containsOnly(GROUP_A, GROUP_B);
}
}
diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/GroupConsumerRetry.java b/event-bus/distributed/src/main/java/org/apache/james/events/GroupConsumerRetry.java
index 10d922a..5c6fd2e 100644
--- a/event-bus/distributed/src/main/java/org/apache/james/events/GroupConsumerRetry.java
+++ b/event-bus/distributed/src/main/java/org/apache/james/events/GroupConsumerRetry.java
@@ -24,6 +24,7 @@ import static org.apache.james.backends.rabbitmq.Constants.DIRECT_EXCHANGE;
import static org.apache.james.backends.rabbitmq.Constants.DURABLE;
import static org.apache.james.backends.rabbitmq.Constants.EMPTY_ROUTING_KEY;
import static org.apache.james.events.GroupRegistration.RETRY_COUNT;
+import static org.apache.james.events.RabbitMQEventBus.MAILBOX_EVENT;
import java.nio.charset.StandardCharsets;
@@ -51,7 +52,7 @@ class GroupConsumerRetry {
return new RetryExchangeName(group.asString());
}
- static final String MAILBOX_EVENT_RETRY_EXCHANGE_PREFIX = RabbitMQEventBus.MAILBOX_EVENT + "-retryExchange-";
+ static final String MAILBOX_EVENT_RETRY_EXCHANGE_PREFIX = MAILBOX_EVENT + "-retryExchange-";
private final String name;
diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java
index ad2b7ad..07739ac 100644
--- a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java
+++ b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java
@@ -25,6 +25,7 @@ import static org.apache.james.backends.rabbitmq.Constants.EMPTY_ROUTING_KEY;
import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE;
import static org.apache.james.backends.rabbitmq.Constants.REQUEUE;
import static org.apache.james.backends.rabbitmq.Constants.deadLetterQueue;
+import static org.apache.james.events.RabbitMQEventBus.MAILBOX_EVENT;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
@@ -56,7 +57,7 @@ class GroupRegistration implements Registration {
return new WorkQueueName(group);
}
- static final String MAILBOX_EVENT_WORK_QUEUE_PREFIX = RabbitMQEventBus.MAILBOX_EVENT + "-workQueue-";
+ static final String MAILBOX_EVENT_WORK_QUEUE_PREFIX = MAILBOX_EVENT + "-workQueue-";
private final Group group;
diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/KeyReconnectionHandler.java b/event-bus/distributed/src/main/java/org/apache/james/events/KeyReconnectionHandler.java
index 24b5dd1..8b15173 100644
--- a/event-bus/distributed/src/main/java/org/apache/james/events/KeyReconnectionHandler.java
+++ b/event-bus/distributed/src/main/java/org/apache/james/events/KeyReconnectionHandler.java
@@ -22,6 +22,8 @@ package org.apache.james.events;
import static org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE;
import static org.apache.james.backends.rabbitmq.Constants.DURABLE;
import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE;
+import static org.apache.james.events.KeyRegistrationHandler.EVENTBUS_QUEUE_NAME_PREFIX;
+import static org.apache.james.events.KeyRegistrationHandler.QUEUE_ARGUMENTS;
import javax.inject.Inject;
@@ -49,7 +51,7 @@ public class KeyReconnectionHandler implements SimpleConnectionPool.Reconnection
public Publisher<Void> handleReconnection(Connection connection) {
return Mono.fromRunnable(() -> {
try (Channel channel = connection.createChannel()) {
- channel.queueDeclare(KeyRegistrationHandler.EVENTBUS_QUEUE_NAME_PREFIX + eventBusId.asString(), DURABLE, !EXCLUSIVE, AUTO_DELETE, KeyRegistrationHandler.QUEUE_ARGUMENTS);
+ channel.queueDeclare(EVENTBUS_QUEUE_NAME_PREFIX + eventBusId.asString(), DURABLE, !EXCLUSIVE, AUTO_DELETE, QUEUE_ARGUMENTS);
} catch (Exception e) {
LOGGER.error("Error recovering connection", e);
}
diff --git a/event-bus/in-vm/src/test/java/org/apache/james/events/delivery/InVmEventDeliveryTest.java b/event-bus/in-vm/src/test/java/org/apache/james/events/delivery/InVmEventDeliveryTest.java
index a0536a0..ab2a92b 100644
--- a/event-bus/in-vm/src/test/java/org/apache/james/events/delivery/InVmEventDeliveryTest.java
+++ b/event-bus/in-vm/src/test/java/org/apache/james/events/delivery/InVmEventDeliveryTest.java
@@ -19,19 +19,23 @@
package org.apache.james.events.delivery;
+import static org.apache.james.events.EventBusTestFixture.EVENT;
+import static org.apache.james.events.EventBusTestFixture.GROUP_A;
+import static org.apache.james.events.delivery.EventDelivery.PermanentFailureHandler.NO_HANDLER;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import java.time.Duration;
import org.apache.james.events.EventBusTestFixture;
-import org.apache.james.events.EventListener;
+import org.apache.james.events.EventListener.ExecutionMode;
import org.apache.james.events.MemoryEventDeadLetters;
import org.apache.james.events.RetryBackoffConfiguration;
+import org.apache.james.events.delivery.EventDelivery.DeliveryOption;
+import org.apache.james.events.delivery.EventDelivery.Retryer.BackoffRetryer;
import org.apache.james.metrics.tests.RecordingMetricFactory;
import org.assertj.core.api.SoftAssertions;
import org.junit.jupiter.api.BeforeEach;
@@ -58,8 +62,8 @@ class InVmEventDeliveryTest {
@Test
void deliverShouldDeliverEvent() {
- when(listener.getExecutionMode()).thenReturn(EventListener.ExecutionMode.SYNCHRONOUS);
- inVmEventDelivery.deliver(listener, EventBusTestFixture.EVENT, EventDelivery.DeliveryOption.none())
+ when(listener.getExecutionMode()).thenReturn(ExecutionMode.SYNCHRONOUS);
+ inVmEventDelivery.deliver(listener, EVENT, DeliveryOption.none())
.block();
assertThat(listener.numberOfEventCalls())
@@ -68,19 +72,19 @@ class InVmEventDeliveryTest {
@Test
void deliverShouldReturnSuccessSynchronousMono() {
- when(listener.getExecutionMode()).thenReturn(EventListener.ExecutionMode.SYNCHRONOUS);
- assertThatCode(() -> inVmEventDelivery.deliver(listener, EventBusTestFixture.EVENT, EventDelivery.DeliveryOption.none())
+ when(listener.getExecutionMode()).thenReturn(ExecutionMode.SYNCHRONOUS);
+ assertThatCode(() -> inVmEventDelivery.deliver(listener, EVENT, DeliveryOption.none())
.block())
.doesNotThrowAnyException();
}
@Test
void deliverShouldNotDeliverWhenListenerGetException() {
- when(listener.getExecutionMode()).thenReturn(EventListener.ExecutionMode.SYNCHRONOUS);
+ when(listener.getExecutionMode()).thenReturn(ExecutionMode.SYNCHRONOUS);
doThrow(new RuntimeException())
- .when(listener).event(EventBusTestFixture.EVENT);
+ .when(listener).event(EVENT);
- assertThatThrownBy(() -> inVmEventDelivery.deliver(listener, EventBusTestFixture.EVENT, EventDelivery.DeliveryOption.none())
+ assertThatThrownBy(() -> inVmEventDelivery.deliver(listener, EVENT, DeliveryOption.none())
.block())
.isInstanceOf(RuntimeException.class);
@@ -90,11 +94,11 @@ class InVmEventDeliveryTest {
@Test
void deliverShouldReturnAnErrorMonoWhenListenerGetException() {
- when(listener.getExecutionMode()).thenReturn(EventListener.ExecutionMode.SYNCHRONOUS);
+ when(listener.getExecutionMode()).thenReturn(ExecutionMode.SYNCHRONOUS);
doThrow(new RuntimeException())
- .when(listener).event(EventBusTestFixture.EVENT);
+ .when(listener).event(EVENT);
- assertThatThrownBy(() -> inVmEventDelivery.deliver(listener, EventBusTestFixture.EVENT, EventDelivery.DeliveryOption.none())
+ assertThatThrownBy(() -> inVmEventDelivery.deliver(listener, EVENT, DeliveryOption.none())
.block())
.isInstanceOf(RuntimeException.class);
}
@@ -105,8 +109,8 @@ class InVmEventDeliveryTest {
@Test
void deliverShouldDeliverEvent() {
- when(listener.getExecutionMode()).thenReturn(EventListener.ExecutionMode.ASYNCHRONOUS);
- inVmEventDelivery.deliver(listener, EventBusTestFixture.EVENT, EventDelivery.DeliveryOption.none())
+ when(listener.getExecutionMode()).thenReturn(ExecutionMode.ASYNCHRONOUS);
+ inVmEventDelivery.deliver(listener, EVENT, DeliveryOption.none())
.block();
assertThat(listener.numberOfEventCalls())
@@ -115,30 +119,30 @@ class InVmEventDeliveryTest {
@Test
void deliverShouldReturnSuccessSynchronousMono() {
- when(listener.getExecutionMode()).thenReturn(EventListener.ExecutionMode.ASYNCHRONOUS);
- assertThatCode(() -> inVmEventDelivery.deliver(listener, EventBusTestFixture.EVENT, EventDelivery.DeliveryOption.none())
+ when(listener.getExecutionMode()).thenReturn(ExecutionMode.ASYNCHRONOUS);
+ assertThatCode(() -> inVmEventDelivery.deliver(listener, EVENT, DeliveryOption.none())
.block())
.doesNotThrowAnyException();
}
@Test
void deliverShouldNotFailWhenListenerGetException() {
- when(listener.getExecutionMode()).thenReturn(EventListener.ExecutionMode.ASYNCHRONOUS);
+ when(listener.getExecutionMode()).thenReturn(ExecutionMode.ASYNCHRONOUS);
doThrow(new RuntimeException())
- .when(listener).event(EventBusTestFixture.EVENT);
+ .when(listener).event(EVENT);
- assertThatCode(() -> inVmEventDelivery.deliver(listener, EventBusTestFixture.EVENT, EventDelivery.DeliveryOption.none())
+ assertThatCode(() -> inVmEventDelivery.deliver(listener, EVENT, DeliveryOption.none())
.block())
.doesNotThrowAnyException();
}
@Test
void deliverShouldReturnAnSuccessSyncMonoWhenListenerGetException() {
- when(listener.getExecutionMode()).thenReturn(EventListener.ExecutionMode.ASYNCHRONOUS);
+ when(listener.getExecutionMode()).thenReturn(ExecutionMode.ASYNCHRONOUS);
doThrow(new RuntimeException())
- .when(listener).event(EventBusTestFixture.EVENT);
+ .when(listener).event(EVENT);
- assertThatCode(() -> inVmEventDelivery.deliver(listener, EventBusTestFixture.EVENT, EventDelivery.DeliveryOption.none())
+ assertThatCode(() -> inVmEventDelivery.deliver(listener, EVENT, DeliveryOption.none())
.block())
.doesNotThrowAnyException();
}
@@ -154,12 +158,12 @@ class InVmEventDeliveryTest {
.doThrow(new RuntimeException())
.doThrow(new RuntimeException())
.doCallRealMethod()
- .when(listener).event(EventBusTestFixture.EVENT);
+ .when(listener).event(EVENT);
- inVmEventDelivery.deliver(listener, EventBusTestFixture.EVENT,
- EventDelivery.DeliveryOption.of(
- EventDelivery.Retryer.BackoffRetryer.of(RetryBackoffConfiguration.DEFAULT, listener),
- EventDelivery.PermanentFailureHandler.NO_HANDLER))
+ inVmEventDelivery.deliver(listener, EVENT,
+ DeliveryOption.of(
+ BackoffRetryer.of(RetryBackoffConfiguration.DEFAULT, listener),
+ NO_HANDLER))
.block();
assertThat(listener.numberOfEventCalls())
@@ -170,18 +174,18 @@ class InVmEventDeliveryTest {
void failureHandlerShouldWorkWhenDeliverWithFailureHandler() {
EventBusTestFixture.EventListenerCountingSuccessfulExecution listener = newListener();
doThrow(new RuntimeException())
- .when(listener).event(EventBusTestFixture.EVENT);
+ .when(listener).event(EVENT);
MemoryEventDeadLetters deadLetter = new MemoryEventDeadLetters();
- inVmEventDelivery.deliver(listener, EventBusTestFixture.EVENT,
- EventDelivery.DeliveryOption.of(
+ inVmEventDelivery.deliver(listener, EVENT,
+ DeliveryOption.of(
EventDelivery.Retryer.NO_RETRYER,
- EventDelivery.PermanentFailureHandler.StoreToDeadLetters.of(EventBusTestFixture.GROUP_A, deadLetter)))
+ EventDelivery.PermanentFailureHandler.StoreToDeadLetters.of(GROUP_A, deadLetter)))
.block();
assertThat(deadLetter.groupsWithFailedEvents().toStream())
- .containsOnly(EventBusTestFixture.GROUP_A);
+ .containsOnly(GROUP_A);
}
@Test
@@ -190,14 +194,14 @@ class InVmEventDeliveryTest {
doThrow(new RuntimeException())
.doThrow(new RuntimeException())
.doCallRealMethod()
- .when(listener).event(EventBusTestFixture.EVENT);
+ .when(listener).event(EVENT);
MemoryEventDeadLetters deadLetter = new MemoryEventDeadLetters();
- inVmEventDelivery.deliver(listener, EventBusTestFixture.EVENT,
- EventDelivery.DeliveryOption.of(
- EventDelivery.Retryer.BackoffRetryer.of(RetryBackoffConfiguration.DEFAULT, listener),
- EventDelivery.PermanentFailureHandler.StoreToDeadLetters.of(EventBusTestFixture.GROUP_A, deadLetter)))
+ inVmEventDelivery.deliver(listener, EVENT,
+ DeliveryOption.of(
+ BackoffRetryer.of(RetryBackoffConfiguration.DEFAULT, listener),
+ EventDelivery.PermanentFailureHandler.StoreToDeadLetters.of(GROUP_A, deadLetter)))
.block();
SoftAssertions.assertSoftly(softy -> {
@@ -223,25 +227,25 @@ class InVmEventDeliveryTest {
.doThrow(new RuntimeException())
.doThrow(new RuntimeException())
.doCallRealMethod()
- .when(listener).event(EventBusTestFixture.EVENT);
+ .when(listener).event(EVENT);
MemoryEventDeadLetters deadLetter = new MemoryEventDeadLetters();
- inVmEventDelivery.deliver(listener, EventBusTestFixture.EVENT,
- EventDelivery.DeliveryOption.of(
- EventDelivery.Retryer.BackoffRetryer.of(RetryBackoffConfiguration.builder()
+ inVmEventDelivery.deliver(listener, EVENT,
+ DeliveryOption.of(
+ BackoffRetryer.of(RetryBackoffConfiguration.builder()
.maxRetries(8)
.firstBackoff(Duration.ofMillis(1))
.jitterFactor(0.2)
.build(), listener),
- EventDelivery.PermanentFailureHandler.StoreToDeadLetters.of(EventBusTestFixture.GROUP_A, deadLetter)))
+ EventDelivery.PermanentFailureHandler.StoreToDeadLetters.of(GROUP_A, deadLetter)))
.block();
SoftAssertions.assertSoftly(softy -> {
softy.assertThat(listener.numberOfEventCalls())
.isEqualTo(0);
assertThat(deadLetter.groupsWithFailedEvents().toStream())
- .containsOnly(EventBusTestFixture.GROUP_A);
+ .containsOnly(GROUP_A);
});
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org