You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2021/02/04 05:02:44 UTC
[james-project] 18/18: JAMES-3498 NamingStrategy should allow
RabbitMQ eventBus isolation
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 3e3c83d7919037b90225d9152e2456d926b798c6
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Mon Feb 1 17:10:51 2021 +0700
JAMES-3498 NamingStrategy should allow RabbitMQ eventBus isolation
---
.../apache/james/events/RabbitMQEventBusTest.java | 59 ++++++++++++++++++++--
1 file changed, 54 insertions(+), 5 deletions(-)
diff --git a/event-bus/distributed/src/test/java/org/apache/james/events/RabbitMQEventBusTest.java b/event-bus/distributed/src/test/java/org/apache/james/events/RabbitMQEventBusTest.java
index efc3db4..2554609 100644
--- a/event-bus/distributed/src/test/java/org/apache/james/events/RabbitMQEventBusTest.java
+++ b/event-bus/distributed/src/test/java/org/apache/james/events/RabbitMQEventBusTest.java
@@ -67,6 +67,7 @@ import org.apache.james.events.EventDispatcher.DispatchingFailureGroup;
import org.apache.james.events.RoutingKeyConverter.RoutingKey;
import org.apache.james.metrics.tests.RecordingMetricFactory;
import org.apache.james.util.concurrency.ConcurrentTestRunner;
+import org.assertj.core.api.SoftAssertions;
import org.assertj.core.data.Percentage;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
@@ -146,11 +147,11 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
}
private RabbitMQEventBus newEventBus() {
- return newEventBus(rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider());
+ return newEventBus(TEST_NAMING_STRATEGY, rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider());
}
- private RabbitMQEventBus newEventBus(Sender sender, ReceiverProvider receiverProvider) {
- return new RabbitMQEventBus(TEST_NAMING_STRATEGY, sender, receiverProvider, eventSerializer,
+ private RabbitMQEventBus newEventBus(NamingStrategy namingStrategy, Sender sender, ReceiverProvider receiverProvider) {
+ return new RabbitMQEventBus(namingStrategy, sender, receiverProvider, eventSerializer,
EventBusTestFixture.RETRY_BACKOFF_CONFIGURATION, routingKeyConverter,
memoryEventDeadLetters, new RecordingMetricFactory(),
rabbitMQExtension.getRabbitChannelPool(), EventBusId.random());
@@ -451,7 +452,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
@BeforeEach
void beforeEach() {
- rabbitMQEventBusWithNetWorkIssue = newEventBus(rabbitMQNetWorkIssueExtension.getSender(), rabbitMQNetWorkIssueExtension.getReceiverProvider());
+ rabbitMQEventBusWithNetWorkIssue = newEventBus(TEST_NAMING_STRATEGY, rabbitMQNetWorkIssueExtension.getSender(), rabbitMQNetWorkIssueExtension.getReceiverProvider());
}
@Test
@@ -774,7 +775,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
assertThat(rabbitManagementAPI.listQueues())
.filteredOn(queue -> !queue.getName().startsWith("test-")
- && !queue.getName().startsWith("test-dead-letter-queue"))
+ && !queue.getName().startsWith("other-"))
.isEmpty();
}
@@ -810,6 +811,54 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
}
@Nested
+ class IsolationTest {
+ private RabbitMQEventBus otherEventBus;
+
+ @BeforeEach
+ void beforeEach() {
+ otherEventBus = newEventBus(new NamingStrategy("other"), rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider());
+ otherEventBus.start();
+ }
+
+ @AfterEach
+ void tearDown() {
+ otherEventBus.stop();
+ }
+
+ @Test
+ void eventBusGroupsWithDistinctNamingStrategiesShouldBeIsolated() throws Exception {
+ EventCollector listener = new EventCollector();
+ EventCollector otherListener = new EventCollector();
+ eventBus.register(listener, GROUP_A);
+ otherEventBus.register(otherListener, GROUP_B);
+
+ eventBus.dispatch(EVENT, ImmutableSet.of()).block();
+
+ TimeUnit.SECONDS.sleep(1);
+ SoftAssertions.assertSoftly(softly -> {
+ softly.assertThat(listener.getEvents()).hasSize(1);
+ softly.assertThat(otherListener.getEvents()).isEmpty();
+ });
+ }
+
+ @Test
+ void eventBusPubSubWithDistinctNamingStrategiesShouldBeIsolated() throws Exception {
+ EventCollector listener = new EventCollector();
+ EventCollector otherListener = new EventCollector();
+ eventBus.register(listener, KEY_1);
+ otherEventBus.register(otherListener, KEY_1);
+
+ eventBus.dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
+
+ TimeUnit.SECONDS.sleep(1);
+ SoftAssertions.assertSoftly(softly -> {
+ softly.assertThat(listener.getEvents()).hasSize(1);
+ softly.assertThat(otherListener.getEvents()).isEmpty();
+ });
+ }
+ }
+
+ @Nested
class ErrorDispatchingTest {
@AfterEach
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org