You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ro...@apache.org on 2019/10/07 10:00:13 UTC

[james-project] 02/02: JAMES-2908 use rabbitmq as the eventbus and fix tests accordingly as event delivery is now asynchronous

This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 37cfbf3fe9c1ab0f29ff317faf3c4b8a0f7f049e
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Tue Oct 1 17:16:03 2019 +0200

    JAMES-2908 use rabbitmq as the eventbus and fix tests accordingly as event delivery is now asynchronous
---
 server/blob/blob-objectstorage/pom.xml             |   6 +
 .../objectstorage/aws/s3/DockerAwsS3TestRule.java  |   4 +
 .../james/CassandraRabbitMQAwsS3JmapTestRule.java  |   2 +
 server/protocols/webadmin-integration-test/pom.xml |  26 ++++-
 .../integration/AuthorizedEndpointsTest.java       |   8 +-
 .../integration/CassandraJmapExtension.java        |  18 ++-
 .../EventDeadLettersIntegrationTest.java           | 122 +++++++++++++++------
 .../integration/JwtFilterIntegrationTest.java      |   6 +-
 .../integration/WebAdminServerIntegrationTest.java |   6 +-
 9 files changed, 148 insertions(+), 50 deletions(-)

diff --git a/server/blob/blob-objectstorage/pom.xml b/server/blob/blob-objectstorage/pom.xml
index a1d0ea4..29a5f24 100644
--- a/server/blob/blob-objectstorage/pom.xml
+++ b/server/blob/blob-objectstorage/pom.xml
@@ -94,6 +94,12 @@
             <groupId>org.apache.jclouds.api</groupId>
             <artifactId>openstack-swift</artifactId>
             <version>${jclouds.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.google.inject.extensions</groupId>
+                    <artifactId>guice-multibindings</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.jclouds.api</groupId>
diff --git a/server/container/guice/blob-objectstorage-guice/src/test/java/org/apache/james/modules/objectstorage/aws/s3/DockerAwsS3TestRule.java b/server/container/guice/blob-objectstorage-guice/src/test/java/org/apache/james/modules/objectstorage/aws/s3/DockerAwsS3TestRule.java
index 2dba254..5cc85f8 100644
--- a/server/container/guice/blob-objectstorage-guice/src/test/java/org/apache/james/modules/objectstorage/aws/s3/DockerAwsS3TestRule.java
+++ b/server/container/guice/blob-objectstorage-guice/src/test/java/org/apache/james/modules/objectstorage/aws/s3/DockerAwsS3TestRule.java
@@ -114,5 +114,9 @@ public class DockerAwsS3TestRule implements GuiceModuleTestRule {
     public void start() {
         ensureAwsS3started();
     }
+
+    public void stop() {
+        //nothing to stop
+    }
 }
 
diff --git a/server/container/guice/cassandra-rabbitmq-guice/src/test/java/org/apache/james/CassandraRabbitMQAwsS3JmapTestRule.java b/server/container/guice/cassandra-rabbitmq-guice/src/test/java/org/apache/james/CassandraRabbitMQAwsS3JmapTestRule.java
index e7fcab7..068d8b5 100644
--- a/server/container/guice/cassandra-rabbitmq-guice/src/test/java/org/apache/james/CassandraRabbitMQAwsS3JmapTestRule.java
+++ b/server/container/guice/cassandra-rabbitmq-guice/src/test/java/org/apache/james/CassandraRabbitMQAwsS3JmapTestRule.java
@@ -30,6 +30,7 @@ import org.apache.james.modules.TestRabbitMQModule;
 import org.apache.james.modules.blobstore.BlobStoreChoosingConfiguration;
 import org.apache.james.modules.objectstorage.aws.s3.DockerAwsS3TestRule;
 import org.apache.james.server.core.configuration.Configuration;
+import org.apache.james.webadmin.WebAdminConfiguration;
 import org.junit.rules.TemporaryFolder;
 import org.junit.rules.TestRule;
 import org.junit.runner.Description;
@@ -77,6 +78,7 @@ public class CassandraRabbitMQAwsS3JmapTestRule implements TestRule {
             .overrideWith(new TestDockerESMetricReporterModule(dockerElasticSearchRule.getDockerEs().getHttpHost()))
             .overrideWith(guiceModuleTestRule.getModule())
             .overrideWith((binder -> binder.bind(CleanupTasksPerformer.class).asEagerSingleton()))
+            .overrideWith(binder -> binder.bind(WebAdminConfiguration.class).toInstance(WebAdminConfiguration.TEST_CONFIGURATION))
             .overrideWith(additionals);
     }
 
diff --git a/server/protocols/webadmin-integration-test/pom.xml b/server/protocols/webadmin-integration-test/pom.xml
index 5ef3894..7e58acb 100644
--- a/server/protocols/webadmin-integration-test/pom.xml
+++ b/server/protocols/webadmin-integration-test/pom.xml
@@ -35,6 +35,12 @@
     <dependencies>
         <dependency>
             <groupId>${james.groupId}</groupId>
+            <artifactId>apache-james-backends-rabbitmq</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
             <artifactId>apache-james-backends-cassandra</artifactId>
             <type>test-jar</type>
             <scope>test</scope>
@@ -64,7 +70,14 @@
         </dependency>
         <dependency>
             <groupId>${james.groupId}</groupId>
-            <artifactId>james-server-cassandra-guice</artifactId>
+            <artifactId>blob-objectstorage</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>blob-objectstorage-guice</artifactId>
+            <type>test-jar</type>
             <scope>test</scope>
         </dependency>
         <dependency>
@@ -75,6 +88,17 @@
         </dependency>
         <dependency>
             <groupId>${james.groupId}</groupId>
+            <artifactId>james-server-cassandra-rabbitmq-guice</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>james-server-cassandra-rabbitmq-guice</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
             <artifactId>james-server-guice-common</artifactId>
             <type>test-jar</type>
             <scope>test</scope>
diff --git a/server/protocols/webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/AuthorizedEndpointsTest.java b/server/protocols/webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/AuthorizedEndpointsTest.java
index aa956cb..1435304 100644
--- a/server/protocols/webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/AuthorizedEndpointsTest.java
+++ b/server/protocols/webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/AuthorizedEndpointsTest.java
@@ -22,7 +22,7 @@ package org.apache.james.webadmin.integration;
 import static io.restassured.RestAssured.when;
 import static org.hamcrest.core.IsNot.not;
 
-import org.apache.james.CassandraJmapTestRule;
+import org.apache.james.CassandraRabbitMQAwsS3JmapTestRule;
 import org.apache.james.DockerCassandraRule;
 import org.apache.james.GuiceJamesServer;
 import org.apache.james.utils.WebAdminGuiceProbe;
@@ -40,15 +40,15 @@ public class AuthorizedEndpointsTest {
 
     @Rule
     public DockerCassandraRule cassandra = new DockerCassandraRule();
-    
+
     @Rule
-    public CassandraJmapTestRule cassandraJmapTestRule = CassandraJmapTestRule.defaultTestRule();
+    public CassandraRabbitMQAwsS3JmapTestRule jamesTestRule = CassandraRabbitMQAwsS3JmapTestRule.defaultTestRule();
 
     private GuiceJamesServer guiceJamesServer;
 
     @Before
     public void setUp() throws Exception {
-        guiceJamesServer = cassandraJmapTestRule.jmapServer(cassandra.getModule(), new UnauthorizedModule());
+        guiceJamesServer = jamesTestRule.jmapServer(cassandra.getModule(), new UnauthorizedModule());
         guiceJamesServer.start();
         WebAdminGuiceProbe webAdminGuiceProbe = guiceJamesServer.getProbe(WebAdminGuiceProbe.class);
 
diff --git a/server/protocols/webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/CassandraJmapExtension.java b/server/protocols/webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/CassandraJmapExtension.java
index 90a8dec..2316abd 100644
--- a/server/protocols/webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/CassandraJmapExtension.java
+++ b/server/protocols/webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/CassandraJmapExtension.java
@@ -18,21 +18,23 @@
  ****************************************************************/
 package org.apache.james.webadmin.integration;
 
-import static org.apache.james.CassandraJamesServerMain.ALL_BUT_JMX_CASSANDRA_MODULE;
-
 import java.io.IOException;
 import java.util.Optional;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 
+import org.apache.james.CassandraRabbitMQJamesServerMain;
 import org.apache.james.CleanupTasksPerformer;
 import org.apache.james.DockerCassandraRule;
 import org.apache.james.DockerElasticSearchRule;
 import org.apache.james.GuiceJamesServer;
+import org.apache.james.backends.rabbitmq.DockerRabbitMQSingleton;
 import org.apache.james.mailbox.extractor.TextExtractor;
 import org.apache.james.mailbox.store.search.PDFTextExtractor;
 import org.apache.james.modules.TestDockerESMetricReporterModule;
 import org.apache.james.modules.TestJMAPServerModule;
+import org.apache.james.modules.TestRabbitMQModule;
+import org.apache.james.modules.objectstorage.aws.s3.DockerAwsS3TestRule;
 import org.apache.james.server.core.configuration.Configuration;
 import org.apache.james.util.FunctionalUtils;
 import org.apache.james.util.Runnables;
@@ -50,6 +52,7 @@ import org.junit.rules.TemporaryFolder;
 import com.github.fge.lambdas.Throwing;
 
 public class CassandraJmapExtension implements BeforeAllCallback, AfterAllCallback, BeforeEachCallback, AfterEachCallback, ParameterResolver {
+
     public interface JamesLifeCyclePolicy {
         JamesLifeCyclePolicy FOR_EACH_TEST = serverSupplier -> JamesLifecycleHandler.builder()
             .beforeAll(Optional::empty)
@@ -127,6 +130,7 @@ public class CassandraJmapExtension implements BeforeAllCallback, AfterAllCallba
     private static final int LIMIT_TO_20_MESSAGES = 20;
 
     private final TemporaryFolder temporaryFolder;
+    private final DockerAwsS3TestRule dockerAwsS3TestRule;
     private final DockerCassandraRule cassandra;
     private final DockerElasticSearchRule elasticSearchRule;
     private final JamesLifecycleHandler jamesLifecycleHandler;
@@ -140,6 +144,7 @@ public class CassandraJmapExtension implements BeforeAllCallback, AfterAllCallba
         this.temporaryFolder = new TemporaryFolder();
         this.cassandra = new DockerCassandraRule();
         this.elasticSearchRule = new DockerElasticSearchRule();
+        this.dockerAwsS3TestRule = new DockerAwsS3TestRule();
         this.jamesLifecycleHandler = jamesLifeCyclePolicy.createHandler(jamesSupplier());
     }
 
@@ -150,11 +155,14 @@ public class CassandraJmapExtension implements BeforeAllCallback, AfterAllCallba
                 .build();
 
         return GuiceJamesServer.forConfiguration(configuration)
-                .combineWith(ALL_BUT_JMX_CASSANDRA_MODULE).overrideWith(binder -> binder.bind(TextExtractor.class).to(PDFTextExtractor.class))
+                .combineWith(CassandraRabbitMQJamesServerMain.MODULES)
+                .overrideWith(binder -> binder.bind(TextExtractor.class).to(PDFTextExtractor.class))
                 .overrideWith(new TestJMAPServerModule(LIMIT_TO_20_MESSAGES))
                 .overrideWith(new TestDockerESMetricReporterModule(elasticSearchRule.getDockerEs().getHttpHost()))
                 .overrideWith(cassandra.getModule())
                 .overrideWith(elasticSearchRule.getModule())
+                .overrideWith(dockerAwsS3TestRule.getModule())
+                .overrideWith(new TestRabbitMQModule(DockerRabbitMQSingleton.SINGLETON))
                 .overrideWith(binder -> binder.bind(WebAdminConfiguration.class).toInstance(WebAdminConfiguration.TEST_CONFIGURATION))
                 .overrideWith(new UnauthorizedModule())
                 .overrideWith((binder -> binder.bind(CleanupTasksPerformer.class).asEagerSingleton()));
@@ -167,14 +175,14 @@ public class CassandraJmapExtension implements BeforeAllCallback, AfterAllCallba
     @Override
     public void beforeAll(ExtensionContext context) throws Exception {
         temporaryFolder.create();
-        Runnables.runParallel(cassandra::start, elasticSearchRule::start);
+        Runnables.runParallel(cassandra::start, elasticSearchRule::start, dockerAwsS3TestRule::start);
         james = jamesLifecycleHandler.beforeAll().orElse(james);
     }
 
     @Override
     public void afterAll(ExtensionContext context) {
         jamesLifecycleHandler.afterAll(james);
-        Runnables.runParallel(cassandra::stop, elasticSearchRule.getDockerEs()::cleanUpData);
+        Runnables.runParallel(cassandra::stop, elasticSearchRule.getDockerEs()::cleanUpData, dockerAwsS3TestRule::stop);
     }
 
     @Override
diff --git a/server/protocols/webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/EventDeadLettersIntegrationTest.java b/server/protocols/webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/EventDeadLettersIntegrationTest.java
index 5af52b2..5083bea 100644
--- a/server/protocols/webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/EventDeadLettersIntegrationTest.java
+++ b/server/protocols/webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/EventDeadLettersIntegrationTest.java
@@ -23,6 +23,7 @@ import static io.restassured.RestAssured.given;
 import static io.restassured.RestAssured.when;
 import static io.restassured.RestAssured.with;
 import static org.awaitility.Duration.ONE_HUNDRED_MILLISECONDS;
+import static org.awaitility.Duration.ONE_MINUTE;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.hasSize;
@@ -32,8 +33,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.james.CassandraJmapTestRule;
+import org.apache.james.CassandraRabbitMQAwsS3JmapTestRule;
 import org.apache.james.DockerCassandraRule;
 import org.apache.james.GuiceJamesServer;
 import org.apache.james.mailbox.DefaultMailboxes;
@@ -60,7 +62,6 @@ import org.junit.Rule;
 import org.junit.Test;
 
 import com.google.inject.multibindings.Multibinder;
-
 import io.restassured.RestAssured;
 import io.restassured.http.ContentType;
 
@@ -73,14 +74,16 @@ public class EventDeadLettersIntegrationTest {
     public static class RetryEventsListener implements MailboxListener.GroupMailboxListener {
         static final Group GROUP = new RetryEventsListenerGroup();
 
-        private int retriesBeforeSuccess;
-        private Map<Event.EventId, Integer> retries;
+        private final AtomicInteger totalCalls;
+        private int callsBeforeSuccess;
+        private Map<Event.EventId, Integer> callsByEventId;
         private List<Event> successfulEvents;
 
         RetryEventsListener() {
-            this.retriesBeforeSuccess = 0;
-            this.retries = new HashMap<>();
+            this.callsBeforeSuccess = 0;
+            this.callsByEventId = new HashMap<>();
             this.successfulEvents = new ArrayList<>();
+            this.totalCalls = new AtomicInteger(0);
         }
 
         @Override
@@ -90,26 +93,41 @@ public class EventDeadLettersIntegrationTest {
 
         @Override
         public void event(Event event) throws Exception {
-            int currentRetries = retries.getOrDefault(event.getEventId(), 0);
-
-            if (currentRetries < retriesBeforeSuccess) {
-                retries.put(event.getEventId(), currentRetries + 1);
-                throw new RuntimeException("throw to trigger retry");
-            } else {
-                retries.remove(event.getEventId());
+            totalCalls.incrementAndGet();
+            if (done(event)) {
+                callsByEventId.remove(event.getEventId());
                 successfulEvents.add(event);
+            } else {
+                increaseRetriesCount(event);
+                throw new RuntimeException("throw to trigger retry");
             }
         }
 
+        private void increaseRetriesCount(Event event) {
+            callsByEventId.put(event.getEventId(), retriesCount(event) + 1);
+        }
+
+        int retriesCount(Event event) {
+            return callsByEventId.getOrDefault(event.getEventId(), 0);
+        }
+
+        boolean done(Event event) {
+            return retriesCount(event) >= callsBeforeSuccess;
+        }
+
         List<Event> getSuccessfulEvents() {
             return successfulEvents;
         }
 
-        void setRetriesBeforeSuccess(int retriesBeforeSuccess) {
-            this.retriesBeforeSuccess = retriesBeforeSuccess;
+        void callsPerEventBeforeSuccess(int retriesBeforeSuccess) {
+            this.callsBeforeSuccess = retriesBeforeSuccess;
         }
     }
 
+    //This value is duplicated from default configuration to ensure we keep the same behavior over time
+    //unless we really want to change that default value
+    private static final int MAX_RETRIES = 3;
+
     private static final String DOMAIN = "domain.tld";
     private static final String BOB = "bob@" + DOMAIN;
     private static final String BOB_PASSWORD = "bobPassword";
@@ -132,7 +150,7 @@ public class EventDeadLettersIntegrationTest {
     public DockerCassandraRule cassandra = new DockerCassandraRule();
 
     @Rule
-    public CassandraJmapTestRule cassandraJmapTestRule = CassandraJmapTestRule.defaultTestRule();
+    public CassandraRabbitMQAwsS3JmapTestRule jamesTestRule = CassandraRabbitMQAwsS3JmapTestRule.defaultTestRule();
 
     private GuiceJamesServer guiceJamesServer;
     private MailboxProbeImpl mailboxProbe;
@@ -140,7 +158,7 @@ public class EventDeadLettersIntegrationTest {
     @Before
     public void setUp() throws Exception {
         retryEventsListener = new RetryEventsListener();
-        guiceJamesServer = cassandraJmapTestRule.jmapServer(cassandra.getModule())
+        guiceJamesServer = jamesTestRule.jmapServer(cassandra.getModule())
             .overrideWith(binder -> Multibinder.newSetBinder(binder, MailboxListener.GroupMailboxListener.class).addBinding().toInstance(retryEventsListener));
         guiceJamesServer.start();
 
@@ -179,9 +197,11 @@ public class EventDeadLettersIntegrationTest {
 
     @Test
     public void failedEventShouldBeStoredInDeadLetterUnderItsGroupId() {
-        retryEventsListener.setRetriesBeforeSuccess(4);
+        retryEventsListener.callsPerEventBeforeSuccess(MAX_RETRIES + 1);
         generateInitialEvent();
 
+        waitForCalls(MAX_RETRIES + 1);
+
         when()
             .get(EventDeadLettersRoutes.BASE_PATH + "/groups/" + GROUP_ID)
         .then()
@@ -192,9 +212,11 @@ public class EventDeadLettersIntegrationTest {
 
     @Test
     public void successfulEventShouldNotBeStoredInDeadLetter() {
-        retryEventsListener.setRetriesBeforeSuccess(3);
+        retryEventsListener.callsPerEventBeforeSuccess(MAX_RETRIES - 1);
         generateInitialEvent();
 
+        calmlyAwait.atMost(ONE_MINUTE).until(() -> !retryEventsListener.successfulEvents.isEmpty());
+
         when()
             .get(EventDeadLettersRoutes.BASE_PATH + "/groups")
         .then()
@@ -205,9 +227,11 @@ public class EventDeadLettersIntegrationTest {
 
     @Test
     public void groupIdOfFailedEventShouldBeStoredInDeadLetter() {
-        retryEventsListener.setRetriesBeforeSuccess(4);
+        retryEventsListener.callsPerEventBeforeSuccess(MAX_RETRIES + 1);
         generateInitialEvent();
 
+        waitForCalls(MAX_RETRIES + 1);
+
         when()
             .get(EventDeadLettersRoutes.BASE_PATH + "/groups")
         .then()
@@ -218,9 +242,11 @@ public class EventDeadLettersIntegrationTest {
 
     @Test
     public void failedEventShouldBeStoredInDeadLetter() {
-        retryEventsListener.setRetriesBeforeSuccess(4);
+        retryEventsListener.callsPerEventBeforeSuccess(MAX_RETRIES + 1);
         MailboxId mailboxId = generateInitialEvent();
 
+        waitForCalls(MAX_RETRIES + 1);
+
         String failedInsertionId = retrieveFirstFailedInsertionId();
 
         when()
@@ -237,10 +263,12 @@ public class EventDeadLettersIntegrationTest {
 
     @Test
     public void multipleFailedEventShouldBeStoredInDeadLetter() {
-        retryEventsListener.setRetriesBeforeSuccess(4);
+        retryEventsListener.callsPerEventBeforeSuccess(MAX_RETRIES + 1);
         generateInitialEvent();
         generateSecondEvent();
 
+        waitForCalls((MAX_RETRIES + 1) * 2);
+
         when()
             .get(EventDeadLettersRoutes.BASE_PATH + "/groups/" + GROUP_ID)
         .then()
@@ -251,9 +279,11 @@ public class EventDeadLettersIntegrationTest {
 
     @Test
     public void failedEventShouldNotBeInDeadLetterAfterBeingDeleted() {
-        retryEventsListener.setRetriesBeforeSuccess(4);
+        retryEventsListener.callsPerEventBeforeSuccess(MAX_RETRIES + 1);
         generateInitialEvent();
 
+        waitForCalls(MAX_RETRIES + 1);
+
         String failedInsertionId = retrieveFirstFailedInsertionId();
 
         with()
@@ -267,9 +297,11 @@ public class EventDeadLettersIntegrationTest {
 
     @Test
     public void taskShouldBeCompletedAfterSuccessfulRedelivery() {
-        retryEventsListener.setRetriesBeforeSuccess(4);
+        retryEventsListener.callsPerEventBeforeSuccess(MAX_RETRIES + 1);
         generateInitialEvent();
 
+        waitForCalls(MAX_RETRIES + 1);
+
         String failedInsertionId = retrieveFirstFailedInsertionId();
 
         String taskId = with()
@@ -292,9 +324,11 @@ public class EventDeadLettersIntegrationTest {
 
     @Test
     public void failedEventShouldNotBeInDeadLettersAfterSuccessfulRedelivery() {
-        retryEventsListener.setRetriesBeforeSuccess(4);
+        retryEventsListener.callsPerEventBeforeSuccess(MAX_RETRIES + 1);
         generateInitialEvent();
 
+        waitForCalls(MAX_RETRIES + 1);
+
         String failedInsertionId = retrieveFirstFailedInsertionId();
 
         String taskId = with()
@@ -314,10 +348,12 @@ public class EventDeadLettersIntegrationTest {
     }
 
     @Test
-    public void failedEventShouldBeCorrectlyProcessedByListenerAfterSuccessfulRedelivery() {
-        retryEventsListener.setRetriesBeforeSuccess(5);
+    public void failedEventShouldBeCorrectlyProcessedByListenerAfterSuccessfulRedelivery() throws InterruptedException {
+        retryEventsListener.callsPerEventBeforeSuccess(MAX_RETRIES + 1);
         generateInitialEvent();
 
+        waitForCalls(MAX_RETRIES + 1);
+
         String failedInsertionId = retrieveFirstFailedInsertionId();
 
         String taskId = with()
@@ -333,12 +369,18 @@ public class EventDeadLettersIntegrationTest {
         awaitAtMostTenSeconds.until(() -> retryEventsListener.getSuccessfulEvents().size() == 1);
     }
 
+    private void waitForCalls(int count) {
+        calmlyAwait.atMost(ONE_MINUTE).until(() -> retryEventsListener.totalCalls.intValue() >= count);
+    }
+
     @Test
     public void taskShouldBeCompletedAfterSuccessfulGroupRedelivery() {
-        retryEventsListener.setRetriesBeforeSuccess(4);
+        retryEventsListener.callsPerEventBeforeSuccess(MAX_RETRIES + 1);
         generateInitialEvent();
         generateSecondEvent();
 
+        waitForCalls((MAX_RETRIES + 1) * 2);
+
         String taskId = with()
             .queryParam("action", EVENTS_ACTION)
         .post(EventDeadLettersRoutes.BASE_PATH + "/groups/" + GROUP_ID)
@@ -358,10 +400,12 @@ public class EventDeadLettersIntegrationTest {
 
     @Test
     public void multipleFailedEventsShouldNotBeInDeadLettersAfterSuccessfulGroupRedelivery() {
-        retryEventsListener.setRetriesBeforeSuccess(4);
+        retryEventsListener.callsPerEventBeforeSuccess(MAX_RETRIES + 1);
         generateInitialEvent();
         generateSecondEvent();
 
+        waitForCalls((MAX_RETRIES + 1) * 2);
+
         String taskId = with()
             .queryParam("action", EVENTS_ACTION)
         .post(EventDeadLettersRoutes.BASE_PATH + "/groups/" + GROUP_ID)
@@ -382,10 +426,12 @@ public class EventDeadLettersIntegrationTest {
 
     @Test
     public void multipleFailedEventsShouldBeCorrectlyProcessedByListenerAfterSuccessfulGroupRedelivery() {
-        retryEventsListener.setRetriesBeforeSuccess(5);
+        retryEventsListener.callsPerEventBeforeSuccess(MAX_RETRIES + 1);
         generateInitialEvent();
         generateSecondEvent();
 
+        waitForCalls((MAX_RETRIES + 1) * 2);
+
         String taskId = with()
             .queryParam("action", EVENTS_ACTION)
         .post(EventDeadLettersRoutes.BASE_PATH + "/groups/" + GROUP_ID)
@@ -401,10 +447,12 @@ public class EventDeadLettersIntegrationTest {
 
     @Test
     public void taskShouldBeCompletedAfterSuccessfulAllRedelivery() {
-        retryEventsListener.setRetriesBeforeSuccess(4);
+        retryEventsListener.callsPerEventBeforeSuccess(MAX_RETRIES + 1);
         generateInitialEvent();
         generateSecondEvent();
 
+        waitForCalls((MAX_RETRIES + 1) * 2);
+
         String taskId = with()
             .queryParam("action", EVENTS_ACTION)
         .post(EventDeadLettersRoutes.BASE_PATH)
@@ -423,10 +471,12 @@ public class EventDeadLettersIntegrationTest {
 
     @Test
     public void multipleFailedEventsShouldNotBeInDeadLettersAfterSuccessfulAllRedelivery() {
-        retryEventsListener.setRetriesBeforeSuccess(4);
+        retryEventsListener.callsPerEventBeforeSuccess(MAX_RETRIES + 1);
         generateInitialEvent();
         generateSecondEvent();
 
+        waitForCalls((MAX_RETRIES + 1) * 2);
+
         String taskId = with()
             .queryParam("action", EVENTS_ACTION)
         .post(EventDeadLettersRoutes.BASE_PATH)
@@ -447,10 +497,12 @@ public class EventDeadLettersIntegrationTest {
 
     @Test
     public void multipleFailedEventsShouldBeCorrectlyProcessedByListenerAfterSuccessfulAllRedelivery() {
-        retryEventsListener.setRetriesBeforeSuccess(5);
+        retryEventsListener.callsPerEventBeforeSuccess(MAX_RETRIES + 1);
         generateInitialEvent();
         generateSecondEvent();
 
+        waitForCalls((MAX_RETRIES + 1) * 2);
+
         String taskId = with()
             .queryParam("action", EVENTS_ACTION)
         .post(EventDeadLettersRoutes.BASE_PATH)
@@ -467,9 +519,11 @@ public class EventDeadLettersIntegrationTest {
     @Ignore("retry rest API delivers only once, see JAMES-2907. We need same retry cound for this test to work")
     @Test
     public void failedEventShouldStillBeInDeadLettersAfterFailedRedelivery() {
-        retryEventsListener.setRetriesBeforeSuccess(8);
+        retryEventsListener.callsPerEventBeforeSuccess(MAX_RETRIES * 2 + 1);
         generateInitialEvent();
 
+        waitForCalls(MAX_RETRIES + 1);
+
         String failedInsertionId = retrieveFirstFailedInsertionId();
 
         String taskId = with()
diff --git a/server/protocols/webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/JwtFilterIntegrationTest.java b/server/protocols/webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/JwtFilterIntegrationTest.java
index 31c1116..578dcf0 100644
--- a/server/protocols/webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/JwtFilterIntegrationTest.java
+++ b/server/protocols/webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/JwtFilterIntegrationTest.java
@@ -25,7 +25,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 import java.util.Optional;
 
-import org.apache.james.CassandraJmapTestRule;
+import org.apache.james.CassandraRabbitMQAwsS3JmapTestRule;
 import org.apache.james.DockerCassandraRule;
 import org.apache.james.GuiceJamesServer;
 import org.apache.james.jwt.JwtConfiguration;
@@ -63,7 +63,7 @@ public class JwtFilterIntegrationTest {
     public DockerCassandraRule cassandra = new DockerCassandraRule();
     
     @Rule
-    public CassandraJmapTestRule cassandraJmapTestRule = CassandraJmapTestRule.defaultTestRule();
+    public CassandraRabbitMQAwsS3JmapTestRule jamesTestRule = CassandraRabbitMQAwsS3JmapTestRule.defaultTestRule();
 
     private GuiceJamesServer guiceJamesServer;
     private DataProbeImpl dataProbe;
@@ -74,7 +74,7 @@ public class JwtFilterIntegrationTest {
         JwtConfiguration jwtConfiguration = new JwtConfiguration(
             Optional.of(ClassLoaderUtils.getSystemResourceAsString("jwt_publickey")));
 
-        guiceJamesServer = cassandraJmapTestRule.jmapServer(cassandra.getModule())
+        guiceJamesServer = jamesTestRule.jmapServer(cassandra.getModule())
             .overrideWith(binder -> binder.bind(AuthenticationFilter.class).to(JwtFilter.class),
                 binder -> binder.bind(JwtConfiguration.class).toInstance(jwtConfiguration));
         guiceJamesServer.start();
diff --git a/server/protocols/webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/WebAdminServerIntegrationTest.java b/server/protocols/webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/WebAdminServerIntegrationTest.java
index 081b2a0..fe302ef 100644
--- a/server/protocols/webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/WebAdminServerIntegrationTest.java
+++ b/server/protocols/webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/WebAdminServerIntegrationTest.java
@@ -32,7 +32,7 @@ import static org.hamcrest.Matchers.is;
 
 import java.util.List;
 
-import org.apache.james.CassandraJmapTestRule;
+import org.apache.james.CassandraRabbitMQAwsS3JmapTestRule;
 import org.apache.james.DockerCassandraRule;
 import org.apache.james.GuiceJamesServer;
 import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionManager;
@@ -85,14 +85,14 @@ public class WebAdminServerIntegrationTest {
     public DockerCassandraRule cassandra = new DockerCassandraRule();
 
     @Rule
-    public CassandraJmapTestRule cassandraJmapTestRule = CassandraJmapTestRule.defaultTestRule();
+    public CassandraRabbitMQAwsS3JmapTestRule jamesTestRule = CassandraRabbitMQAwsS3JmapTestRule.defaultTestRule();
 
     private GuiceJamesServer guiceJamesServer;
     private DataProbe dataProbe;
 
     @Before
     public void setUp() throws Exception {
-        guiceJamesServer = cassandraJmapTestRule.jmapServer(cassandra.getModule());
+        guiceJamesServer = jamesTestRule.jmapServer(cassandra.getModule());
         guiceJamesServer.start();
         dataProbe = guiceJamesServer.getProbe(DataProbeImpl.class);
         dataProbe.addDomain(DOMAIN);


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org