You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/03/07 11:25:37 UTC

[james-project] branch master updated (4b3ebab -> eaad70c)

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git.


    from 4b3ebab  JAMES-3067 Fixes related to make RecipientRewriteTable configuration immutable
     new aa97806  JAMES-3104 Eventually publish blob store metrics
     new 1ebd654  JAMES-3082 add retry to make event bus test when rabbitmq restart pass
     new d143e5e  JAMES-3082 re-enable rabbitMQ event bust test when restart rabbitMQ
     new 257e917  JAMES-3082 create test to demonstrate that event bus messages are persisted on rabbitMQ
     new f39bea9  JAMES-3082 set eventbus rabbitmq messages persitent
     new 7cace28  JAMES-3082 add retry to make event bus test when rabbitmq restart pass
     new cf71fdd  JAMES-3103 Set EventBus' queue to autoDelete in order to avoid leak
     new 0dff06e  JAMES-3103 Add a prefix to EventBus' queue name
     new da860b5  JAMES-3103 Add a test showing message are dispatched to registered keys qfter outage
     new 45f0ce4  JAMES-3044 Upgrade JSOUP to 1.13.1
     new eaad70c  JAMES-1759 WebAdmin route to test user existence

The 11 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../mailbox/events/RetryBackoffConfiguration.java  |  3 +-
 .../mailbox/events/ErrorHandlingContract.java      | 57 +++++++++++++++++----
 .../james/mailbox/events/EventBusTestFixture.java  |  9 +++-
 .../events/delivery/InVmEventDeliveryTest.java     |  6 +++
 .../james/mailbox/events/EventDispatcher.java      |  4 ++
 .../james/mailbox/events/GroupConsumerRetry.java   |  4 ++
 .../james/mailbox/events/GroupRegistration.java    |  4 ++
 .../mailbox/events/KeyRegistrationHandler.java     | 59 +++++++++++++++++-----
 .../james/mailbox/events/RabbitMQEventBus.java     | 24 ++++++++-
 .../james/mailbox/events/RabbitMQEventBusTest.java | 50 +++++++++++++++---
 pom.xml                                            |  2 +-
 server/blob/blob-api/pom.xml                       |  5 ++
 .../blob/api/MetricableBlobStoreContract.java      | 38 +++++++++-----
 server/blob/blob-cassandra/pom.xml                 |  5 ++
 server/blob/blob-memory/pom.xml                    |  5 ++
 server/blob/blob-objectstorage/pom.xml             |  5 ++
 .../RabbitMQEventDeadLettersIntegrationTest.java   |  4 +-
 .../apache/james/webadmin/routes/UserRoutes.java   | 31 ++++++++++++
 .../apache/james/webadmin/service/UserService.java |  8 +--
 .../james/webadmin/routes/UserRoutesTest.java      | 48 ++++++++++++++++++
 src/site/markdown/server/manage-webadmin.md        | 16 ++++++
 21 files changed, 331 insertions(+), 56 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 05/11: JAMES-3082 set eventbus rabbitmq messages persitent

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit f39bea91362791401898b8579b718fe72421afee
Author: Rémi KOWALSKI <rk...@linagora.com>
AuthorDate: Mon Mar 2 11:19:03 2020 +0100

    JAMES-3082 set eventbus rabbitmq messages persitent
---
 .../main/java/org/apache/james/mailbox/events/EventDispatcher.java    | 4 ++++
 .../main/java/org/apache/james/mailbox/events/GroupConsumerRetry.java | 4 ++++
 2 files changed, 8 insertions(+)

diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
index 93b1028..239d716 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
@@ -19,6 +19,7 @@
 
 package org.apache.james.mailbox.events;
 
+import static com.rabbitmq.client.MessageProperties.PERSISTENT_TEXT_PLAIN;
 import static org.apache.james.backends.rabbitmq.Constants.DIRECT_EXCHANGE;
 import static org.apache.james.backends.rabbitmq.Constants.DURABLE;
 import static org.apache.james.mailbox.events.RabbitMQEventBus.EVENT_BUS_ID;
@@ -64,6 +65,9 @@ class EventDispatcher {
         this.localListenerRegistry = localListenerRegistry;
         this.basicProperties = new AMQP.BasicProperties.Builder()
             .headers(ImmutableMap.of(EVENT_BUS_ID, eventBusId.asString()))
+            .deliveryMode(PERSISTENT_TEXT_PLAIN.getDeliveryMode())
+            .priority(PERSISTENT_TEXT_PLAIN.getPriority())
+            .contentType(PERSISTENT_TEXT_PLAIN.getContentType())
             .build();
         this.mailboxListenerExecutor = mailboxListenerExecutor;
     }
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupConsumerRetry.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupConsumerRetry.java
index f1933ad..8286d30 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupConsumerRetry.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupConsumerRetry.java
@@ -19,6 +19,7 @@
 
 package org.apache.james.mailbox.events;
 
+import static com.rabbitmq.client.MessageProperties.PERSISTENT_TEXT_PLAIN;
 import static org.apache.james.backends.rabbitmq.Constants.DIRECT_EXCHANGE;
 import static org.apache.james.backends.rabbitmq.Constants.DURABLE;
 import static org.apache.james.backends.rabbitmq.Constants.EMPTY_ROUTING_KEY;
@@ -118,6 +119,9 @@ class GroupConsumerRetry {
             EMPTY_ROUTING_KEY,
             new AMQP.BasicProperties.Builder()
                 .headers(ImmutableMap.of(RETRY_COUNT, currentRetryCount + 1))
+                .deliveryMode(PERSISTENT_TEXT_PLAIN.getDeliveryMode())
+                .priority(PERSISTENT_TEXT_PLAIN.getPriority())
+                .contentType(PERSISTENT_TEXT_PLAIN.getContentType())
                 .build(),
             eventAsBytes));
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 03/11: JAMES-3082 re-enable rabbitMQ event bust test when restart rabbitMQ

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit d143e5e1d50ee58738e609b34a2eec5fb2d6705d
Author: Rémi KOWALSKI <rk...@linagora.com>
AuthorDate: Mon Mar 2 17:42:31 2020 +0100

    JAMES-3082 re-enable rabbitMQ event bust test when restart rabbitMQ
---
 .../java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java  | 6 ------
 1 file changed, 6 deletions(-)

diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
index 35d3581..a5a7d48 100644
--- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
+++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
@@ -361,7 +361,6 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
             }
 
             @Test
-            @Disabled("To fix in JAMES-3082 make message persistent in event bus")
             void dispatchShouldWorkAfterRestartForOldRegistration() throws Exception {
                 eventBus.start();
                 MailboxListener listener = newListener();
@@ -374,7 +373,6 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
             }
 
             @Test
-            @Disabled("To fix in JAMES-3082 make message persistent in event bus")
             void dispatchShouldWorkAfterRestartForNewRegistration() throws Exception {
                 eventBus.start();
                 MailboxListener listener = newListener();
@@ -390,7 +388,6 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
             }
 
             @Test
-            @Disabled("To fix in JAMES-3082 make message persistent in event bus")
             void redeliverShouldWorkAfterRestartForOldRegistration() throws Exception {
                 eventBus.start();
                 MailboxListener listener = newListener();
@@ -403,7 +400,6 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
             }
 
             @Test
-            @Disabled("To fix in JAMES-3082 make message persistent in event bus")
             void redeliverShouldWorkAfterRestartForNewRegistration() throws Exception {
                 eventBus.start();
                 MailboxListener listener = newListener();
@@ -417,7 +413,6 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
             }
 
             @Test
-            @Disabled("To fix in JAMES-3082 make message persistent in event bus")
             void dispatchShouldWorkAfterRestartForOldKeyRegistration() throws Exception {
                 eventBus.start();
                 MailboxListener listener = newListener();
@@ -430,7 +425,6 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
             }
 
             @Test
