You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/01/11 09:58:14 UTC

[1/8] james-project git commit: JAMES-2630 Upgrade Reactor

Repository: james-project
Updated Branches:
  refs/heads/master 2b5c162ef -> 79cf1c26b


JAMES-2630 Upgrade Reactor


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/ebab03a6
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/ebab03a6
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/ebab03a6

Branch: refs/heads/master
Commit: ebab03a667922b7239a33a7d5de9ddc3d1501810
Parents: 2b5c162
Author: Gautier DI FOLCO <gd...@linagora.com>
Authored: Tue Jan 8 09:45:01 2019 +0100
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Fri Jan 11 16:55:59 2019 +0700

----------------------------------------------------------------------
 pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/ebab03a6/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 56486ae..0119f10 100644
--- a/pom.xml
+++ b/pom.xml
@@ -668,7 +668,7 @@
             <dependency>
                 <groupId>io.projectreactor</groupId>
                 <artifactId>reactor-bom</artifactId>
-                <version>Bismuth-RELEASE</version>
+                <version>Californium-SR3</version>
                 <type>pom</type>
                 <scope>import</scope>
             </dependency>


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[5/8] james-project git commit: MAILBOX-368 EventBusContract should enforce basic error handling checks

Posted by bt...@apache.org.
MAILBOX-368 EventBusContract should enforce basic error handling checks

- Add one test when failed before, consumer is still able to handle subsequent events
- A failing listener should not prevent subsequent listeners execution


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/16ac299a
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/16ac299a
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/16ac299a

Branch: refs/heads/master
Commit: 16ac299af400aa666aaa4bf52383705a24bee9ca
Parents: bca9ba0
Author: datph <dp...@linagora.com>
Authored: Tue Jan 8 14:13:05 2019 +0700
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Fri Jan 11 16:57:08 2019 +0700

----------------------------------------------------------------------
 mailbox/api/pom.xml                             |  5 +++
 .../mailbox/events/EventBusTestFixture.java     | 23 ++++++++++++
 .../james/mailbox/events/GroupContract.java     | 37 ++++++++++++++++++++
 mailbox/event/event-memory/pom.xml              |  5 +++
 4 files changed, 70 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/16ac299a/mailbox/api/pom.xml
----------------------------------------------------------------------
diff --git a/mailbox/api/pom.xml b/mailbox/api/pom.xml
index 024675d..04ef4f5 100644
--- a/mailbox/api/pom.xml
+++ b/mailbox/api/pom.xml
@@ -70,6 +70,11 @@
             <artifactId>icu4j</artifactId>
         </dependency>
         <dependency>
+            <groupId>com.jayway.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>com.sun.mail</groupId>
             <artifactId>javax.mail</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/james-project/blob/16ac299a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java
index ae2116d..ac6a883 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java
@@ -19,10 +19,12 @@
 
 package org.apache.james.mailbox.events;
 
+import static com.jayway.awaitility.Awaitility.await;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.james.core.User;
 import org.apache.james.mailbox.Event;
