You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/01/11 09:58:14 UTC
[1/8] james-project git commit: JAMES-2630 Upgrade Reactor
Repository: james-project
Updated Branches:
refs/heads/master 2b5c162ef -> 79cf1c26b
JAMES-2630 Upgrade Reactor
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/ebab03a6
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/ebab03a6
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/ebab03a6
Branch: refs/heads/master
Commit: ebab03a667922b7239a33a7d5de9ddc3d1501810
Parents: 2b5c162
Author: Gautier DI FOLCO <gd...@linagora.com>
Authored: Tue Jan 8 09:45:01 2019 +0100
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Fri Jan 11 16:55:59 2019 +0700
----------------------------------------------------------------------
pom.xml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/ebab03a6/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 56486ae..0119f10 100644
--- a/pom.xml
+++ b/pom.xml
@@ -668,7 +668,7 @@
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-bom</artifactId>
- <version>Bismuth-RELEASE</version>
+ <version>Californium-SR3</version>
<type>pom</type>
<scope>import</scope>
</dependency>
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org
[5/8] james-project git commit: MAILBOX-368 EventBusContract should
enforce basic error handling checks
Posted by bt...@apache.org.
MAILBOX-368 EventBusContract should enforce basic error handling checks
- Add one test when failed before, consumer is still able to handle subsequent events
- A failing listener should not prevent subsequent listeners execution
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/16ac299a
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/16ac299a
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/16ac299a
Branch: refs/heads/master
Commit: 16ac299af400aa666aaa4bf52383705a24bee9ca
Parents: bca9ba0
Author: datph <dp...@linagora.com>
Authored: Tue Jan 8 14:13:05 2019 +0700
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Fri Jan 11 16:57:08 2019 +0700
----------------------------------------------------------------------
mailbox/api/pom.xml | 5 +++
.../mailbox/events/EventBusTestFixture.java | 23 ++++++++++++
.../james/mailbox/events/GroupContract.java | 37 ++++++++++++++++++++
mailbox/event/event-memory/pom.xml | 5 +++
4 files changed, 70 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/16ac299a/mailbox/api/pom.xml
----------------------------------------------------------------------
diff --git a/mailbox/api/pom.xml b/mailbox/api/pom.xml
index 024675d..04ef4f5 100644
--- a/mailbox/api/pom.xml
+++ b/mailbox/api/pom.xml
@@ -70,6 +70,11 @@
<artifactId>icu4j</artifactId>
</dependency>
<dependency>
+ <groupId>com.jayway.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>com.sun.mail</groupId>
<artifactId>javax.mail</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/james-project/blob/16ac299a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java
index ae2116d..ac6a883 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java
@@ -19,10 +19,12 @@
package org.apache.james.mailbox.events;
+import static com.jayway.awaitility.Awaitility.await;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.james.core.User;
import org.apache.james.mailbox.Event;
@@ -35,9 +37,28 @@ import org.apache.james.mailbox.model.TestId;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
+import com.jayway.awaitility.core.ConditionFactory;
public interface EventBusTestFixture {
+ class MailboxListenerCountingSuccessfulExecution implements MailboxListener {
+ private final AtomicInteger calls = new AtomicInteger(0);
+
+ @Override
+ public ListenerType getType() {
+ return ListenerType.ONCE;
+ }
+
+ @Override
+ public void event(Event event) {
+ calls.incrementAndGet();
+ }
+
+ int numberOfEventCalls() {
+ return calls.get();
+ }
+ }
+
class GroupA extends Group {}
class GroupB extends Group {}
@@ -57,6 +78,8 @@ public interface EventBusTestFixture {
MailboxIdRegistrationKey KEY_2 = new MailboxIdRegistrationKey(ID_2);
List<Class<? extends Group>> ALL_GROUPS = ImmutableList.of(GroupA.class, GroupB.class);
+ ConditionFactory WAIT_CONDITION = await().timeout(com.jayway.awaitility.Duration.ONE_SECOND);
+
static MailboxListener newListener() {
MailboxListener listener = mock(MailboxListener.class);
when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
http://git-wip-us.apache.org/repos/asf/james-project/blob/16ac299a/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
index 0975e3b..1820e80 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
@@ -25,14 +25,19 @@ import static org.apache.james.mailbox.events.EventBusTestFixture.GroupA;
import static org.apache.james.mailbox.events.EventBusTestFixture.GroupB;
import static org.apache.james.mailbox.events.EventBusTestFixture.NO_KEYS;
import static org.apache.james.mailbox.events.EventBusTestFixture.ONE_SECOND;
+import static org.apache.james.mailbox.events.EventBusTestFixture.WAIT_CONDITION;
import static org.apache.james.mailbox.events.EventBusTestFixture.newListener;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.after;
import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
import org.apache.james.core.User;
import org.apache.james.mailbox.Event;
@@ -163,5 +168,37 @@ public interface GroupContract {
verify(listener, timeout(ONE_SECOND).times(1)).event(any());
}
+
+ @Test
+ default void failingGroupListenersShouldNotAbortGroupDelivery() {
+ EventBusTestFixture.MailboxListenerCountingSuccessfulExecution listener = spy(new EventBusTestFixture.MailboxListenerCountingSuccessfulExecution());
+ eventBus().register(listener, new GroupA());
+
+ doThrow(new RuntimeException())
+ .doCallRealMethod()
+ .when(listener).event(any());
+
+ eventBus().dispatch(EVENT, NO_KEYS).block();
+ eventBus().dispatch(EVENT, NO_KEYS).block();
+
+ WAIT_CONDITION
+ .until(() -> assertThat(listener.numberOfEventCalls()).isEqualTo(1));
+ }
+
+ @Test
+ default void allGroupListenersShouldBeExecutedWhenAGroupListenerFails() {
+ MailboxListener listener = newListener();
+
+ MailboxListener failingListener = mock(MailboxListener.class);
+ when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
+ doThrow(new RuntimeException()).when(failingListener).event(any());
+
+ eventBus().register(failingListener, new GroupA());
+ eventBus().register(listener, new GroupB());
+
+ eventBus().dispatch(EVENT, NO_KEYS).block();
+
+ verify(listener, timeout(ONE_SECOND).times(1)).event(any());
+ }
}
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/16ac299a/mailbox/event/event-memory/pom.xml
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/pom.xml b/mailbox/event/event-memory/pom.xml
index be04bbc..cdc5d8f 100644
--- a/mailbox/event/event-memory/pom.xml
+++ b/mailbox/event/event-memory/pom.xml
@@ -47,6 +47,11 @@
<artifactId>metrics-api</artifactId>
</dependency>
<dependency>
+ <groupId>com.jayway.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org
[7/8] james-project git commit: MAILBOX-368 Add EventBus Concurrent
test for group registration
Posted by bt...@apache.org.
MAILBOX-368 Add EventBus Concurrent test for group registration
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/79cf1c26
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/79cf1c26
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/79cf1c26
Branch: refs/heads/master
Commit: 79cf1c26bea46d663b294a566488d720cd563d3c
Parents: 9fe9e62
Author: datph <dp...@linagora.com>
Authored: Tue Jan 8 17:14:49 2019 +0700
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Fri Jan 11 16:57:08 2019 +0700
----------------------------------------------------------------------
.../events/EventBusConcurrentTestContract.java | 109 +++++++++++++++++++
.../mailbox/events/EventBusTestFixture.java | 3 +-
.../mailbox/events/RabbitMQEventBusTest.java | 6 +-
3 files changed, 116 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/79cf1c26/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusConcurrentTestContract.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusConcurrentTestContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusConcurrentTestContract.java
new file mode 100644
index 0000000..0746612
--- /dev/null
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusConcurrentTestContract.java
@@ -0,0 +1,109 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events;
+
+import static com.jayway.awaitility.Awaitility.await;
+import static org.apache.james.mailbox.events.EventBusTestFixture.EVENT;
+import static org.apache.james.mailbox.events.EventBusTestFixture.NO_KEYS;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.james.util.concurrency.ConcurrentTestRunner;
+import org.junit.jupiter.api.Test;
+
+import com.google.common.collect.ImmutableList;
+import com.jayway.awaitility.core.ConditionFactory;
+
+public interface EventBusConcurrentTestContract {
+
+ Duration FIVE_SECONDS = Duration.ofSeconds(5);
+ ConditionFactory AWAIT_CONDITION = await().timeout(new com.jayway.awaitility.Duration(5, TimeUnit.SECONDS));
+
+ int THREAD_COUNT = 10;
+ int OPERATION_COUNT = 30;
+ int TOTAL_DISPATCH_OPERATIONS = THREAD_COUNT * OPERATION_COUNT;
+
+ static EventBusTestFixture.MailboxListenerCountingSuccessfulExecution newCountingListener() {
+ return new EventBusTestFixture.MailboxListenerCountingSuccessfulExecution();
+ }
+
+ static int totalEventsReceived(ImmutableList<EventBusTestFixture.MailboxListenerCountingSuccessfulExecution> allListeners) {
+ return allListeners.stream()
+ .mapToInt(listener -> listener.numberOfEventCalls())
+ .sum();
+ }
+
+ interface SingleEventBusConcurrentContract extends EventBusContract {
+
+ @Test
+ default void concurrentDispatchGroupShouldDeliverAllEventsToListenersWithSingleEventBus() throws Exception {
+ EventBusTestFixture.MailboxListenerCountingSuccessfulExecution countingListener1 = newCountingListener();
+ EventBusTestFixture.MailboxListenerCountingSuccessfulExecution countingListener2 = newCountingListener();
+ EventBusTestFixture.MailboxListenerCountingSuccessfulExecution countingListener3 = newCountingListener();
+
+ eventBus().register(countingListener1, new EventBusTestFixture.GroupA());
+ eventBus().register(countingListener2, new EventBusTestFixture.GroupB());
+ eventBus().register(countingListener3, new EventBusTestFixture.GroupC());
+ int totalGlobalRegistrations = 3; // GroupA + GroupB + GroupC
+
+ ConcurrentTestRunner.builder()
+ .operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, NO_KEYS))
+ .threadCount(THREAD_COUNT)
+ .operationCount(OPERATION_COUNT)
+ .runSuccessfullyWithin(FIVE_SECONDS);
+
+ AWAIT_CONDITION.until(() -> assertThat(totalEventsReceived(ImmutableList
+ .of(countingListener1, countingListener2, countingListener3)))
+ .isEqualTo(totalGlobalRegistrations * TOTAL_DISPATCH_OPERATIONS));
+ }
+ }
+
+ interface MultiEventBusConcurrentContract extends EventBusContract.MultipleEventBusContract {
+
+ @Test
+ default void concurrentDispatchGroupShouldDeliverAllEventsToListenersWithMultipleEventBus() throws Exception {
+ EventBusTestFixture.MailboxListenerCountingSuccessfulExecution countingListener1 = newCountingListener();
+ EventBusTestFixture.MailboxListenerCountingSuccessfulExecution countingListener2 = newCountingListener();
+ EventBusTestFixture.MailboxListenerCountingSuccessfulExecution countingListener3 = newCountingListener();
+
+ eventBus().register(countingListener1, new EventBusTestFixture.GroupA());
+ eventBus().register(countingListener2, new EventBusTestFixture.GroupB());
+ eventBus().register(countingListener3, new EventBusTestFixture.GroupC());
+
+ eventBus2().register(countingListener1, new EventBusTestFixture.GroupA());
+ eventBus2().register(countingListener2, new EventBusTestFixture.GroupB());
+ eventBus2().register(countingListener3, new EventBusTestFixture.GroupC());
+
+ int totalGlobalRegistrations = 3; // GroupA + GroupB + GroupC
+
+ ConcurrentTestRunner.builder()
+ .operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, NO_KEYS))
+ .threadCount(THREAD_COUNT)
+ .operationCount(OPERATION_COUNT)
+ .runSuccessfullyWithin(FIVE_SECONDS);
+
+ AWAIT_CONDITION.until(() -> assertThat(totalEventsReceived(ImmutableList
+ .of(countingListener1, countingListener2, countingListener3)))
+ .isEqualTo(totalGlobalRegistrations * TOTAL_DISPATCH_OPERATIONS));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/james-project/blob/79cf1c26/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java
index ac6a883..5d87c89 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java
@@ -61,6 +61,7 @@ public interface EventBusTestFixture {
class GroupA extends Group {}
class GroupB extends Group {}
+ class GroupC extends Group {}
MailboxListener.MailboxEvent EVENT = new MailboxListener.MailboxAdded(
MailboxSession.SessionId.of(42),
@@ -76,7 +77,7 @@ public interface EventBusTestFixture {
ImmutableSet<RegistrationKey> NO_KEYS = ImmutableSet.of();
MailboxIdRegistrationKey KEY_1 = new MailboxIdRegistrationKey(ID_1);
MailboxIdRegistrationKey KEY_2 = new MailboxIdRegistrationKey(ID_2);
- List<Class<? extends Group>> ALL_GROUPS = ImmutableList.of(GroupA.class, GroupB.class);
+ List<Class<? extends Group>> ALL_GROUPS = ImmutableList.of(GroupA.class, GroupB.class, GroupC.class);
ConditionFactory WAIT_CONDITION = await().timeout(com.jayway.awaitility.Duration.ONE_SECOND);
http://git-wip-us.apache.org/repos/asf/james-project/blob/79cf1c26/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
index 8f3ee9d..b8d5cee 100644
--- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
+++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
@@ -49,6 +49,7 @@ import com.rabbitmq.client.Connection;
import reactor.core.publisher.Mono;
import reactor.rabbitmq.BindingSpecification;
+import reactor.rabbitmq.ExchangeSpecification;
import reactor.rabbitmq.QueueSpecification;
import reactor.rabbitmq.RabbitFlux;
import reactor.rabbitmq.Receiver;
@@ -56,7 +57,8 @@ import reactor.rabbitmq.ReceiverOptions;
import reactor.rabbitmq.Sender;
import reactor.rabbitmq.SenderOptions;
-class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract, GroupContract.MultipleEventBusGroupContract {
+class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract, GroupContract.MultipleEventBusGroupContract,
+ EventBusConcurrentTestContract.SingleEventBusConcurrentContract, EventBusConcurrentTestContract.MultiEventBusConcurrentContract {
@RegisterExtension
static RabbitMQExtension rabbitMQExtension = new RabbitMQExtension();
@@ -89,6 +91,8 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
ALL_GROUPS.stream()
.map(groupClass -> GroupRegistration.WorkQueueName.of(groupClass).asString())
.forEach(queueName -> sender.delete(QueueSpecification.queue(queueName)).block());
+ sender.delete(ExchangeSpecification.exchange(MAILBOX_EVENT_EXCHANGE_NAME)).block();
+ sender.close();
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org
[8/8] james-project git commit: MAILBOX-368 Add Multiple EventBus
tests for group registration
Posted by bt...@apache.org.
MAILBOX-368 Add Multiple EventBus tests for group registration
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/9fe9e627
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/9fe9e627
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/9fe9e627
Branch: refs/heads/master
Commit: 9fe9e627ac293a14f99a3e5e8901a21f03c70a39
Parents: ab2fa64
Author: datph <dp...@linagora.com>
Authored: Tue Jan 8 14:57:29 2019 +0700
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Fri Jan 11 16:57:08 2019 +0700
----------------------------------------------------------------------
.../james/mailbox/events/EventBusContract.java | 5 +++
.../james/mailbox/events/GroupContract.java | 39 ++++++++++++++++++++
.../mailbox/events/RabbitMQEventBusTest.java | 11 +++++-
3 files changed, 54 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/9fe9e627/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusContract.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusContract.java
index 50d8d67..4abbdee 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusContract.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusContract.java
@@ -21,5 +21,10 @@ package org.apache.james.mailbox.events;
public interface EventBusContract {
+ interface MultipleEventBusContract extends EventBusContract {
+
+ EventBus eventBus2();
+ }
+
EventBus eventBus();
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/james-project/blob/9fe9e627/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
index 1820e80..01b3583 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
@@ -201,4 +201,43 @@ public interface GroupContract {
verify(listener, timeout(ONE_SECOND).times(1)).event(any());
}
}
+
+ interface MultipleEventBusGroupContract extends EventBusContract.MultipleEventBusContract {
+
+ @Test
+ default void groupsDefinedOnlyOnSomeNodesShouldBeNotified() {
+ MailboxListener mailboxListener = newListener();
+
+ eventBus().register(mailboxListener, new GroupA());
+
+ eventBus2().dispatch(EVENT, NO_KEYS).block();
+
+ verify(mailboxListener, timeout(ONE_SECOND).times(1)).event(any());
+ }
+
+ @Test
+ default void groupListenersShouldBeExecutedOnceInAControlledEnvironment() {
+ MailboxListener mailboxListener = newListener();
+
+ eventBus().register(mailboxListener, new GroupA());
+ eventBus2().register(mailboxListener, new GroupA());
+
+ eventBus2().dispatch(EVENT, NO_KEYS).block();
+
+ verify(mailboxListener, timeout(ONE_SECOND).times(1)).event(any());
+ }
+
+ @Test
+ default void unregisterShouldStopNotificationForDistantGroups() {
+ MailboxListener mailboxListener = newListener();
+
+ eventBus().register(mailboxListener, new GroupA()).unregister();
+
+ eventBus2().dispatch(EVENT, NO_KEYS).block();
+
+
+ verify(mailboxListener, after(FIVE_HUNDRED_MS).never())
+ .event(any());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/9fe9e627/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
index b39fb42..8f3ee9d 100644
--- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
+++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
@@ -56,12 +56,13 @@ import reactor.rabbitmq.ReceiverOptions;
import reactor.rabbitmq.Sender;
import reactor.rabbitmq.SenderOptions;
-class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract {
+class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract, GroupContract.MultipleEventBusGroupContract {
@RegisterExtension
static RabbitMQExtension rabbitMQExtension = new RabbitMQExtension();
private RabbitMQEventBus eventBus;
+ private RabbitMQEventBus eventBus2;
private Sender sender;
private RabbitMQConnectionFactory connectionFactory;
private EventSerializer eventSerializer;
@@ -75,13 +76,16 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract
eventSerializer = new EventSerializer(mailboxIdFactory, new TestMessageId.Factory());
eventBus = new RabbitMQEventBus(connectionFactory, eventSerializer);
+ eventBus2 = new RabbitMQEventBus(connectionFactory, eventSerializer);
eventBus.start();
+ eventBus2.start();
sender = RabbitFlux.createSender(new SenderOptions().connectionMono(connectionMono));
}
@AfterEach
void tearDown() {
eventBus.stop();
+ eventBus2.stop();
ALL_GROUPS.stream()
.map(groupClass -> GroupRegistration.WorkQueueName.of(groupClass).asString())
.forEach(queueName -> sender.delete(QueueSpecification.queue(queueName)).block());
@@ -92,6 +96,11 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract
return eventBus;
}
+ @Override
+ public EventBus eventBus2() {
+ return eventBus2;
+ }
+
@Nested
class PublishingTest {
private static final String MAILBOX_WORK_QUEUE_NAME = MAILBOX_EVENT + "-workQueue";
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org
[6/8] james-project git commit: MAILBOX-368 Separating Group and Key
Contract from EventBusContract
Posted by bt...@apache.org.
MAILBOX-368 Separating Group and Key Contract from EventBusContract
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/bca9ba01
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/bca9ba01
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/bca9ba01
Branch: refs/heads/master
Commit: bca9ba01b8d52562844c14653f9196a617cc1ae9
Parents: 92842f4
Author: datph <dp...@linagora.com>
Authored: Tue Jan 8 14:03:35 2019 +0700
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Fri Jan 11 16:57:08 2019 +0700
----------------------------------------------------------------------
.../james/mailbox/events/EventBusContract.java | 359 -------------------
.../mailbox/events/EventBusTestFixture.java | 65 ++++
.../events/EventDeadLettersContract.java | 4 +-
.../james/mailbox/events/GroupContract.java | 167 +++++++++
.../james/mailbox/events/KeyContract.java | 246 +++++++++++++
.../james/mailbox/events/InVMEventBusTest.java | 2 +-
6 files changed, 481 insertions(+), 362 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/bca9ba01/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusContract.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusContract.java
index ab54208..50d8d67 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusContract.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusContract.java
@@ -19,366 +19,7 @@
package org.apache.james.mailbox.events;
-import static org.assertj.core.api.Assertions.assertThatCode;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.junit.jupiter.api.Assertions.assertTimeout;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.after;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
-
-import java.time.Duration;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-
-import org.apache.james.core.User;
-import org.apache.james.mailbox.Event;
-import org.apache.james.mailbox.MailboxListener;
-import org.apache.james.mailbox.MailboxSession;
-import org.apache.james.mailbox.model.MailboxConstants;
-import org.apache.james.mailbox.model.MailboxId;
-import org.apache.james.mailbox.model.MailboxPath;
-import org.apache.james.mailbox.model.TestId;
-import org.junit.jupiter.api.Test;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.ImmutableSortedMap;
-
public interface EventBusContract {
- MailboxListener.MailboxEvent EVENT = new MailboxListener.MailboxAdded(
- MailboxSession.SessionId.of(42),
- User.fromUsername("user"),
- new MailboxPath(MailboxConstants.USER_NAMESPACE, "user", "mailboxName"),
- TestId.of(18),
- Event.EventId.random());
- MailboxListener.Added NOOP_EVENT = new MailboxListener.Added(MailboxSession.SessionId.of(18), User.fromUsername("bob"), MailboxPath.forUser("bob", "mailbox"), TestId.of(58), ImmutableSortedMap.of(), Event.EventId.random());
-
- class GroupA extends Group {}
-
- class GroupB extends Group {}
-
- int ONE_SECOND = 1000;
- int FIVE_HUNDRED_MS = 500;
- MailboxId ID_1 = TestId.of(18);
- MailboxId ID_2 = TestId.of(24);
- ImmutableSet<RegistrationKey> NO_KEYS = ImmutableSet.of();
- MailboxIdRegistrationKey KEY_1 = new MailboxIdRegistrationKey(ID_1);
- MailboxIdRegistrationKey KEY_2 = new MailboxIdRegistrationKey(ID_2);
- List<Class<? extends Group>> ALL_GROUPS = ImmutableList.of(GroupA.class, GroupB.class);
EventBus eventBus();
-
- default MailboxListener newListener() {
- MailboxListener listener = mock(MailboxListener.class);
- when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
- return listener;
- }
-
- @Test
- default void listenerGroupShouldReceiveEvents() {
- MailboxListener listener = newListener();
-
- eventBus().register(listener, new GroupA());
-
- eventBus().dispatch(EVENT, NO_KEYS).block();
-
- verify(listener, timeout(ONE_SECOND).times(1)).event(any());
- }
-
- @Test
- default void groupListenersShouldNotReceiveNoopEvents() {
- MailboxListener listener = newListener();
-
- eventBus().register(listener, new GroupA());
-
- eventBus().dispatch(NOOP_EVENT, NO_KEYS).block();
-
- verifyNoMoreInteractions(listener);
- }
-
- @Test
- default void registeredListenersShouldNotReceiveNoopEvents() {
- MailboxListener listener = newListener();
-
- eventBus().register(listener, KEY_1);
-
- eventBus().dispatch(NOOP_EVENT, KEY_1).block();
-
- verifyNoMoreInteractions(listener);
- }
-
- @Test
- default void dispatchShouldNotThrowWhenAGroupListenerFails() {
- MailboxListener listener = newListener();
- doThrow(new RuntimeException()).when(listener).event(any());
-
- eventBus().register(listener, new GroupA());
-
- assertThatCode(() -> eventBus().dispatch(EVENT, NO_KEYS).block())
- .doesNotThrowAnyException();
- }
-
- @Test
- default void dispatchShouldNotThrowWhenARegisteredListenerFails() {
- MailboxListener listener = newListener();
- doThrow(new RuntimeException()).when(listener).event(any());
-
- eventBus().register(listener, KEY_1);
-
- assertThatCode(() -> eventBus().dispatch(EVENT, NO_KEYS).block())
- .doesNotThrowAnyException();
- }
-
- @Test
- default void eachListenerGroupShouldReceiveEvents() {
- MailboxListener listener = newListener();
- MailboxListener listener2 = newListener();
- eventBus().register(listener, new GroupA());
- eventBus().register(listener2, new GroupB());
-
- eventBus().dispatch(EVENT, NO_KEYS).block();
-
- verify(listener, timeout(ONE_SECOND).times(1)).event(any());
- verify(listener2, timeout(ONE_SECOND).times(1)).event(any());
- }
-
- @Test
- default void unregisteredGroupListenerShouldNotReceiveEvents() {
- MailboxListener listener = newListener();
- Registration registration = eventBus().register(listener, new GroupA());
-
- registration.unregister();
-
- eventBus().dispatch(EVENT, NO_KEYS).block();
- verify(listener, after(FIVE_HUNDRED_MS).never())
- .event(any());
- }
-
- @Test
- default void registerShouldThrowWhenAGroupIsAlreadyUsed() {
- MailboxListener listener = newListener();
- MailboxListener listener2 = newListener();
-
- eventBus().register(listener, new GroupA());
-
- assertThatThrownBy(() -> eventBus().register(listener2, new GroupA()))
- .isInstanceOf(GroupAlreadyRegistered.class);
- }
-
- @Test
- default void registerShouldNotThrowOnAnUnregisteredGroup() {
- MailboxListener listener = newListener();
- MailboxListener listener2 = newListener();
-
- eventBus().register(listener, new GroupA()).unregister();
-
- assertThatCode(() -> eventBus().register(listener2, new GroupA()))
- .doesNotThrowAnyException();
- }
-
- @Test
- default void unregisterShouldBeIdempotentForGroups() {
- MailboxListener listener = newListener();
-
- Registration registration = eventBus().register(listener, new GroupA());
- registration.unregister();
-
- assertThatCode(registration::unregister)
- .doesNotThrowAnyException();
- }
-
- @Test
- default void registerShouldAcceptAlreadyUnregisteredGroups() {
- MailboxListener listener = newListener();
-
- eventBus().register(listener, new GroupA()).unregister();
- eventBus().register(listener, new GroupA());
-
- eventBus().dispatch(EVENT, NO_KEYS).block();
-
- verify(listener, timeout(ONE_SECOND).times(1)).event(any());
- }
-
- @Test
- default void dispatchShouldNotNotifyRegisteredListenerWhenEmptyKeySet() {
- MailboxListener listener = newListener();
- eventBus().register(listener, KEY_1);
-
- eventBus().dispatch(EVENT, NO_KEYS).block();
-
- verify(listener, after(FIVE_HUNDRED_MS).never())
- .event(any());
- }
-
- @Test
- default void dispatchShouldNotNotifyListenerRegisteredOnOtherKeys() {
- MailboxListener listener = newListener();
- eventBus().register(listener, KEY_1);
-
- eventBus().dispatch(EVENT, ImmutableSet.of(KEY_2)).block();
-
- verify(listener, after(FIVE_HUNDRED_MS).never())
- .event(any());
- }
-
- @Test
- default void dispatchShouldNotifyRegisteredListeners() {
- MailboxListener listener = newListener();
- eventBus().register(listener, KEY_1);
-
- eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
-
- verify(listener, timeout(ONE_SECOND).times(1)).event(any());
- }
-
- @Test
- default void dispatchShouldNotifyOnlyRegisteredListener() {
- MailboxListener listener = newListener();
- MailboxListener listener2 = newListener();
- eventBus().register(listener, KEY_1);
- eventBus().register(listener2, KEY_2);
-
- eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
-
- verify(listener, timeout(ONE_SECOND).times(1)).event(any());
- verify(listener2, after(FIVE_HUNDRED_MS).never())
- .event(any());
- }
-
- @Test
- default void dispatchShouldNotifyAllListenersRegisteredOnAKey() {
- MailboxListener listener = newListener();
- MailboxListener listener2 = newListener();
- eventBus().register(listener, KEY_1);
- eventBus().register(listener2, KEY_1);
-
- eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
-
- verify(listener, timeout(ONE_SECOND).times(1)).event(any());
- verify(listener2, timeout(ONE_SECOND).times(1)).event(any());
- }
-
- @Test
- default void registerShouldAllowDuplicatedRegistration() {
- MailboxListener listener = newListener();
- eventBus().register(listener, KEY_1);
- eventBus().register(listener, KEY_1);
-
- eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
-
- verify(listener, timeout(ONE_SECOND).times(1)).event(any());
- }
-
- @Test
- default void unregisterShouldRemoveDoubleRegisteredListener() {
- MailboxListener listener = newListener();
- eventBus().register(listener, KEY_1);
- eventBus().register(listener, KEY_1).unregister();
-
- eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
-
- verify(listener, after(FIVE_HUNDRED_MS).never())
- .event(any());
- }
-
- @Test
- default void callingAllUnregisterMethodShouldUnregisterTheListener() {
- MailboxListener listener = newListener();
- Registration registration = eventBus().register(listener, KEY_1);
- eventBus().register(listener, KEY_1).unregister();
- registration.unregister();
-
- eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
-
- verify(listener, after(FIVE_HUNDRED_MS).never())
- .event(any());
- }
-
- @Test
- default void unregisterShouldHaveNotNotifyWhenCalledOnDifferentKeys() {
- MailboxListener listener = newListener();
- eventBus().register(listener, KEY_1);
- eventBus().register(listener, KEY_2).unregister();
-
- eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
-
- verify(listener, timeout(ONE_SECOND).times(1)).event(any());
- }
-
- @Test
- default void unregisterShouldBeIdempotentForKeyRegistrations() {
- MailboxListener listener = newListener();
-
- Registration registration = eventBus().register(listener, KEY_1);
- registration.unregister();
-
- assertThatCode(registration::unregister)
- .doesNotThrowAnyException();
- }
-
- @Test
- default void dispatchShouldAcceptSeveralKeys() {
- MailboxListener listener = newListener();
- eventBus().register(listener, KEY_1);
-
- eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1, KEY_2)).block();
-
- verify(listener, timeout(ONE_SECOND).times(1)).event(any());
- }
-
- @Test
- default void dispatchShouldCallListenerOnceWhenSeveralKeysMatching() {
- MailboxListener listener = newListener();
- eventBus().register(listener, KEY_1);
- eventBus().register(listener, KEY_2);
-
- eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1, KEY_2)).block();
-
- verify(listener, timeout(ONE_SECOND).times(1)).event(any());
- }
-
- @Test
- default void dispatchShouldNotNotifyUnregisteredListener() {
- MailboxListener listener = newListener();
- eventBus().register(listener, KEY_1).unregister();
-
- eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
-
- verify(listener, after(FIVE_HUNDRED_MS).never())
- .event(any());
- }
-
- @Test
- default void dispatchShouldCallSynchronousListener() {
- MailboxListener listener = newListener();
-
- eventBus().register(listener, new GroupA());
-
- eventBus().dispatch(EVENT, NO_KEYS).block();
-
- verify(listener, timeout(ONE_SECOND).times(1)).event(any());
- }
-
- @Test
- default void dispatchShouldNotBlockAsynchronousListener() {
- MailboxListener listener = newListener();
- when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
- CountDownLatch latch = new CountDownLatch(1);
- doAnswer(invocation -> {
- latch.await();
- return null;
- }).when(listener).event(EVENT);
-
- assertTimeout(Duration.ofSeconds(2),
- () -> {
- eventBus().dispatch(EVENT, NO_KEYS).block();
- latch.countDown();
- });
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/james-project/blob/bca9ba01/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java
new file mode 100644
index 0000000..ae2116d
--- /dev/null
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java
@@ -0,0 +1,65 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+
+import org.apache.james.core.User;
+import org.apache.james.mailbox.Event;
+import org.apache.james.mailbox.MailboxListener;
+import org.apache.james.mailbox.MailboxSession;
+import org.apache.james.mailbox.model.MailboxConstants;
+import org.apache.james.mailbox.model.MailboxId;
+import org.apache.james.mailbox.model.MailboxPath;
+import org.apache.james.mailbox.model.TestId;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+public interface EventBusTestFixture {
+
+ class GroupA extends Group {}
+ class GroupB extends Group {}
+
+ MailboxListener.MailboxEvent EVENT = new MailboxListener.MailboxAdded(
+ MailboxSession.SessionId.of(42),
+ User.fromUsername("user"),
+ new MailboxPath(MailboxConstants.USER_NAMESPACE, "user", "mailboxName"),
+ TestId.of(18),
+ Event.EventId.random());
+
+ int ONE_SECOND = 1000;
+ int FIVE_HUNDRED_MS = 500;
+ MailboxId ID_1 = TestId.of(18);
+ MailboxId ID_2 = TestId.of(24);
+ ImmutableSet<RegistrationKey> NO_KEYS = ImmutableSet.of();
+ MailboxIdRegistrationKey KEY_1 = new MailboxIdRegistrationKey(ID_1);
+ MailboxIdRegistrationKey KEY_2 = new MailboxIdRegistrationKey(ID_2);
+ List<Class<? extends Group>> ALL_GROUPS = ImmutableList.of(GroupA.class, GroupB.class);
+
+ static MailboxListener newListener() {
+ MailboxListener listener = mock(MailboxListener.class);
+ when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
+ return listener;
+ }
+}
http://git-wip-us.apache.org/repos/asf/james-project/blob/bca9ba01/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventDeadLettersContract.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventDeadLettersContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventDeadLettersContract.java
index d70395a..e2282a1 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventDeadLettersContract.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventDeadLettersContract.java
@@ -86,8 +86,8 @@ interface EventDeadLettersContract {
MailboxListener.MailboxAdded EVENT_2 = new MailboxListener.MailboxAdded(SESSION_ID, USER, MAILBOX_PATH, MAILBOX_ID, EVENT_ID_2);
MailboxListener.MailboxAdded EVENT_3 = new MailboxListener.MailboxAdded(SESSION_ID, USER, MAILBOX_PATH, MAILBOX_ID, EVENT_ID_3);
- Group GROUP_A = new EventBusContract.GroupA();
- Group GROUP_B = new EventBusContract.GroupB();
+ Group GROUP_A = new EventBusTestFixture.GroupA();
+ Group GROUP_B = new EventBusTestFixture.GroupB();
Group NULL_GROUP = null;
Event NULL_EVENT = null;
Event.EventId NULL_EVENT_ID = null;
http://git-wip-us.apache.org/repos/asf/james-project/blob/bca9ba01/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
new file mode 100644
index 0000000..0975e3b
--- /dev/null
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
@@ -0,0 +1,167 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events;
+
+import static org.apache.james.mailbox.events.EventBusTestFixture.EVENT;
+import static org.apache.james.mailbox.events.EventBusTestFixture.FIVE_HUNDRED_MS;
+import static org.apache.james.mailbox.events.EventBusTestFixture.GroupA;
+import static org.apache.james.mailbox.events.EventBusTestFixture.GroupB;
+import static org.apache.james.mailbox.events.EventBusTestFixture.NO_KEYS;
+import static org.apache.james.mailbox.events.EventBusTestFixture.ONE_SECOND;
+import static org.apache.james.mailbox.events.EventBusTestFixture.newListener;
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.after;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+
+import org.apache.james.core.User;
+import org.apache.james.mailbox.Event;
+import org.apache.james.mailbox.MailboxListener;
+import org.apache.james.mailbox.MailboxSession;
+import org.apache.james.mailbox.model.MailboxPath;
+import org.apache.james.mailbox.model.TestId;
+import org.junit.jupiter.api.Test;
+
+import com.google.common.collect.ImmutableSortedMap;
+
+public interface GroupContract {
+
+ interface SingleEventBusGroupContract extends EventBusContract {
+
+ @Test
+ default void listenerGroupShouldReceiveEvents() {
+ MailboxListener listener = newListener();
+
+ eventBus().register(listener, new GroupA());
+
+ eventBus().dispatch(EVENT, NO_KEYS).block();
+
+ verify(listener, timeout(ONE_SECOND).times(1)).event(any());
+ }
+
+ @Test
+ default void groupListenersShouldNotReceiveNoopEvents() {
+ MailboxListener listener = newListener();
+
+ eventBus().register(listener, new GroupA());
+
+ MailboxListener.Added noopEvent = new MailboxListener.Added(MailboxSession.SessionId.of(18), User.fromUsername("bob"), MailboxPath.forUser("bob", "mailbox"), TestId.of(58), ImmutableSortedMap.of(), Event.EventId.random());
+ eventBus().dispatch(noopEvent, NO_KEYS).block();
+
+ verify(listener, after(FIVE_HUNDRED_MS).never())
+ .event(any());
+ }
+
+ @Test
+ default void dispatchShouldNotThrowWhenAGroupListenerFails() {
+ MailboxListener listener = newListener();
+ doThrow(new RuntimeException()).when(listener).event(any());
+
+ eventBus().register(listener, new GroupA());
+
+ assertThatCode(() -> eventBus().dispatch(EVENT, NO_KEYS).block())
+ .doesNotThrowAnyException();
+ }
+
+ @Test
+ default void eachListenerGroupShouldReceiveEvents() {
+ MailboxListener listener = newListener();
+ MailboxListener listener2 = newListener();
+ eventBus().register(listener, new GroupA());
+ eventBus().register(listener2, new GroupB());
+
+ eventBus().dispatch(EVENT, NO_KEYS).block();
+
+ verify(listener, timeout(ONE_SECOND).times(1)).event(any());
+ verify(listener2, timeout(ONE_SECOND).times(1)).event(any());
+ }
+
+ @Test
+ default void unregisteredGroupListenerShouldNotReceiveEvents() {
+ MailboxListener listener = newListener();
+ Registration registration = eventBus().register(listener, new GroupA());
+
+ registration.unregister();
+
+ eventBus().dispatch(EVENT, NO_KEYS).block();
+ verify(listener, after(FIVE_HUNDRED_MS).never())
+ .event(any());
+ }
+
+ @Test
+ default void registerShouldThrowWhenAGroupIsAlreadyUsed() {
+ MailboxListener listener = newListener();
+ MailboxListener listener2 = newListener();
+
+ eventBus().register(listener, new GroupA());
+
+ assertThatThrownBy(() -> eventBus().register(listener2, new GroupA()))
+ .isInstanceOf(GroupAlreadyRegistered.class);
+ }
+
+ @Test
+ default void registerShouldNotThrowOnAnUnregisteredGroup() {
+ MailboxListener listener = newListener();
+ MailboxListener listener2 = newListener();
+
+ eventBus().register(listener, new GroupA()).unregister();
+
+ assertThatCode(() -> eventBus().register(listener2, new GroupA()))
+ .doesNotThrowAnyException();
+ }
+
+ @Test
+ default void unregisterShouldBeIdempotentForGroups() {
+ MailboxListener listener = newListener();
+
+ Registration registration = eventBus().register(listener, new GroupA());
+ registration.unregister();
+
+ assertThatCode(registration::unregister)
+ .doesNotThrowAnyException();
+ }
+
+ @Test
+ default void registerShouldAcceptAlreadyUnregisteredGroups() {
+ MailboxListener listener = newListener();
+
+ eventBus().register(listener, new GroupA()).unregister();
+ eventBus().register(listener, new GroupA());
+
+ eventBus().dispatch(EVENT, NO_KEYS).block();
+
+ verify(listener, timeout(ONE_SECOND).times(1)).event(any());
+ }
+
+ @Test
+ default void dispatchShouldCallSynchronousListener() {
+ MailboxListener listener = newListener();
+
+ eventBus().register(listener, new GroupA());
+
+ eventBus().dispatch(EVENT, NO_KEYS).block();
+
+ verify(listener, timeout(ONE_SECOND).times(1)).event(any());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/james-project/blob/bca9ba01/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java
new file mode 100644
index 0000000..bd92d82
--- /dev/null
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java
@@ -0,0 +1,246 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events;
+
+import static org.apache.james.mailbox.events.EventBusTestFixture.EVENT;
+import static org.apache.james.mailbox.events.EventBusTestFixture.FIVE_HUNDRED_MS;
+import static org.apache.james.mailbox.events.EventBusTestFixture.KEY_1;
+import static org.apache.james.mailbox.events.EventBusTestFixture.KEY_2;
+import static org.apache.james.mailbox.events.EventBusTestFixture.NO_KEYS;
+import static org.apache.james.mailbox.events.EventBusTestFixture.ONE_SECOND;
+import static org.apache.james.mailbox.events.EventBusTestFixture.newListener;
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.junit.jupiter.api.Assertions.assertTimeout;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.after;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.time.Duration;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.james.core.User;
+import org.apache.james.mailbox.Event;
+import org.apache.james.mailbox.MailboxListener;
+import org.apache.james.mailbox.MailboxSession;
+import org.apache.james.mailbox.model.MailboxPath;
+import org.apache.james.mailbox.model.TestId;
+import org.junit.jupiter.api.Test;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableSortedMap;
+
+public interface KeyContract extends EventBusContract {
+
+ interface SingleEventBusKeyContract extends EventBusContract {
+ @Test
+ default void registeredListenersShouldNotReceiveNoopEvents() {
+ MailboxListener listener = newListener();
+
+ eventBus().register(listener, KEY_1);
+
+ MailboxListener.Added noopEvent = new MailboxListener.Added(MailboxSession.SessionId.of(18), User.fromUsername("bob"), MailboxPath.forUser("bob", "mailbox"), TestId.of(58), ImmutableSortedMap.of(), Event.EventId.random());
+ eventBus().dispatch(noopEvent, KEY_1).block();
+
+ verify(listener, after(FIVE_HUNDRED_MS).never())
+ .event(any());
+ }
+
+ @Test
+ default void dispatchShouldNotThrowWhenARegisteredListenerFails() {
+ MailboxListener listener = newListener();
+ doThrow(new RuntimeException()).when(listener).event(any());
+
+ eventBus().register(listener, KEY_1);
+
+ assertThatCode(() -> eventBus().dispatch(EVENT, NO_KEYS).block())
+ .doesNotThrowAnyException();
+ }
+
+ @Test
+ default void dispatchShouldNotNotifyRegisteredListenerWhenEmptyKeySet() {
+ MailboxListener listener = newListener();
+ eventBus().register(listener, KEY_1);
+
+ eventBus().dispatch(EVENT, NO_KEYS).block();
+
+ verify(listener, after(FIVE_HUNDRED_MS).never())
+ .event(any());
+ }
+
+ @Test
+ default void dispatchShouldNotNotifyListenerRegisteredOnOtherKeys() {
+ MailboxListener listener = newListener();
+ eventBus().register(listener, KEY_1);
+
+ eventBus().dispatch(EVENT, ImmutableSet.of(KEY_2)).block();
+
+ verify(listener, after(FIVE_HUNDRED_MS).never())
+ .event(any());
+ }
+
+ @Test
+ default void dispatchShouldNotifyRegisteredListeners() {
+ MailboxListener listener = newListener();
+ eventBus().register(listener, KEY_1);
+
+ eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
+
+ verify(listener, timeout(ONE_SECOND).times(1)).event(any());
+ }
+
+ @Test
+ default void dispatchShouldNotifyOnlyRegisteredListener() {
+ MailboxListener listener = newListener();
+ MailboxListener listener2 = newListener();
+ eventBus().register(listener, KEY_1);
+ eventBus().register(listener2, KEY_2);
+
+ eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
+
+ verify(listener, timeout(ONE_SECOND).times(1)).event(any());
+ verify(listener2, after(FIVE_HUNDRED_MS).never())
+ .event(any());
+ }
+
+ @Test
+ default void dispatchShouldNotifyAllListenersRegisteredOnAKey() {
+ MailboxListener listener = newListener();
+ MailboxListener listener2 = newListener();
+ eventBus().register(listener, KEY_1);
+ eventBus().register(listener2, KEY_1);
+
+ eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
+
+ verify(listener, timeout(ONE_SECOND).times(1)).event(any());
+ verify(listener2, timeout(ONE_SECOND).times(1)).event(any());
+ }
+
+ @Test
+ default void registerShouldAllowDuplicatedRegistration() {
+ MailboxListener listener = newListener();
+ eventBus().register(listener, KEY_1);
+ eventBus().register(listener, KEY_1);
+
+ eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
+
+ verify(listener, timeout(ONE_SECOND).times(1)).event(any());
+ }
+
+ @Test
+ default void unregisterShouldRemoveDoubleRegisteredListener() {
+ MailboxListener listener = newListener();
+ eventBus().register(listener, KEY_1);
+ eventBus().register(listener, KEY_1).unregister();
+
+ eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
+
+ verify(listener, after(FIVE_HUNDRED_MS).never())
+ .event(any());
+ }
+
+ @Test
+ default void callingAllUnregisterMethodShouldUnregisterTheListener() {
+ MailboxListener listener = newListener();
+ Registration registration = eventBus().register(listener, KEY_1);
+ eventBus().register(listener, KEY_1).unregister();
+ registration.unregister();
+
+ eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
+
+ verify(listener, after(FIVE_HUNDRED_MS).never())
+ .event(any());
+ }
+
+ @Test
+ default void unregisterShouldHaveNotNotifyWhenCalledOnDifferentKeys() {
+ MailboxListener listener = newListener();
+ eventBus().register(listener, KEY_1);
+ eventBus().register(listener, KEY_2).unregister();
+
+ eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
+
+ verify(listener, timeout(ONE_SECOND).times(1)).event(any());
+ }
+
+ @Test
+ default void unregisterShouldBeIdempotentForKeyRegistrations() {
+ MailboxListener listener = newListener();
+
+ Registration registration = eventBus().register(listener, KEY_1);
+ registration.unregister();
+
+ assertThatCode(registration::unregister)
+ .doesNotThrowAnyException();
+ }
+
+ @Test
+ default void dispatchShouldAcceptSeveralKeys() {
+ MailboxListener listener = newListener();
+ eventBus().register(listener, KEY_1);
+
+ eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1, KEY_2)).block();
+
+ verify(listener, timeout(ONE_SECOND).times(1)).event(any());
+ }
+
+ @Test
+ default void dispatchShouldCallListenerOnceWhenSeveralKeysMatching() {
+ MailboxListener listener = newListener();
+ eventBus().register(listener, KEY_1);
+ eventBus().register(listener, KEY_2);
+
+ eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1, KEY_2)).block();
+
+ verify(listener, timeout(ONE_SECOND).times(1)).event(any());
+ }
+
+ @Test
+ default void dispatchShouldNotNotifyUnregisteredListener() {
+ MailboxListener listener = newListener();
+ eventBus().register(listener, KEY_1).unregister();
+
+ eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
+
+ verify(listener, after(FIVE_HUNDRED_MS).never())
+ .event(any());
+ }
+
+ @Test
+ default void dispatchShouldNotBlockAsynchronousListener() {
+ MailboxListener listener = newListener();
+ when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
+ CountDownLatch latch = new CountDownLatch(1);
+ doAnswer(invocation -> {
+ latch.await();
+ return null;
+ }).when(listener).event(EVENT);
+
+ assertTimeout(Duration.ofSeconds(2),
+ () -> {
+ eventBus().dispatch(EVENT, NO_KEYS).block();
+ latch.countDown();
+ });
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/james-project/blob/bca9ba01/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/InVMEventBusTest.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/InVMEventBusTest.java b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/InVMEventBusTest.java
index 6d278aa..0241ac7 100644
--- a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/InVMEventBusTest.java
+++ b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/InVMEventBusTest.java
@@ -23,7 +23,7 @@ import org.apache.james.mailbox.events.delivery.InVmEventDelivery;
import org.apache.james.metrics.api.NoopMetricFactory;
import org.junit.jupiter.api.BeforeEach;
-public class InVMEventBusTest implements EventBusContract {
+public class InVMEventBusTest implements KeyContract.SingleEventBusKeyContract, GroupContract.SingleEventBusGroupContract {
private InVMEventBus eventBus;
@BeforeEach
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org
[3/8] james-project git commit: JAMES-2630 Add Functional utils
Posted by bt...@apache.org.
JAMES-2630 Add Functional utils
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/08c2f450
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/08c2f450
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/08c2f450
Branch: refs/heads/master
Commit: 08c2f450ad1151426ecc78010f407960a0d21834
Parents: ebab03a
Author: Gautier DI FOLCO <gd...@linagora.com>
Authored: Tue Jan 8 09:46:01 2019 +0100
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Fri Jan 11 16:57:08 2019 +0700
----------------------------------------------------------------------
.../org/apache/james/util/FunctionalUtils.java | 37 ++++++++++
.../apache/james/util/FunctionalUtilsTest.java | 72 ++++++++++++++++++++
2 files changed, 109 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/08c2f450/server/container/util/src/main/java/org/apache/james/util/FunctionalUtils.java
----------------------------------------------------------------------
diff --git a/server/container/util/src/main/java/org/apache/james/util/FunctionalUtils.java b/server/container/util/src/main/java/org/apache/james/util/FunctionalUtils.java
new file mode 100644
index 0000000..54c8305
--- /dev/null
+++ b/server/container/util/src/main/java/org/apache/james/util/FunctionalUtils.java
@@ -0,0 +1,37 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+package org.apache.james.util;
+
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.UnaryOperator;
+
+public class FunctionalUtils {
+ public static <T> UnaryOperator<T> toFunction(Consumer<T> consumer) {
+ return argument -> {
+ consumer.accept(argument);
+ return argument;
+ };
+ }
+
+ public static <T> Predicate<T> toPredicate(Function<T, Boolean> function) {
+ return value -> function.apply(value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/james-project/blob/08c2f450/server/container/util/src/test/java/org/apache/james/util/FunctionalUtilsTest.java
----------------------------------------------------------------------
diff --git a/server/container/util/src/test/java/org/apache/james/util/FunctionalUtilsTest.java b/server/container/util/src/test/java/org/apache/james/util/FunctionalUtilsTest.java
new file mode 100644
index 0000000..8387a53
--- /dev/null
+++ b/server/container/util/src/test/java/org/apache/james/util/FunctionalUtilsTest.java
@@ -0,0 +1,72 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+package org.apache.james.util;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Predicate;
+
+class FunctionalUtilsTest {
+
+ @Nested
+ class ToFunction {
+ @Test
+ void shouldCallConsumerAndReturnTheGivenParameter() {
+ Counter counter = new Counter(26);
+ Consumer<Integer> consumer = counter::increment;
+ Function<Integer, Integer> function = FunctionalUtils.toFunction(consumer);
+
+ assertThat(function.apply(16)).isEqualTo(16);
+ assertThat(counter.getCounter()).isEqualTo(42);
+ }
+
+ private class Counter {
+ private Integer counter;
+
+ public Counter(Integer counter) {
+ this.counter = counter;
+ }
+
+ public void increment(Integer other) {
+ counter += other;
+ }
+
+ public Integer getCounter() {
+ return counter;
+ }
+ }
+ }
+
+ @Nested
+ class ToPredicate {
+ @Test
+ void shouldKeepProperty() {
+ Function<Integer, Boolean> function = value -> value % 42 == 0;
+ Predicate<Integer> predicate = FunctionalUtils.toPredicate(function);
+
+ assertThat(predicate.test(5)).isFalse();
+ assertThat(predicate.test(42)).isTrue();
+ }
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org
[4/8] james-project git commit: JAMES-2630 Add Reactor utils
Posted by bt...@apache.org.
JAMES-2630 Add Reactor utils
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/92842f40
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/92842f40
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/92842f40
Branch: refs/heads/master
Commit: 92842f40a6efba09fcbdb9f1295bd2cd719be6d7
Parents: 08c2f45
Author: Gautier DI FOLCO <gd...@linagora.com>
Authored: Tue Jan 8 09:46:52 2019 +0100
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Fri Jan 11 16:57:08 2019 +0700
----------------------------------------------------------------------
.../org/apache/james/util/ReactorUtils.java | 27 ++++++++
.../org/apache/james/util/ReactorUtilsTest.java | 72 ++++++++++++++++++++
2 files changed, 99 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/92842f40/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
----------------------------------------------------------------------
diff --git a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
new file mode 100644
index 0000000..42e6d8e
--- /dev/null
+++ b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
@@ -0,0 +1,27 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+package org.apache.james.util;
+
+import reactor.core.publisher.Mono;
+
+public class ReactorUtils {
+ public static <T> Mono<T> executeAndEmpty(Runnable runnable) {
+ return Mono.fromRunnable(runnable).then(Mono.empty());
+ }
+}
http://git-wip-us.apache.org/repos/asf/james-project/blob/92842f40/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
----------------------------------------------------------------------
diff --git a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
new file mode 100644
index 0000000..3c991e0
--- /dev/null
+++ b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
@@ -0,0 +1,72 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+package org.apache.james.util;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+
+import reactor.core.publisher.Mono;
+
+class ReactorUtilsTest {
+
+ @Nested
+ class ExecuteAndEmpty {
+ @Test
+ void shouldExecuteTheRunnableAndReturnEmpty() {
+ Counter counter = new Counter(1);
+
+ Mono<?> reactor = Mono.empty()
+ .switchIfEmpty(ReactorUtils.executeAndEmpty(() -> counter.increment(2)))
+ .map(FunctionalUtils.toFunction(any -> counter.increment(4)));
+
+ assertThat(reactor.hasElement().block()).isFalse();
+ assertThat(counter.getCounter()).isEqualTo(3);
+ }
+
+ @Test
+ void shouldNotExecuteTheRunnableAndReturnTheValue() {
+ Counter counter = new Counter(1);
+
+ Mono<?> reactor = Mono.just(42)
+ .switchIfEmpty(ReactorUtils.executeAndEmpty(() -> counter.increment(2)))
+ .map(FunctionalUtils.toFunction(any -> counter.increment(4)));
+
+ assertThat(reactor.hasElement().block()).isTrue();
+ assertThat(counter.getCounter()).isEqualTo(5);
+ }
+
+ private class Counter {
+ private Integer counter;
+
+ public Counter(Integer counter) {
+ this.counter = counter;
+ }
+
+ public void increment(Integer other) {
+ counter += other;
+ }
+
+ public Integer getCounter() {
+ return counter;
+ }
+ }
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org
[2/8] james-project git commit: MAILBOX-368 RabbitMQEventBus group
registration
Posted by bt...@apache.org.
MAILBOX-368 RabbitMQEventBus group registration
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/ab2fa642
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/ab2fa642
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/ab2fa642
Branch: refs/heads/master
Commit: ab2fa64264a67c702575a1c50fddeecfd9080027
Parents: 16ac299
Author: datph <dp...@linagora.com>
Authored: Tue Jan 8 14:21:15 2019 +0700
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Fri Jan 11 16:57:08 2019 +0700
----------------------------------------------------------------------
mailbox/event/event-rabbitmq/pom.xml | 5 +
.../james/mailbox/events/EventDispatcher.java | 69 +++++++++
.../james/mailbox/events/GroupRegistration.java | 152 +++++++++++++++++++
.../events/GroupRegistrationHandler.java | 70 +++++++++
.../james/mailbox/events/RabbitMQEventBus.java | 54 +++----
.../events/RabbitMQEventBusPublishingTest.java | 121 ---------------
.../mailbox/events/RabbitMQEventBusTest.java | 148 ++++++++++++++++++
7 files changed, 467 insertions(+), 152 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/ab2fa642/mailbox/event/event-rabbitmq/pom.xml
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/pom.xml b/mailbox/event/event-rabbitmq/pom.xml
index c671468..0085059 100644
--- a/mailbox/event/event-rabbitmq/pom.xml
+++ b/mailbox/event/event-rabbitmq/pom.xml
@@ -91,6 +91,11 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/james-project/blob/ab2fa642/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
new file mode 100644
index 0000000..b01fc53
--- /dev/null
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
@@ -0,0 +1,69 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events;
+
+import static org.apache.james.backend.rabbitmq.Constants.DIRECT_EXCHANGE;
+import static org.apache.james.backend.rabbitmq.Constants.DURABLE;
+import static org.apache.james.backend.rabbitmq.Constants.EMPTY_ROUTING_KEY;
+import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT_EXCHANGE_NAME;
+
+import java.nio.charset.StandardCharsets;
+
+import org.apache.james.event.json.EventSerializer;
+import org.apache.james.mailbox.Event;
+
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.MonoProcessor;
+import reactor.core.scheduler.Schedulers;
+import reactor.rabbitmq.ExchangeSpecification;
+import reactor.rabbitmq.OutboundMessage;
+import reactor.rabbitmq.Sender;
+
+public class EventDispatcher {
+ private final EventSerializer eventSerializer;
+ private final Sender sender;
+
+ EventDispatcher(EventSerializer eventSerializer, Sender sender) {
+ this.eventSerializer = eventSerializer;
+ this.sender = sender;
+ }
+
+ void start() {
+ sender.declareExchange(ExchangeSpecification.exchange(MAILBOX_EVENT_EXCHANGE_NAME)
+ .durable(DURABLE)
+ .type(DIRECT_EXCHANGE))
+ .block();
+ }
+
+ Mono<Void> dispatch(Event event) {
+ Mono<OutboundMessage> outboundMessage = Mono.just(event)
+ .publishOn(Schedulers.parallel())
+ .map(this::serializeEvent)
+ .map(payload -> new OutboundMessage(MAILBOX_EVENT_EXCHANGE_NAME, EMPTY_ROUTING_KEY, payload));
+
+ return sender.send(outboundMessage)
+ .subscribeWith(MonoProcessor.create());
+ }
+
+ private byte[] serializeEvent(Event event) {
+ return eventSerializer.toJson(event).getBytes(StandardCharsets.UTF_8);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/james-project/blob/ab2fa642/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
new file mode 100644
index 0000000..36db6ae
--- /dev/null
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
@@ -0,0 +1,152 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events;
+
+import static org.apache.james.backend.rabbitmq.Constants.AUTO_DELETE;
+import static org.apache.james.backend.rabbitmq.Constants.DURABLE;
+import static org.apache.james.backend.rabbitmq.Constants.EMPTY_ROUTING_KEY;
+import static org.apache.james.backend.rabbitmq.Constants.EXCLUSIVE;
+import static org.apache.james.backend.rabbitmq.Constants.NO_ARGUMENTS;
+import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT;
+import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT_EXCHANGE_NAME;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.apache.james.event.json.EventSerializer;
+import org.apache.james.mailbox.Event;
+import org.apache.james.mailbox.MailboxListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.Delivery;
+
+import play.api.libs.json.JsResult;
+import reactor.core.Disposable;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+import reactor.rabbitmq.BindingSpecification;
+import reactor.rabbitmq.QueueSpecification;
+import reactor.rabbitmq.RabbitFlux;
+import reactor.rabbitmq.Receiver;
+import reactor.rabbitmq.ReceiverOptions;
+import reactor.rabbitmq.Sender;
+
+class GroupRegistration implements Registration {
+ static class WorkQueueName {
+ @VisibleForTesting
+ static WorkQueueName of(Class<? extends Group> clazz) {
+ return new WorkQueueName(clazz.getName());
+ }
+
+ static WorkQueueName of(Group group) {
+ return of(group.getClass());
+ }
+
+ static final String MAILBOX_EVENT_WORK_QUEUE_PREFIX = MAILBOX_EVENT + "-workQueue-";
+
+ private final String name;
+
+ private WorkQueueName(String name) {
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(name), "Queue name must be specified");
+ this.name = name;
+ }
+
+ String asString() {
+ return MAILBOX_EVENT_WORK_QUEUE_PREFIX + name;
+ }
+ }
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(GroupRegistration.class);
+
+ private final MailboxListener mailboxListener;
+ private final WorkQueueName queueName;
+ private final Receiver receiver;
+ private final Runnable unregisterGroup;
+ private final Sender sender;
+ private final EventSerializer eventSerializer;
+ private Optional<Disposable> receiverSubscriber;
+
+ GroupRegistration(Mono<Connection> connectionSupplier, Sender sender, EventSerializer eventSerializer,
+ MailboxListener mailboxListener, Group group, Runnable unregisterGroup) {
+ this.eventSerializer = eventSerializer;
+ this.mailboxListener = mailboxListener;
+ this.queueName = WorkQueueName.of(group);
+ this.sender = sender;
+ this.receiver = RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(connectionSupplier));
+ this.receiverSubscriber = Optional.empty();
+ this.unregisterGroup = unregisterGroup;
+ }
+
+ GroupRegistration start() {
+ createGroupWorkQueue()
+ .doOnSuccess(any -> this.subscribeWorkQueue())
+ .block();
+ return this;
+ }
+
+ private Mono<Void> createGroupWorkQueue() {
+ return Flux.concat(
+ sender.declareQueue(QueueSpecification.queue(queueName.asString())
+ .durable(DURABLE)
+ .exclusive(!EXCLUSIVE)
+ .autoDelete(!AUTO_DELETE)
+ .arguments(NO_ARGUMENTS)),
+ sender.bind(BindingSpecification.binding()
+ .exchange(MAILBOX_EVENT_EXCHANGE_NAME)
+ .queue(queueName.asString())
+ .routingKey(EMPTY_ROUTING_KEY)))
+ .then();
+ }
+
+ private void subscribeWorkQueue() {
+ receiverSubscriber = Optional.of(receiver.consumeAutoAck(queueName.asString())
+ .subscribeOn(Schedulers.parallel())
+ .map(Delivery::getBody)
+ .filter(Objects::nonNull)
+ .map(eventInBytes -> new String(eventInBytes, StandardCharsets.UTF_8))
+ .map(eventSerializer::fromJson)
+ .map(JsResult::get)
+ .subscribeOn(Schedulers.elastic())
+ .subscribe(event -> deliverEvent(mailboxListener, event)));
+ }
+
+ private void deliverEvent(MailboxListener mailboxListener, Event event) {
+ try {
+ mailboxListener.event(event);
+ } catch (Exception e) {
+ LOGGER.error("Exception happens when handling event of user {}", event.getUser().asString(), e);
+ }
+ }
+
+ @Override
+ public void unregister() {
+ receiverSubscriber.filter(subscriber -> !subscriber.isDisposed())
+ .ifPresent(subscriber -> subscriber.dispose());
+ receiver.close();
+ unregisterGroup.run();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/james-project/blob/ab2fa642/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistrationHandler.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistrationHandler.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistrationHandler.java
new file mode 100644
index 0000000..c0f4339
--- /dev/null
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistrationHandler.java
@@ -0,0 +1,70 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.james.event.json.EventSerializer;
+import org.apache.james.mailbox.MailboxListener;
+
+import com.rabbitmq.client.Connection;
+
+import reactor.core.publisher.Mono;
+import reactor.rabbitmq.Sender;
+
+class GroupRegistrationHandler {
+ private final Map<Group, GroupRegistration> groupRegistrations;
+ private final EventSerializer eventSerializer;
+ private final Sender sender;
+ private final Mono<Connection> connectionMono;
+
+ GroupRegistrationHandler(EventSerializer eventSerializer, Sender sender, Mono<Connection> connectionMono) {
+ this.eventSerializer = eventSerializer;
+ this.sender = sender;
+ this.connectionMono = connectionMono;
+ this.groupRegistrations = new ConcurrentHashMap<>();
+ }
+
+ void stop() {
+ groupRegistrations.values().forEach(GroupRegistration::unregister);
+ }
+
+ Registration register(MailboxListener listener, Group group) {
+ return groupRegistrations
+ .compute(group, (groupToRegister, oldGroupRegistration) -> {
+ if (oldGroupRegistration != null) {
+ throw new GroupAlreadyRegistered(group);
+ }
+ return newGroupRegistration(listener, groupToRegister);
+ })
+ .start();
+ }
+
+ private GroupRegistration newGroupRegistration(MailboxListener listener, Group group) {
+ return new GroupRegistration(
+ connectionMono,
+ sender,
+ eventSerializer,
+ listener,
+ group,
+ () -> groupRegistrations.remove(group));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/james-project/blob/ab2fa642/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
index 4ce792a..accd6cc 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
@@ -19,19 +19,19 @@
package org.apache.james.mailbox.events;
-import java.nio.charset.StandardCharsets;
import java.util.Set;
+import javax.annotation.PreDestroy;
+
import org.apache.commons.lang3.NotImplementedException;
import org.apache.james.backend.rabbitmq.RabbitMQConnectionFactory;
import org.apache.james.event.json.EventSerializer;
import org.apache.james.mailbox.Event;
import org.apache.james.mailbox.MailboxListener;
+import com.rabbitmq.client.Connection;
+
import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Schedulers;
-import reactor.rabbitmq.ExchangeSpecification;
-import reactor.rabbitmq.OutboundMessage;
import reactor.rabbitmq.RabbitFlux;
import reactor.rabbitmq.Sender;
import reactor.rabbitmq.SenderOptions;
@@ -39,26 +39,26 @@ import reactor.rabbitmq.SenderOptions;
public class RabbitMQEventBus implements EventBus {
static final String MAILBOX_EVENT = "mailboxEvent";
static final String MAILBOX_EVENT_EXCHANGE_NAME = MAILBOX_EVENT + "-exchange";
- static final String EMPTY_ROUTING_KEY = "";
-
- private static final boolean DURABLE = true;
- private static final String DIRECT_EXCHANGE = "direct";
- private final EventSerializer eventSerializer;
private final Sender sender;
+ private final GroupRegistrationHandler groupRegistrationHandler;
+ private final EventDispatcher eventDispatcher;
RabbitMQEventBus(RabbitMQConnectionFactory rabbitMQConnectionFactory, EventSerializer eventSerializer) {
- this.eventSerializer = eventSerializer;
- SenderOptions senderOption = new SenderOptions().connectionMono(Mono.fromSupplier(rabbitMQConnectionFactory::create));
- this.sender = RabbitFlux.createSender(senderOption);
+ Mono<Connection> connectionMono = Mono.fromSupplier(rabbitMQConnectionFactory::create).cache();
+ this.sender = RabbitFlux.createSender(new SenderOptions().connectionMono(connectionMono));
+ this.groupRegistrationHandler = new GroupRegistrationHandler(eventSerializer, sender, connectionMono);
+ this.eventDispatcher = new EventDispatcher(eventSerializer, sender);
+ }
+
+ public void start() {
+ eventDispatcher.start();
}
- Mono<Void> start() {
- return sender.declareExchange(ExchangeSpecification.exchange(MAILBOX_EVENT_EXCHANGE_NAME)
- .durable(DURABLE)
- .type(DIRECT_EXCHANGE))
- .subscribeOn(Schedulers.elastic())
- .then();
+ @PreDestroy
+ public void stop() {
+ groupRegistrationHandler.stop();
+ sender.close();
}
@Override
@@ -68,22 +68,14 @@ public class RabbitMQEventBus implements EventBus {
@Override
public Registration register(MailboxListener listener, Group group) {
- throw new NotImplementedException("will implement latter");
+ return groupRegistrationHandler.register(listener, group);
}
@Override
public Mono<Void> dispatch(Event event, Set<RegistrationKey> key) {
- Mono<OutboundMessage> outboundMessage = Mono.just(event)
- .publishOn(Schedulers.parallel())
- .map(this::serializeEvent)
- .map(payload -> new OutboundMessage(MAILBOX_EVENT_EXCHANGE_NAME, EMPTY_ROUTING_KEY, payload));
-
- Mono<Void> publishMono = sender.send(outboundMessage).cache();
- publishMono.subscribe();
- return publishMono;
- }
-
- private byte[] serializeEvent(Event event) {
- return eventSerializer.toJson(event).getBytes(StandardCharsets.UTF_8);
+ if (!event.isNoop()) {
+ return eventDispatcher.dispatch(event);
+ }
+ return Mono.empty();
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/james-project/blob/ab2fa642/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusPublishingTest.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusPublishingTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusPublishingTest.java
deleted file mode 100644
index 50a37ca..0000000
--- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusPublishingTest.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-
-package org.apache.james.mailbox.events;
-
-import static org.apache.james.backend.rabbitmq.Constants.AUTO_DELETE;
-import static org.apache.james.backend.rabbitmq.Constants.DURABLE;
-import static org.apache.james.backend.rabbitmq.Constants.EXCLUSIVE;
-import static org.apache.james.backend.rabbitmq.Constants.NO_ARGUMENTS;
-import static org.apache.james.mailbox.events.EventBusContract.EVENT;
-import static org.apache.james.mailbox.events.EventBusContract.NO_KEYS;
-import static org.apache.james.mailbox.events.RabbitMQEventBus.EMPTY_ROUTING_KEY;
-import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT;
-import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT_EXCHANGE_NAME;
-import static org.assertj.core.api.Assertions.assertThat;
-
-import java.nio.charset.StandardCharsets;
-
-import org.apache.james.backend.rabbitmq.RabbitMQConnectionFactory;
-import org.apache.james.backend.rabbitmq.RabbitMQExtension;
-import org.apache.james.event.json.EventSerializer;
-import org.apache.james.mailbox.Event;
-import org.apache.james.mailbox.model.TestId;
-import org.apache.james.mailbox.model.TestMessageId;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.RegisterExtension;
-
-import com.google.common.collect.ImmutableSet;
-
-import reactor.core.publisher.Mono;
-import reactor.rabbitmq.BindingSpecification;
-import reactor.rabbitmq.QueueSpecification;
-import reactor.rabbitmq.RabbitFlux;
-import reactor.rabbitmq.Receiver;
-import reactor.rabbitmq.ReceiverOptions;
-import reactor.rabbitmq.Sender;
-import reactor.rabbitmq.SenderOptions;
-
-class RabbitMQEventBusPublishingTest {
- private static final String MAILBOX_WORK_QUEUE_NAME = MAILBOX_EVENT + "-workQueue";
-
- @RegisterExtension
- static RabbitMQExtension rabbitMQExtension = new RabbitMQExtension();
-
- private RabbitMQEventBus eventBus;
- private EventSerializer eventSerializer;
- private RabbitMQConnectionFactory connectionFactory;
-
- @BeforeEach
- void setUp() {
- connectionFactory = rabbitMQExtension.getConnectionFactory();
-
- eventSerializer = new EventSerializer(new TestId.Factory(), new TestMessageId.Factory());
- eventBus = new RabbitMQEventBus(connectionFactory, eventSerializer);
- eventBus.start().block();
-
- createQueue();
- }
-
- private void createQueue() {
- SenderOptions senderOption = new SenderOptions()
- .connectionMono(Mono.fromSupplier(connectionFactory::create));
- Sender sender = RabbitFlux.createSender(senderOption);
-
- sender.declareQueue(QueueSpecification.queue(MAILBOX_WORK_QUEUE_NAME)
- .durable(DURABLE)
- .exclusive(!EXCLUSIVE)
- .autoDelete(!AUTO_DELETE)
- .arguments(NO_ARGUMENTS))
- .block();
- sender.bind(BindingSpecification.binding()
- .exchange(MAILBOX_EVENT_EXCHANGE_NAME)
- .queue(MAILBOX_WORK_QUEUE_NAME)
- .routingKey(EMPTY_ROUTING_KEY))
- .block();
- }
-
- @Test
- void dispatchShouldPublishSerializedEventToRabbitMQ() {
- eventBus.dispatch(EVENT, NO_KEYS).block();
-
- assertThat(dequeueEvent()).isEqualTo(EVENT);
- }
-
- @Test
- void dispatchShouldPublishSerializedEventToRabbitMQWhenNotBlocking() {
- eventBus.dispatch(EVENT, NO_KEYS);
-
- assertThat(dequeueEvent()).isEqualTo(EVENT);
- }
-
-
- private Event dequeueEvent() {
- RabbitMQConnectionFactory connectionFactory = rabbitMQExtension.getConnectionFactory();
- Receiver receiver = RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(Mono.just(connectionFactory.create())));
-
- byte[] eventInBytes = receiver.consumeAutoAck(MAILBOX_WORK_QUEUE_NAME)
- .blockFirst()
- .getBody();
-
- return eventSerializer.fromJson(new String(eventInBytes, StandardCharsets.UTF_8))
- .get();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/james-project/blob/ab2fa642/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
new file mode 100644
index 0000000..b39fb42
--- /dev/null
+++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
@@ -0,0 +1,148 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events;
+
+import static org.apache.james.backend.rabbitmq.Constants.AUTO_DELETE;
+import static org.apache.james.backend.rabbitmq.Constants.DURABLE;
+import static org.apache.james.backend.rabbitmq.Constants.EMPTY_ROUTING_KEY;
+import static org.apache.james.backend.rabbitmq.Constants.EXCLUSIVE;
+import static org.apache.james.backend.rabbitmq.Constants.NO_ARGUMENTS;
+import static org.apache.james.mailbox.events.EventBusTestFixture.ALL_GROUPS;
+import static org.apache.james.mailbox.events.EventBusTestFixture.EVENT;
+import static org.apache.james.mailbox.events.EventBusTestFixture.NO_KEYS;
+import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT;
+import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT_EXCHANGE_NAME;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.nio.charset.StandardCharsets;
+
+import org.apache.james.backend.rabbitmq.RabbitMQConnectionFactory;
+import org.apache.james.backend.rabbitmq.RabbitMQExtension;
+import org.apache.james.event.json.EventSerializer;
+import org.apache.james.mailbox.Event;
+import org.apache.james.mailbox.model.TestId;
+import org.apache.james.mailbox.model.TestMessageId;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import com.rabbitmq.client.Connection;
+
+import reactor.core.publisher.Mono;
+import reactor.rabbitmq.BindingSpecification;
+import reactor.rabbitmq.QueueSpecification;
+import reactor.rabbitmq.RabbitFlux;
+import reactor.rabbitmq.Receiver;
+import reactor.rabbitmq.ReceiverOptions;
+import reactor.rabbitmq.Sender;
+import reactor.rabbitmq.SenderOptions;
+
+class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract {
+
+ @RegisterExtension
+ static RabbitMQExtension rabbitMQExtension = new RabbitMQExtension();
+
+ private RabbitMQEventBus eventBus;
+ private Sender sender;
+ private RabbitMQConnectionFactory connectionFactory;
+ private EventSerializer eventSerializer;
+
+ @BeforeEach
+ void setUp() {
+ connectionFactory = rabbitMQExtension.getConnectionFactory();
+ Mono<Connection> connectionMono = Mono.fromSupplier(connectionFactory::create).cache();
+
+ TestId.Factory mailboxIdFactory = new TestId.Factory();
+ eventSerializer = new EventSerializer(mailboxIdFactory, new TestMessageId.Factory());
+
+ eventBus = new RabbitMQEventBus(connectionFactory, eventSerializer);
+ eventBus.start();
+ sender = RabbitFlux.createSender(new SenderOptions().connectionMono(connectionMono));
+ }
+
+ @AfterEach
+ void tearDown() {
+ eventBus.stop();
+ ALL_GROUPS.stream()
+ .map(groupClass -> GroupRegistration.WorkQueueName.of(groupClass).asString())
+ .forEach(queueName -> sender.delete(QueueSpecification.queue(queueName)).block());
+ }
+
+ @Override
+ public EventBus eventBus() {
+ return eventBus;
+ }
+
+ @Nested
+ class PublishingTest {
+ private static final String MAILBOX_WORK_QUEUE_NAME = MAILBOX_EVENT + "-workQueue";
+
+ @BeforeEach
+ void setUp() {
+ createQueue();
+ }
+
+ private void createQueue() {
+ SenderOptions senderOption = new SenderOptions()
+ .connectionMono(Mono.fromSupplier(connectionFactory::create));
+ Sender sender = RabbitFlux.createSender(senderOption);
+
+ sender.declareQueue(QueueSpecification.queue(MAILBOX_WORK_QUEUE_NAME)
+ .durable(DURABLE)
+ .exclusive(!EXCLUSIVE)
+ .autoDelete(!AUTO_DELETE)
+ .arguments(NO_ARGUMENTS))
+ .block();
+ sender.bind(BindingSpecification.binding()
+ .exchange(MAILBOX_EVENT_EXCHANGE_NAME)
+ .queue(MAILBOX_WORK_QUEUE_NAME)
+ .routingKey(EMPTY_ROUTING_KEY))
+ .block();
+ }
+
+ @Test
+ void dispatchShouldPublishSerializedEventToRabbitMQ() {
+ eventBus.dispatch(EVENT, NO_KEYS).block();
+
+ assertThat(dequeueEvent()).isEqualTo(EVENT);
+ }
+
+ @Test
+ void dispatchShouldPublishSerializedEventToRabbitMQWhenNotBlocking() {
+ eventBus.dispatch(EVENT, NO_KEYS);
+
+ assertThat(dequeueEvent()).isEqualTo(EVENT);
+ }
+
+ private Event dequeueEvent() {
+ RabbitMQConnectionFactory connectionFactory = rabbitMQExtension.getConnectionFactory();
+ Receiver receiver = RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(Mono.just(connectionFactory.create())));
+
+ byte[] eventInBytes = receiver.consumeAutoAck(MAILBOX_WORK_QUEUE_NAME)
+ .blockFirst()
+ .getBody();
+
+ return eventSerializer.fromJson(new String(eventInBytes, StandardCharsets.UTF_8))
+ .get();
+ }
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org