You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/01/22 10:09:56 UTC

[1/6] james-project git commit: MAILBOX-374 QuotaRatio Index and Aliases should not include capital letters

Repository: james-project
Updated Branches:
  refs/heads/master 8a42e9ecd -> d51c70854


MAILBOX-374 QuotaRatio Index and Aliases should not include capital letters

This is not supported by ElasticSearch and leads to many EventBus retries


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/8938f5fd
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/8938f5fd
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/8938f5fd

Branch: refs/heads/master
Commit: 8938f5fddc6f56355ceb04597759d32725d023c1
Parents: 061fa39
Author: Benoit Tellier <bt...@linagora.com>
Authored: Fri Jan 18 15:52:20 2019 +0700
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Tue Jan 22 17:07:15 2019 +0700

----------------------------------------------------------------------
 .../search/elasticsearch/QuotaRatioElasticSearchConstants.java | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/8938f5fd/mailbox/plugin/quota-search-elasticsearch/src/main/java/org/apache/james/quota/search/elasticsearch/QuotaRatioElasticSearchConstants.java
----------------------------------------------------------------------
diff --git a/mailbox/plugin/quota-search-elasticsearch/src/main/java/org/apache/james/quota/search/elasticsearch/QuotaRatioElasticSearchConstants.java b/mailbox/plugin/quota-search-elasticsearch/src/main/java/org/apache/james/quota/search/elasticsearch/QuotaRatioElasticSearchConstants.java
index 649ab68..e3ceadd 100644
--- a/mailbox/plugin/quota-search-elasticsearch/src/main/java/org/apache/james/quota/search/elasticsearch/QuotaRatioElasticSearchConstants.java
+++ b/mailbox/plugin/quota-search-elasticsearch/src/main/java/org/apache/james/quota/search/elasticsearch/QuotaRatioElasticSearchConstants.java
@@ -30,8 +30,8 @@ public interface QuotaRatioElasticSearchConstants {
         String QUOTA_RATIO = "quotaRatio";
     }
 
-    WriteAliasName DEFAULT_QUOTA_RATIO_WRITE_ALIAS = new WriteAliasName("quotaRatioWriteAlias");
-    ReadAliasName DEFAULT_QUOTA_RATIO_READ_ALIAS = new ReadAliasName("quotaRatioReadAlias");
+    WriteAliasName DEFAULT_QUOTA_RATIO_WRITE_ALIAS = new WriteAliasName("quota_ratio_write_alias");
+    ReadAliasName DEFAULT_QUOTA_RATIO_READ_ALIAS = new ReadAliasName("quota_ratio_read_alias");
     IndexName DEFAULT_QUOTA_RATIO_INDEX = new IndexName("quota_ratio_v1");
-    TypeName QUOTA_RATIO_TYPE = new TypeName("quotaRatio");
+    TypeName QUOTA_RATIO_TYPE = new TypeName("quota_ratio");
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[5/6] james-project git commit: MAILBOX-374 Checking registration queues deletion should filter out group queues

Posted by bt...@apache.org.
MAILBOX-374 Checking registration queues deletion should filter out group queues


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/8935b9c1
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/8935b9c1
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/8935b9c1

Branch: refs/heads/master
Commit: 8935b9c1243117caf9701678814c9a6c9f745372
Parents: 255f1be
Author: Benoit Tellier <bt...@linagora.com>
Authored: Fri Jan 18 11:43:57 2019 +0700
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Tue Jan 22 17:07:15 2019 +0700

----------------------------------------------------------------------
 .../java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/8935b9c1/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
index 8784493..efe5296 100644
--- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
+++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
@@ -34,6 +34,7 @@ import static org.apache.james.mailbox.events.EventBusTestFixture.MailboxListene
 import static org.apache.james.mailbox.events.EventBusTestFixture.NO_KEYS;
 import static org.apache.james.mailbox.events.EventBusTestFixture.WAIT_CONDITION;
 import static org.apache.james.mailbox.events.EventBusTestFixture.newListener;
+import static org.apache.james.mailbox.events.GroupRegistration.WorkQueueName.MAILBOX_EVENT_WORK_QUEUE_PREFIX;
 import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT;
 import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT_EXCHANGE_NAME;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -414,6 +415,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
                 eventBus3.stop();
 
                 assertThat(rabbitManagementAPI.listQueues())
+                    .filteredOn(queue -> !queue.getName().startsWith(MAILBOX_EVENT_WORK_QUEUE_PREFIX))
                     .isEmpty();
             }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[2/6] james-project git commit: MAILBOX-374 Configure SpamAssassin only when required

Posted by bt...@apache.org.
MAILBOX-374 Configure SpamAssassin only when required

This avoid the retry to happen.


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/061fa39f
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/061fa39f
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/061fa39f

Branch: refs/heads/master
Commit: 061fa39faba2fb57d25d5497f1207b63301a25a2
Parents: 8935b9c
Author: Benoit Tellier <bt...@linagora.com>
Authored: Fri Jan 18 15:42:14 2019 +0700
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Tue Jan 22 17:07:15 2019 +0700

----------------------------------------------------------------------
 .../spamassassin/SpamAssassinListener.java      |  2 +-
 .../src/test/resources/listeners.xml            |  6 ------
 .../methods/integration/SpamAssassinModule.java | 20 ++++++++++++++++++--
 .../src/test/resources/listeners.xml            |  6 ------
 .../src/test/resources/listeners.xml            |  6 ------
 5 files changed, 19 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/061fa39f/mailbox/plugin/spamassassin/src/main/java/org/apache/james/mailbox/spamassassin/SpamAssassinListener.java