-            @Disabled("To fix in JAMES-3082 make message persistent in event bus")
             void dispatchShouldWorkAfterRestartForNewKeyRegistration() throws Exception {
                 eventBus.start();
                 MailboxListener listener = newListener();


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 06/11: JAMES-3082 add retry to make event bus test when rabbitmq restart pass

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 7cace280ba2ef586c58423f561c0f16e5030e99d
Author: Rémi KOWALSKI <rk...@linagora.com>
AuthorDate: Fri Mar 6 10:56:50 2020 +0100

    JAMES-3082 add retry to make event bus test when rabbitmq restart pass
---
 .../main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
index 7e95dbf..f823ff1 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
@@ -92,9 +92,9 @@ public class RabbitMQEventBus implements EventBus, Startable {
         if (!isRunning && !isStopping) {
 
             LocalListenerRegistry localListenerRegistry = new LocalListenerRegistry();
-            keyRegistrationHandler = new KeyRegistrationHandler(eventBusId, eventSerializer, channelPool, routingKeyConverter, localListenerRegistry, mailboxListenerExecutor, retryBackoff);
-            groupRegistrationHandler = new GroupRegistrationHandler(eventSerializer, channelPool, retryBackoff, eventDeadLetters, mailboxListenerExecutor);
-            eventDispatcher = new EventDispatcher(eventBusId, eventSerializer, channelPool.getSender(), localListenerRegistry, mailboxListenerExecutor);
+            keyRegistrationHandler = new KeyRegistrationHandler(eventBusId, eventSerializer, sender, receiverProvider, routingKeyConverter, localListenerRegistry, mailboxListenerExecutor, retryBackoff);
+            groupRegistrationHandler = new GroupRegistrationHandler(eventSerializer, sender, receiverProvider, retryBackoff, eventDeadLetters, mailboxListenerExecutor);
+            eventDispatcher = new EventDispatcher(eventBusId, eventSerializer, sender, localListenerRegistry, mailboxListenerExecutor);
 
             keyRegistrationHandler.declareQueue();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 07/11: JAMES-3103 Set EventBus' queue to autoDelete in order to avoid leak

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit cf71fddc2b972bd0b7ec227db940afe7d2f42708
Author: Gautier DI FOLCO <gd...@linagora.com>
AuthorDate: Fri Mar 6 13:40:40 2020 +0100

    JAMES-3103 Set EventBus' queue to autoDelete in order to avoid leak
---
 .../java/org/apache/james/mailbox/events/KeyRegistrationHandler.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
index 49ea9b9..20c4b4a 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
@@ -99,7 +99,7 @@ class KeyRegistrationHandler {
         sender.declareQueue(QueueSpecification.queue(eventBusId.asString())
             .durable(DURABLE)
             .exclusive(!EXCLUSIVE)
-            .autoDelete(!AUTO_DELETE)
+            .autoDelete(AUTO_DELETE)
             .arguments(NO_ARGUMENTS))
             .map(AMQP.Queue.DeclareOk::getQueue)
             .retryBackoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff(), FOREVER, retryBackoff.getJitterFactor())


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 08/11: JAMES-3103 Add a prefix to EventBus' queue name

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 0dff06e98530bc9e8b59d93107f4f4c9b5689593
Author: Gautier DI FOLCO <gd...@linagora.com>
AuthorDate: Fri Mar 6 13:51:30 2020 +0100

    JAMES-3103 Add a prefix to EventBus' queue name
---
 .../apache/james/mailbox/events/KeyRegistrationHandler.java   | 11 ++++++++---
 1 file changed, 8 insertions(+), 3 deletions(-)

diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
index 20c4b4a..c9767b8 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
@@ -22,11 +22,12 @@ package org.apache.james.mailbox.events;
 import static org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE;
 import static org.apache.james.backends.rabbitmq.Constants.DURABLE;
 import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE;
-import static org.apache.james.backends.rabbitmq.Constants.NO_ARGUMENTS;
 import static org.apache.james.mailbox.events.RabbitMQEventBus.EVENT_BUS_ID;
 import static org.apache.james.mailbox.events.RetryBackoffConfiguration.FOREVER;
 
 import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -40,6 +41,7 @@ import org.slf4j.LoggerFactory;
 
 import com.github.fge.lambdas.Throwing;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
 import com.rabbitmq.client.AMQP;
 import com.rabbitmq.client.Delivery;
 
@@ -53,6 +55,9 @@ import reactor.rabbitmq.Sender;
 
 class KeyRegistrationHandler {
     private static final Logger LOGGER = LoggerFactory.getLogger(KeyRegistrationHandler.class);
+    private static final String EVENTBUS_QUEUE_NAME_PREFIX = "eventbus-";
+    private static final Duration EXPIRATION_TIMEOUT = Duration.ofMinutes(30);
+    private static final Map<String, Object> QUEUE_ARGUMENTS = ImmutableMap.of("x-expires", EXPIRATION_TIMEOUT.toMillis());
 
     private final EventBusId eventBusId;
     private final LocalListenerRegistry localListenerRegistry;
@@ -96,11 +101,11 @@ class KeyRegistrationHandler {
 
     @VisibleForTesting
     void declareQueue() {
-        sender.declareQueue(QueueSpecification.queue(eventBusId.asString())
+        sender.declareQueue(QueueSpecification.queue(EVENTBUS_QUEUE_NAME_PREFIX + eventBusId.asString())
             .durable(DURABLE)
             .exclusive(!EXCLUSIVE)
             .autoDelete(AUTO_DELETE)
-            .arguments(NO_ARGUMENTS))
+            .arguments(QUEUE_ARGUMENTS))
             .map(AMQP.Queue.DeclareOk::getQueue)
             .retryBackoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff(), FOREVER, retryBackoff.getJitterFactor())
             .doOnSuccess(queueName -> {


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 11/11: JAMES-1759 WebAdmin route to test user existence

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit eaad70cb24c77524051b58875422e4ef28a9a997
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Thu Mar 5 11:08:52 2020 +0700

    JAMES-1759 WebAdmin route to test user existence
    
    Convenient when the input of user list is super large
---
 .../apache/james/webadmin/routes/UserRoutes.java   | 31 ++++++++++++++
 .../apache/james/webadmin/service/UserService.java |  8 +---
 .../james/webadmin/routes/UserRoutesTest.java      | 48 ++++++++++++++++++++++
 src/site/markdown/server/manage-webadmin.md        | 16 ++++++++
 4 files changed, 96 insertions(+), 7 deletions(-)

diff --git a/server/protocols/webadmin/webadmin-data/src/main/java/org/apache/james/webadmin/routes/UserRoutes.java b/server/protocols/webadmin/webadmin-data/src/main/java/org/apache/james/webadmin/routes/UserRoutes.java
index cd8699d..6cfcbc8 100644
--- a/server/protocols/webadmin/webadmin-data/src/main/java/org/apache/james/webadmin/routes/UserRoutes.java
+++ b/server/protocols/webadmin/webadmin-data/src/main/java/org/apache/james/webadmin/routes/UserRoutes.java
@@ -27,6 +27,7 @@ import java.util.List;
 import javax.inject.Inject;
 import javax.ws.rs.DELETE;
 import javax.ws.rs.GET;
+import javax.ws.rs.HEAD;
 import javax.ws.rs.PUT;
 import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
@@ -38,6 +39,7 @@ import org.apache.james.rrt.api.RecipientRewriteTable;
 import org.apache.james.rrt.api.RecipientRewriteTableException;
 import org.apache.james.user.api.InvalidUsernameException;
 import org.apache.james.user.api.UsersRepositoryException;
+import org.apache.james.webadmin.Constants;
 import org.apache.james.webadmin.Routes;
 import org.apache.james.webadmin.dto.AddUserRequest;
 import org.apache.james.webadmin.dto.UserResponse;
@@ -106,6 +108,8 @@ public class UserRoutes implements Routes {
         defineDeleteUser();
 
         defineAllowedFromHeaders();
+
+        defineUserExist();
     }
 
     @DELETE
@@ -124,6 +128,23 @@ public class UserRoutes implements Routes {
         service.delete(USERS + SEPARATOR + USER_NAME, this::removeUser);
     }
 
+    @HEAD
+    @Path("/{username}")
+    @ApiOperation(value = "Testing an user existence")
+    @ApiImplicitParams({
+        @ApiImplicitParam(required = true, dataType = "string", name = "username", paramType = "path")
+    })
+    @ApiResponses(value = {
+        @ApiResponse(code = HttpStatus.OK_200, message = "OK. User exists."),
+        @ApiResponse(code = HttpStatus.BAD_REQUEST_400, message = "Invalid input user."),
+        @ApiResponse(code = HttpStatus.NOT_FOUND_404, message = "User does not exist."),
+        @ApiResponse(code = HttpStatus.INTERNAL_SERVER_ERROR_500,
+            message = "Internal server error - Something went bad on the server side.")
+    })
+    public void defineUserExist() {
+        service.head(USERS + SEPARATOR + USER_NAME, this::userExist);
+    }
+
     @PUT
     @Path("/{username}")
     @ApiOperation(value = "Creating an user")
@@ -188,6 +209,16 @@ public class UserRoutes implements Routes {
         }
     }
 
+    private String userExist(Request request, Response response) throws UsersRepositoryException {
+        Username username = extractUsername(request);
+        if (userService.userExists(username)) {
+            response.status(HttpStatus.OK_200);
+        } else {
+            response.status(HttpStatus.NOT_FOUND_404);
+        }
+        return Constants.EMPTY_BODY;
+    }
+
     private HaltException upsertUser(Request request, Response response) throws JsonExtractException {
         Username username = extractUsername(request);
         try {
diff --git a/server/protocols/webadmin/webadmin-data/src/main/java/org/apache/james/webadmin/service/UserService.java b/server/protocols/webadmin/webadmin-data/src/main/java/org/apache/james/webadmin/service/UserService.java
index a263234..35e5cb6 100644
--- a/server/protocols/webadmin/webadmin-data/src/main/java/org/apache/james/webadmin/service/UserService.java
+++ b/server/protocols/webadmin/webadmin-data/src/main/java/org/apache/james/webadmin/service/UserService.java
@@ -24,7 +24,6 @@ import java.util.Optional;
 import java.util.stream.Stream;
 
 import javax.inject.Inject;
-import javax.mail.internet.AddressException;
 
 import org.apache.james.core.Username;
 import org.apache.james.user.api.UsersRepository;
@@ -67,12 +66,7 @@ public class UserService {
     }
 
     public boolean userExists(Username username) throws UsersRepositoryException {
-        try {
-            return usersRepository.contains(usersRepository.getUser(username.asMailAddress()));
-        } catch (AddressException e) {
-            LOGGER.info("Unable to parse address '%s'", username.asString(), e);
-            return false;
-        }
+        return usersRepository.contains(username);
     }
 
     private void upsert(User user, Username username, char[] password) throws UsersRepositoryException {
diff --git a/server/protocols/webadmin/webadmin-data/src/test/java/org/apache/james/webadmin/routes/UserRoutesTest.java b/server/protocols/webadmin/webadmin-data/src/test/java/org/apache/james/webadmin/routes/UserRoutesTest.java
index 38b1ada..2354290 100644
--- a/server/protocols/webadmin/webadmin-data/src/test/java/org/apache/james/webadmin/routes/UserRoutesTest.java
+++ b/server/protocols/webadmin/webadmin-data/src/test/java/org/apache/james/webadmin/routes/UserRoutesTest.java
@@ -246,6 +246,14 @@ class UserRoutesTest {
             }
 
             @Test
+            default void headShouldReturnBadRequestWhenEmptyUserName() {
+                when()
+                    .head("/")
+                .then()
+                    .statusCode(HttpStatus.NOT_FOUND_404);
+            }
+
+            @Test
             default void deleteShouldReturnBadRequestWhenUsernameIsTooLong() {
                 when()
                     .delete(USERNAME_WITH_DOMAIN.asString() + "0123456789.0123456789.0123456789.0123456789.0123456789.0123456789.0123456789.0123456789.0123456789.0123456789." +
@@ -514,6 +522,26 @@ class UserRoutesTest {
         }
 
         @Test
+        void headShouldReturnOKWhenUserExists() {
+            with()
+                .body("{\"password\":\"password\"}")
+                .put(USERNAME_WITH_DOMAIN.asString());
+
+            when()
+                .head(USERNAME_WITH_DOMAIN.asString())
+            .then()
+                .statusCode(HttpStatus.OK_200);
+        }
+
+        @Test
+        void headShouldReturnNotFoundWhenUserDoesNotExist() {
+            when()
+                .head(USERNAME_WITH_DOMAIN.asString())
+            .then()
+                .statusCode(HttpStatus.NOT_FOUND_404);
+        }
+
+        @Test
         void puttingWithDomainPartInUsernameTwoTimesShouldBeAllowed() {
             // Given
             with()
@@ -683,6 +711,26 @@ class UserRoutesTest {
         }
 
         @Test
+        void headShouldReturnOKWhenUserExists() {
+            with()
+                .body("{\"password\":\"password\"}")
+                .put(USERNAME_WITHOUT_DOMAIN.asString());
+
+            when()
+                .head(USERNAME_WITHOUT_DOMAIN.asString())
+            .then()
+                .statusCode(HttpStatus.OK_200);
+        }
+
+        @Test
+        void headShouldReturnNotFoundWhenUserDoesNotExist() {
+            when()
+                .head(USERNAME_WITHOUT_DOMAIN.asString())
+            .then()
+                .statusCode(HttpStatus.NOT_FOUND_404);
+        }
+
+        @Test
         void puttingWithoutDomainPartInUsernameTwoTimesShouldBeAllowed() {
             // Given
             with()
diff --git a/src/site/markdown/server/manage-webadmin.md b/src/site/markdown/server/manage-webadmin.md
index 33d7a16..0b5675f 100644
--- a/src/site/markdown/server/manage-webadmin.md
+++ b/src/site/markdown/server/manage-webadmin.md
@@ -297,6 +297,7 @@ Response codes:
 ## Administrating users
 
    - [Create a user](#Create_a_user)
+   - [Testing a user existence](#Testing_a_user_existence)
    - [Updating a user password](#Updating_a_user_password)
    - [Deleting a domain](#Deleting_a_user)
    - [Retrieving the user list](#Retrieving_the_user_list)
@@ -320,6 +321,21 @@ Response codes:
 
 Note: if the user exists already, its password will be updated.
 
+###Testing a user existence
+
+```
+curl -XHEAD http://ip:port/users/usernameToBeUsed
+```
+
+Resource name "usernameToBeUsed" represents a valid user,
+hence it should match the criteria at [User Repositories documentation](/server/config-users.html) 
+
+Response codes:
+
+ - 200: The user exists
+ - 400: The user name is invalid
+ - 404: The user does not exist
+
 ### Updating a user password
 
 Same than Create, but a user need to exist.


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 10/11: JAMES-3044 Upgrade JSOUP to 1.13.1

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 45f0ce455c1929eb704d5cce9877141aad90766b
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Mar 6 11:22:28 2020 +0700

    JAMES-3044 Upgrade JSOUP to 1.13.1
    
     - Many performance enhancement are shipped in 1.13.1
    
    ```
    memory optimizations, reducing the retained size of a Document by ~ 39%,
    and allocations by ~ 9%
    ```
    
    Ships:
    
      * Bugfix: in CharacterReader when parsing an input stream, could throw
        a Mark Invalid exception if the reader was marked, a bufferUp
        occurred, and then the reader was rewound.
        <https://github.com/jhy/jsoup/issues/1324>
    
    Which we noticed in production, even after a 1.12.2 upgrade.
    
    Other bug fixes and enhancements are documented.
    
    CF https://github.com/jhy/jsoup/blob/master/CHANGES
---
 pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pom.xml b/pom.xml
index 7bee095..f83608b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2543,7 +2543,7 @@
             <dependency>
                 <groupId>org.jsoup</groupId>
                 <artifactId>jsoup</artifactId>
-                <version>1.12.2</version>
+                <version>1.13.1</version>
             </dependency>
             <dependency>
                 <groupId>org.mockito</groupId>


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 01/11: JAMES-3104 Eventually publish blob store metrics

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit aa978061feea5c777d96b2293bc2e5b54399ef10
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Sat Mar 7 04:45:15 2020 +0100

    JAMES-3104 Eventually publish blob store metrics
    
    This enhancement improve build stability
---
 server/blob/blob-api/pom.xml                       |  5 +++
 .../blob/api/MetricableBlobStoreContract.java      | 38 ++++++++++++++--------
 server/blob/blob-cassandra/pom.xml                 |  5 +++
 server/blob/blob-memory/pom.xml                    |  5 +++
 server/blob/blob-objectstorage/pom.xml             |  5 +++
 5 files changed, 44 insertions(+), 14 deletions(-)

diff --git a/server/blob/blob-api/pom.xml b/server/blob/blob-api/pom.xml
index cbe5c0e..8c9182b 100644
--- a/server/blob/blob-api/pom.xml
+++ b/server/blob/blob-api/pom.xml
@@ -70,6 +70,11 @@
             <artifactId>commons-lang3</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.mockito</groupId>
             <artifactId>mockito-core</artifactId>
             <scope>test</scope>
diff --git a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/MetricableBlobStoreContract.java b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/MetricableBlobStoreContract.java
index c11b117..ca346db 100644
--- a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/MetricableBlobStoreContract.java
+++ b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/MetricableBlobStoreContract.java
@@ -27,11 +27,14 @@ import static org.apache.james.blob.api.MetricableBlobStore.READ_TIMER_NAME;
 import static org.apache.james.blob.api.MetricableBlobStore.SAVE_BYTES_TIMER_NAME;
 import static org.apache.james.blob.api.MetricableBlobStore.SAVE_INPUT_STREAM_TIMER_NAME;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
 
 import java.io.ByteArrayInputStream;
 import java.nio.charset.StandardCharsets;
 
 import org.apache.james.metrics.tests.RecordingMetricFactory;
+import org.awaitility.Duration;
+import org.junit.jupiter.api.RepeatedTest;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.BeforeEachCallback;
 import org.junit.jupiter.api.extension.ExtensionContext;
@@ -67,8 +70,9 @@ public interface MetricableBlobStoreContract extends BlobStoreContract {
         Mono.from(store.save(store.getDefaultBucketName(), BYTES_CONTENT, LOW_COST)).block();
         Mono.from(store.save(store.getDefaultBucketName(), BYTES_CONTENT, LOW_COST)).block();
 
-        assertThat(metricsTestExtension.getMetricFactory().executionTimesFor(SAVE_BYTES_TIMER_NAME))
-            .hasSize(2);
+        await().atMost(Duration.FIVE_SECONDS)
+            .untilAsserted(() ->  assertThat(metricsTestExtension.getMetricFactory().executionTimesFor(SAVE_BYTES_TIMER_NAME))
+                .hasSize(2));
     }
 
     @Test
@@ -78,8 +82,9 @@ public interface MetricableBlobStoreContract extends BlobStoreContract {
         Mono.from(store.save(store.getDefaultBucketName(), STRING_CONTENT, LOW_COST)).block();
         Mono.from(store.save(store.getDefaultBucketName(), STRING_CONTENT, LOW_COST)).block();
 
-        assertThat(metricsTestExtension.getMetricFactory().executionTimesFor(SAVE_BYTES_TIMER_NAME))
-            .hasSize(2);
+        await().atMost(Duration.FIVE_SECONDS)
+            .untilAsserted(() ->  assertThat(metricsTestExtension.getMetricFactory().executionTimesFor(SAVE_BYTES_TIMER_NAME))
+                .hasSize(2));
     }
 
     @Test
@@ -89,8 +94,9 @@ public interface MetricableBlobStoreContract extends BlobStoreContract {
         Mono.from(store.save(store.getDefaultBucketName(), new ByteArrayInputStream(BYTES_CONTENT), LOW_COST)).block();
         Mono.from(store.save(store.getDefaultBucketName(), new ByteArrayInputStream(BYTES_CONTENT), LOW_COST)).block();
 
-        assertThat(metricsTestExtension.getMetricFactory().executionTimesFor(SAVE_INPUT_STREAM_TIMER_NAME))
-            .hasSize(2);
+        await().atMost(Duration.FIVE_SECONDS)
+            .untilAsserted(() ->  assertThat(metricsTestExtension.getMetricFactory().executionTimesFor(SAVE_INPUT_STREAM_TIMER_NAME))
+                .hasSize(2));
     }
 
     @Test
@@ -101,8 +107,9 @@ public interface MetricableBlobStoreContract extends BlobStoreContract {
         Mono.from(store.readBytes(store.getDefaultBucketName(), blobId)).block();
         Mono.from(store.readBytes(store.getDefaultBucketName(), blobId)).block();
 
-        assertThat(metricsTestExtension.getMetricFactory().executionTimesFor(READ_BYTES_TIMER_NAME))
-            .hasSize(2);
+        await().atMost(Duration.FIVE_SECONDS)
+            .untilAsserted(() ->  assertThat(metricsTestExtension.getMetricFactory().executionTimesFor(READ_BYTES_TIMER_NAME))
+                .hasSize(2));
     }
 
     @Test
@@ -113,8 +120,9 @@ public interface MetricableBlobStoreContract extends BlobStoreContract {
         store.read(store.getDefaultBucketName(), blobId);
         store.read(store.getDefaultBucketName(), blobId);
 
-        assertThat(metricsTestExtension.getMetricFactory().executionTimesFor(READ_TIMER_NAME))
-            .hasSize(2);
+        await().atMost(Duration.FIVE_SECONDS)
+            .untilAsserted(() ->  assertThat(metricsTestExtension.getMetricFactory().executionTimesFor(READ_TIMER_NAME))
+                .hasSize(2));
     }
 
     @Test
@@ -127,8 +135,9 @@ public interface MetricableBlobStoreContract extends BlobStoreContract {
 
         Mono.from(store.deleteBucket(bucketName)).block();
 
-        assertThat(metricsTestExtension.getMetricFactory().executionTimesFor(DELETE_BUCKET_TIMER_NAME))
-            .hasSize(1);
+        await().atMost(Duration.FIVE_SECONDS)
+            .untilAsserted(() ->  assertThat(metricsTestExtension.getMetricFactory().executionTimesFor(DELETE_BUCKET_TIMER_NAME))
+                .hasSize(1));
     }
 
     @Test
@@ -141,7 +150,8 @@ public interface MetricableBlobStoreContract extends BlobStoreContract {
         Mono.from(store.delete(BucketName.DEFAULT, blobId1)).block();
         Mono.from(store.delete(BucketName.DEFAULT, blobId2)).block();
 
-        assertThat(metricsTestExtension.getMetricFactory().executionTimesFor(DELETE_TIMER_NAME))
-            .hasSize(2);
+        await().atMost(Duration.FIVE_SECONDS)
+            .untilAsserted(() ->  assertThat(metricsTestExtension.getMetricFactory().executionTimesFor(DELETE_TIMER_NAME))
+                .hasSize(2));
     }
 }
\ No newline at end of file
diff --git a/server/blob/blob-cassandra/pom.xml b/server/blob/blob-cassandra/pom.xml
index dcaf57c..8894ca6 100644
--- a/server/blob/blob-cassandra/pom.xml
+++ b/server/blob/blob-cassandra/pom.xml
@@ -72,6 +72,11 @@
             <artifactId>guava</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.testcontainers</groupId>
             <artifactId>testcontainers</artifactId>
             <scope>test</scope>
diff --git a/server/blob/blob-memory/pom.xml b/server/blob/blob-memory/pom.xml
index 6a6888c..76f67e3 100644
--- a/server/blob/blob-memory/pom.xml
+++ b/server/blob/blob-memory/pom.xml
@@ -66,6 +66,11 @@
             <artifactId>reactor-core</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.mockito</groupId>
             <artifactId>mockito-core</artifactId>
             <scope>test</scope>
diff --git a/server/blob/blob-objectstorage/pom.xml b/server/blob/blob-objectstorage/pom.xml
index 9f66fae..287b193 100644
--- a/server/blob/blob-objectstorage/pom.xml
+++ b/server/blob/blob-objectstorage/pom.xml
@@ -112,6 +112,11 @@
             <version>${jclouds.version}</version>
         </dependency>
         <dependency>
+            <groupId>org.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.testcontainers</groupId>
             <artifactId>testcontainers</artifactId>
             <scope>test</scope>


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 09/11: JAMES-3103 Add a test showing message are dispatched to registered keys qfter outage

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit da860b57d58451b7098a9a23d9e4fda69061e426
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Mar 6 21:34:44 2020 +0700

    JAMES-3103 Add a test showing message are dispatched to registered keys qfter outage
---
 .../james/mailbox/events/RabbitMQEventBusTest.java   | 20 ++++++++++++++++++++
 1 file changed, 20 insertions(+)

diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
index f47a63b..0425320 100644
--- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
+++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
@@ -76,6 +76,8 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 import org.mockito.stubbing.Answer;
 
+import com.google.common.collect.ImmutableSet;
+
 import reactor.core.publisher.Mono;
 import reactor.rabbitmq.BindingSpecification;
 import reactor.rabbitmq.ExchangeSpecification;
@@ -353,6 +355,24 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
                     rabbitMQEventBusWithNetWorkIssue.dispatch(EVENT, NO_KEYS).block();
                     assertThatListenerReceiveOneEvent(listener);
                 }
+
+                @Test
+                void dispatchShouldWorkAfterNetworkIssuesForOldRegistrationAndKey() {
+                    rabbitMQEventBusWithNetWorkIssue.start();
+                    MailboxListener listener = newListener();
+                    rabbitMQEventBusWithNetWorkIssue.register(listener, KEY_1);
+
+                    rabbitMQNetWorkIssueExtension.getRabbitMQ().pause();
+
+                    assertThatThrownBy(() -> rabbitMQEventBusWithNetWorkIssue.dispatch(EVENT, NO_KEYS).block())
+                        .isInstanceOf(IllegalStateException.class)
+                        .hasMessageContaining("Retries exhausted");
+
+                    rabbitMQNetWorkIssueExtension.getRabbitMQ().unpause();
+
+                    rabbitMQEventBusWithNetWorkIssue.dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
+                    assertThatListenerReceiveOneEvent(listener);
+                }
             }
 
             @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 04/11: JAMES-3082 create test to demonstrate that event bus messages are persisted on rabbitMQ

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 257e917892433442a64ce094388041010e93a091
Author: Rémi KOWALSKI <rk...@linagora.com>
AuthorDate: Tue Mar 3 17:09:29 2020 +0100

    JAMES-3082 create test to demonstrate that event bus messages are persisted on rabbitMQ
---
 .../mailbox/events/ErrorHandlingContract.java      | 57 ++++++++++++++++++----
 .../james/mailbox/events/EventBusTestFixture.java  |  9 +++-
 .../events/delivery/InVmEventDeliveryTest.java     |  6 +++
 .../mailbox/events/KeyRegistrationHandler.java     | 28 ++++++++---
 .../james/mailbox/events/RabbitMQEventBus.java     | 22 +++++++++
 .../james/mailbox/events/RabbitMQEventBusTest.java | 24 +++++++++
 .../RabbitMQEventDeadLettersIntegrationTest.java   |  4 +-
 7 files changed, 131 insertions(+), 19 deletions(-)

diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java
index 219f5e4..35aaea2 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java
@@ -26,6 +26,7 @@ import static org.apache.james.mailbox.events.EventBusTestFixture.GROUP_A;
 import static org.apache.james.mailbox.events.EventBusTestFixture.KEY_1;
 import static org.apache.james.mailbox.events.EventBusTestFixture.NO_KEYS;
 import static org.apache.james.mailbox.events.EventBusTestFixture.WAIT_CONDITION;
+import static org.apache.james.mailbox.events.EventBusTestFixture.WAIT_CONDITION_LONG;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.spy;
@@ -107,17 +108,22 @@ interface ErrorHandlingContract extends EventBusContract {
     @Test
     default void listenerShouldReceiveWhenFailsEqualsMaxRetries() {
         EventCollector eventCollector = eventCollector();
-
+        //do throw  RetryBackoffConfiguration.DEFAULT.DEFAULT_MAX_RETRIES
         doThrow(new RuntimeException())
             .doThrow(new RuntimeException())
             .doThrow(new RuntimeException())
+            .doThrow(new RuntimeException())
+            .doThrow(new RuntimeException())
+            .doThrow(new RuntimeException())
+            .doThrow(new RuntimeException())
+            .doThrow(new RuntimeException())
             .doCallRealMethod()
             .when(eventCollector).event(EVENT);
 
         eventBus().register(eventCollector, GROUP_A);
         eventBus().dispatch(EVENT, NO_KEYS).block();
 
-        WAIT_CONDITION
+        WAIT_CONDITION_LONG
             .untilAsserted(() -> assertThat(eventCollector.getEvents()).hasSize(1));
     }
 
@@ -125,10 +131,16 @@ interface ErrorHandlingContract extends EventBusContract {
     default void listenerShouldNotReceiveWhenFailsGreaterThanMaxRetries() throws Exception {
         EventCollector eventCollector = eventCollector();
 
+        //do throw  RetryBackoffConfiguration.DEFAULT.DEFAULT_MAX_RETRIES + 1 times
         doThrow(new RuntimeException())
             .doThrow(new RuntimeException())
             .doThrow(new RuntimeException())
             .doThrow(new RuntimeException())
+            .doThrow(new RuntimeException())
+            .doThrow(new RuntimeException())
+            .doThrow(new RuntimeException())
+            .doThrow(new RuntimeException())
+            .doThrow(new RuntimeException())
             .doCallRealMethod()
             .when(eventCollector).event(EVENT);
 
@@ -147,9 +159,10 @@ interface ErrorHandlingContract extends EventBusContract {
         eventBus().register(throwingListener, GROUP_A);
         eventBus().dispatch(EVENT, NO_KEYS).block();
 
-        TimeUnit.SECONDS.sleep(5);
+        TimeUnit.MINUTES.sleep(1);
         int numberOfCallsAfterExceedMaxRetries = throwingListener.timeElapsed.size();
-        TimeUnit.SECONDS.sleep(5);
+        TimeUnit.MINUTES.sleep(1);
+
 
         assertThat(throwingListener.timeElapsed.size())
             .isEqualTo(numberOfCallsAfterExceedMaxRetries);
@@ -162,10 +175,10 @@ interface ErrorHandlingContract extends EventBusContract {
         eventBus().register(throwingListener, GROUP_A);
         eventBus().dispatch(EVENT, NO_KEYS).block();
 
-        TimeUnit.SECONDS.sleep(5);
+        TimeUnit.MINUTES.sleep(1);
         SoftAssertions.assertSoftly(softly -> {
             List<Instant> timeElapsed = throwingListener.timeElapsed;
-            softly.assertThat(timeElapsed).hasSize(4);
+            softly.assertThat(timeElapsed).hasSize(RetryBackoffConfiguration.DEFAULT.DEFAULT_MAX_RETRIES + 1);
 
             long minFirstDelayAfter = 100; // first backOff
             long minSecondDelayAfter = 50; // 50 * jitter factor (50 * 0.5)
@@ -207,10 +220,16 @@ interface ErrorHandlingContract extends EventBusContract {
     default void deadLettersIsNotAppliedForKeyRegistrations() throws Exception {
         EventCollector eventCollector = eventCollector();
 
+        //do throw  RetryBackoffConfiguration.DEFAULT.DEFAULT_MAX_RETRIES + 1 times
         doThrow(new RuntimeException())
             .doThrow(new RuntimeException())
             .doThrow(new RuntimeException())
             .doThrow(new RuntimeException())
+            .doThrow(new RuntimeException())
+            .doThrow(new RuntimeException())
+            .doThrow(new RuntimeException())
+            .doThrow(new RuntimeException())
+            .doThrow(new RuntimeException())
             .doCallRealMethod()
             .when(eventCollector).event(EVENT);
 
@@ -247,18 +266,23 @@ interface ErrorHandlingContract extends EventBusContract {
     @Test
     default void deadLetterShouldStoreWhenDispatchFailsGreaterThanMaxRetries() {
         EventCollector eventCollector = eventCollector();
-
+        //do throw  RetryBackoffConfiguration.DEFAULT.DEFAULT_MAX_RETRIES + 1 times
         doThrow(new RuntimeException())
             .doThrow(new RuntimeException())
             .doThrow(new RuntimeException())
             .doThrow(new RuntimeException())
+            .doThrow(new RuntimeException())
+            .doThrow(new RuntimeException())
+            .doThrow(new RuntimeException())
+            .doThrow(new RuntimeException())
+            .doThrow(new RuntimeException())
             .doCallRealMethod()
             .when(eventCollector).event(EVENT);
 
         eventBus().register(eventCollector, GROUP_A);
         eventBus().dispatch(EVENT, NO_KEYS).block();
 
-        WAIT_CONDITION.untilAsserted(() -> assertThat(deadLetter().failedIds(GROUP_A)
+        WAIT_CONDITION_LONG.untilAsserted(() -> assertThat(deadLetter().failedIds(GROUP_A)
                 .flatMap(insertionId -> deadLetter().failedEvent(GROUP_A, insertionId))
                 .toIterable())
             .containsOnly(EVENT));
@@ -270,17 +294,23 @@ interface ErrorHandlingContract extends EventBusContract {
     default void deadLetterShouldStoreWhenRedeliverFailsGreaterThanMaxRetries() {
         EventCollector eventCollector = eventCollector();
 
+        //do throw  RetryBackoffConfiguration.DEFAULT.DEFAULT_MAX_RETRIES + 1 times
         doThrow(new RuntimeException())
             .doThrow(new RuntimeException())
             .doThrow(new RuntimeException())
             .doThrow(new RuntimeException())
+            .doThrow(new RuntimeException())
+            .doThrow(new RuntimeException())
+            .doThrow(new RuntimeException())
+            .doThrow(new RuntimeException())
+            .doThrow(new RuntimeException())
             .doCallRealMethod()
             .when(eventCollector).event(EVENT);
 
         eventBus().register(eventCollector, GROUP_A);
         eventBus().reDeliver(GROUP_A, EVENT).block();
 
-        WAIT_CONDITION.untilAsserted(() ->
+        WAIT_CONDITION_LONG.untilAsserted(() ->
                 assertThat(
                         deadLetter()
                             .failedIds(GROUP_A)
@@ -295,16 +325,23 @@ interface ErrorHandlingContract extends EventBusContract {
     default void retryShouldDeliverAsManyTimesAsInitialDeliveryAttempt() {
         EventCollector eventCollector = eventCollector();
 
+        //do throw  RetryBackoffConfiguration.DEFAULT.DEFAULT_MAX_RETRIES + 1 times
         doThrow(new RuntimeException())
             .doThrow(new RuntimeException())
             .doThrow(new RuntimeException())
+            .doThrow(new RuntimeException())
+            .doThrow(new RuntimeException())
+            .doThrow(new RuntimeException())
+            .doThrow(new RuntimeException())
+            .doThrow(new RuntimeException())
+            .doThrow(new RuntimeException())
             .doCallRealMethod()
             .when(eventCollector).event(EVENT);
 
         eventBus().register(eventCollector, GROUP_A);
         eventBus().reDeliver(GROUP_A, EVENT).block();
 
-        WAIT_CONDITION.untilAsserted(() -> assertThat(eventCollector.getEvents()).isNotEmpty());
+        WAIT_CONDITION_LONG.untilAsserted(() -> assertThat(eventCollector.getEvents()).isNotEmpty());
     }
 
     @Test
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java
index a071583..06515e0 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java
@@ -103,7 +103,6 @@ public interface EventBusTestFixture {
     MailboxListener.MailboxRenamed EVENT_UNSUPPORTED_BY_LISTENER = new MailboxListener.MailboxRenamed(SESSION_ID, USERNAME, MAILBOX_PATH, TEST_ID, MAILBOX_PATH, EVENT_ID_2);
 
     java.time.Duration ONE_SECOND = java.time.Duration.ofSeconds(1);
-    java.time.Duration THIRTY_SECONDS = java.time.Duration.ofSeconds(30);
     java.time.Duration FIVE_HUNDRED_MS = java.time.Duration.ofMillis(500);
     MailboxId ID_1 = TEST_ID;
     MailboxId ID_2 = TestId.of(24);
@@ -118,6 +117,7 @@ public interface EventBusTestFixture {
     List<Group> ALL_GROUPS = ImmutableList.of(GROUP_A, GROUP_B, GROUP_C);
 
     ConditionFactory WAIT_CONDITION = await().timeout(Duration.FIVE_SECONDS);
+    ConditionFactory WAIT_CONDITION_LONG = await().timeout(Duration.ONE_MINUTE);
 
     static MailboxListener newListener() {
         MailboxListener listener = mock(MailboxListener.class);
@@ -125,4 +125,11 @@ public interface EventBusTestFixture {
         when(listener.isHandling(any(MailboxListener.MailboxAdded.class))).thenReturn(true);
         return listener;
     }
+
+    static MailboxListener newAsyncListener() {
+        MailboxListener listener = mock(MailboxListener.class);
+        when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
+        when(listener.isHandling(any(MailboxListener.MailboxAdded.class))).thenReturn(true);
+        return listener;
+    }
 }
diff --git a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java
index 6212178..45335e9 100644
--- a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java
+++ b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java
@@ -215,10 +215,16 @@ class InVmEventDeliveryTest {
         @Test
         void failureHandlerShouldWorkWhenRetryFails() {
             MailboxListenerCountingSuccessfulExecution listener = newListener();
+            //do throw  RetryBackoffConfiguration.DEFAULT.DEFAULT_MAX_RETRIES + 1 times
             doThrow(new RuntimeException())
                 .doThrow(new RuntimeException())
                 .doThrow(new RuntimeException())
                 .doThrow(new RuntimeException())
+                .doThrow(new RuntimeException())
+                .doThrow(new RuntimeException())
+                .doThrow(new RuntimeException())
+                .doThrow(new RuntimeException())
+                .doThrow(new RuntimeException())
                 .doCallRealMethod()
                 .when(listener).event(EVENT);
 
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
index b8789ca..49ea9b9 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
@@ -28,6 +28,7 @@ import static org.apache.james.mailbox.events.RetryBackoffConfiguration.FOREVER;
 
 import java.nio.charset.StandardCharsets;
 import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.james.backends.rabbitmq.ReceiverProvider;
 import org.apache.james.event.json.EventSerializer;
@@ -38,6 +39,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.github.fge.lambdas.Throwing;
+import com.google.common.annotations.VisibleForTesting;
 import com.rabbitmq.client.AMQP;
 import com.rabbitmq.client.Delivery;
 
@@ -63,6 +65,7 @@ class KeyRegistrationHandler {
     private final MailboxListenerExecutor mailboxListenerExecutor;
     private final RetryBackoffConfiguration retryBackoff;
     private Optional<Disposable> receiverSubscriber;
+    private AtomicBoolean registrationQueueInitialized = new AtomicBoolean(false);
 
     KeyRegistrationHandler(EventBusId eventBusId, EventSerializer eventSerializer,
                            Sender sender, ReceiverProvider receiverProvider,
@@ -78,22 +81,35 @@ class KeyRegistrationHandler {
         this.retryBackoff = retryBackoff;
         this.registrationQueue = new RegistrationQueueName();
         this.registrationBinder = new RegistrationBinder(sender, registrationQueue);
+        this.receiverSubscriber = Optional.empty();
+
     }
 
     void start() {
+        declareQueue();
+
+        receiverSubscriber = Optional.of(receiver.consumeAutoAck(registrationQueue.asString(), new ConsumeOptions().qos(EventBus.EXECUTION_RATE))
+            .subscribeOn(Schedulers.parallel())
+            .flatMap(this::handleDelivery)
+            .subscribe());
+    }
+
+    @VisibleForTesting
+    void declareQueue() {
         sender.declareQueue(QueueSpecification.queue(eventBusId.asString())
             .durable(DURABLE)
             .exclusive(!EXCLUSIVE)
             .autoDelete(!AUTO_DELETE)
             .arguments(NO_ARGUMENTS))
             .map(AMQP.Queue.DeclareOk::getQueue)
-            .doOnSuccess(registrationQueue::initialize)
+            .retryBackoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff(), FOREVER, retryBackoff.getJitterFactor())
+            .doOnSuccess(queueName -> {
+                if (!registrationQueueInitialized.get()) {
+                    registrationQueue.initialize(queueName);
+                    registrationQueueInitialized.set(true);
+                }
+            })
             .block();
-
-        receiverSubscriber = Optional.of(receiver.consumeAutoAck(registrationQueue.asString(), new ConsumeOptions().qos(EventBus.EXECUTION_RATE))
-            .subscribeOn(Schedulers.parallel())
-            .flatMap(this::handleDelivery)
-            .subscribe());
     }
 
     void stop() {
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
index 01c2b23..7e95dbf 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
@@ -29,6 +29,7 @@ import org.apache.james.event.json.EventSerializer;
 import org.apache.james.lifecycle.api.Startable;
 import org.apache.james.metrics.api.MetricFactory;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
 import reactor.core.publisher.Mono;
@@ -86,6 +87,27 @@ public class RabbitMQEventBus implements EventBus, Startable {
         }
     }
 
+    @VisibleForTesting
+    void startWithoutStartingKeyRegistrationHandler() {
+        if (!isRunning && !isStopping) {
+
+            LocalListenerRegistry localListenerRegistry = new LocalListenerRegistry();
+            keyRegistrationHandler = new KeyRegistrationHandler(eventBusId, eventSerializer, channelPool, routingKeyConverter, localListenerRegistry, mailboxListenerExecutor, retryBackoff);
+            groupRegistrationHandler = new GroupRegistrationHandler(eventSerializer, channelPool, retryBackoff, eventDeadLetters, mailboxListenerExecutor);
+            eventDispatcher = new EventDispatcher(eventBusId, eventSerializer, channelPool.getSender(), localListenerRegistry, mailboxListenerExecutor);
+
+            keyRegistrationHandler.declareQueue();
+
+            eventDispatcher.start();
+            isRunning = true;
+        }
+    }
+
+    @VisibleForTesting
+    void startKeyRegistrationHandler() {
+        keyRegistrationHandler.start();
+    }
+
     @PreDestroy
     public void stop() {
         if (isRunning && !isStopping) {
diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
index a5a7d48..f47a63b 100644
--- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
+++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
@@ -32,6 +32,7 @@ import static org.apache.james.mailbox.events.EventBusTestFixture.GROUP_A;
 import static org.apache.james.mailbox.events.EventBusTestFixture.KEY_1;
 import static org.apache.james.mailbox.events.EventBusTestFixture.NO_KEYS;
 import static org.apache.james.mailbox.events.EventBusTestFixture.WAIT_CONDITION;
+import static org.apache.james.mailbox.events.EventBusTestFixture.newAsyncListener;
 import static org.apache.james.mailbox.events.EventBusTestFixture.newListener;
 import static org.apache.james.mailbox.events.GroupRegistration.WorkQueueName.MAILBOX_EVENT_WORK_QUEUE_PREFIX;
 import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT;
@@ -75,6 +76,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 import org.mockito.stubbing.Answer;
 
+import reactor.core.publisher.Mono;
 import reactor.rabbitmq.BindingSpecification;
 import reactor.rabbitmq.ExchangeSpecification;
 import reactor.rabbitmq.QueueSpecification;
@@ -91,6 +93,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
     private RabbitMQEventBus eventBus;
     private RabbitMQEventBus eventBus2;
     private RabbitMQEventBus eventBus3;
+    private RabbitMQEventBus eventBusWithKeyHandlerNotStarted;
     private EventSerializer eventSerializer;
     private RoutingKeyConverter routingKeyConverter;
     private MemoryEventDeadLetters memoryEventDeadLetters;
@@ -106,10 +109,12 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
         eventBus = newEventBus();
         eventBus2 = newEventBus();
         eventBus3 = newEventBus();
+        eventBusWithKeyHandlerNotStarted = newEventBus();
 
         eventBus.start();
         eventBus2.start();
         eventBus3.start();
+        eventBusWithKeyHandlerNotStarted.startWithoutStartingKeyRegistrationHandler();
     }
 
     @AfterEach
@@ -117,6 +122,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
         eventBus.stop();
         eventBus2.stop();
         eventBus3.stop();
+        eventBusWithKeyHandlerNotStarted.stop();
         ALL_GROUPS.stream()
             .map(GroupRegistration.WorkQueueName::of)
             .forEach(queueName -> rabbitMQExtension.getSender().delete(QueueSpecification.queue(queueName.asString())).block());
@@ -425,6 +431,21 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
             }
 
             @Test
+            void dispatchedMessagesShouldSurviveARabbitMQRestart() throws Exception {
+                eventBusWithKeyHandlerNotStarted.startWithoutStartingKeyRegistrationHandler();
+                MailboxListener listener = newAsyncListener();
+                eventBusWithKeyHandlerNotStarted.register(listener, KEY_1);
+                Mono<Void> dispatch = eventBusWithKeyHandlerNotStarted.dispatch(EVENT, KEY_1);
+                dispatch.block();
+
+                rabbitMQExtension.getRabbitMQ().restart();
+
+                eventBusWithKeyHandlerNotStarted.startKeyRegistrationHandler();
+
+                assertThatListenerReceiveOneEvent(listener);
+            }
+
+            @Test
             void dispatchShouldWorkAfterRestartForNewKeyRegistration() throws Exception {
                 eventBus.start();
                 MailboxListener listener = newListener();
@@ -606,6 +627,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
                 eventBus.stop();
                 eventBus2.stop();
                 eventBus3.stop();
+                eventBusWithKeyHandlerNotStarted.stop();
 
                 assertThat(rabbitManagementAPI.listExchanges())
                     .anySatisfy(exchange -> assertThat(exchange.getName()).isEqualTo(MAILBOX_EVENT_EXCHANGE_NAME));
@@ -618,6 +640,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
                 eventBus.stop();
                 eventBus2.stop();
                 eventBus3.stop();
+                eventBusWithKeyHandlerNotStarted.stop();
 
                 assertThat(rabbitManagementAPI.listQueues())
                     .anySatisfy(queue -> assertThat(queue.getName()).contains(GroupA.class.getName()));
@@ -628,6 +651,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
                 eventBus.stop();
                 eventBus2.stop();
                 eventBus3.stop();
+                eventBusWithKeyHandlerNotStarted.stop();
 
                 assertThat(rabbitManagementAPI.listQueues())
                     .filteredOn(queue -> !queue.getName().startsWith(MAILBOX_EVENT_WORK_QUEUE_PREFIX))
diff --git a/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQEventDeadLettersIntegrationTest.java b/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQEventDeadLettersIntegrationTest.java
index 18060c8..36dd401 100644
--- a/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQEventDeadLettersIntegrationTest.java
+++ b/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQEventDeadLettersIntegrationTest.java
@@ -179,7 +179,7 @@ class RabbitMQEventDeadLettersIntegrationTest {
 
     //This value is duplicated from default configuration to ensure we keep the same behavior over time
     //unless we really want to change that default value
-    private static final int MAX_RETRIES = 3;
+    private static final int MAX_RETRIES = 8;
 
     private static final String DOMAIN = "domain.tld";
     private static final String BOB = "bob@" + DOMAIN;
@@ -220,7 +220,7 @@ class RabbitMQEventDeadLettersIntegrationTest {
     }
 
     private String retrieveFirstFailedInsertionId() {
-        calmlyAwait.atMost(TEN_SECONDS)
+        calmlyAwait.atMost(ONE_MINUTE)
             .untilAsserted(() ->
                 when()
                     .get(EventDeadLettersRoutes.BASE_PATH + "/groups/" + GROUP_ID)


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 02/11: JAMES-3082 add retry to make event bus test when rabbitmq restart pass

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 1ebd654240b4bcc2be1437e19e7f9aab7ed3ccc1
Author: Rémi KOWALSKI <rk...@linagora.com>
AuthorDate: Mon Mar 2 17:41:30 2020 +0100

    JAMES-3082 add retry to make event bus test when rabbitmq restart pass
---
 .../mailbox/events/RetryBackoffConfiguration.java    |  3 ++-
 .../james/mailbox/events/GroupRegistration.java      |  4 ++++
 .../james/mailbox/events/KeyRegistrationHandler.java | 20 ++++++++++++++++----
 .../james/mailbox/events/RabbitMQEventBus.java       |  2 +-
 4 files changed, 23 insertions(+), 6 deletions(-)

diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/events/RetryBackoffConfiguration.java b/mailbox/api/src/main/java/org/apache/james/mailbox/events/RetryBackoffConfiguration.java
index f802d5e..a674a28 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/events/RetryBackoffConfiguration.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/events/RetryBackoffConfiguration.java
@@ -63,8 +63,9 @@ public class RetryBackoffConfiguration {
     }
 
     static final double DEFAULT_JITTER_FACTOR = 0.5;
-    static final int DEFAULT_MAX_RETRIES = 3;
+    static final int DEFAULT_MAX_RETRIES = 8;
     static final Duration DEFAULT_FIRST_BACKOFF = Duration.ofMillis(100);
+    static final Duration FOREVER = Duration.ofMillis(Long.MAX_VALUE);
     public static final RetryBackoffConfiguration DEFAULT = new RetryBackoffConfiguration(
         DEFAULT_MAX_RETRIES,
         DEFAULT_FIRST_BACKOFF,
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
index da0fe77..8d1b7aa 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
@@ -26,6 +26,7 @@ import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE;
 import static org.apache.james.backends.rabbitmq.Constants.NO_ARGUMENTS;
 import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT;
 import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT_EXCHANGE_NAME;
+import static org.apache.james.mailbox.events.RetryBackoffConfiguration.FOREVER;
 
 import java.nio.charset.StandardCharsets;
 import java.util.Objects;
@@ -81,6 +82,7 @@ class GroupRegistration implements Registration {
     private final GroupConsumerRetry retryHandler;
     private final WaitDelayGenerator delayGenerator;
     private final Group group;
+    private final RetryBackoffConfiguration retryBackoff;
     private final MailboxListenerExecutor mailboxListenerExecutor;
     private Optional<Disposable> receiverSubscriber;
 
@@ -93,6 +95,7 @@ class GroupRegistration implements Registration {
         this.queueName = WorkQueueName.of(group);
         this.sender = sender;
         this.receiver = receiverProvider.createReceiver();
+        this.retryBackoff = retryBackoff;
         this.mailboxListenerExecutor = mailboxListenerExecutor;
         this.receiverSubscriber = Optional.empty();
         this.unregisterGroup = unregisterGroup;
@@ -106,6 +109,7 @@ class GroupRegistration implements Registration {
             .of(createGroupWorkQueue()
                 .then(retryHandler.createRetryExchange(queueName))
                 .then(Mono.fromCallable(() -> this.consumeWorkQueue()))
+                .retryBackoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff(), FOREVER, retryBackoff.getJitterFactor(), Schedulers.elastic())
                 .block());
         return this;
     }
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
index ae49471..b8789ca 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
@@ -24,6 +24,7 @@ import static org.apache.james.backends.rabbitmq.Constants.DURABLE;
 import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE;
 import static org.apache.james.backends.rabbitmq.Constants.NO_ARGUMENTS;
 import static org.apache.james.mailbox.events.RabbitMQEventBus.EVENT_BUS_ID;
+import static org.apache.james.mailbox.events.RetryBackoffConfiguration.FOREVER;
 
 import java.nio.charset.StandardCharsets;
 import java.util.Optional;
@@ -60,9 +61,13 @@ class KeyRegistrationHandler {
     private final RegistrationQueueName registrationQueue;
     private final RegistrationBinder registrationBinder;
     private final MailboxListenerExecutor mailboxListenerExecutor;
+    private final RetryBackoffConfiguration retryBackoff;
     private Optional<Disposable> receiverSubscriber;
 
-    KeyRegistrationHandler(EventBusId eventBusId, EventSerializer eventSerializer, Sender sender, ReceiverProvider receiverProvider, RoutingKeyConverter routingKeyConverter, LocalListenerRegistry localListenerRegistry, MailboxListenerExecutor mailboxListenerExecutor) {
+    KeyRegistrationHandler(EventBusId eventBusId, EventSerializer eventSerializer,
+                           Sender sender, ReceiverProvider receiverProvider,
+                           RoutingKeyConverter routingKeyConverter, LocalListenerRegistry localListenerRegistry,
+                           MailboxListenerExecutor mailboxListenerExecutor, RetryBackoffConfiguration retryBackoff) {
         this.eventBusId = eventBusId;
         this.eventSerializer = eventSerializer;
         this.sender = sender;
@@ -70,6 +75,7 @@ class KeyRegistrationHandler {
         this.localListenerRegistry = localListenerRegistry;
         this.receiver = receiverProvider.createReceiver();
         this.mailboxListenerExecutor = mailboxListenerExecutor;
+        this.retryBackoff = retryBackoff;
         this.registrationQueue = new RegistrationQueueName();
         this.registrationBinder = new RegistrationBinder(sender, registrationQueue);
     }
@@ -94,17 +100,23 @@ class KeyRegistrationHandler {
         receiverSubscriber.filter(subscriber -> !subscriber.isDisposed())
             .ifPresent(Disposable::dispose);
         receiver.close();
-        sender.delete(QueueSpecification.queue(registrationQueue.asString())).block();
+        sender.delete(QueueSpecification.queue(registrationQueue.asString()))
+            .retryBackoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff(), FOREVER, retryBackoff.getJitterFactor(), Schedulers.elastic())
+            .block();
     }
 
     Registration register(MailboxListener listener, RegistrationKey key) {
         LocalListenerRegistry.LocalRegistration registration = localListenerRegistry.addListener(key, listener);
         if (registration.isFirstListener()) {
-            registrationBinder.bind(key).block();
+            registrationBinder.bind(key)
+                .retryBackoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff(), FOREVER, retryBackoff.getJitterFactor(), Schedulers.elastic())
+                .block();
         }
         return new KeyRegistration(() -> {
             if (registration.unregister().lastListenerRemoved()) {
-                registrationBinder.unbind(key).block();
+                registrationBinder.unbind(key)
+                    .retryBackoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff(), FOREVER, retryBackoff.getJitterFactor(), Schedulers.elastic())
+                    .block();
             }
         });
     }
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
index ecc1b4f..01c2b23 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
@@ -76,7 +76,7 @@ public class RabbitMQEventBus implements EventBus, Startable {
         if (!isRunning && !isStopping) {
 
             LocalListenerRegistry localListenerRegistry = new LocalListenerRegistry();
-            keyRegistrationHandler = new KeyRegistrationHandler(eventBusId, eventSerializer, sender, receiverProvider, routingKeyConverter, localListenerRegistry, mailboxListenerExecutor);
+            keyRegistrationHandler = new KeyRegistrationHandler(eventBusId, eventSerializer, sender, receiverProvider, routingKeyConverter, localListenerRegistry, mailboxListenerExecutor, retryBackoff);
             groupRegistrationHandler = new GroupRegistrationHandler(eventSerializer, sender, receiverProvider, retryBackoff, eventDeadLetters, mailboxListenerExecutor);
             eventDispatcher = new EventDispatcher(eventBusId, eventSerializer, sender, localListenerRegistry, mailboxListenerExecutor);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org