@@ -35,9 +37,28 @@ import org.apache.james.mailbox.model.TestId;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
+import com.jayway.awaitility.core.ConditionFactory;
 
 public interface EventBusTestFixture {
 
+    class MailboxListenerCountingSuccessfulExecution implements MailboxListener {
+        private final AtomicInteger calls = new AtomicInteger(0);
+
+        @Override
+        public ListenerType getType() {
+            return ListenerType.ONCE;
+        }
+
+        @Override
+        public void event(Event event) {
+            calls.incrementAndGet();
+        }
+
+        int numberOfEventCalls() {
+            return calls.get();
+        }
+    }
+
     class GroupA extends Group {}
     class GroupB extends Group {}
 
@@ -57,6 +78,8 @@ public interface EventBusTestFixture {
     MailboxIdRegistrationKey KEY_2 = new MailboxIdRegistrationKey(ID_2);
     List<Class<? extends Group>> ALL_GROUPS = ImmutableList.of(GroupA.class, GroupB.class);
 
+    ConditionFactory WAIT_CONDITION = await().timeout(com.jayway.awaitility.Duration.ONE_SECOND);
+
     static MailboxListener newListener() {
         MailboxListener listener = mock(MailboxListener.class);
         when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);

http://git-wip-us.apache.org/repos/asf/james-project/blob/16ac299a/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
index 0975e3b..1820e80 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
@@ -25,14 +25,19 @@ import static org.apache.james.mailbox.events.EventBusTestFixture.GroupA;
 import static org.apache.james.mailbox.events.EventBusTestFixture.GroupB;
 import static org.apache.james.mailbox.events.EventBusTestFixture.NO_KEYS;
 import static org.apache.james.mailbox.events.EventBusTestFixture.ONE_SECOND;
+import static org.apache.james.mailbox.events.EventBusTestFixture.WAIT_CONDITION;
 import static org.apache.james.mailbox.events.EventBusTestFixture.newListener;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.after;
 import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import org.apache.james.core.User;
 import org.apache.james.mailbox.Event;
@@ -163,5 +168,37 @@ public interface GroupContract {
 
             verify(listener, timeout(ONE_SECOND).times(1)).event(any());
         }
+
+        @Test
+        default void failingGroupListenersShouldNotAbortGroupDelivery() {
+            EventBusTestFixture.MailboxListenerCountingSuccessfulExecution listener = spy(new EventBusTestFixture.MailboxListenerCountingSuccessfulExecution());
+            eventBus().register(listener, new GroupA());
+
+            doThrow(new RuntimeException())
+                .doCallRealMethod()
+                .when(listener).event(any());
+
+            eventBus().dispatch(EVENT, NO_KEYS).block();
+            eventBus().dispatch(EVENT, NO_KEYS).block();
+
+            WAIT_CONDITION
+                .until(() -> assertThat(listener.numberOfEventCalls()).isEqualTo(1));
+        }
+
+        @Test
+        default void allGroupListenersShouldBeExecutedWhenAGroupListenerFails() {
+            MailboxListener listener = newListener();
+
+            MailboxListener failingListener = mock(MailboxListener.class);
+            when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
+            doThrow(new RuntimeException()).when(failingListener).event(any());
+
+            eventBus().register(failingListener, new GroupA());
+            eventBus().register(listener, new GroupB());
+
+            eventBus().dispatch(EVENT, NO_KEYS).block();
+
+            verify(listener, timeout(ONE_SECOND).times(1)).event(any());
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/16ac299a/mailbox/event/event-memory/pom.xml
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/pom.xml b/mailbox/event/event-memory/pom.xml
index be04bbc..cdc5d8f 100644
--- a/mailbox/event/event-memory/pom.xml
+++ b/mailbox/event/event-memory/pom.xml
@@ -47,6 +47,11 @@
             <artifactId>metrics-api</artifactId>
         </dependency>
         <dependency>
+            <groupId>com.jayway.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>io.projectreactor</groupId>
             <artifactId>reactor-core</artifactId>
         </dependency>


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[7/8] james-project git commit: MAILBOX-368 Add EventBus Concurrent test for group registration

Posted by bt...@apache.org.
MAILBOX-368 Add EventBus Concurrent test for group registration


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/79cf1c26
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/79cf1c26
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/79cf1c26

Branch: refs/heads/master
Commit: 79cf1c26bea46d663b294a566488d720cd563d3c
Parents: 9fe9e62
Author: datph <dp...@linagora.com>
Authored: Tue Jan 8 17:14:49 2019 +0700
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Fri Jan 11 16:57:08 2019 +0700

----------------------------------------------------------------------
 .../events/EventBusConcurrentTestContract.java  | 109 +++++++++++++++++++
 .../mailbox/events/EventBusTestFixture.java     |   3 +-
 .../mailbox/events/RabbitMQEventBusTest.java    |   6 +-
 3 files changed, 116 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/79cf1c26/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusConcurrentTestContract.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusConcurrentTestContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusConcurrentTestContract.java
new file mode 100644
index 0000000..0746612
--- /dev/null
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusConcurrentTestContract.java
@@ -0,0 +1,109 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events;
+
+import static com.jayway.awaitility.Awaitility.await;
+import static org.apache.james.mailbox.events.EventBusTestFixture.EVENT;
+import static org.apache.james.mailbox.events.EventBusTestFixture.NO_KEYS;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.james.util.concurrency.ConcurrentTestRunner;
+import org.junit.jupiter.api.Test;
+
+import com.google.common.collect.ImmutableList;
+import com.jayway.awaitility.core.ConditionFactory;
+
+public interface EventBusConcurrentTestContract {
+
+    Duration FIVE_SECONDS = Duration.ofSeconds(5);
+    ConditionFactory AWAIT_CONDITION = await().timeout(new com.jayway.awaitility.Duration(5, TimeUnit.SECONDS));
+
+    int THREAD_COUNT = 10;
+    int OPERATION_COUNT = 30;
+    int TOTAL_DISPATCH_OPERATIONS = THREAD_COUNT * OPERATION_COUNT;
+
+    static EventBusTestFixture.MailboxListenerCountingSuccessfulExecution newCountingListener() {
+        return new EventBusTestFixture.MailboxListenerCountingSuccessfulExecution();
+    }
+
+    static int totalEventsReceived(ImmutableList<EventBusTestFixture.MailboxListenerCountingSuccessfulExecution> allListeners) {
+        return allListeners.stream()
+            .mapToInt(listener -> listener.numberOfEventCalls())
+            .sum();
+    }
+
+    interface SingleEventBusConcurrentContract extends EventBusContract {
+
+        @Test
+        default void concurrentDispatchGroupShouldDeliverAllEventsToListenersWithSingleEventBus() throws Exception {
+            EventBusTestFixture.MailboxListenerCountingSuccessfulExecution countingListener1 = newCountingListener();
+            EventBusTestFixture.MailboxListenerCountingSuccessfulExecution countingListener2 = newCountingListener();
+            EventBusTestFixture.MailboxListenerCountingSuccessfulExecution countingListener3 = newCountingListener();
+
+            eventBus().register(countingListener1, new EventBusTestFixture.GroupA());
+            eventBus().register(countingListener2, new EventBusTestFixture.GroupB());
+            eventBus().register(countingListener3, new EventBusTestFixture.GroupC());
+            int totalGlobalRegistrations = 3; // GroupA + GroupB + GroupC
+
+            ConcurrentTestRunner.builder()
+                .operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, NO_KEYS))
+                .threadCount(THREAD_COUNT)
+                .operationCount(OPERATION_COUNT)
+                .runSuccessfullyWithin(FIVE_SECONDS);
+
+            AWAIT_CONDITION.until(() -> assertThat(totalEventsReceived(ImmutableList
+                    .of(countingListener1, countingListener2, countingListener3)))
+                .isEqualTo(totalGlobalRegistrations * TOTAL_DISPATCH_OPERATIONS));
+        }
+    }
+
+    interface MultiEventBusConcurrentContract extends EventBusContract.MultipleEventBusContract {
+
+        @Test
+        default void concurrentDispatchGroupShouldDeliverAllEventsToListenersWithMultipleEventBus() throws Exception {
+            EventBusTestFixture.MailboxListenerCountingSuccessfulExecution countingListener1 = newCountingListener();
+            EventBusTestFixture.MailboxListenerCountingSuccessfulExecution countingListener2 = newCountingListener();
+            EventBusTestFixture.MailboxListenerCountingSuccessfulExecution countingListener3 = newCountingListener();
+
+            eventBus().register(countingListener1, new EventBusTestFixture.GroupA());
+            eventBus().register(countingListener2, new EventBusTestFixture.GroupB());
+            eventBus().register(countingListener3, new EventBusTestFixture.GroupC());
+
+            eventBus2().register(countingListener1, new EventBusTestFixture.GroupA());
+            eventBus2().register(countingListener2, new EventBusTestFixture.GroupB());
+            eventBus2().register(countingListener3, new EventBusTestFixture.GroupC());
+
+            int totalGlobalRegistrations = 3; // GroupA + GroupB + GroupC
+
+            ConcurrentTestRunner.builder()
+                .operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, NO_KEYS))
+                .threadCount(THREAD_COUNT)
+                .operationCount(OPERATION_COUNT)
+                .runSuccessfullyWithin(FIVE_SECONDS);
+
+            AWAIT_CONDITION.until(() -> assertThat(totalEventsReceived(ImmutableList
+                    .of(countingListener1, countingListener2, countingListener3)))
+                .isEqualTo(totalGlobalRegistrations * TOTAL_DISPATCH_OPERATIONS));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/79cf1c26/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java
index ac6a883..5d87c89 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java
@@ -61,6 +61,7 @@ public interface EventBusTestFixture {
 
     class GroupA extends Group {}
     class GroupB extends Group {}
+    class GroupC extends Group {}
 
     MailboxListener.MailboxEvent EVENT = new MailboxListener.MailboxAdded(
         MailboxSession.SessionId.of(42),
@@ -76,7 +77,7 @@ public interface EventBusTestFixture {
     ImmutableSet<RegistrationKey> NO_KEYS = ImmutableSet.of();
     MailboxIdRegistrationKey KEY_1 = new MailboxIdRegistrationKey(ID_1);
     MailboxIdRegistrationKey KEY_2 = new MailboxIdRegistrationKey(ID_2);
-    List<Class<? extends Group>> ALL_GROUPS = ImmutableList.of(GroupA.class, GroupB.class);
+    List<Class<? extends Group>> ALL_GROUPS = ImmutableList.of(GroupA.class, GroupB.class, GroupC.class);
 
     ConditionFactory WAIT_CONDITION = await().timeout(com.jayway.awaitility.Duration.ONE_SECOND);
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/79cf1c26/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
index 8f3ee9d..b8d5cee 100644
--- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
+++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
@@ -49,6 +49,7 @@ import com.rabbitmq.client.Connection;
 
 import reactor.core.publisher.Mono;
 import reactor.rabbitmq.BindingSpecification;
+import reactor.rabbitmq.ExchangeSpecification;
 import reactor.rabbitmq.QueueSpecification;
 import reactor.rabbitmq.RabbitFlux;
 import reactor.rabbitmq.Receiver;
@@ -56,7 +57,8 @@ import reactor.rabbitmq.ReceiverOptions;
 import reactor.rabbitmq.Sender;
 import reactor.rabbitmq.SenderOptions;
 
-class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract, GroupContract.MultipleEventBusGroupContract {
+class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract, GroupContract.MultipleEventBusGroupContract,
+    EventBusConcurrentTestContract.SingleEventBusConcurrentContract, EventBusConcurrentTestContract.MultiEventBusConcurrentContract {
 
     @RegisterExtension
     static RabbitMQExtension rabbitMQExtension = new RabbitMQExtension();
@@ -89,6 +91,8 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
         ALL_GROUPS.stream()
             .map(groupClass -> GroupRegistration.WorkQueueName.of(groupClass).asString())
             .forEach(queueName -> sender.delete(QueueSpecification.queue(queueName)).block());
+        sender.delete(ExchangeSpecification.exchange(MAILBOX_EVENT_EXCHANGE_NAME)).block();
+        sender.close();
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[8/8] james-project git commit: MAILBOX-368 Add Multiple EventBus tests for group registration

Posted by bt...@apache.org.
MAILBOX-368 Add Multiple EventBus tests for group registration


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/9fe9e627
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/9fe9e627
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/9fe9e627

Branch: refs/heads/master
Commit: 9fe9e627ac293a14f99a3e5e8901a21f03c70a39
Parents: ab2fa64
Author: datph <dp...@linagora.com>
Authored: Tue Jan 8 14:57:29 2019 +0700
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Fri Jan 11 16:57:08 2019 +0700

----------------------------------------------------------------------
 .../james/mailbox/events/EventBusContract.java  |  5 +++
 .../james/mailbox/events/GroupContract.java     | 39 ++++++++++++++++++++
 .../mailbox/events/RabbitMQEventBusTest.java    | 11 +++++-
 3 files changed, 54 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/9fe9e627/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusContract.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusContract.java
index 50d8d67..4abbdee 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusContract.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusContract.java
@@ -21,5 +21,10 @@ package org.apache.james.mailbox.events;
 
 public interface EventBusContract {
 
+    interface MultipleEventBusContract extends EventBusContract {
+
+        EventBus eventBus2();
+    }
+
     EventBus eventBus();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/james-project/blob/9fe9e627/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
index 1820e80..01b3583 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
@@ -201,4 +201,43 @@ public interface GroupContract {
             verify(listener, timeout(ONE_SECOND).times(1)).event(any());
         }
     }
+
+    interface MultipleEventBusGroupContract extends EventBusContract.MultipleEventBusContract {
+
+        @Test
+        default void groupsDefinedOnlyOnSomeNodesShouldBeNotified() {
+            MailboxListener mailboxListener = newListener();
+
+            eventBus().register(mailboxListener, new GroupA());
+
+            eventBus2().dispatch(EVENT, NO_KEYS).block();
+
+            verify(mailboxListener, timeout(ONE_SECOND).times(1)).event(any());
+        }
+
+        @Test
+        default void groupListenersShouldBeExecutedOnceInAControlledEnvironment() {
+            MailboxListener mailboxListener = newListener();
+
+            eventBus().register(mailboxListener, new GroupA());
+            eventBus2().register(mailboxListener, new GroupA());
+
+            eventBus2().dispatch(EVENT, NO_KEYS).block();
+
+            verify(mailboxListener, timeout(ONE_SECOND).times(1)).event(any());
+        }
+
+        @Test
+        default void unregisterShouldStopNotificationForDistantGroups() {
+            MailboxListener mailboxListener = newListener();
+
+            eventBus().register(mailboxListener, new GroupA()).unregister();
+
+            eventBus2().dispatch(EVENT, NO_KEYS).block();
+
+
+            verify(mailboxListener, after(FIVE_HUNDRED_MS).never())
+                .event(any());
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/9fe9e627/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
index b39fb42..8f3ee9d 100644
--- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
+++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
@@ -56,12 +56,13 @@ import reactor.rabbitmq.ReceiverOptions;
 import reactor.rabbitmq.Sender;
 import reactor.rabbitmq.SenderOptions;
 
-class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract {
+class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract, GroupContract.MultipleEventBusGroupContract {
 
     @RegisterExtension
     static RabbitMQExtension rabbitMQExtension = new RabbitMQExtension();
 
     private RabbitMQEventBus eventBus;
+    private RabbitMQEventBus eventBus2;
     private Sender sender;
     private RabbitMQConnectionFactory connectionFactory;
     private EventSerializer eventSerializer;
@@ -75,13 +76,16 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract
         eventSerializer = new EventSerializer(mailboxIdFactory, new TestMessageId.Factory());
 
         eventBus = new RabbitMQEventBus(connectionFactory, eventSerializer);
+        eventBus2 = new RabbitMQEventBus(connectionFactory, eventSerializer);
         eventBus.start();
+        eventBus2.start();
         sender = RabbitFlux.createSender(new SenderOptions().connectionMono(connectionMono));
     }
 
     @AfterEach
     void tearDown() {
         eventBus.stop();
+        eventBus2.stop();
         ALL_GROUPS.stream()
             .map(groupClass -> GroupRegistration.WorkQueueName.of(groupClass).asString())
             .forEach(queueName -> sender.delete(QueueSpecification.queue(queueName)).block());
@@ -92,6 +96,11 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract
         return eventBus;
     }
 
+    @Override
+    public EventBus eventBus2() {
+        return eventBus2;
+    }
+
     @Nested
     class PublishingTest {
         private static final String MAILBOX_WORK_QUEUE_NAME = MAILBOX_EVENT + "-workQueue";


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[6/8] james-project git commit: MAILBOX-368 Separating Group and Key Contract from EventBusContract

Posted by bt...@apache.org.
MAILBOX-368 Separating Group and Key Contract from EventBusContract


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/bca9ba01
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/bca9ba01
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/bca9ba01

Branch: refs/heads/master
Commit: bca9ba01b8d52562844c14653f9196a617cc1ae9
Parents: 92842f4
Author: datph <dp...@linagora.com>
Authored: Tue Jan 8 14:03:35 2019 +0700
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Fri Jan 11 16:57:08 2019 +0700

----------------------------------------------------------------------
 .../james/mailbox/events/EventBusContract.java  | 359 -------------------
 .../mailbox/events/EventBusTestFixture.java     |  65 ++++
 .../events/EventDeadLettersContract.java        |   4 +-
 .../james/mailbox/events/GroupContract.java     | 167 +++++++++
 .../james/mailbox/events/KeyContract.java       | 246 +++++++++++++
 .../james/mailbox/events/InVMEventBusTest.java  |   2 +-
 6 files changed, 481 insertions(+), 362 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/bca9ba01/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusContract.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusContract.java
index ab54208..50d8d67 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusContract.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusContract.java
@@ -19,366 +19,7 @@
 
 package org.apache.james.mailbox.events;
 
-import static org.assertj.core.api.Assertions.assertThatCode;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.junit.jupiter.api.Assertions.assertTimeout;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.after;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
-
-import java.time.Duration;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-
-import org.apache.james.core.User;
-import org.apache.james.mailbox.Event;
-import org.apache.james.mailbox.MailboxListener;
-import org.apache.james.mailbox.MailboxSession;
-import org.apache.james.mailbox.model.MailboxConstants;
-import org.apache.james.mailbox.model.MailboxId;
-import org.apache.james.mailbox.model.MailboxPath;
-import org.apache.james.mailbox.model.TestId;
-import org.junit.jupiter.api.Test;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.ImmutableSortedMap;
-
 public interface EventBusContract {
-    MailboxListener.MailboxEvent EVENT = new MailboxListener.MailboxAdded(
-        MailboxSession.SessionId.of(42),
-        User.fromUsername("user"),
-        new MailboxPath(MailboxConstants.USER_NAMESPACE, "user", "mailboxName"),
-        TestId.of(18),
-        Event.EventId.random());
-    MailboxListener.Added NOOP_EVENT = new MailboxListener.Added(MailboxSession.SessionId.of(18), User.fromUsername("bob"), MailboxPath.forUser("bob", "mailbox"), TestId.of(58), ImmutableSortedMap.of(), Event.EventId.random());
-
-    class GroupA extends Group {}
-
-    class GroupB extends Group {}
-
-    int ONE_SECOND = 1000;
-    int FIVE_HUNDRED_MS = 500;
-    MailboxId ID_1 = TestId.of(18);
-    MailboxId ID_2 = TestId.of(24);
-    ImmutableSet<RegistrationKey> NO_KEYS = ImmutableSet.of();
-    MailboxIdRegistrationKey KEY_1 = new MailboxIdRegistrationKey(ID_1);
-    MailboxIdRegistrationKey KEY_2 = new MailboxIdRegistrationKey(ID_2);
-    List<Class<? extends Group>> ALL_GROUPS = ImmutableList.of(GroupA.class, GroupB.class);
 
     EventBus eventBus();
-
-    default MailboxListener newListener() {
-        MailboxListener listener = mock(MailboxListener.class);
-        when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
-        return listener;
-    }
-
-    @Test
-    default void listenerGroupShouldReceiveEvents() {
-        MailboxListener listener = newListener();
-
-        eventBus().register(listener, new GroupA());
-
-        eventBus().dispatch(EVENT, NO_KEYS).block();
-
-        verify(listener, timeout(ONE_SECOND).times(1)).event(any());
-    }
-
-    @Test
-    default void groupListenersShouldNotReceiveNoopEvents() {
-        MailboxListener listener = newListener();
-
-        eventBus().register(listener, new GroupA());
-
-        eventBus().dispatch(NOOP_EVENT, NO_KEYS).block();
-
-        verifyNoMoreInteractions(listener);
-    }
-
-    @Test
-    default void registeredListenersShouldNotReceiveNoopEvents() {
-        MailboxListener listener = newListener();
-
-        eventBus().register(listener, KEY_1);
-
-        eventBus().dispatch(NOOP_EVENT, KEY_1).block();
-
-        verifyNoMoreInteractions(listener);
-    }
-
-    @Test
-    default void dispatchShouldNotThrowWhenAGroupListenerFails() {
-        MailboxListener listener = newListener();
-        doThrow(new RuntimeException()).when(listener).event(any());
-
-        eventBus().register(listener, new GroupA());
-
-        assertThatCode(() -> eventBus().dispatch(EVENT, NO_KEYS).block())
-            .doesNotThrowAnyException();
-    }
-
-    @Test
-    default void dispatchShouldNotThrowWhenARegisteredListenerFails() {
-        MailboxListener listener = newListener();
-        doThrow(new RuntimeException()).when(listener).event(any());
-
-        eventBus().register(listener, KEY_1);
-
-        assertThatCode(() -> eventBus().dispatch(EVENT, NO_KEYS).block())
-            .doesNotThrowAnyException();
-    }
-
-    @Test
-    default void eachListenerGroupShouldReceiveEvents() {
-        MailboxListener listener = newListener();
-        MailboxListener listener2 = newListener();
-        eventBus().register(listener, new GroupA());
-        eventBus().register(listener2, new GroupB());
-
-        eventBus().dispatch(EVENT, NO_KEYS).block();
-
-        verify(listener, timeout(ONE_SECOND).times(1)).event(any());
-        verify(listener2, timeout(ONE_SECOND).times(1)).event(any());
-    }
-
-    @Test
-    default void unregisteredGroupListenerShouldNotReceiveEvents() {
-        MailboxListener listener = newListener();
-        Registration registration = eventBus().register(listener, new GroupA());
-
-        registration.unregister();
-
-        eventBus().dispatch(EVENT, NO_KEYS).block();
-        verify(listener, after(FIVE_HUNDRED_MS).never())
-            .event(any());
-    }
-
-    @Test
-    default void registerShouldThrowWhenAGroupIsAlreadyUsed() {
-        MailboxListener listener = newListener();
-        MailboxListener listener2 = newListener();
-
-        eventBus().register(listener, new GroupA());
-
-        assertThatThrownBy(() -> eventBus().register(listener2, new GroupA()))
-            .isInstanceOf(GroupAlreadyRegistered.class);
-    }
-
-    @Test
-    default void registerShouldNotThrowOnAnUnregisteredGroup() {
-        MailboxListener listener = newListener();
-        MailboxListener listener2 = newListener();
-
-        eventBus().register(listener, new GroupA()).unregister();
-
-        assertThatCode(() -> eventBus().register(listener2, new GroupA()))
-            .doesNotThrowAnyException();
-    }
-
-    @Test
-    default void unregisterShouldBeIdempotentForGroups() {
-        MailboxListener listener = newListener();
-
-        Registration registration = eventBus().register(listener, new GroupA());
-        registration.unregister();
-
-        assertThatCode(registration::unregister)
-            .doesNotThrowAnyException();
-    }
-
-    @Test
-    default void registerShouldAcceptAlreadyUnregisteredGroups() {
-        MailboxListener listener = newListener();
-
-        eventBus().register(listener, new GroupA()).unregister();
-        eventBus().register(listener, new GroupA());
-
-        eventBus().dispatch(EVENT, NO_KEYS).block();
-
-        verify(listener, timeout(ONE_SECOND).times(1)).event(any());
-    }
-
-    @Test
-    default void dispatchShouldNotNotifyRegisteredListenerWhenEmptyKeySet() {
-        MailboxListener listener = newListener();
-        eventBus().register(listener, KEY_1);
-
-        eventBus().dispatch(EVENT, NO_KEYS).block();
-
-        verify(listener, after(FIVE_HUNDRED_MS).never())
-            .event(any());
-    }
-
-    @Test
-    default void dispatchShouldNotNotifyListenerRegisteredOnOtherKeys() {
-        MailboxListener listener = newListener();
-        eventBus().register(listener, KEY_1);
-
-        eventBus().dispatch(EVENT, ImmutableSet.of(KEY_2)).block();
-
-        verify(listener, after(FIVE_HUNDRED_MS).never())
-            .event(any());
-    }
-
-    @Test
-    default void dispatchShouldNotifyRegisteredListeners() {
-        MailboxListener listener = newListener();
-        eventBus().register(listener, KEY_1);
-
-        eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
-
-        verify(listener, timeout(ONE_SECOND).times(1)).event(any());
-    }
-
-    @Test
-    default void dispatchShouldNotifyOnlyRegisteredListener() {
-        MailboxListener listener = newListener();
-        MailboxListener listener2 = newListener();
-        eventBus().register(listener, KEY_1);
-        eventBus().register(listener2, KEY_2);
-
-        eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
-
-        verify(listener, timeout(ONE_SECOND).times(1)).event(any());
-        verify(listener2, after(FIVE_HUNDRED_MS).never())
-            .event(any());
-    }
-
-    @Test
-    default void dispatchShouldNotifyAllListenersRegisteredOnAKey() {
-        MailboxListener listener = newListener();
-        MailboxListener listener2 = newListener();
-        eventBus().register(listener, KEY_1);
-        eventBus().register(listener2, KEY_1);
-
-        eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
-
-        verify(listener, timeout(ONE_SECOND).times(1)).event(any());
-        verify(listener2, timeout(ONE_SECOND).times(1)).event(any());
-    }
-
-    @Test
-    default void registerShouldAllowDuplicatedRegistration() {
-        MailboxListener listener = newListener();
-        eventBus().register(listener, KEY_1);
-        eventBus().register(listener, KEY_1);
-
-        eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
-
-        verify(listener, timeout(ONE_SECOND).times(1)).event(any());
-    }
-
-    @Test
-    default void unregisterShouldRemoveDoubleRegisteredListener() {
-        MailboxListener listener = newListener();
-        eventBus().register(listener, KEY_1);
-        eventBus().register(listener, KEY_1).unregister();
-
-        eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
-
-        verify(listener, after(FIVE_HUNDRED_MS).never())
-            .event(any());
-    }
-
-    @Test
-    default void callingAllUnregisterMethodShouldUnregisterTheListener() {
-        MailboxListener listener = newListener();
-        Registration registration = eventBus().register(listener, KEY_1);
-        eventBus().register(listener, KEY_1).unregister();
-        registration.unregister();
-
-        eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
-
-        verify(listener, after(FIVE_HUNDRED_MS).never())
-            .event(any());
-    }
-
-    @Test
-    default void unregisterShouldHaveNotNotifyWhenCalledOnDifferentKeys() {
-        MailboxListener listener = newListener();
-        eventBus().register(listener, KEY_1);
-        eventBus().register(listener, KEY_2).unregister();
-
-        eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
-
-        verify(listener, timeout(ONE_SECOND).times(1)).event(any());
-    }
-
-    @Test
-    default void unregisterShouldBeIdempotentForKeyRegistrations() {
-        MailboxListener listener = newListener();
-
-        Registration registration = eventBus().register(listener, KEY_1);
-        registration.unregister();
-
-        assertThatCode(registration::unregister)
-            .doesNotThrowAnyException();
-    }
-
-    @Test
-    default void dispatchShouldAcceptSeveralKeys() {
-        MailboxListener listener = newListener();
-        eventBus().register(listener, KEY_1);
-
-        eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1, KEY_2)).block();
-
-        verify(listener, timeout(ONE_SECOND).times(1)).event(any());
-    }
-
-    @Test
-    default void dispatchShouldCallListenerOnceWhenSeveralKeysMatching() {
-        MailboxListener listener = newListener();
-        eventBus().register(listener, KEY_1);
-        eventBus().register(listener, KEY_2);
-
-        eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1, KEY_2)).block();
-
-        verify(listener, timeout(ONE_SECOND).times(1)).event(any());
-    }
-
-    @Test
-    default void dispatchShouldNotNotifyUnregisteredListener() {
-        MailboxListener listener = newListener();
-        eventBus().register(listener, KEY_1).unregister();
-
-        eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
-
-        verify(listener, after(FIVE_HUNDRED_MS).never())
-            .event(any());
-    }
-
-    @Test
-    default void dispatchShouldCallSynchronousListener() {
-        MailboxListener listener = newListener();
-
-        eventBus().register(listener, new GroupA());
-
-        eventBus().dispatch(EVENT, NO_KEYS).block();
-
-        verify(listener, timeout(ONE_SECOND).times(1)).event(any());
-    }
-
-    @Test
-    default void dispatchShouldNotBlockAsynchronousListener() {
-        MailboxListener listener = newListener();
-        when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
-        CountDownLatch latch = new CountDownLatch(1);
-        doAnswer(invocation -> {
-            latch.await();
-            return null;
-        }).when(listener).event(EVENT);
-
-        assertTimeout(Duration.ofSeconds(2),
-            () -> {
-                eventBus().dispatch(EVENT, NO_KEYS).block();
-                latch.countDown();
-            });
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/james-project/blob/bca9ba01/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java
new file mode 100644
index 0000000..ae2116d
--- /dev/null
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java
@@ -0,0 +1,65 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+
+import org.apache.james.core.User;
+import org.apache.james.mailbox.Event;
+import org.apache.james.mailbox.MailboxListener;
+import org.apache.james.mailbox.MailboxSession;
+import org.apache.james.mailbox.model.MailboxConstants;
+import org.apache.james.mailbox.model.MailboxId;
+import org.apache.james.mailbox.model.MailboxPath;
+import org.apache.james.mailbox.model.TestId;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+public interface EventBusTestFixture {
+
+    class GroupA extends Group {}
+    class GroupB extends Group {}
+
+    MailboxListener.MailboxEvent EVENT = new MailboxListener.MailboxAdded(
+        MailboxSession.SessionId.of(42),
+        User.fromUsername("user"),
+        new MailboxPath(MailboxConstants.USER_NAMESPACE, "user", "mailboxName"),
+        TestId.of(18),
+        Event.EventId.random());
+
+    int ONE_SECOND = 1000;
+    int FIVE_HUNDRED_MS = 500;
+    MailboxId ID_1 = TestId.of(18);
+    MailboxId ID_2 = TestId.of(24);
+    ImmutableSet<RegistrationKey> NO_KEYS = ImmutableSet.of();
+    MailboxIdRegistrationKey KEY_1 = new MailboxIdRegistrationKey(ID_1);
+    MailboxIdRegistrationKey KEY_2 = new MailboxIdRegistrationKey(ID_2);
+    List<Class<? extends Group>> ALL_GROUPS = ImmutableList.of(GroupA.class, GroupB.class);
+
+    static MailboxListener newListener() {
+        MailboxListener listener = mock(MailboxListener.class);
+        when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
+        return listener;
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/bca9ba01/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventDeadLettersContract.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventDeadLettersContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventDeadLettersContract.java
index d70395a..e2282a1 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventDeadLettersContract.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventDeadLettersContract.java
@@ -86,8 +86,8 @@ interface EventDeadLettersContract {
     MailboxListener.MailboxAdded EVENT_2 = new MailboxListener.MailboxAdded(SESSION_ID, USER, MAILBOX_PATH, MAILBOX_ID, EVENT_ID_2);
     MailboxListener.MailboxAdded EVENT_3 = new MailboxListener.MailboxAdded(SESSION_ID, USER, MAILBOX_PATH, MAILBOX_ID, EVENT_ID_3);
 
-    Group GROUP_A = new EventBusContract.GroupA();
-    Group GROUP_B = new EventBusContract.GroupB();
+    Group GROUP_A = new EventBusTestFixture.GroupA();
+    Group GROUP_B = new EventBusTestFixture.GroupB();
     Group NULL_GROUP = null;
     Event NULL_EVENT = null;
     Event.EventId NULL_EVENT_ID = null;

http://git-wip-us.apache.org/repos/asf/james-project/blob/bca9ba01/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
new file mode 100644
index 0000000..0975e3b
--- /dev/null
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
@@ -0,0 +1,167 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events;
+
+import static org.apache.james.mailbox.events.EventBusTestFixture.EVENT;
+import static org.apache.james.mailbox.events.EventBusTestFixture.FIVE_HUNDRED_MS;
+import static org.apache.james.mailbox.events.EventBusTestFixture.GroupA;
+import static org.apache.james.mailbox.events.EventBusTestFixture.GroupB;
+import static org.apache.james.mailbox.events.EventBusTestFixture.NO_KEYS;
+import static org.apache.james.mailbox.events.EventBusTestFixture.ONE_SECOND;
+import static org.apache.james.mailbox.events.EventBusTestFixture.newListener;
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.after;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+
+import org.apache.james.core.User;
+import org.apache.james.mailbox.Event;
+import org.apache.james.mailbox.MailboxListener;
+import org.apache.james.mailbox.MailboxSession;
+import org.apache.james.mailbox.model.MailboxPath;
+import org.apache.james.mailbox.model.TestId;
+import org.junit.jupiter.api.Test;
+
+import com.google.common.collect.ImmutableSortedMap;
+
+public interface GroupContract {
+
+    interface SingleEventBusGroupContract extends EventBusContract {
+
+        @Test
+        default void listenerGroupShouldReceiveEvents() {
+            MailboxListener listener = newListener();
+
+            eventBus().register(listener, new GroupA());
+
+            eventBus().dispatch(EVENT, NO_KEYS).block();
+
+            verify(listener, timeout(ONE_SECOND).times(1)).event(any());
+        }
+
+        @Test
+        default void groupListenersShouldNotReceiveNoopEvents() {
+            MailboxListener listener = newListener();
+
+            eventBus().register(listener, new GroupA());
+
+            MailboxListener.Added noopEvent = new MailboxListener.Added(MailboxSession.SessionId.of(18), User.fromUsername("bob"), MailboxPath.forUser("bob", "mailbox"), TestId.of(58), ImmutableSortedMap.of(), Event.EventId.random());
+            eventBus().dispatch(noopEvent, NO_KEYS).block();
+
+            verify(listener, after(FIVE_HUNDRED_MS).never())
+                .event(any());
+        }
+
+        @Test
+        default void dispatchShouldNotThrowWhenAGroupListenerFails() {
+            MailboxListener listener = newListener();
+            doThrow(new RuntimeException()).when(listener).event(any());
+
+            eventBus().register(listener, new GroupA());
+
+            assertThatCode(() -> eventBus().dispatch(EVENT, NO_KEYS).block())
+                .doesNotThrowAnyException();
+        }
+
+        @Test
+        default void eachListenerGroupShouldReceiveEvents() {
+            MailboxListener listener = newListener();
+            MailboxListener listener2 = newListener();
+            eventBus().register(listener, new GroupA());
+            eventBus().register(listener2, new GroupB());
+
+            eventBus().dispatch(EVENT, NO_KEYS).block();
+
+            verify(listener, timeout(ONE_SECOND).times(1)).event(any());
+            verify(listener2, timeout(ONE_SECOND).times(1)).event(any());
+        }
+
+        @Test
+        default void unregisteredGroupListenerShouldNotReceiveEvents() {
+            MailboxListener listener = newListener();
+            Registration registration = eventBus().register(listener, new GroupA());
+
+            registration.unregister();
+
+            eventBus().dispatch(EVENT, NO_KEYS).block();
+            verify(listener, after(FIVE_HUNDRED_MS).never())
+                .event(any());
+        }
+
+        @Test
+        default void registerShouldThrowWhenAGroupIsAlreadyUsed() {
+            MailboxListener listener = newListener();
+            MailboxListener listener2 = newListener();
+
+            eventBus().register(listener, new GroupA());
+
+            assertThatThrownBy(() -> eventBus().register(listener2, new GroupA()))
+                .isInstanceOf(GroupAlreadyRegistered.class);
+        }
+
+        @Test
+        default void registerShouldNotThrowOnAnUnregisteredGroup() {
+            MailboxListener listener = newListener();
+            MailboxListener listener2 = newListener();
+
+            eventBus().register(listener, new GroupA()).unregister();
+
+            assertThatCode(() -> eventBus().register(listener2, new GroupA()))
+                .doesNotThrowAnyException();
+        }
+
+        @Test
+        default void unregisterShouldBeIdempotentForGroups() {
+            MailboxListener listener = newListener();
+
+            Registration registration = eventBus().register(listener, new GroupA());
+            registration.unregister();
+
+            assertThatCode(registration::unregister)
+                .doesNotThrowAnyException();
+        }
+
+        @Test
+        default void registerShouldAcceptAlreadyUnregisteredGroups() {
+            MailboxListener listener = newListener();
+
+            eventBus().register(listener, new GroupA()).unregister();
+            eventBus().register(listener, new GroupA());
+
+            eventBus().dispatch(EVENT, NO_KEYS).block();
+
+            verify(listener, timeout(ONE_SECOND).times(1)).event(any());
+        }
+
+        @Test
+        default void dispatchShouldCallSynchronousListener() {
+            MailboxListener listener = newListener();
+
+            eventBus().register(listener, new GroupA());
+
+            eventBus().dispatch(EVENT, NO_KEYS).block();
+
+            verify(listener, timeout(ONE_SECOND).times(1)).event(any());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/bca9ba01/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java
new file mode 100644
index 0000000..bd92d82
--- /dev/null
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java
@@ -0,0 +1,246 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events;
+
+import static org.apache.james.mailbox.events.EventBusTestFixture.EVENT;
+import static org.apache.james.mailbox.events.EventBusTestFixture.FIVE_HUNDRED_MS;
+import static org.apache.james.mailbox.events.EventBusTestFixture.KEY_1;
+import static org.apache.james.mailbox.events.EventBusTestFixture.KEY_2;
+import static org.apache.james.mailbox.events.EventBusTestFixture.NO_KEYS;
+import static org.apache.james.mailbox.events.EventBusTestFixture.ONE_SECOND;
+import static org.apache.james.mailbox.events.EventBusTestFixture.newListener;
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.junit.jupiter.api.Assertions.assertTimeout;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.after;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.time.Duration;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.james.core.User;
+import org.apache.james.mailbox.Event;
+import org.apache.james.mailbox.MailboxListener;
+import org.apache.james.mailbox.MailboxSession;
+import org.apache.james.mailbox.model.MailboxPath;
+import org.apache.james.mailbox.model.TestId;
+import org.junit.jupiter.api.Test;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableSortedMap;
+
+public interface KeyContract extends EventBusContract {
+
+    interface SingleEventBusKeyContract extends EventBusContract {
+        @Test
+        default void registeredListenersShouldNotReceiveNoopEvents() {
+            MailboxListener listener = newListener();
+
+            eventBus().register(listener, KEY_1);
+
+            MailboxListener.Added noopEvent = new MailboxListener.Added(MailboxSession.SessionId.of(18), User.fromUsername("bob"), MailboxPath.forUser("bob", "mailbox"), TestId.of(58), ImmutableSortedMap.of(), Event.EventId.random());
+            eventBus().dispatch(noopEvent, KEY_1).block();
+
+            verify(listener, after(FIVE_HUNDRED_MS).never())
+                .event(any());
+        }
+
+        @Test
+        default void dispatchShouldNotThrowWhenARegisteredListenerFails() {
+            MailboxListener listener = newListener();
+            doThrow(new RuntimeException()).when(listener).event(any());
+
+            eventBus().register(listener, KEY_1);
+
+            assertThatCode(() -> eventBus().dispatch(EVENT, NO_KEYS).block())
+                .doesNotThrowAnyException();
+        }
+
+        @Test
+        default void dispatchShouldNotNotifyRegisteredListenerWhenEmptyKeySet() {
+            MailboxListener listener = newListener();
+            eventBus().register(listener, KEY_1);
+
+            eventBus().dispatch(EVENT, NO_KEYS).block();
+
+            verify(listener, after(FIVE_HUNDRED_MS).never())
+                .event(any());
+        }
+
+        @Test
+        default void dispatchShouldNotNotifyListenerRegisteredOnOtherKeys() {
+            MailboxListener listener = newListener();
+            eventBus().register(listener, KEY_1);
+
+            eventBus().dispatch(EVENT, ImmutableSet.of(KEY_2)).block();
+
+            verify(listener, after(FIVE_HUNDRED_MS).never())
+                .event(any());
+        }
+
+        @Test
+        default void dispatchShouldNotifyRegisteredListeners() {
+            MailboxListener listener = newListener();
+            eventBus().register(listener, KEY_1);
+
+            eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
+
+            verify(listener, timeout(ONE_SECOND).times(1)).event(any());
+        }
+
+        @Test
+        default void dispatchShouldNotifyOnlyRegisteredListener() {
+            MailboxListener listener = newListener();
+            MailboxListener listener2 = newListener();
+            eventBus().register(listener, KEY_1);
+            eventBus().register(listener2, KEY_2);
+
+            eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
+
+            verify(listener, timeout(ONE_SECOND).times(1)).event(any());
+            verify(listener2, after(FIVE_HUNDRED_MS).never())
+                .event(any());
+        }
+
+        @Test
+        default void dispatchShouldNotifyAllListenersRegisteredOnAKey() {
+            MailboxListener listener = newListener();
+            MailboxListener listener2 = newListener();
+            eventBus().register(listener, KEY_1);
+            eventBus().register(listener2, KEY_1);
+
+            eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
+
+            verify(listener, timeout(ONE_SECOND).times(1)).event(any());
+            verify(listener2, timeout(ONE_SECOND).times(1)).event(any());
+        }
+
+        @Test
+        default void registerShouldAllowDuplicatedRegistration() {
+            MailboxListener listener = newListener();
+            eventBus().register(listener, KEY_1);
+            eventBus().register(listener, KEY_1);
+
+            eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
+
+            verify(listener, timeout(ONE_SECOND).times(1)).event(any());
+        }
+
+        @Test
+        default void unregisterShouldRemoveDoubleRegisteredListener() {
+            MailboxListener listener = newListener();
+            eventBus().register(listener, KEY_1);
+            eventBus().register(listener, KEY_1).unregister();
+
+            eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
+
+            verify(listener, after(FIVE_HUNDRED_MS).never())
+                .event(any());
+        }
+
+        @Test
+        default void callingAllUnregisterMethodShouldUnregisterTheListener() {
+            MailboxListener listener = newListener();
+            Registration registration = eventBus().register(listener, KEY_1);
+            eventBus().register(listener, KEY_1).unregister();
+            registration.unregister();
+
+            eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
+
+            verify(listener, after(FIVE_HUNDRED_MS).never())
+                .event(any());
+        }
+
+        @Test
+        default void unregisterShouldHaveNotNotifyWhenCalledOnDifferentKeys() {
+            MailboxListener listener = newListener();
+            eventBus().register(listener, KEY_1);
+            eventBus().register(listener, KEY_2).unregister();
+
+            eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
+
+            verify(listener, timeout(ONE_SECOND).times(1)).event(any());
+        }
+
+        @Test
+        default void unregisterShouldBeIdempotentForKeyRegistrations() {
+            MailboxListener listener = newListener();
+
+            Registration registration = eventBus().register(listener, KEY_1);
+            registration.unregister();
+
+            assertThatCode(registration::unregister)
+                .doesNotThrowAnyException();
+        }
+
+        @Test
+        default void dispatchShouldAcceptSeveralKeys() {
+            MailboxListener listener = newListener();
+            eventBus().register(listener, KEY_1);
+
+            eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1, KEY_2)).block();
+
+            verify(listener, timeout(ONE_SECOND).times(1)).event(any());
+        }
+
+        @Test
+        default void dispatchShouldCallListenerOnceWhenSeveralKeysMatching() {
+            MailboxListener listener = newListener();
+            eventBus().register(listener, KEY_1);
+            eventBus().register(listener, KEY_2);
+
+            eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1, KEY_2)).block();
+
+            verify(listener, timeout(ONE_SECOND).times(1)).event(any());
+        }
+
+        @Test
+        default void dispatchShouldNotNotifyUnregisteredListener() {
+            MailboxListener listener = newListener();
+            eventBus().register(listener, KEY_1).unregister();
+
+            eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
+
+            verify(listener, after(FIVE_HUNDRED_MS).never())
+                .event(any());
+        }
+
+        @Test
+        default void dispatchShouldNotBlockAsynchronousListener() {
+            MailboxListener listener = newListener();
+            when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
+            CountDownLatch latch = new CountDownLatch(1);
+            doAnswer(invocation -> {
+                latch.await();
+                return null;
+            }).when(listener).event(EVENT);
+
+            assertTimeout(Duration.ofSeconds(2),
+                () -> {
+                    eventBus().dispatch(EVENT, NO_KEYS).block();
+                    latch.countDown();
+                });
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/bca9ba01/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/InVMEventBusTest.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/InVMEventBusTest.java b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/InVMEventBusTest.java
index 6d278aa..0241ac7 100644
--- a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/InVMEventBusTest.java
+++ b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/InVMEventBusTest.java
@@ -23,7 +23,7 @@ import org.apache.james.mailbox.events.delivery.InVmEventDelivery;
 import org.apache.james.metrics.api.NoopMetricFactory;
 import org.junit.jupiter.api.BeforeEach;
 
-public class InVMEventBusTest implements EventBusContract {
+public class InVMEventBusTest implements KeyContract.SingleEventBusKeyContract, GroupContract.SingleEventBusGroupContract {
     private InVMEventBus eventBus;
 
     @BeforeEach


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[3/8] james-project git commit: JAMES-2630 Add Functional utils

Posted by bt...@apache.org.
JAMES-2630 Add Functional utils


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/08c2f450
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/08c2f450
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/08c2f450

Branch: refs/heads/master
Commit: 08c2f450ad1151426ecc78010f407960a0d21834
Parents: ebab03a
Author: Gautier DI FOLCO <gd...@linagora.com>
Authored: Tue Jan 8 09:46:01 2019 +0100
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Fri Jan 11 16:57:08 2019 +0700

----------------------------------------------------------------------
 .../org/apache/james/util/FunctionalUtils.java  | 37 ++++++++++
 .../apache/james/util/FunctionalUtilsTest.java  | 72 ++++++++++++++++++++
 2 files changed, 109 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/08c2f450/server/container/util/src/main/java/org/apache/james/util/FunctionalUtils.java
----------------------------------------------------------------------
diff --git a/server/container/util/src/main/java/org/apache/james/util/FunctionalUtils.java b/server/container/util/src/main/java/org/apache/james/util/FunctionalUtils.java
new file mode 100644
index 0000000..54c8305
--- /dev/null
+++ b/server/container/util/src/main/java/org/apache/james/util/FunctionalUtils.java
@@ -0,0 +1,37 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.util;
+
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.UnaryOperator;
+
+public class FunctionalUtils {
+    public static <T> UnaryOperator<T> toFunction(Consumer<T> consumer) {
+        return argument -> {
+            consumer.accept(argument);
+            return argument;
+        };
+    }
+
+    public static <T> Predicate<T> toPredicate(Function<T, Boolean> function) {
+        return value -> function.apply(value);
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/08c2f450/server/container/util/src/test/java/org/apache/james/util/FunctionalUtilsTest.java
----------------------------------------------------------------------
diff --git a/server/container/util/src/test/java/org/apache/james/util/FunctionalUtilsTest.java b/server/container/util/src/test/java/org/apache/james/util/FunctionalUtilsTest.java
new file mode 100644
index 0000000..8387a53
--- /dev/null
+++ b/server/container/util/src/test/java/org/apache/james/util/FunctionalUtilsTest.java
@@ -0,0 +1,72 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.util;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Predicate;
+
+class FunctionalUtilsTest {
+
+    @Nested
+    class ToFunction {
+        @Test
+        void shouldCallConsumerAndReturnTheGivenParameter() {
+            Counter counter = new Counter(26);
+            Consumer<Integer> consumer = counter::increment;
+            Function<Integer, Integer> function = FunctionalUtils.toFunction(consumer);
+
+            assertThat(function.apply(16)).isEqualTo(16);
+            assertThat(counter.getCounter()).isEqualTo(42);
+        }
+
+        private class Counter {
+            private Integer counter;
+
+            public Counter(Integer counter) {
+                this.counter = counter;
+            }
+
+            public void increment(Integer other) {
+                counter += other;
+            }
+
+            public Integer getCounter() {
+                return counter;
+            }
+        }
+    }
+
+    @Nested
+    class ToPredicate {
+        @Test
+        void shouldKeepProperty() {
+            Function<Integer, Boolean> function = value -> value % 42 == 0;
+            Predicate<Integer> predicate = FunctionalUtils.toPredicate(function);
+
+            assertThat(predicate.test(5)).isFalse();
+            assertThat(predicate.test(42)).isTrue();
+        }
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[4/8] james-project git commit: JAMES-2630 Add Reactor utils

Posted by bt...@apache.org.
JAMES-2630 Add Reactor utils


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/92842f40
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/92842f40
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/92842f40

Branch: refs/heads/master
Commit: 92842f40a6efba09fcbdb9f1295bd2cd719be6d7
Parents: 08c2f45
Author: Gautier DI FOLCO <gd...@linagora.com>
Authored: Tue Jan 8 09:46:52 2019 +0100
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Fri Jan 11 16:57:08 2019 +0700

----------------------------------------------------------------------
 .../org/apache/james/util/ReactorUtils.java     | 27 ++++++++
 .../org/apache/james/util/ReactorUtilsTest.java | 72 ++++++++++++++++++++
 2 files changed, 99 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/92842f40/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
----------------------------------------------------------------------
diff --git a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
new file mode 100644
index 0000000..42e6d8e
--- /dev/null
+++ b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
@@ -0,0 +1,27 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.util;
+
+import reactor.core.publisher.Mono;
+
+public class ReactorUtils {
+    public static <T> Mono<T> executeAndEmpty(Runnable runnable) {
+        return Mono.fromRunnable(runnable).then(Mono.empty());
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/92842f40/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
----------------------------------------------------------------------
diff --git a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
new file mode 100644
index 0000000..3c991e0
--- /dev/null
+++ b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
@@ -0,0 +1,72 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.util;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+
+import reactor.core.publisher.Mono;
+
+class ReactorUtilsTest {
+
+    @Nested
+    class ExecuteAndEmpty {
+        @Test
+        void shouldExecuteTheRunnableAndReturnEmpty() {
+            Counter counter = new Counter(1);
+
+            Mono<?> reactor = Mono.empty()
+                    .switchIfEmpty(ReactorUtils.executeAndEmpty(() -> counter.increment(2)))
+                    .map(FunctionalUtils.toFunction(any -> counter.increment(4)));
+
+            assertThat(reactor.hasElement().block()).isFalse();
+            assertThat(counter.getCounter()).isEqualTo(3);
+        }
+
+        @Test
+        void shouldNotExecuteTheRunnableAndReturnTheValue() {
+            Counter counter = new Counter(1);
+
+            Mono<?> reactor = Mono.just(42)
+                    .switchIfEmpty(ReactorUtils.executeAndEmpty(() -> counter.increment(2)))
+                    .map(FunctionalUtils.toFunction(any -> counter.increment(4)));
+
+            assertThat(reactor.hasElement().block()).isTrue();
+            assertThat(counter.getCounter()).isEqualTo(5);
+        }
+
+        private class Counter {
+            private Integer counter;
+
+            public Counter(Integer counter) {
+                this.counter = counter;
+            }
+
+            public void increment(Integer other) {
+                counter += other;
+            }
+
+            public Integer getCounter() {
+                return counter;
+            }
+        }
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[2/8] james-project git commit: MAILBOX-368 RabbitMQEventBus group registration

Posted by bt...@apache.org.
MAILBOX-368 RabbitMQEventBus group registration


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/ab2fa642
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/ab2fa642
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/ab2fa642

Branch: refs/heads/master
Commit: ab2fa64264a67c702575a1c50fddeecfd9080027
Parents: 16ac299
Author: datph <dp...@linagora.com>
Authored: Tue Jan 8 14:21:15 2019 +0700
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Fri Jan 11 16:57:08 2019 +0700

----------------------------------------------------------------------
 mailbox/event/event-rabbitmq/pom.xml            |   5 +
 .../james/mailbox/events/EventDispatcher.java   |  69 +++++++++
 .../james/mailbox/events/GroupRegistration.java | 152 +++++++++++++++++++
 .../events/GroupRegistrationHandler.java        |  70 +++++++++
 .../james/mailbox/events/RabbitMQEventBus.java  |  54 +++----
 .../events/RabbitMQEventBusPublishingTest.java  | 121 ---------------
 .../mailbox/events/RabbitMQEventBusTest.java    | 148 ++++++++++++++++++
 7 files changed, 467 insertions(+), 152 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/ab2fa642/mailbox/event/event-rabbitmq/pom.xml
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/pom.xml b/mailbox/event/event-rabbitmq/pom.xml
index c671468..0085059 100644
--- a/mailbox/event/event-rabbitmq/pom.xml
+++ b/mailbox/event/event-rabbitmq/pom.xml
@@ -91,6 +91,11 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.testcontainers</groupId>
             <artifactId>testcontainers</artifactId>
             <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/james-project/blob/ab2fa642/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
new file mode 100644
index 0000000..b01fc53
--- /dev/null
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
@@ -0,0 +1,69 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events;
+
+import static org.apache.james.backend.rabbitmq.Constants.DIRECT_EXCHANGE;
+import static org.apache.james.backend.rabbitmq.Constants.DURABLE;
+import static org.apache.james.backend.rabbitmq.Constants.EMPTY_ROUTING_KEY;
+import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT_EXCHANGE_NAME;
+
+import java.nio.charset.StandardCharsets;
+
+import org.apache.james.event.json.EventSerializer;
+import org.apache.james.mailbox.Event;
+
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.MonoProcessor;
+import reactor.core.scheduler.Schedulers;
+import reactor.rabbitmq.ExchangeSpecification;
+import reactor.rabbitmq.OutboundMessage;
+import reactor.rabbitmq.Sender;
+
+public class EventDispatcher {
+    private final EventSerializer eventSerializer;
+    private final Sender sender;
+
+    EventDispatcher(EventSerializer eventSerializer, Sender sender) {
+        this.eventSerializer = eventSerializer;
+        this.sender = sender;
+    }
+
+    void start() {
+        sender.declareExchange(ExchangeSpecification.exchange(MAILBOX_EVENT_EXCHANGE_NAME)
+            .durable(DURABLE)
+            .type(DIRECT_EXCHANGE))
+            .block();
+    }
+
+    Mono<Void> dispatch(Event event) {
+        Mono<OutboundMessage> outboundMessage = Mono.just(event)
+            .publishOn(Schedulers.parallel())
+            .map(this::serializeEvent)
+            .map(payload -> new OutboundMessage(MAILBOX_EVENT_EXCHANGE_NAME, EMPTY_ROUTING_KEY, payload));
+
+        return sender.send(outboundMessage)
+            .subscribeWith(MonoProcessor.create());
+    }
+
+    private byte[] serializeEvent(Event event) {
+        return eventSerializer.toJson(event).getBytes(StandardCharsets.UTF_8);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/ab2fa642/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
new file mode 100644
index 0000000..36db6ae
--- /dev/null
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
@@ -0,0 +1,152 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events;
+
+import static org.apache.james.backend.rabbitmq.Constants.AUTO_DELETE;
+import static org.apache.james.backend.rabbitmq.Constants.DURABLE;
+import static org.apache.james.backend.rabbitmq.Constants.EMPTY_ROUTING_KEY;
+import static org.apache.james.backend.rabbitmq.Constants.EXCLUSIVE;
+import static org.apache.james.backend.rabbitmq.Constants.NO_ARGUMENTS;
+import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT;
+import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT_EXCHANGE_NAME;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.apache.james.event.json.EventSerializer;
+import org.apache.james.mailbox.Event;
+import org.apache.james.mailbox.MailboxListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.Delivery;
+
+import play.api.libs.json.JsResult;
+import reactor.core.Disposable;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+import reactor.rabbitmq.BindingSpecification;
+import reactor.rabbitmq.QueueSpecification;
+import reactor.rabbitmq.RabbitFlux;
+import reactor.rabbitmq.Receiver;
+import reactor.rabbitmq.ReceiverOptions;
+import reactor.rabbitmq.Sender;
+
+class GroupRegistration implements Registration {
+    static class WorkQueueName {
+        @VisibleForTesting
+        static WorkQueueName of(Class<? extends Group> clazz) {
+            return new WorkQueueName(clazz.getName());
+        }
+
+        static WorkQueueName of(Group group) {
+            return of(group.getClass());
+        }
+
+        static final String MAILBOX_EVENT_WORK_QUEUE_PREFIX = MAILBOX_EVENT + "-workQueue-";
+
+        private final String name;
+
+        private WorkQueueName(String name) {
+            Preconditions.checkArgument(!Strings.isNullOrEmpty(name), "Queue name must be specified");
+            this.name = name;
+        }
+
+        String asString() {
+            return MAILBOX_EVENT_WORK_QUEUE_PREFIX + name;
+        }
+    }
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(GroupRegistration.class);
+
+    private final MailboxListener mailboxListener;
+    private final WorkQueueName queueName;
+    private final Receiver receiver;
+    private final Runnable unregisterGroup;
+    private final Sender sender;
+    private final EventSerializer eventSerializer;
+    private Optional<Disposable> receiverSubscriber;
+
+    GroupRegistration(Mono<Connection> connectionSupplier, Sender sender, EventSerializer eventSerializer,
+                              MailboxListener mailboxListener, Group group, Runnable unregisterGroup) {
+        this.eventSerializer = eventSerializer;
+        this.mailboxListener = mailboxListener;
+        this.queueName = WorkQueueName.of(group);
+        this.sender = sender;
+        this.receiver = RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(connectionSupplier));
+        this.receiverSubscriber = Optional.empty();
+        this.unregisterGroup = unregisterGroup;
+    }
+
+    GroupRegistration start() {
+        createGroupWorkQueue()
+            .doOnSuccess(any -> this.subscribeWorkQueue())
+            .block();
+        return this;
+    }
+
+    private Mono<Void> createGroupWorkQueue() {
+        return Flux.concat(
+            sender.declareQueue(QueueSpecification.queue(queueName.asString())
+                .durable(DURABLE)
+                .exclusive(!EXCLUSIVE)
+                .autoDelete(!AUTO_DELETE)
+                .arguments(NO_ARGUMENTS)),
+            sender.bind(BindingSpecification.binding()
+                .exchange(MAILBOX_EVENT_EXCHANGE_NAME)
+                .queue(queueName.asString())
+                .routingKey(EMPTY_ROUTING_KEY)))
+            .then();
+    }
+
+    private void subscribeWorkQueue() {
+        receiverSubscriber = Optional.of(receiver.consumeAutoAck(queueName.asString())
+            .subscribeOn(Schedulers.parallel())
+            .map(Delivery::getBody)
+            .filter(Objects::nonNull)
+            .map(eventInBytes -> new String(eventInBytes, StandardCharsets.UTF_8))
+            .map(eventSerializer::fromJson)
+            .map(JsResult::get)
+            .subscribeOn(Schedulers.elastic())
+            .subscribe(event -> deliverEvent(mailboxListener, event)));
+    }
+
+    private void deliverEvent(MailboxListener mailboxListener, Event event) {
+        try {
+            mailboxListener.event(event);
+        } catch (Exception e) {
+            LOGGER.error("Exception happens when handling event of user {}", event.getUser().asString(), e);
+        }
+    }
+
+    @Override
+    public void unregister() {
+        receiverSubscriber.filter(subscriber -> !subscriber.isDisposed())
+            .ifPresent(subscriber -> subscriber.dispose());
+        receiver.close();
+        unregisterGroup.run();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/james-project/blob/ab2fa642/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistrationHandler.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistrationHandler.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistrationHandler.java
new file mode 100644
index 0000000..c0f4339
--- /dev/null
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistrationHandler.java
@@ -0,0 +1,70 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.james.event.json.EventSerializer;
+import org.apache.james.mailbox.MailboxListener;
+
+import com.rabbitmq.client.Connection;
+
+import reactor.core.publisher.Mono;
+import reactor.rabbitmq.Sender;
+
+class GroupRegistrationHandler {
+    private final Map<Group, GroupRegistration> groupRegistrations;
+    private final EventSerializer eventSerializer;
+    private final Sender sender;
+    private final Mono<Connection> connectionMono;
+
+    GroupRegistrationHandler(EventSerializer eventSerializer, Sender sender, Mono<Connection> connectionMono) {
+        this.eventSerializer = eventSerializer;
+        this.sender = sender;
+        this.connectionMono = connectionMono;
+        this.groupRegistrations = new ConcurrentHashMap<>();
+    }
+
+    void stop() {
+        groupRegistrations.values().forEach(GroupRegistration::unregister);
+    }
+
+    Registration register(MailboxListener listener, Group group) {
+        return groupRegistrations
+            .compute(group, (groupToRegister, oldGroupRegistration) -> {
+                if (oldGroupRegistration != null) {
+                    throw new GroupAlreadyRegistered(group);
+                }
+                return newGroupRegistration(listener, groupToRegister);
+            })
+            .start();
+    }
+
+    private GroupRegistration newGroupRegistration(MailboxListener listener, Group group) {
+        return new GroupRegistration(
+            connectionMono,
+            sender,
+            eventSerializer,
+            listener,
+            group,
+            () -> groupRegistrations.remove(group));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/james-project/blob/ab2fa642/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
index 4ce792a..accd6cc 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
@@ -19,19 +19,19 @@
 
 package org.apache.james.mailbox.events;
 
-import java.nio.charset.StandardCharsets;
 import java.util.Set;
 
+import javax.annotation.PreDestroy;
+
 import org.apache.commons.lang3.NotImplementedException;
 import org.apache.james.backend.rabbitmq.RabbitMQConnectionFactory;
 import org.apache.james.event.json.EventSerializer;
 import org.apache.james.mailbox.Event;
 import org.apache.james.mailbox.MailboxListener;
 
+import com.rabbitmq.client.Connection;
+
 import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Schedulers;
-import reactor.rabbitmq.ExchangeSpecification;
-import reactor.rabbitmq.OutboundMessage;
 import reactor.rabbitmq.RabbitFlux;
 import reactor.rabbitmq.Sender;
 import reactor.rabbitmq.SenderOptions;
@@ -39,26 +39,26 @@ import reactor.rabbitmq.SenderOptions;
 public class RabbitMQEventBus implements EventBus {
     static final String MAILBOX_EVENT = "mailboxEvent";
     static final String MAILBOX_EVENT_EXCHANGE_NAME = MAILBOX_EVENT + "-exchange";
-    static final String EMPTY_ROUTING_KEY = "";
-
-    private static final boolean DURABLE = true;
-    private static final String DIRECT_EXCHANGE = "direct";
 
-    private final EventSerializer eventSerializer;
     private final Sender sender;
+    private final GroupRegistrationHandler groupRegistrationHandler;
+    private final EventDispatcher eventDispatcher;
 
     RabbitMQEventBus(RabbitMQConnectionFactory rabbitMQConnectionFactory, EventSerializer eventSerializer) {
-        this.eventSerializer = eventSerializer;
-        SenderOptions senderOption = new SenderOptions().connectionMono(Mono.fromSupplier(rabbitMQConnectionFactory::create));
-        this.sender = RabbitFlux.createSender(senderOption);
+        Mono<Connection> connectionMono = Mono.fromSupplier(rabbitMQConnectionFactory::create).cache();
+        this.sender = RabbitFlux.createSender(new SenderOptions().connectionMono(connectionMono));
+        this.groupRegistrationHandler = new GroupRegistrationHandler(eventSerializer, sender, connectionMono);
+        this.eventDispatcher = new EventDispatcher(eventSerializer, sender);
+    }
+
+    public void start() {
+        eventDispatcher.start();
     }
 
-    Mono<Void> start() {
-        return sender.declareExchange(ExchangeSpecification.exchange(MAILBOX_EVENT_EXCHANGE_NAME)
-                .durable(DURABLE)
-                .type(DIRECT_EXCHANGE))
-            .subscribeOn(Schedulers.elastic())
-            .then();
+    @PreDestroy
+    public void stop() {
+        groupRegistrationHandler.stop();
+        sender.close();
     }
 
     @Override
@@ -68,22 +68,14 @@ public class RabbitMQEventBus implements EventBus {
 
     @Override
     public Registration register(MailboxListener listener, Group group) {
-        throw new NotImplementedException("will implement latter");
+        return groupRegistrationHandler.register(listener, group);
     }
 
     @Override
     public Mono<Void> dispatch(Event event, Set<RegistrationKey> key) {
-        Mono<OutboundMessage> outboundMessage = Mono.just(event)
-            .publishOn(Schedulers.parallel())
-            .map(this::serializeEvent)
-            .map(payload -> new OutboundMessage(MAILBOX_EVENT_EXCHANGE_NAME, EMPTY_ROUTING_KEY, payload));
-
-        Mono<Void> publishMono = sender.send(outboundMessage).cache();
-        publishMono.subscribe();
-        return publishMono;
-    }
-
-    private byte[] serializeEvent(Event event) {
-        return eventSerializer.toJson(event).getBytes(StandardCharsets.UTF_8);
+        if (!event.isNoop()) {
+            return eventDispatcher.dispatch(event);
+        }
+        return Mono.empty();
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/james-project/blob/ab2fa642/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusPublishingTest.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusPublishingTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusPublishingTest.java
deleted file mode 100644
index 50a37ca..0000000
--- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusPublishingTest.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-
-package org.apache.james.mailbox.events;
-
-import static org.apache.james.backend.rabbitmq.Constants.AUTO_DELETE;
-import static org.apache.james.backend.rabbitmq.Constants.DURABLE;
-import static org.apache.james.backend.rabbitmq.Constants.EXCLUSIVE;
-import static org.apache.james.backend.rabbitmq.Constants.NO_ARGUMENTS;
-import static org.apache.james.mailbox.events.EventBusContract.EVENT;
-import static org.apache.james.mailbox.events.EventBusContract.NO_KEYS;
-import static org.apache.james.mailbox.events.RabbitMQEventBus.EMPTY_ROUTING_KEY;
-import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT;
-import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT_EXCHANGE_NAME;
-import static org.assertj.core.api.Assertions.assertThat;
-
-import java.nio.charset.StandardCharsets;
-
-import org.apache.james.backend.rabbitmq.RabbitMQConnectionFactory;
-import org.apache.james.backend.rabbitmq.RabbitMQExtension;
-import org.apache.james.event.json.EventSerializer;
-import org.apache.james.mailbox.Event;
-import org.apache.james.mailbox.model.TestId;
-import org.apache.james.mailbox.model.TestMessageId;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.RegisterExtension;
-
-import com.google.common.collect.ImmutableSet;
-
-import reactor.core.publisher.Mono;
-import reactor.rabbitmq.BindingSpecification;
-import reactor.rabbitmq.QueueSpecification;
-import reactor.rabbitmq.RabbitFlux;
-import reactor.rabbitmq.Receiver;
-import reactor.rabbitmq.ReceiverOptions;
-import reactor.rabbitmq.Sender;
-import reactor.rabbitmq.SenderOptions;
-
-class RabbitMQEventBusPublishingTest {
-    private static final String MAILBOX_WORK_QUEUE_NAME = MAILBOX_EVENT + "-workQueue";
-
-    @RegisterExtension
-    static RabbitMQExtension rabbitMQExtension = new RabbitMQExtension();
-
-    private RabbitMQEventBus eventBus;
-    private EventSerializer eventSerializer;
-    private RabbitMQConnectionFactory connectionFactory;
-
-    @BeforeEach
-    void setUp() {
-        connectionFactory = rabbitMQExtension.getConnectionFactory();
-
-        eventSerializer = new EventSerializer(new TestId.Factory(), new TestMessageId.Factory());
-        eventBus = new RabbitMQEventBus(connectionFactory, eventSerializer);
-        eventBus.start().block();
-
-        createQueue();
-    }
-
-    private void createQueue() {
-        SenderOptions senderOption = new SenderOptions()
-            .connectionMono(Mono.fromSupplier(connectionFactory::create));
-        Sender sender = RabbitFlux.createSender(senderOption);
-
-        sender.declareQueue(QueueSpecification.queue(MAILBOX_WORK_QUEUE_NAME)
-            .durable(DURABLE)
-            .exclusive(!EXCLUSIVE)
-            .autoDelete(!AUTO_DELETE)
-            .arguments(NO_ARGUMENTS))
-            .block();
-        sender.bind(BindingSpecification.binding()
-            .exchange(MAILBOX_EVENT_EXCHANGE_NAME)
-            .queue(MAILBOX_WORK_QUEUE_NAME)
-            .routingKey(EMPTY_ROUTING_KEY))
-            .block();
-    }
-
-    @Test
-    void dispatchShouldPublishSerializedEventToRabbitMQ() {
-        eventBus.dispatch(EVENT, NO_KEYS).block();
-
-        assertThat(dequeueEvent()).isEqualTo(EVENT);
-    }
-
-    @Test
-    void dispatchShouldPublishSerializedEventToRabbitMQWhenNotBlocking() {
-        eventBus.dispatch(EVENT, NO_KEYS);
-
-        assertThat(dequeueEvent()).isEqualTo(EVENT);
-    }
-
-
-    private Event dequeueEvent() {
-        RabbitMQConnectionFactory connectionFactory = rabbitMQExtension.getConnectionFactory();
-        Receiver receiver = RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(Mono.just(connectionFactory.create())));
-
-        byte[] eventInBytes = receiver.consumeAutoAck(MAILBOX_WORK_QUEUE_NAME)
-            .blockFirst()
-            .getBody();
-
-        return eventSerializer.fromJson(new String(eventInBytes, StandardCharsets.UTF_8))
-            .get();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/james-project/blob/ab2fa642/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
new file mode 100644
index 0000000..b39fb42
--- /dev/null
+++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
@@ -0,0 +1,148 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events;
+
+import static org.apache.james.backend.rabbitmq.Constants.AUTO_DELETE;
+import static org.apache.james.backend.rabbitmq.Constants.DURABLE;
+import static org.apache.james.backend.rabbitmq.Constants.EMPTY_ROUTING_KEY;
+import static org.apache.james.backend.rabbitmq.Constants.EXCLUSIVE;
+import static org.apache.james.backend.rabbitmq.Constants.NO_ARGUMENTS;
+import static org.apache.james.mailbox.events.EventBusTestFixture.ALL_GROUPS;
+import static org.apache.james.mailbox.events.EventBusTestFixture.EVENT;
+import static org.apache.james.mailbox.events.EventBusTestFixture.NO_KEYS;
+import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT;
+import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT_EXCHANGE_NAME;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.nio.charset.StandardCharsets;
+
+import org.apache.james.backend.rabbitmq.RabbitMQConnectionFactory;
+import org.apache.james.backend.rabbitmq.RabbitMQExtension;
+import org.apache.james.event.json.EventSerializer;
+import org.apache.james.mailbox.Event;
+import org.apache.james.mailbox.model.TestId;
+import org.apache.james.mailbox.model.TestMessageId;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import com.rabbitmq.client.Connection;
+
+import reactor.core.publisher.Mono;
+import reactor.rabbitmq.BindingSpecification;
+import reactor.rabbitmq.QueueSpecification;
+import reactor.rabbitmq.RabbitFlux;
+import reactor.rabbitmq.Receiver;
+import reactor.rabbitmq.ReceiverOptions;
+import reactor.rabbitmq.Sender;
+import reactor.rabbitmq.SenderOptions;
+
+class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract {
+
+    @RegisterExtension
+    static RabbitMQExtension rabbitMQExtension = new RabbitMQExtension();
+
+    private RabbitMQEventBus eventBus;
+    private Sender sender;
+    private RabbitMQConnectionFactory connectionFactory;
+    private EventSerializer eventSerializer;
+
+    @BeforeEach
+    void setUp() {
+        connectionFactory = rabbitMQExtension.getConnectionFactory();
+        Mono<Connection> connectionMono = Mono.fromSupplier(connectionFactory::create).cache();
+
+        TestId.Factory mailboxIdFactory = new TestId.Factory();
+        eventSerializer = new EventSerializer(mailboxIdFactory, new TestMessageId.Factory());
+
+        eventBus = new RabbitMQEventBus(connectionFactory, eventSerializer);
+        eventBus.start();
+        sender = RabbitFlux.createSender(new SenderOptions().connectionMono(connectionMono));
+    }
+
+    @AfterEach
+    void tearDown() {
+        eventBus.stop();
+        ALL_GROUPS.stream()
+            .map(groupClass -> GroupRegistration.WorkQueueName.of(groupClass).asString())
+            .forEach(queueName -> sender.delete(QueueSpecification.queue(queueName)).block());
+    }
+
+    @Override
+    public EventBus eventBus() {
+        return eventBus;
+    }
+
+    @Nested
+    class PublishingTest {
+        private static final String MAILBOX_WORK_QUEUE_NAME = MAILBOX_EVENT + "-workQueue";
+
+        @BeforeEach
+        void setUp() {
+            createQueue();
+        }
+
+        private void createQueue() {
+            SenderOptions senderOption = new SenderOptions()
+                .connectionMono(Mono.fromSupplier(connectionFactory::create));
+            Sender sender = RabbitFlux.createSender(senderOption);
+
+            sender.declareQueue(QueueSpecification.queue(MAILBOX_WORK_QUEUE_NAME)
+                .durable(DURABLE)
+                .exclusive(!EXCLUSIVE)
+                .autoDelete(!AUTO_DELETE)
+                .arguments(NO_ARGUMENTS))
+                .block();
+            sender.bind(BindingSpecification.binding()
+                .exchange(MAILBOX_EVENT_EXCHANGE_NAME)
+                .queue(MAILBOX_WORK_QUEUE_NAME)
+                .routingKey(EMPTY_ROUTING_KEY))
+                .block();
+        }
+
+        @Test
+        void dispatchShouldPublishSerializedEventToRabbitMQ() {
+            eventBus.dispatch(EVENT, NO_KEYS).block();
+
+            assertThat(dequeueEvent()).isEqualTo(EVENT);
+        }
+
+        @Test
+        void dispatchShouldPublishSerializedEventToRabbitMQWhenNotBlocking() {
+            eventBus.dispatch(EVENT, NO_KEYS);
+
+            assertThat(dequeueEvent()).isEqualTo(EVENT);
+        }
+
+        private Event dequeueEvent() {
+            RabbitMQConnectionFactory connectionFactory = rabbitMQExtension.getConnectionFactory();
+            Receiver receiver = RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(Mono.just(connectionFactory.create())));
+
+            byte[] eventInBytes = receiver.consumeAutoAck(MAILBOX_WORK_QUEUE_NAME)
+                .blockFirst()
+                .getBody();
+
+            return eventSerializer.fromJson(new String(eventInBytes, StandardCharsets.UTF_8))
+                .get();
+        }
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org