----------------------------------------------------------------------
diff --git a/mailbox/plugin/spamassassin/src/main/java/org/apache/james/mailbox/spamassassin/SpamAssassinListener.java b/mailbox/plugin/spamassassin/src/main/java/org/apache/james/mailbox/spamassassin/SpamAssassinListener.java
index 70ce446..1b83028 100644
--- a/mailbox/plugin/spamassassin/src/main/java/org/apache/james/mailbox/spamassassin/SpamAssassinListener.java
+++ b/mailbox/plugin/spamassassin/src/main/java/org/apache/james/mailbox/spamassassin/SpamAssassinListener.java
@@ -63,7 +63,7 @@ public class SpamAssassinListener implements SpamEventListener {
     private final ExecutionMode executionMode;
 
     @Inject
-    SpamAssassinListener(SpamAssassin spamAssassin, SystemMailboxesProvider systemMailboxesProvider, MailboxManager mailboxManager, MailboxSessionMapperFactory mapperFactory, ExecutionMode executionMode) {
+    public SpamAssassinListener(SpamAssassin spamAssassin, SystemMailboxesProvider systemMailboxesProvider, MailboxManager mailboxManager, MailboxSessionMapperFactory mapperFactory, ExecutionMode executionMode) {
         this.spamAssassin = spamAssassin;
         this.systemMailboxesProvider = systemMailboxesProvider;
         this.mailboxManager = mailboxManager;

http://git-wip-us.apache.org/repos/asf/james-project/blob/061fa39f/server/protocols/jmap-integration-testing/cassandra-jmap-integration-testing/src/test/resources/listeners.xml
----------------------------------------------------------------------
diff --git a/server/protocols/jmap-integration-testing/cassandra-jmap-integration-testing/src/test/resources/listeners.xml b/server/protocols/jmap-integration-testing/cassandra-jmap-integration-testing/src/test/resources/listeners.xml
index 9d0f61b..ff2e517 100644
--- a/server/protocols/jmap-integration-testing/cassandra-jmap-integration-testing/src/test/resources/listeners.xml
+++ b/server/protocols/jmap-integration-testing/cassandra-jmap-integration-testing/src/test/resources/listeners.xml
@@ -19,12 +19,6 @@
  -->
 
 <listeners>
-  <poolSize>8</poolSize>
-
-  <listener>
-    <class>org.apache.james.mailbox.spamassassin.SpamAssassinListener</class>
-    <async>false</async>
-  </listener>
   <listener>
     <class>org.apache.james.mailbox.cassandra.MailboxOperationLoggingListener</class>
   </listener>

http://git-wip-us.apache.org/repos/asf/james-project/blob/061fa39f/server/protocols/jmap-integration-testing/jmap-integration-testing-common/src/test/java/org/apache/james/jmap/methods/integration/SpamAssassinModule.java
----------------------------------------------------------------------
diff --git a/server/protocols/jmap-integration-testing/jmap-integration-testing-common/src/test/java/org/apache/james/jmap/methods/integration/SpamAssassinModule.java b/server/protocols/jmap-integration-testing/jmap-integration-testing-common/src/test/java/org/apache/james/jmap/methods/integration/SpamAssassinModule.java
index 6f8b6d9..ac8f2a1 100644
--- a/server/protocols/jmap-integration-testing/jmap-integration-testing-common/src/test/java/org/apache/james/jmap/methods/integration/SpamAssassinModule.java
+++ b/server/protocols/jmap-integration-testing/jmap-integration-testing-common/src/test/java/org/apache/james/jmap/methods/integration/SpamAssassinModule.java
@@ -23,10 +23,15 @@ import java.util.Optional;
 import javax.inject.Singleton;
 
 import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.james.mailbox.MailboxListener;
+import org.apache.james.mailbox.MailboxManager;
+import org.apache.james.mailbox.SystemMailboxesProvider;
+import org.apache.james.mailbox.spamassassin.SpamAssassin;
 import org.apache.james.mailbox.spamassassin.SpamAssassinConfiguration;
+import org.apache.james.mailbox.spamassassin.SpamAssassinListener;
+import org.apache.james.mailbox.store.MailboxSessionMapperFactory;
 import org.apache.james.mailetcontainer.impl.MailetConfigImpl;
 import org.apache.james.spamassassin.SpamAssassinExtension;
-import org.apache.james.spamassassin.SpamAssassinExtension.SpamAssassin;
 import org.apache.james.util.Host;
 import org.apache.james.utils.MailetConfigurationOverride;
 
@@ -50,12 +55,23 @@ public class SpamAssassinModule extends AbstractModule {
                 new MailetConfigurationOverride(
                     org.apache.james.transport.mailets.SpamAssassin.class,
                     spamAssassinMailetConfig()));
+
+        Multibinder.newSetBinder(binder(), MailboxListener.GroupMailboxListener.class)
+            .addBinding()
+            .to(SpamAssassinListener.class);
+    }
+
+    @Provides
+    @Singleton
+    SpamAssassinListener provideSpamAssassinListener(SpamAssassin spamAssassin, SystemMailboxesProvider systemMailboxesProvider, MailboxManager mailboxManager, MailboxSessionMapperFactory mapperFactory) {
+        return new SpamAssassinListener(spamAssassin, systemMailboxesProvider, mailboxManager, mapperFactory,
+            MailboxListener.ExecutionMode.SYNCHRONOUS);
     }
 
     @Provides
     @Singleton
     private SpamAssassinConfiguration getSpamAssassinConfiguration() {
-        SpamAssassin spamAssassin = spamAssassinExtension.getSpamAssassin();
+        SpamAssassinExtension.SpamAssassin spamAssassin = spamAssassinExtension.getSpamAssassin();
         return new SpamAssassinConfiguration(Optional.of(Host.from(spamAssassin.getIp(), spamAssassin.getBindingPort())));
     }
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/061fa39f/server/protocols/jmap-integration-testing/memory-jmap-integration-testing/src/test/resources/listeners.xml
----------------------------------------------------------------------
diff --git a/server/protocols/jmap-integration-testing/memory-jmap-integration-testing/src/test/resources/listeners.xml b/server/protocols/jmap-integration-testing/memory-jmap-integration-testing/src/test/resources/listeners.xml
index 2145b25..59e3fec 100644
--- a/server/protocols/jmap-integration-testing/memory-jmap-integration-testing/src/test/resources/listeners.xml
+++ b/server/protocols/jmap-integration-testing/memory-jmap-integration-testing/src/test/resources/listeners.xml
@@ -19,12 +19,6 @@
  -->
 
 <listeners>
-  <poolSize>8</poolSize>
-
-  <listener>
-    <class>org.apache.james.mailbox.spamassassin.SpamAssassinListener</class>
-    <async>false</async>
-  </listener>
   <listener>
     <class>org.apache.james.mailbox.quota.mailing.listeners.QuotaThresholdCrossingListener</class>
     <group>QuotaThresholdCrossingListener-lower-threshold</group>

http://git-wip-us.apache.org/repos/asf/james-project/blob/061fa39f/server/protocols/jmap-integration-testing/rabbitmq-jmap-integration-testing/src/test/resources/listeners.xml
----------------------------------------------------------------------
diff --git a/server/protocols/jmap-integration-testing/rabbitmq-jmap-integration-testing/src/test/resources/listeners.xml b/server/protocols/jmap-integration-testing/rabbitmq-jmap-integration-testing/src/test/resources/listeners.xml
index 9d0f61b..ff2e517 100644
--- a/server/protocols/jmap-integration-testing/rabbitmq-jmap-integration-testing/src/test/resources/listeners.xml
+++ b/server/protocols/jmap-integration-testing/rabbitmq-jmap-integration-testing/src/test/resources/listeners.xml
@@ -19,12 +19,6 @@
  -->
 
 <listeners>
-  <poolSize>8</poolSize>
-
-  <listener>
-    <class>org.apache.james.mailbox.spamassassin.SpamAssassinListener</class>
-    <async>false</async>
-  </listener>
   <listener>
     <class>org.apache.james.mailbox.cassandra.MailboxOperationLoggingListener</class>
   </listener>


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[3/6] james-project git commit: MAILBOX-374 RabbitMQ should support generic groups

Posted by bt...@apache.org.
MAILBOX-374 RabbitMQ should support generic groups

This bug caused only one of the QuotaMailing listeners to be triggered


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/04e8f7bd
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/04e8f7bd
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/04e8f7bd

Branch: refs/heads/master
Commit: 04e8f7bd2ca6b8185a556aaa5f8679437f37324b
Parents: 8a42e9e
Author: Benoit Tellier <bt...@linagora.com>
Authored: Thu Jan 17 17:34:53 2019 +0700
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Tue Jan 22 17:07:15 2019 +0700

----------------------------------------------------------------------
 .../org/apache/james/mailbox/events/GenericGroup.java |  5 +++++
 .../java/org/apache/james/mailbox/events/Group.java   |  4 ++++
 .../apache/james/mailbox/events/GroupContract.java    | 14 ++++++++++++++
 .../org/apache/james/mailbox/events/GroupTest.java    | 10 ++++++++++
 .../james/mailbox/events/GroupConsumerRetry.java      |  2 +-
 .../james/mailbox/events/GroupRegistration.java       | 13 ++-----------
 .../james/mailbox/events/RabbitMQEventBusTest.java    |  4 ++--
 7 files changed, 38 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/04e8f7bd/mailbox/api/src/main/java/org/apache/james/mailbox/events/GenericGroup.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/events/GenericGroup.java b/mailbox/api/src/main/java/org/apache/james/mailbox/events/GenericGroup.java
index ed36568..df14453 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/events/GenericGroup.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/events/GenericGroup.java
@@ -29,6 +29,11 @@ public class GenericGroup extends Group {
     }
 
     @Override
+    public String asString() {
+        return super.asString() + "-" + groupName;
+    }
+
+    @Override
     public final boolean equals(Object o) {
         if (o instanceof GenericGroup) {
             GenericGroup that = (GenericGroup) o;

http://git-wip-us.apache.org/repos/asf/james-project/blob/04e8f7bd/mailbox/api/src/main/java/org/apache/james/mailbox/events/Group.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/events/Group.java b/mailbox/api/src/main/java/org/apache/james/mailbox/events/Group.java
index 2da5cb4..9fb9d25 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/events/Group.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/events/Group.java
@@ -22,6 +22,10 @@ package org.apache.james.mailbox.events;
 import java.util.Objects;
 
 public class Group {
+    public String asString() {
+        return getClass().getName();
+    }
+
     @Override
     public boolean equals(Object o) {
         if (o == null) {

http://git-wip-us.apache.org/repos/asf/james-project/blob/04e8f7bd/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
index c7f5009..aa500be 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
@@ -227,6 +227,20 @@ public interface GroupContract {
 
             verify(listener, timeout(ONE_SECOND).times(1)).event(any());
         }
+
+        @Test
+        default void allGroupListenersShouldBeExecutedWhenGenericGroups() throws Exception {
+            MailboxListener listener1 = newListener();
+            MailboxListener listener2 = newListener();
+
+            eventBus().register(listener1, new GenericGroup("a"));
+            eventBus().register(listener2, new GenericGroup("b"));
+
+            eventBus().dispatch(EVENT, NO_KEYS).block();
+
+            verify(listener1, timeout(ONE_SECOND).times(1)).event(any());
+            verify(listener2, timeout(ONE_SECOND).times(1)).event(any());
+        }
     }
 
     interface MultipleEventBusGroupContract extends EventBusContract.MultipleEventBusContract {

http://git-wip-us.apache.org/repos/asf/james-project/blob/04e8f7bd/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupTest.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupTest.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupTest.java
index 0acb141..6b5fb6b 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupTest.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupTest.java
@@ -65,4 +65,14 @@ class GroupTest {
             .withRedefinedSuperclass()
             .verify();
     }
+
+    @Test
+    void asStringShouldReturnFqdnByDefault() {
+        assertThat(new EventBusTestFixture.GroupA().asString()).isEqualTo("org.apache.james.mailbox.events.EventBusTestFixture$GroupA");
+    }
+
+    @Test
+    void asStringShouldReturnNameWhenGenericGroup() {
+        assertThat(new GenericGroup("abc").asString()).isEqualTo("org.apache.james.mailbox.events.GenericGroup-abc");
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/james-project/blob/04e8f7bd/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupConsumerRetry.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupConsumerRetry.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupConsumerRetry.java
index 2b99c49..ce2c713 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupConsumerRetry.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupConsumerRetry.java
@@ -46,7 +46,7 @@ class GroupConsumerRetry {
     static class RetryExchangeName {
 
         static RetryExchangeName of(Group group) {
-            return new RetryExchangeName(GroupRegistration.groupName(group.getClass()));
+            return new RetryExchangeName(group.asString());
         }
 
         static final String MAILBOX_EVENT_RETRY_EXCHANGE_PREFIX = MAILBOX_EVENT + "-retryExchange-";

http://git-wip-us.apache.org/repos/asf/james-project/blob/04e8f7bd/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
index 7e4fe6d..d7c4c91 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
@@ -36,7 +36,6 @@ import org.apache.james.mailbox.Event;
 import org.apache.james.mailbox.MailboxListener;
 
 import com.github.fge.lambdas.Throwing;
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.rabbitmq.client.Connection;
 
@@ -56,8 +55,6 @@ import reactor.rabbitmq.Sender;
 class GroupRegistration implements Registration {
 
     static class WorkQueueName {
-
-        @VisibleForTesting
         static WorkQueueName of(Group group) {
             return new WorkQueueName(group);
         }
@@ -65,27 +62,21 @@ class GroupRegistration implements Registration {
         static final String MAILBOX_EVENT_WORK_QUEUE_PREFIX = MAILBOX_EVENT + "-workQueue-";
 
         private final Group group;
-        private final String name;
 
         private WorkQueueName(Group group) {
             Preconditions.checkNotNull(group, "Group must be specified");
             this.group = group;
-            this.name = groupName(group.getClass());
         }
 
-        public Group getGroup() {
+        Group getGroup() {
             return group;
         }
 
         String asString() {
-            return MAILBOX_EVENT_WORK_QUEUE_PREFIX + name;
+            return MAILBOX_EVENT_WORK_QUEUE_PREFIX + group.asString();
         }
     }
 
-    static String groupName(Class<? extends Group> clazz) {
-        return clazz.getName();
-    }
-
     static final String RETRY_COUNT = "retry-count";
     static final int DEFAULT_RETRY_COUNT = 0;
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/04e8f7bd/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
index 0d55f30..8784493 100644
--- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
+++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
@@ -118,8 +118,8 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
         eventBus2.stop();
         eventBus3.stop();
         ALL_GROUPS.stream()
-            .map(groupClass -> GroupRegistration.WorkQueueName.of(groupClass).asString())
-            .forEach(queueName -> sender.delete(QueueSpecification.queue(queueName)).block());
+            .map(GroupRegistration.WorkQueueName::of)
+            .forEach(queueName -> sender.delete(QueueSpecification.queue(queueName.asString())).block());
         sender.delete(ExchangeSpecification.exchange(MAILBOX_EVENT_EXCHANGE_NAME)).block();
         sender.close();
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[6/6] james-project git commit: JAMES-2647 Cassandra migration task for mapping sources

Posted by bt...@apache.org.
JAMES-2647 Cassandra migration task for mapping sources


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/d51c7085
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/d51c7085
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/d51c7085

Branch: refs/heads/master
Commit: d51c7085443c947b3f77e64163d55800dc6e782b
Parents: 8938f5f
Author: Rene Cordier <rc...@linagora.com>
Authored: Mon Jan 21 15:02:55 2019 +0700
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Tue Jan 22 17:09:38 2019 +0700

----------------------------------------------------------------------
 .../versions/CassandraSchemaVersionManager.java |   2 +-
 .../guice/protocols/webadmin-cassandra/pom.xml  |   4 +
 .../modules/server/CassandraRoutesModule.java   |   3 +
 server/data/data-cassandra/pom.xml              |   5 +
 .../cassandra/CassandraMappingsSourcesDAO.java  |   8 +-
 .../CassandraRecipientRewriteTable.java         |   9 +-
 .../CassandraRecipientRewriteTableDAO.java      |  45 ++----
 .../migration/MappingsSourcesMigration.java     |  65 ++++++++
 .../CassandraMappingsSourcesDAOTest.java        |   1 -
 .../CassandraRecipientRewriteTableDAOTest.java  |  10 +-
 .../migration/MappingsSourcesMigrationTest.java | 151 +++++++++++++++++++
 11 files changed, 256 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/d51c7085/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManager.java
----------------------------------------------------------------------
diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManager.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManager.java
index d953f7e..172fdb5 100644
--- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManager.java
+++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManager.java
@@ -34,7 +34,7 @@ import com.google.common.base.Preconditions;
 
 public class CassandraSchemaVersionManager {
     public static final SchemaVersion MIN_VERSION = new SchemaVersion(2);
-    public static final SchemaVersion MAX_VERSION = new SchemaVersion(6);
+    public static final SchemaVersion MAX_VERSION = new SchemaVersion(7);
     public static final SchemaVersion DEFAULT_VERSION = MIN_VERSION;
 
     private static final Logger LOGGER = LoggerFactory.getLogger(CassandraSchemaVersionManager.class);

http://git-wip-us.apache.org/repos/asf/james-project/blob/d51c7085/server/container/guice/protocols/webadmin-cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/server/container/guice/protocols/webadmin-cassandra/pom.xml b/server/container/guice/protocols/webadmin-cassandra/pom.xml
index 6554da8..4f7fbcb 100644
--- a/server/container/guice/protocols/webadmin-cassandra/pom.xml
+++ b/server/container/guice/protocols/webadmin-cassandra/pom.xml
@@ -41,6 +41,10 @@
             <artifactId>james-server-webadmin-cassandra</artifactId>
         </dependency>
         <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>james-server-data-cassandra</artifactId>
+        </dependency>
+        <dependency>
             <groupId>com.google.inject</groupId>
             <artifactId>guice</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/james-project/blob/d51c7085/server/container/guice/protocols/webadmin-cassandra/src/main/java/org/apache/james/modules/server/CassandraRoutesModule.java
----------------------------------------------------------------------
diff --git a/server/container/guice/protocols/webadmin-cassandra/src/main/java/org/apache/james/modules/server/CassandraRoutesModule.java b/server/container/guice/protocols/webadmin-cassandra/src/main/java/org/apache/james/modules/server/CassandraRoutesModule.java
index 7513df4..01e4caa 100644
--- a/server/container/guice/protocols/webadmin-cassandra/src/main/java/org/apache/james/modules/server/CassandraRoutesModule.java
+++ b/server/container/guice/protocols/webadmin-cassandra/src/main/java/org/apache/james/modules/server/CassandraRoutesModule.java
@@ -26,6 +26,7 @@ import org.apache.james.backends.cassandra.versions.SchemaVersion;
 import org.apache.james.mailbox.cassandra.mail.migration.AttachmentMessageIdCreation;
 import org.apache.james.mailbox.cassandra.mail.migration.AttachmentV2Migration;
 import org.apache.james.mailbox.cassandra.mail.migration.MailboxPathV2Migration;
+import org.apache.james.rrt.cassandra.migration.MappingsSourcesMigration;
 import org.apache.james.webadmin.Routes;
 import org.apache.james.webadmin.routes.CassandraMailboxMergingRoutes;
 import org.apache.james.webadmin.routes.CassandraMigrationRoutes;
@@ -41,6 +42,7 @@ public class CassandraRoutesModule extends AbstractModule {
     private static final SchemaVersion FROM_V3_TO_V4 = new SchemaVersion(3);
     private static final SchemaVersion FROM_V4_TO_V5 = new SchemaVersion(4);
     private static final SchemaVersion FROM_V5_TO_V6 = new SchemaVersion(5);
+    private static final SchemaVersion FROM_V6_TO_V7 = new SchemaVersion(6);
 
     @Override
     protected void configure() {
@@ -57,6 +59,7 @@ public class CassandraRoutesModule extends AbstractModule {
         allMigrationClazzBinder.addBinding(FROM_V3_TO_V4).to(AttachmentV2Migration.class);
         allMigrationClazzBinder.addBinding(FROM_V4_TO_V5).to(AttachmentMessageIdCreation.class);
         allMigrationClazzBinder.addBinding(FROM_V5_TO_V6).to(MailboxPathV2Migration.class);
+        allMigrationClazzBinder.addBinding(FROM_V6_TO_V7).to(MappingsSourcesMigration.class);
 
         bind(SchemaVersion.class)
             .annotatedWith(Names.named(CassandraMigrationService.LATEST_VERSION))

http://git-wip-us.apache.org/repos/asf/james-project/blob/d51c7085/server/data/data-cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/server/data/data-cassandra/pom.xml b/server/data/data-cassandra/pom.xml
index 01780b5..bd3220a 100644
--- a/server/data/data-cassandra/pom.xml
+++ b/server/data/data-cassandra/pom.xml
@@ -150,6 +150,11 @@
             <artifactId>testcontainers</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/james-project/blob/d51c7085/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraMappingsSourcesDAO.java
----------------------------------------------------------------------
diff --git a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraMappingsSourcesDAO.java b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraMappingsSourcesDAO.java
index 777a50e..6ff3072 100644
--- a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraMappingsSourcesDAO.java
+++ b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraMappingsSourcesDAO.java
@@ -41,14 +41,14 @@ import com.datastax.driver.core.Session;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
-class CassandraMappingsSourcesDAO {
+public class CassandraMappingsSourcesDAO {
     private final CassandraAsyncExecutor executor;
     private final PreparedStatement insertStatement;
     private final PreparedStatement deleteStatement;
     private final PreparedStatement retrieveSourcesStatement;
 
     @Inject
-    CassandraMappingsSourcesDAO(Session session) {
+    public CassandraMappingsSourcesDAO(Session session) {
         this.executor = new CassandraAsyncExecutor(session);
         this.insertStatement = prepareInsertStatement(session);
         this.deleteStatement = prepareDelete(session);
@@ -77,7 +77,7 @@ class CassandraMappingsSourcesDAO {
             .and(eq(MAPPING_VALUE, bindMarker(MAPPING_VALUE))));
     }
 
-    Mono<Void> addMapping(Mapping mapping, MappingSource source) {
+    public Mono<Void> addMapping(Mapping mapping, MappingSource source) {
         return executor.executeVoidReactor(insertStatement.bind()
             .setString(MAPPING_TYPE, mapping.getType().asPrefix())
             .setString(MAPPING_VALUE, mapping.getMappingValue())
@@ -91,7 +91,7 @@ class CassandraMappingsSourcesDAO {
             .setString(SOURCE, source.asMailAddressString()));
     }
 
-    Flux<MappingSource> retrieveSources(Mapping mapping) {
+    public Flux<MappingSource> retrieveSources(Mapping mapping) {
         return executor.executeReactor(retrieveSourcesStatement.bind()
             .setString(MAPPING_TYPE, mapping.getType().asPrefix())
             .setString(MAPPING_VALUE, mapping.getMappingValue()))

http://git-wip-us.apache.org/repos/asf/james-project/blob/d51c7085/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java
----------------------------------------------------------------------
diff --git a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java
index 4c7a92b..0a51ec1 100644
--- a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java
+++ b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java
@@ -30,6 +30,8 @@ import org.apache.james.rrt.lib.Mappings;
 import org.apache.james.rrt.lib.MappingsImpl;
 import org.apache.james.util.OptionalUtils;
 
+import com.github.steveash.guavate.Guavate;
+
 public class CassandraRecipientRewriteTable extends AbstractRecipientRewriteTable {
     private final CassandraRecipientRewriteTableDAO cassandraRecipientRewriteTableDAO;
     private final CassandraMappingsSourcesDAO cassandraMappingsSourcesDAO;
@@ -63,7 +65,12 @@ public class CassandraRecipientRewriteTable extends AbstractRecipientRewriteTabl
 
     @Override
     public Map<MappingSource, Mappings> getAllMappings() {
-        return cassandraRecipientRewriteTableDAO.getAllMappings().block();
+        return cassandraRecipientRewriteTableDAO.getAllMappings()
+            .collect(Guavate.toImmutableMap(
+                pair -> pair.getLeft(),
+                pair -> MappingsImpl.fromMappings(pair.getRight()),
+                Mappings::union))
+            .block();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/james-project/blob/d51c7085/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAO.java
----------------------------------------------------------------------
diff --git a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAO.java b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAO.java
index 52f3516..fe31f1f 100644
--- a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAO.java
+++ b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAO.java
@@ -29,24 +29,23 @@ import static org.apache.james.rrt.cassandra.tables.CassandraRecipientRewriteTab
 import static org.apache.james.rrt.cassandra.tables.CassandraRecipientRewriteTableTable.TABLE_NAME;
 import static org.apache.james.rrt.cassandra.tables.CassandraRecipientRewriteTableTable.USER;
 
-import java.util.Map;
-
 import javax.inject.Inject;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
 import org.apache.james.backends.cassandra.utils.CassandraUtils;
 import org.apache.james.rrt.lib.Mapping;
 import org.apache.james.rrt.lib.MappingSource;
-import org.apache.james.rrt.lib.Mappings;
 import org.apache.james.rrt.lib.MappingsImpl;
 
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Session;
 import com.github.steveash.guavate.Guavate;
 
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
-class CassandraRecipientRewriteTableDAO {
+public class CassandraRecipientRewriteTableDAO {
     private final CassandraAsyncExecutor executor;
     private final CassandraUtils cassandraUtils;
     private final PreparedStatement insertStatement;
@@ -55,7 +54,7 @@ class CassandraRecipientRewriteTableDAO {
     private final PreparedStatement retrieveAllMappingsStatement;
 
     @Inject
-    CassandraRecipientRewriteTableDAO(Session session, CassandraUtils cassandraUtils) {
+    public CassandraRecipientRewriteTableDAO(Session session, CassandraUtils cassandraUtils) {
         this.executor = new CassandraAsyncExecutor(session);
         this.cassandraUtils = cassandraUtils;
         this.insertStatement = prepareInsertStatement(session);
@@ -91,7 +90,7 @@ class CassandraRecipientRewriteTableDAO {
             .value(MAPPING, bindMarker(MAPPING)));
     }
 
-    Mono<Void> addMapping(MappingSource source, Mapping mapping) {
+    public Mono<Void> addMapping(MappingSource source, Mapping mapping) {
         return executor.executeVoidReactor(insertStatement.bind()
             .setString(USER, source.getFixedUser())
             .setString(DOMAIN, source.getFixedDomain())
@@ -116,35 +115,11 @@ class CassandraRecipientRewriteTableDAO {
             .filter(mappings -> !mappings.isEmpty());
     }
 
-    Mono<Map<MappingSource, Mappings>> getAllMappings() {
+    public Flux<Pair<MappingSource, Mapping>> getAllMappings() {
         return executor.executeReactor(retrieveAllMappingsStatement.bind())
-            .map(resultSet -> cassandraUtils.convertToStream(resultSet)
-                .map(row -> new UserMapping(MappingSource.fromUser(row.getString(USER), row.getString(DOMAIN)), row.getString(MAPPING)))
-                .collect(Guavate.toImmutableMap(
-                    UserMapping::getSource,
-                    UserMapping::toMapping,
-                    Mappings::union)));
-    }
-
-    private static class UserMapping {
-        private final MappingSource source;
-        private final String mapping;
-
-        UserMapping(MappingSource source, String mapping) {
-            this.source = source;
-            this.mapping = mapping;
-        }
-
-        MappingSource getSource() {
-            return source;
-        }
-
-        String getMapping() {
-            return mapping;
-        }
-
-        Mappings toMapping() {
-            return MappingsImpl.fromRawString(getMapping());
-        }
+            .flatMapMany(Flux::fromIterable)
+            .map(row -> Pair.of(
+                MappingSource.fromUser(row.getString(USER), row.getString(DOMAIN)),
+                Mapping.of(row.getString(MAPPING))));
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/d51c7085/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/migration/MappingsSourcesMigration.java
----------------------------------------------------------------------
diff --git a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/migration/MappingsSourcesMigration.java b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/migration/MappingsSourcesMigration.java
new file mode 100644
index 0000000..226add2
--- /dev/null
+++ b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/migration/MappingsSourcesMigration.java
@@ -0,0 +1,65 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.rrt.cassandra.migration;
+
+import javax.inject.Inject;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.james.backends.cassandra.migration.Migration;
+import org.apache.james.rrt.cassandra.CassandraMappingsSourcesDAO;
+import org.apache.james.rrt.cassandra.CassandraRecipientRewriteTableDAO;
+import org.apache.james.rrt.lib.Mapping;
+import org.apache.james.rrt.lib.MappingSource;
+import org.apache.james.task.Task;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Mono;
+
+public class MappingsSourcesMigration implements Migration {
+    private static final Logger LOGGER = LoggerFactory.getLogger(MappingsSourcesMigration.class);
+    private final CassandraRecipientRewriteTableDAO cassandraRecipientRewriteTableDAO;
+    private final CassandraMappingsSourcesDAO cassandraMappingsSourcesDAO;
+
+    @Inject
+    public MappingsSourcesMigration(CassandraRecipientRewriteTableDAO cassandraRecipientRewriteTableDAO,
+                                    CassandraMappingsSourcesDAO cassandraMappingsSourcesDAO) {
+        this.cassandraRecipientRewriteTableDAO = cassandraRecipientRewriteTableDAO;
+        this.cassandraMappingsSourcesDAO = cassandraMappingsSourcesDAO;
+    }
+
+
+    @Override
+    public Result run() {
+        return cassandraRecipientRewriteTableDAO.getAllMappings()
+            .flatMap(this::migrate)
+            .reduce(Result.COMPLETED, Task::combine)
+            .doOnError(e -> LOGGER.error("Error while migrating mappings sources", e))
+            .onErrorResume(e -> Mono.just(Result.PARTIAL))
+            .block();
+    }
+
+    private Mono<Result> migrate(Pair<MappingSource, Mapping> mappingEntry) {
+        return cassandraMappingsSourcesDAO.addMapping(mappingEntry.getRight(), mappingEntry.getLeft())
+            .map(any -> Result.COMPLETED)
+            .doOnError(e -> LOGGER.error("Error while performing migration of mappings sources", e))
+            .onErrorResume(e -> Mono.just(Result.PARTIAL));
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/d51c7085/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraMappingsSourcesDAOTest.java
----------------------------------------------------------------------
diff --git a/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraMappingsSourcesDAOTest.java b/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraMappingsSourcesDAOTest.java
index 5d0a125..b277a4e 100644
--- a/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraMappingsSourcesDAOTest.java
+++ b/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraMappingsSourcesDAOTest.java
@@ -23,7 +23,6 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.CassandraClusterExtension;
-import org.apache.james.backends.cassandra.utils.CassandraUtils;
 import org.apache.james.core.Domain;
 import org.apache.james.rrt.lib.Mapping;
 import org.apache.james.rrt.lib.MappingSource;

http://git-wip-us.apache.org/repos/asf/james-project/blob/d51c7085/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAOTest.java
----------------------------------------------------------------------
diff --git a/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAOTest.java b/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAOTest.java
index 3abc541..1654166 100644
--- a/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAOTest.java
+++ b/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAOTest.java
@@ -58,7 +58,7 @@ class CassandraRecipientRewriteTableDAOTest {
 
     @Test
     void getAllMappingsShouldReturnEmptyByDefault() {
-        assertThat(dao.getAllMappings().block()).isEmpty();
+        assertThat(dao.getAllMappings().collectList().block()).isEmpty();
     }
 
     @Test
@@ -72,7 +72,7 @@ class CassandraRecipientRewriteTableDAOTest {
     void getAllMappingsShouldReturnStoredMapping() {
         dao.addMapping(SOURCE, MAPPING).block();
 
-        assertThat(dao.getAllMappings().block()).contains(Pair.of(SOURCE, MappingsImpl.fromMappings(MAPPING)));
+        assertThat(dao.getAllMappings().collectList().block()).contains(Pair.of(SOURCE, MAPPING));
     }
 
     @Test
@@ -90,7 +90,7 @@ class CassandraRecipientRewriteTableDAOTest {
 
         dao.removeMapping(SOURCE, MAPPING).block();
 
-        assertThat(dao.getAllMappings().block()).isEmpty();
+        assertThat(dao.getAllMappings().collectList().block()).isEmpty();
     }
 
     @Test
@@ -107,7 +107,7 @@ class CassandraRecipientRewriteTableDAOTest {
         dao.addMapping(SOURCE, MAPPING).block();
         dao.addMapping(SOURCE, MAPPING_2).block();
 
-        assertThat(dao.getAllMappings().block())
-            .contains(Pair.of(SOURCE, MappingsImpl.fromMappings(MAPPING, MAPPING_2)));
+        assertThat(dao.getAllMappings().collectList().block())
+            .contains(Pair.of(SOURCE, MAPPING), Pair.of(SOURCE, MAPPING_2));
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/d51c7085/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/migration/MappingsSourcesMigrationTest.java
----------------------------------------------------------------------
diff --git a/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/migration/MappingsSourcesMigrationTest.java b/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/migration/MappingsSourcesMigrationTest.java
new file mode 100644
index 0000000..d2c1c69
--- /dev/null
+++ b/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/migration/MappingsSourcesMigrationTest.java
@@ -0,0 +1,151 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.rrt.cassandra.migration;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.time.Duration;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.IntStream;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.james.backends.cassandra.CassandraCluster;
+import org.apache.james.backends.cassandra.CassandraClusterExtension;
+import org.apache.james.backends.cassandra.migration.Migration;
+import org.apache.james.backends.cassandra.utils.CassandraUtils;
+import org.apache.james.core.Domain;
+import org.apache.james.rrt.cassandra.CassandraMappingsSourcesDAO;
+import org.apache.james.rrt.cassandra.CassandraRRTModule;
+import org.apache.james.rrt.cassandra.CassandraRecipientRewriteTableDAO;
+import org.apache.james.rrt.lib.Mapping;
+import org.apache.james.rrt.lib.MappingSource;
+import org.apache.james.task.Task;
+import org.apache.james.util.concurrency.ConcurrentTestRunner;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import reactor.core.publisher.Flux;
+
+class MappingsSourcesMigrationTest {
+    private static final int THREAD_COUNT = 10;
+    private static final int OPERATION_COUNT = 10;
+    private static final int MAPPING_COUNT = 100;
+
+    private static final String USER = "test";
+    private static final String ADDRESS = "test@domain";
+    private static final MappingSource SOURCE = MappingSource.fromUser(USER, Domain.LOCALHOST);
+    private static final Mapping MAPPING = Mapping.alias(ADDRESS);
+
+    @RegisterExtension
+    static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraRRTModule.MODULE);
+
+    private CassandraRecipientRewriteTableDAO cassandraRecipientRewriteTableDAO;
+    private CassandraMappingsSourcesDAO cassandraMappingsSourcesDAO;
+
+    private MappingsSourcesMigration migration;
+
+    @BeforeEach
+    void setUp(CassandraCluster cassandra) {
+        cassandraRecipientRewriteTableDAO = new CassandraRecipientRewriteTableDAO(cassandra.getConf(), CassandraUtils.WITH_DEFAULT_CONFIGURATION);
+        cassandraMappingsSourcesDAO = new CassandraMappingsSourcesDAO(cassandra.getConf());
+
+        migration = new MappingsSourcesMigration(cassandraRecipientRewriteTableDAO, cassandraMappingsSourcesDAO);
+    }
+
+    @Test
+    void emptyMigrationShouldSucceed() {
+        assertThat(migration.run()).isEqualTo(Migration.Result.COMPLETED);
+    }
+
+    @Test
+    void migrationShouldSucceedWithData() {
+        cassandraRecipientRewriteTableDAO.addMapping(SOURCE, MAPPING).block();
+
+        assertThat(migration.run()).isEqualTo(Task.Result.COMPLETED);
+    }
+
+    @Test
+    void migrationShouldCreateMappingSourceFromMapping() {
+        cassandraRecipientRewriteTableDAO.addMapping(SOURCE, MAPPING).block();
+
+        migration.run();
+
+        assertThat(cassandraMappingsSourcesDAO.retrieveSources(MAPPING).collectList().block())
+            .containsExactly(SOURCE);
+    }
+
+    @Test
+    void migrationShouldCreateMultipleMappingSourcesFromMappings() {
+        MappingSource source2 = MappingSource.fromUser("bob", Domain.LOCALHOST);
+
+        cassandraRecipientRewriteTableDAO.addMapping(SOURCE, MAPPING).block();
+        cassandraRecipientRewriteTableDAO.addMapping(source2, MAPPING).block();
+
+        migration.run();
+
+        assertThat(cassandraMappingsSourcesDAO.retrieveSources(MAPPING).collectList().block())
+            .containsOnly(SOURCE, source2);
+    }
+
+    @Test
+    void migrationShouldReturnPartialWhenGetAllMappingsFromMappingsFail() {
+        CassandraRecipientRewriteTableDAO cassandraRecipientRewriteTableDAO = mock(CassandraRecipientRewriteTableDAO.class);
+        CassandraMappingsSourcesDAO cassandraMappingsSourcesDAO = mock(CassandraMappingsSourcesDAO.class);
+        migration = new MappingsSourcesMigration(cassandraRecipientRewriteTableDAO, cassandraMappingsSourcesDAO);
+
+        when(cassandraRecipientRewriteTableDAO.getAllMappings()).thenReturn(Flux.error(new RuntimeException()));
+
+        assertThat(migration.run()).isEqualTo(Migration.Result.PARTIAL);
+    }
+
+    @Test
+    void migrationShouldReturnPartialAddMappingFails() {
+        CassandraRecipientRewriteTableDAO cassandraRecipientRewriteTableDAO = mock(CassandraRecipientRewriteTableDAO.class);
+        CassandraMappingsSourcesDAO cassandraMappingsSourcesDAO = mock(CassandraMappingsSourcesDAO.class);
+        migration = new MappingsSourcesMigration(cassandraRecipientRewriteTableDAO, cassandraMappingsSourcesDAO);
+
+        when(cassandraRecipientRewriteTableDAO.getAllMappings())
+            .thenReturn(Flux.just(Pair.of(SOURCE, MAPPING)));
+        when(cassandraMappingsSourcesDAO.addMapping(any(Mapping.class), any(MappingSource.class)))
+            .thenThrow(new RuntimeException());
+
+        assertThat(migration.run()).isEqualTo(Migration.Result.PARTIAL);
+    }
+
+    @Test
+    void migrationShouldBeIdempotentWhenRunMultipleTimes() throws ExecutionException, InterruptedException {
+        IntStream.range(0, MAPPING_COUNT)
+            .forEach(i -> cassandraRecipientRewriteTableDAO
+                .addMapping(MappingSource.parse("source" + i + "@domain"), MAPPING).block());
+
+        ConcurrentTestRunner.builder()
+            .operation((threadNumber, step) -> migration.run())
+            .threadCount(THREAD_COUNT)
+            .operationCount(OPERATION_COUNT)
+            .runSuccessfullyWithin(Duration.ofMinutes(1));
+
+        assertThat(cassandraMappingsSourcesDAO.retrieveSources(MAPPING).collectList().block())
+            .hasSize(MAPPING_COUNT);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[4/6] james-project git commit: MAILBOX-374 Bind RabbitMQEventBus module in RabbitMQ related product

Posted by bt...@apache.org.
MAILBOX-374 Bind RabbitMQEventBus module in RabbitMQ related product


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/255f1be5
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/255f1be5
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/255f1be5

Branch: refs/heads/master
Commit: 255f1be500f100a1f9e094e9f378249728341158
Parents: 04e8f7b
Author: Benoit Tellier <bt...@linagora.com>
Authored: Thu Jan 17 10:22:10 2019 +0700
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Tue Jan 22 17:07:15 2019 +0700

----------------------------------------------------------------------
 .../james/mailbox/events/EventDeadLetters.java  |  2 +-
 .../james/mailbox/events/RabbitMQEventBus.java  |  4 +-
 .../james/event/json/EventSerializer.scala      |  3 +-
 pom.xml                                         |  5 ++
 .../guice/cassandra-rabbitmq-guice/pom.xml      |  4 ++
 .../james/CassandraRabbitMQJamesServerMain.java |  3 +-
 .../modules/event/RabbitMQEventBusModule.java   | 75 ++++++++++++++++++++
 .../CassandraRabbitMQSwiftJmapTestRule.java     | 11 +++
 .../modules/mailbox/DefaultEventModule.java     |  4 ++
 9 files changed, 107 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/255f1be5/mailbox/api/src/main/java/org/apache/james/mailbox/events/EventDeadLetters.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/events/EventDeadLetters.java b/mailbox/api/src/main/java/org/apache/james/mailbox/events/EventDeadLetters.java
index 238533b..5c0057c 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/events/EventDeadLetters.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/events/EventDeadLetters.java
@@ -24,7 +24,7 @@ import org.apache.james.mailbox.Event;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
-interface EventDeadLetters {
+public interface EventDeadLetters {
     Mono<Void> store(Group registeredGroup, Event failDeliveredEvent);
 
     Mono<Void> remove(Group registeredGroup, Event.EventId failDeliveredEventId);

http://git-wip-us.apache.org/repos/asf/james-project/blob/255f1be5/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
index 0ab082c..bad0714 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
@@ -23,6 +23,7 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.annotation.PreDestroy;
+import javax.inject.Inject;
 
 import org.apache.james.backend.rabbitmq.RabbitMQConnectionFactory;
 import org.apache.james.event.json.EventSerializer;
@@ -36,7 +37,7 @@ import reactor.rabbitmq.RabbitFlux;
 import reactor.rabbitmq.Sender;
 import reactor.rabbitmq.SenderOptions;
 
-class RabbitMQEventBus implements EventBus {
+public class RabbitMQEventBus implements EventBus {
     static final String MAILBOX_EVENT = "mailboxEvent";
     static final String MAILBOX_EVENT_EXCHANGE_NAME = MAILBOX_EVENT + "-exchange";
     static final String EVENT_BUS_ID = "eventBusId";
@@ -55,6 +56,7 @@ class RabbitMQEventBus implements EventBus {
     private EventDispatcher eventDispatcher;
     private Sender sender;
 
+    @Inject
     RabbitMQEventBus(RabbitMQConnectionFactory rabbitMQConnectionFactory, EventSerializer eventSerializer,
                      RetryBackoffConfiguration retryBackoff,
                      RoutingKeyConverter routingKeyConverter,

http://git-wip-us.apache.org/repos/asf/james-project/blob/255f1be5/mailbox/event/json/src/main/scala/org/apache/james/event/json/EventSerializer.scala
----------------------------------------------------------------------
diff --git a/mailbox/event/json/src/main/scala/org/apache/james/event/json/EventSerializer.scala b/mailbox/event/json/src/main/scala/org/apache/james/event/json/EventSerializer.scala
index 03c62eb..592d366 100644
--- a/mailbox/event/json/src/main/scala/org/apache/james/event/json/EventSerializer.scala
+++ b/mailbox/event/json/src/main/scala/org/apache/james/event/json/EventSerializer.scala
@@ -22,6 +22,7 @@ package org.apache.james.event.json
 import java.time.Instant
 import java.util.{Optional, TreeMap => JavaTreeMap}
 
+import javax.inject.Inject
 import julienrf.json.derived
 import org.apache.james.core.quota.{QuotaCount, QuotaSize, QuotaValue}
 import org.apache.james.core.{Domain, User}
@@ -330,7 +331,7 @@ class JsonSerialize(mailboxIdFactory: MailboxId.Factory, messageIdFactory: Messa
     .map(event => event.toJava)
 }
 
-class EventSerializer(mailboxIdFactory: MailboxId.Factory, messageIdFactory: MessageId.Factory) {
+class EventSerializer @Inject() (mailboxIdFactory: MailboxId.Factory, messageIdFactory: MessageId.Factory) {
   private val jsonSerialize = new JsonSerialize(mailboxIdFactory, messageIdFactory)
 
   def toJson(event: JavaEvent): String = jsonSerialize.toJson(event)

http://git-wip-us.apache.org/repos/asf/james-project/blob/255f1be5/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0119f10..1bc0311 100644
--- a/pom.xml
+++ b/pom.xml
@@ -772,6 +772,11 @@
             </dependency>
             <dependency>
                 <groupId>${james.groupId}</groupId>
+                <artifactId>apache-james-mailbox-event-rabbitmq</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>${james.groupId}</groupId>
                 <artifactId>apache-james-mailbox-jpa</artifactId>
                 <version>${project.version}</version>
             </dependency>

http://git-wip-us.apache.org/repos/asf/james-project/blob/255f1be5/server/container/guice/cassandra-rabbitmq-guice/pom.xml
----------------------------------------------------------------------
diff --git a/server/container/guice/cassandra-rabbitmq-guice/pom.xml b/server/container/guice/cassandra-rabbitmq-guice/pom.xml
index 716b626..ced0a3d 100644
--- a/server/container/guice/cassandra-rabbitmq-guice/pom.xml
+++ b/server/container/guice/cassandra-rabbitmq-guice/pom.xml
@@ -65,6 +65,10 @@
         </dependency>
         <dependency>
             <groupId>${james.groupId}</groupId>
+            <artifactId>apache-james-mailbox-event-rabbitmq</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
             <artifactId>apache-james-mailbox-elasticsearch</artifactId>
             <type>test-jar</type>
             <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/james-project/blob/255f1be5/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/CassandraRabbitMQJamesServerMain.java
----------------------------------------------------------------------
diff --git a/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/CassandraRabbitMQJamesServerMain.java b/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/CassandraRabbitMQJamesServerMain.java
index 8bcf444..8597644 100644
--- a/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/CassandraRabbitMQJamesServerMain.java
+++ b/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/CassandraRabbitMQJamesServerMain.java
@@ -22,6 +22,7 @@ package org.apache.james;
 import static org.apache.james.CassandraJamesServerMain.ALL_BUT_JMX_CASSANDRA_MODULE;
 
 import org.apache.james.modules.blobstore.BlobStoreChoosingModule;
+import org.apache.james.modules.event.RabbitMQEventBusModule;
 import org.apache.james.modules.rabbitmq.RabbitMQModule;
 import org.apache.james.modules.server.JMXServerModule;
 import org.apache.james.server.core.configuration.Configuration;
@@ -33,7 +34,7 @@ public class CassandraRabbitMQJamesServerMain {
     public static final Module MODULES =
         Modules
             .override(Modules.combine(ALL_BUT_JMX_CASSANDRA_MODULE))
-            .with(new RabbitMQModule(), new BlobStoreChoosingModule());
+            .with(new RabbitMQModule(), new BlobStoreChoosingModule(), new RabbitMQEventBusModule());
 
     public static void main(String[] args) throws Exception {
         Configuration configuration = Configuration.builder()

http://git-wip-us.apache.org/repos/asf/james-project/blob/255f1be5/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/modules/event/RabbitMQEventBusModule.java
----------------------------------------------------------------------
diff --git a/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/modules/event/RabbitMQEventBusModule.java b/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/modules/event/RabbitMQEventBusModule.java
new file mode 100644
index 0000000..df4927e
--- /dev/null
+++ b/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/modules/event/RabbitMQEventBusModule.java
@@ -0,0 +1,75 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.modules.event;
+
+import java.util.List;
+
+import org.apache.james.event.json.EventSerializer;
+import org.apache.james.lifecycle.api.Configurable;
+import org.apache.james.mailbox.events.EventBus;
+import org.apache.james.mailbox.events.MailboxIdRegistrationKey;
+import org.apache.james.mailbox.events.RabbitMQEventBus;
+import org.apache.james.mailbox.events.RegistrationKey;
+import org.apache.james.mailbox.events.RetryBackoffConfiguration;
+import org.apache.james.utils.ConfigurationPerformer;
+import org.parboiled.common.ImmutableList;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Inject;
+import com.google.inject.Scopes;
+import com.google.inject.multibindings.Multibinder;
+
+public class RabbitMQEventBusModule extends AbstractModule {
+
+    @Override
+    protected void configure() {
+        bind(EventSerializer.class).in(Scopes.SINGLETON);
+
+        bind(RabbitMQEventBus.class).in(Scopes.SINGLETON);
+        bind(EventBus.class).to(RabbitMQEventBus.class);
+
+        Multibinder.newSetBinder(binder(), RegistrationKey.Factory.class)
+            .addBinding().to(MailboxIdRegistrationKey.Factory.class);
+
+        Multibinder.newSetBinder(binder(), ConfigurationPerformer.class)
+            .addBinding().to(RabbitMQEventBusInitializer.class);
+
+        bind(RetryBackoffConfiguration.class).toInstance(RetryBackoffConfiguration.DEFAULT);
+    }
+
+    static class RabbitMQEventBusInitializer implements ConfigurationPerformer {
+        private final RabbitMQEventBus rabbitMQEventBus;
+
+        @Inject
+        RabbitMQEventBusInitializer(RabbitMQEventBus rabbitMQEventBus) {
+            this.rabbitMQEventBus = rabbitMQEventBus;
+        }
+
+        @Override
+        public void initModule() {
+            rabbitMQEventBus.start();
+        }
+
+        @Override
+        public List<Class<? extends Configurable>> forClasses() {
+            return ImmutableList.of();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/255f1be5/server/container/guice/cassandra-rabbitmq-guice/src/test/java/org/apache/james/CassandraRabbitMQSwiftJmapTestRule.java
----------------------------------------------------------------------
diff --git a/server/container/guice/cassandra-rabbitmq-guice/src/test/java/org/apache/james/CassandraRabbitMQSwiftJmapTestRule.java b/server/container/guice/cassandra-rabbitmq-guice/src/test/java/org/apache/james/CassandraRabbitMQSwiftJmapTestRule.java
index 1f988ad..8c7d7a1 100644
--- a/server/container/guice/cassandra-rabbitmq-guice/src/test/java/org/apache/james/CassandraRabbitMQSwiftJmapTestRule.java
+++ b/server/container/guice/cassandra-rabbitmq-guice/src/test/java/org/apache/james/CassandraRabbitMQSwiftJmapTestRule.java
@@ -39,6 +39,7 @@ import com.google.inject.Module;
 public class CassandraRabbitMQSwiftJmapTestRule implements TestRule {
 
     private static final int LIMIT_TO_10_MESSAGES = 10;
+    public static final int TWO_SECONDS = 2000;
     private final TemporaryFolder temporaryFolder;
 
     public static CassandraRabbitMQSwiftJmapTestRule defaultTestRule() {
@@ -80,6 +81,16 @@ public class CassandraRabbitMQSwiftJmapTestRule implements TestRule {
     }
 
     public void await() {
+        awaitProcessingStart();
         guiceModuleTestRule.await();
     }
+
+    private void awaitProcessingStart() {
+        // As the RabbitMQEventBus is asynchronous we have otherwise no guaranties that the processing to be awaiting for did start
+        try {
+            Thread.sleep(TWO_SECONDS);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/255f1be5/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java
----------------------------------------------------------------------
diff --git a/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java b/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java
index 2cd7252..45dc122 100644
--- a/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java
+++ b/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java
@@ -27,7 +27,9 @@ import org.apache.commons.configuration.ConfigurationException;
 import org.apache.james.lifecycle.api.Configurable;
 import org.apache.james.mailbox.MailboxListener;
 import org.apache.james.mailbox.events.EventBus;
+import org.apache.james.mailbox.events.EventDeadLetters;
 import org.apache.james.mailbox.events.InVMEventBus;
+import org.apache.james.mailbox.events.MemoryEventDeadLetters;
 import org.apache.james.mailbox.events.RetryBackoffConfiguration;
 import org.apache.james.mailbox.events.delivery.EventDelivery;
 import org.apache.james.mailbox.events.delivery.InVmEventDelivery;
@@ -49,6 +51,8 @@ public class DefaultEventModule extends AbstractModule {
         bind(MailboxListenersLoaderImpl.class).in(Scopes.SINGLETON);
         bind(InVmEventDelivery.class).in(Scopes.SINGLETON);
         bind(InVMEventBus.class).in(Scopes.SINGLETON);
+        bind(MemoryEventDeadLetters.class).in(Scopes.SINGLETON);
+        bind(EventDeadLetters.class).to(MemoryEventDeadLetters.class);
 
         bind(MailboxListenersLoader.class).to(MailboxListenersLoaderImpl.class);
         bind(EventDelivery.class).to(InVmEventDelivery.class);


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org