You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/03/07 11:25:37 UTC
[james-project] branch master updated (4b3ebab -> eaad70c)
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git.
from 4b3ebab JAMES-3067 Fixes related to make RecipientRewriteTable configuration immutable
new aa97806 JAMES-3104 Eventually publish blob store metrics
new 1ebd654 JAMES-3082 add retry to make event bus test when rabbitmq restart pass
new d143e5e JAMES-3082 re-enable rabbitMQ event bust test when restart rabbitMQ
new 257e917 JAMES-3082 create test to demonstrate that event bus messages are persisted on rabbitMQ
new f39bea9 JAMES-3082 set eventbus rabbitmq messages persitent
new 7cace28 JAMES-3082 add retry to make event bus test when rabbitmq restart pass
new cf71fdd JAMES-3103 Set EventBus' queue to autoDelete in order to avoid leak
new 0dff06e JAMES-3103 Add a prefix to EventBus' queue name
new da860b5 JAMES-3103 Add a test showing message are dispatched to registered keys qfter outage
new 45f0ce4 JAMES-3044 Upgrade JSOUP to 1.13.1
new eaad70c JAMES-1759 WebAdmin route to test user existence
The 11 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
.../mailbox/events/RetryBackoffConfiguration.java | 3 +-
.../mailbox/events/ErrorHandlingContract.java | 57 +++++++++++++++++----
.../james/mailbox/events/EventBusTestFixture.java | 9 +++-
.../events/delivery/InVmEventDeliveryTest.java | 6 +++
.../james/mailbox/events/EventDispatcher.java | 4 ++
.../james/mailbox/events/GroupConsumerRetry.java | 4 ++
.../james/mailbox/events/GroupRegistration.java | 4 ++
.../mailbox/events/KeyRegistrationHandler.java | 59 +++++++++++++++++-----
.../james/mailbox/events/RabbitMQEventBus.java | 24 ++++++++-
.../james/mailbox/events/RabbitMQEventBusTest.java | 50 +++++++++++++++---
pom.xml | 2 +-
server/blob/blob-api/pom.xml | 5 ++
.../blob/api/MetricableBlobStoreContract.java | 38 +++++++++-----
server/blob/blob-cassandra/pom.xml | 5 ++
server/blob/blob-memory/pom.xml | 5 ++
server/blob/blob-objectstorage/pom.xml | 5 ++
.../RabbitMQEventDeadLettersIntegrationTest.java | 4 +-
.../apache/james/webadmin/routes/UserRoutes.java | 31 ++++++++++++
.../apache/james/webadmin/service/UserService.java | 8 +--
.../james/webadmin/routes/UserRoutesTest.java | 48 ++++++++++++++++++
src/site/markdown/server/manage-webadmin.md | 16 ++++++
21 files changed, 331 insertions(+), 56 deletions(-)
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org
[james-project] 05/11: JAMES-3082 set eventbus rabbitmq messages
persitent
Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit f39bea91362791401898b8579b718fe72421afee
Author: Rémi KOWALSKI <rk...@linagora.com>
AuthorDate: Mon Mar 2 11:19:03 2020 +0100
JAMES-3082 set eventbus rabbitmq messages persitent
---
.../main/java/org/apache/james/mailbox/events/EventDispatcher.java | 4 ++++
.../main/java/org/apache/james/mailbox/events/GroupConsumerRetry.java | 4 ++++
2 files changed, 8 insertions(+)
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
index 93b1028..239d716 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
@@ -19,6 +19,7 @@
package org.apache.james.mailbox.events;
+import static com.rabbitmq.client.MessageProperties.PERSISTENT_TEXT_PLAIN;
import static org.apache.james.backends.rabbitmq.Constants.DIRECT_EXCHANGE;
import static org.apache.james.backends.rabbitmq.Constants.DURABLE;
import static org.apache.james.mailbox.events.RabbitMQEventBus.EVENT_BUS_ID;
@@ -64,6 +65,9 @@ class EventDispatcher {
this.localListenerRegistry = localListenerRegistry;
this.basicProperties = new AMQP.BasicProperties.Builder()
.headers(ImmutableMap.of(EVENT_BUS_ID, eventBusId.asString()))
+ .deliveryMode(PERSISTENT_TEXT_PLAIN.getDeliveryMode())
+ .priority(PERSISTENT_TEXT_PLAIN.getPriority())
+ .contentType(PERSISTENT_TEXT_PLAIN.getContentType())
.build();
this.mailboxListenerExecutor = mailboxListenerExecutor;
}
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupConsumerRetry.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupConsumerRetry.java
index f1933ad..8286d30 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupConsumerRetry.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupConsumerRetry.java
@@ -19,6 +19,7 @@
package org.apache.james.mailbox.events;
+import static com.rabbitmq.client.MessageProperties.PERSISTENT_TEXT_PLAIN;
import static org.apache.james.backends.rabbitmq.Constants.DIRECT_EXCHANGE;
import static org.apache.james.backends.rabbitmq.Constants.DURABLE;
import static org.apache.james.backends.rabbitmq.Constants.EMPTY_ROUTING_KEY;
@@ -118,6 +119,9 @@ class GroupConsumerRetry {
EMPTY_ROUTING_KEY,
new AMQP.BasicProperties.Builder()
.headers(ImmutableMap.of(RETRY_COUNT, currentRetryCount + 1))
+ .deliveryMode(PERSISTENT_TEXT_PLAIN.getDeliveryMode())
+ .priority(PERSISTENT_TEXT_PLAIN.getPriority())
+ .contentType(PERSISTENT_TEXT_PLAIN.getContentType())
.build(),
eventAsBytes));
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org
[james-project] 03/11: JAMES-3082 re-enable rabbitMQ event bust
test when restart rabbitMQ
Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit d143e5e1d50ee58738e609b34a2eec5fb2d6705d
Author: Rémi KOWALSKI <rk...@linagora.com>
AuthorDate: Mon Mar 2 17:42:31 2020 +0100
JAMES-3082 re-enable rabbitMQ event bust test when restart rabbitMQ
---
.../java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java | 6 ------
1 file changed, 6 deletions(-)
diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
index 35d3581..a5a7d48 100644
--- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
+++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
@@ -361,7 +361,6 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
}
@Test
- @Disabled("To fix in JAMES-3082 make message persistent in event bus")
void dispatchShouldWorkAfterRestartForOldRegistration() throws Exception {
eventBus.start();
MailboxListener listener = newListener();
@@ -374,7 +373,6 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
}
@Test
- @Disabled("To fix in JAMES-3082 make message persistent in event bus")
void dispatchShouldWorkAfterRestartForNewRegistration() throws Exception {
eventBus.start();
MailboxListener listener = newListener();
@@ -390,7 +388,6 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
}
@Test
- @Disabled("To fix in JAMES-3082 make message persistent in event bus")
void redeliverShouldWorkAfterRestartForOldRegistration() throws Exception {
eventBus.start();
MailboxListener listener = newListener();
@@ -403,7 +400,6 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
}
@Test
- @Disabled("To fix in JAMES-3082 make message persistent in event bus")
void redeliverShouldWorkAfterRestartForNewRegistration() throws Exception {
eventBus.start();
MailboxListener listener = newListener();
@@ -417,7 +413,6 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
}
@Test
- @Disabled("To fix in JAMES-3082 make message persistent in event bus")
void dispatchShouldWorkAfterRestartForOldKeyRegistration() throws Exception {
eventBus.start();
MailboxListener listener = newListener();
@@ -430,7 +425,6 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
}
@Test
- @Disabled("To fix in JAMES-3082 make message persistent in event bus")
void dispatchShouldWorkAfterRestartForNewKeyRegistration() throws Exception {
eventBus.start();
MailboxListener listener = newListener();
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org
[james-project] 06/11: JAMES-3082 add retry to make event bus test
when rabbitmq restart pass
Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 7cace280ba2ef586c58423f561c0f16e5030e99d
Author: Rémi KOWALSKI <rk...@linagora.com>
AuthorDate: Fri Mar 6 10:56:50 2020 +0100
JAMES-3082 add retry to make event bus test when rabbitmq restart pass
---
.../main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
index 7e95dbf..f823ff1 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
@@ -92,9 +92,9 @@ public class RabbitMQEventBus implements EventBus, Startable {
if (!isRunning && !isStopping) {
LocalListenerRegistry localListenerRegistry = new LocalListenerRegistry();
- keyRegistrationHandler = new KeyRegistrationHandler(eventBusId, eventSerializer, channelPool, routingKeyConverter, localListenerRegistry, mailboxListenerExecutor, retryBackoff);
- groupRegistrationHandler = new GroupRegistrationHandler(eventSerializer, channelPool, retryBackoff, eventDeadLetters, mailboxListenerExecutor);
- eventDispatcher = new EventDispatcher(eventBusId, eventSerializer, channelPool.getSender(), localListenerRegistry, mailboxListenerExecutor);
+ keyRegistrationHandler = new KeyRegistrationHandler(eventBusId, eventSerializer, sender, receiverProvider, routingKeyConverter, localListenerRegistry, mailboxListenerExecutor, retryBackoff);
+ groupRegistrationHandler = new GroupRegistrationHandler(eventSerializer, sender, receiverProvider, retryBackoff, eventDeadLetters, mailboxListenerExecutor);
+ eventDispatcher = new EventDispatcher(eventBusId, eventSerializer, sender, localListenerRegistry, mailboxListenerExecutor);
keyRegistrationHandler.declareQueue();
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org
[james-project] 07/11: JAMES-3103 Set EventBus' queue to autoDelete
in order to avoid leak
Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit cf71fddc2b972bd0b7ec227db940afe7d2f42708
Author: Gautier DI FOLCO <gd...@linagora.com>
AuthorDate: Fri Mar 6 13:40:40 2020 +0100
JAMES-3103 Set EventBus' queue to autoDelete in order to avoid leak
---
.../java/org/apache/james/mailbox/events/KeyRegistrationHandler.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
index 49ea9b9..20c4b4a 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
@@ -99,7 +99,7 @@ class KeyRegistrationHandler {
sender.declareQueue(QueueSpecification.queue(eventBusId.asString())
.durable(DURABLE)
.exclusive(!EXCLUSIVE)
- .autoDelete(!AUTO_DELETE)
+ .autoDelete(AUTO_DELETE)
.arguments(NO_ARGUMENTS))
.map(AMQP.Queue.DeclareOk::getQueue)
.retryBackoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff(), FOREVER, retryBackoff.getJitterFactor())
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org
[james-project] 08/11: JAMES-3103 Add a prefix to EventBus' queue
name
Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 0dff06e98530bc9e8b59d93107f4f4c9b5689593
Author: Gautier DI FOLCO <gd...@linagora.com>
AuthorDate: Fri Mar 6 13:51:30 2020 +0100
JAMES-3103 Add a prefix to EventBus' queue name
---
.../apache/james/mailbox/events/KeyRegistrationHandler.java | 11 ++++++++---
1 file changed, 8 insertions(+), 3 deletions(-)
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
index 20c4b4a..c9767b8 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
@@ -22,11 +22,12 @@ package org.apache.james.mailbox.events;
import static org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE;
import static org.apache.james.backends.rabbitmq.Constants.DURABLE;
import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE;
-import static org.apache.james.backends.rabbitmq.Constants.NO_ARGUMENTS;
import static org.apache.james.mailbox.events.RabbitMQEventBus.EVENT_BUS_ID;
import static org.apache.james.mailbox.events.RetryBackoffConfiguration.FOREVER;
import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -40,6 +41,7 @@ import org.slf4j.LoggerFactory;
import com.github.fge.lambdas.Throwing;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Delivery;
@@ -53,6 +55,9 @@ import reactor.rabbitmq.Sender;
class KeyRegistrationHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(KeyRegistrationHandler.class);
+ private static final String EVENTBUS_QUEUE_NAME_PREFIX = "eventbus-";
+ private static final Duration EXPIRATION_TIMEOUT = Duration.ofMinutes(30);
+ private static final Map<String, Object> QUEUE_ARGUMENTS = ImmutableMap.of("x-expires", EXPIRATION_TIMEOUT.toMillis());
private final EventBusId eventBusId;
private final LocalListenerRegistry localListenerRegistry;
@@ -96,11 +101,11 @@ class KeyRegistrationHandler {
@VisibleForTesting
void declareQueue() {
- sender.declareQueue(QueueSpecification.queue(eventBusId.asString())
+ sender.declareQueue(QueueSpecification.queue(EVENTBUS_QUEUE_NAME_PREFIX + eventBusId.asString())
.durable(DURABLE)
.exclusive(!EXCLUSIVE)
.autoDelete(AUTO_DELETE)
- .arguments(NO_ARGUMENTS))
+ .arguments(QUEUE_ARGUMENTS))
.map(AMQP.Queue.DeclareOk::getQueue)
.retryBackoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff(), FOREVER, retryBackoff.getJitterFactor())
.doOnSuccess(queueName -> {
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org
[james-project] 11/11: JAMES-1759 WebAdmin route to test user
existence
Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit eaad70cb24c77524051b58875422e4ef28a9a997
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Thu Mar 5 11:08:52 2020 +0700
JAMES-1759 WebAdmin route to test user existence
Convenient when the input of user list is super large
---
.../apache/james/webadmin/routes/UserRoutes.java | 31 ++++++++++++++
.../apache/james/webadmin/service/UserService.java | 8 +---
.../james/webadmin/routes/UserRoutesTest.java | 48 ++++++++++++++++++++++
src/site/markdown/server/manage-webadmin.md | 16 ++++++++
4 files changed, 96 insertions(+), 7 deletions(-)
diff --git a/server/protocols/webadmin/webadmin-data/src/main/java/org/apache/james/webadmin/routes/UserRoutes.java b/server/protocols/webadmin/webadmin-data/src/main/java/org/apache/james/webadmin/routes/UserRoutes.java
index cd8699d..6cfcbc8 100644
--- a/server/protocols/webadmin/webadmin-data/src/main/java/org/apache/james/webadmin/routes/UserRoutes.java
+++ b/server/protocols/webadmin/webadmin-data/src/main/java/org/apache/james/webadmin/routes/UserRoutes.java
@@ -27,6 +27,7 @@ import java.util.List;
import javax.inject.Inject;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
+import javax.ws.rs.HEAD;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
@@ -38,6 +39,7 @@ import org.apache.james.rrt.api.RecipientRewriteTable;
import org.apache.james.rrt.api.RecipientRewriteTableException;
import org.apache.james.user.api.InvalidUsernameException;
import org.apache.james.user.api.UsersRepositoryException;
+import org.apache.james.webadmin.Constants;
import org.apache.james.webadmin.Routes;
import org.apache.james.webadmin.dto.AddUserRequest;
import org.apache.james.webadmin.dto.UserResponse;
@@ -106,6 +108,8 @@ public class UserRoutes implements Routes {
defineDeleteUser();
defineAllowedFromHeaders();
+
+ defineUserExist();
}
@DELETE
@@ -124,6 +128,23 @@ public class UserRoutes implements Routes {
service.delete(USERS + SEPARATOR + USER_NAME, this::removeUser);
}
+ @HEAD
+ @Path("/{username}")
+ @ApiOperation(value = "Testing an user existence")
+ @ApiImplicitParams({
+ @ApiImplicitParam(required = true, dataType = "string", name = "username", paramType = "path")
+ })
+ @ApiResponses(value = {
+ @ApiResponse(code = HttpStatus.OK_200, message = "OK. User exists."),
+ @ApiResponse(code = HttpStatus.BAD_REQUEST_400, message = "Invalid input user."),
+ @ApiResponse(code = HttpStatus.NOT_FOUND_404, message = "User does not exist."),
+ @ApiResponse(code = HttpStatus.INTERNAL_SERVER_ERROR_500,
+ message = "Internal server error - Something went bad on the server side.")
+ })
+ public void defineUserExist() {
+ service.head(USERS + SEPARATOR + USER_NAME, this::userExist);
+ }
+
@PUT
@Path("/{username}")
@ApiOperation(value = "Creating an user")
@@ -188,6 +209,16 @@ public class UserRoutes implements Routes {
}
}
+ private String userExist(Request request, Response response) throws UsersRepositoryException {
+ Username username = extractUsername(request);
+ if (userService.userExists(username)) {
+ response.status(HttpStatus.OK_200);
+ } else {
+ response.status(HttpStatus.NOT_FOUND_404);
+ }
+ return Constants.EMPTY_BODY;
+ }
+
private HaltException upsertUser(Request request, Response response) throws JsonExtractException {
Username username = extractUsername(request);
try {
diff --git a/server/protocols/webadmin/webadmin-data/src/main/java/org/apache/james/webadmin/service/UserService.java b/server/protocols/webadmin/webadmin-data/src/main/java/org/apache/james/webadmin/service/UserService.java
index a263234..35e5cb6 100644
--- a/server/protocols/webadmin/webadmin-data/src/main/java/org/apache/james/webadmin/service/UserService.java
+++ b/server/protocols/webadmin/webadmin-data/src/main/java/org/apache/james/webadmin/service/UserService.java
@@ -24,7 +24,6 @@ import java.util.Optional;
import java.util.stream.Stream;
import javax.inject.Inject;
-import javax.mail.internet.AddressException;
import org.apache.james.core.Username;
import org.apache.james.user.api.UsersRepository;
@@ -67,12 +66,7 @@ public class UserService {
}
public boolean userExists(Username username) throws UsersRepositoryException {
- try {
- return usersRepository.contains(usersRepository.getUser(username.asMailAddress()));
- } catch (AddressException e) {
- LOGGER.info("Unable to parse address '%s'", username.asString(), e);
- return false;
- }
+ return usersRepository.contains(username);
}
private void upsert(User user, Username username, char[] password) throws UsersRepositoryException {
diff --git a/server/protocols/webadmin/webadmin-data/src/test/java/org/apache/james/webadmin/routes/UserRoutesTest.java b/server/protocols/webadmin/webadmin-data/src/test/java/org/apache/james/webadmin/routes/UserRoutesTest.java
index 38b1ada..2354290 100644
--- a/server/protocols/webadmin/webadmin-data/src/test/java/org/apache/james/webadmin/routes/UserRoutesTest.java
+++ b/server/protocols/webadmin/webadmin-data/src/test/java/org/apache/james/webadmin/routes/UserRoutesTest.java
@@ -246,6 +246,14 @@ class UserRoutesTest {
}
@Test
+ default void headShouldReturnBadRequestWhenEmptyUserName() {
+ when()
+ .head("/")
+ .then()
+ .statusCode(HttpStatus.NOT_FOUND_404);
+ }
+
+ @Test
default void deleteShouldReturnBadRequestWhenUsernameIsTooLong() {
when()
.delete(USERNAME_WITH_DOMAIN.asString() + "0123456789.0123456789.0123456789.0123456789.0123456789.0123456789.0123456789.0123456789.0123456789.0123456789." +
@@ -514,6 +522,26 @@ class UserRoutesTest {
}
@Test
+ void headShouldReturnOKWhenUserExists() {
+ with()
+ .body("{\"password\":\"password\"}")
+ .put(USERNAME_WITH_DOMAIN.asString());
+
+ when()
+ .head(USERNAME_WITH_DOMAIN.asString())
+ .then()
+ .statusCode(HttpStatus.OK_200);
+ }
+
+ @Test
+ void headShouldReturnNotFoundWhenUserDoesNotExist() {
+ when()
+ .head(USERNAME_WITH_DOMAIN.asString())
+ .then()
+ .statusCode(HttpStatus.NOT_FOUND_404);
+ }
+
+ @Test
void puttingWithDomainPartInUsernameTwoTimesShouldBeAllowed() {
// Given
with()
@@ -683,6 +711,26 @@ class UserRoutesTest {
}
@Test
+ void headShouldReturnOKWhenUserExists() {
+ with()
+ .body("{\"password\":\"password\"}")
+ .put(USERNAME_WITHOUT_DOMAIN.asString());
+
+ when()
+ .head(USERNAME_WITHOUT_DOMAIN.asString())
+ .then()
+ .statusCode(HttpStatus.OK_200);
+ }
+
+ @Test
+ void headShouldReturnNotFoundWhenUserDoesNotExist() {
+ when()
+ .head(USERNAME_WITHOUT_DOMAIN.asString())
+ .then()
+ .statusCode(HttpStatus.NOT_FOUND_404);
+ }
+
+ @Test
void puttingWithoutDomainPartInUsernameTwoTimesShouldBeAllowed() {
// Given
with()
diff --git a/src/site/markdown/server/manage-webadmin.md b/src/site/markdown/server/manage-webadmin.md
index 33d7a16..0b5675f 100644
--- a/src/site/markdown/server/manage-webadmin.md
+++ b/src/site/markdown/server/manage-webadmin.md
@@ -297,6 +297,7 @@ Response codes:
## Administrating users
- [Create a user](#Create_a_user)
+ - [Testing a user existence](#Testing_a_user_existence)
- [Updating a user password](#Updating_a_user_password)
- [Deleting a domain](#Deleting_a_user)
- [Retrieving the user list](#Retrieving_the_user_list)
@@ -320,6 +321,21 @@ Response codes:
Note: if the user exists already, its password will be updated.
+###Testing a user existence
+
+```
+curl -XHEAD http://ip:port/users/usernameToBeUsed
+```
+
+Resource name "usernameToBeUsed" represents a valid user,
+hence it should match the criteria at [User Repositories documentation](/server/config-users.html)
+
+Response codes:
+
+ - 200: The user exists
+ - 400: The user name is invalid
+ - 404: The user does not exist
+
### Updating a user password
Same than Create, but a user need to exist.
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org
[james-project] 10/11: JAMES-3044 Upgrade JSOUP to 1.13.1
Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 45f0ce455c1929eb704d5cce9877141aad90766b
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Mar 6 11:22:28 2020 +0700
JAMES-3044 Upgrade JSOUP to 1.13.1
- Many performance enhancement are shipped in 1.13.1
```
memory optimizations, reducing the retained size of a Document by ~ 39%,
and allocations by ~ 9%
```
Ships:
* Bugfix: in CharacterReader when parsing an input stream, could throw
a Mark Invalid exception if the reader was marked, a bufferUp
occurred, and then the reader was rewound.
<https://github.com/jhy/jsoup/issues/1324>
Which we noticed in production, even after a 1.12.2 upgrade.
Other bug fixes and enhancements are documented.
CF https://github.com/jhy/jsoup/blob/master/CHANGES
---
pom.xml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/pom.xml b/pom.xml
index 7bee095..f83608b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2543,7 +2543,7 @@
<dependency>
<groupId>org.jsoup</groupId>
<artifactId>jsoup</artifactId>
- <version>1.12.2</version>
+ <version>1.13.1</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org
[james-project] 01/11: JAMES-3104 Eventually publish blob store
metrics
Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit aa978061feea5c777d96b2293bc2e5b54399ef10
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Sat Mar 7 04:45:15 2020 +0100
JAMES-3104 Eventually publish blob store metrics
This enhancement improve build stability
---
server/blob/blob-api/pom.xml | 5 +++
.../blob/api/MetricableBlobStoreContract.java | 38 ++++++++++++++--------
server/blob/blob-cassandra/pom.xml | 5 +++
server/blob/blob-memory/pom.xml | 5 +++
server/blob/blob-objectstorage/pom.xml | 5 +++
5 files changed, 44 insertions(+), 14 deletions(-)
diff --git a/server/blob/blob-api/pom.xml b/server/blob/blob-api/pom.xml
index cbe5c0e..8c9182b 100644
--- a/server/blob/blob-api/pom.xml
+++ b/server/blob/blob-api/pom.xml
@@ -70,6 +70,11 @@
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
diff --git a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/MetricableBlobStoreContract.java b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/MetricableBlobStoreContract.java
index c11b117..ca346db 100644
--- a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/MetricableBlobStoreContract.java
+++ b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/MetricableBlobStoreContract.java
@@ -27,11 +27,14 @@ import static org.apache.james.blob.api.MetricableBlobStore.READ_TIMER_NAME;
import static org.apache.james.blob.api.MetricableBlobStore.SAVE_BYTES_TIMER_NAME;
import static org.apache.james.blob.api.MetricableBlobStore.SAVE_INPUT_STREAM_TIMER_NAME;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
import java.io.ByteArrayInputStream;
import java.nio.charset.StandardCharsets;
import org.apache.james.metrics.tests.RecordingMetricFactory;
+import org.awaitility.Duration;
+import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
@@ -67,8 +70,9 @@ public interface MetricableBlobStoreContract extends BlobStoreContract {
Mono.from(store.save(store.getDefaultBucketName(), BYTES_CONTENT, LOW_COST)).block();
Mono.from(store.save(store.getDefaultBucketName(), BYTES_CONTENT, LOW_COST)).block();
- assertThat(metricsTestExtension.getMetricFactory().executionTimesFor(SAVE_BYTES_TIMER_NAME))
- .hasSize(2);
+ await().atMost(Duration.FIVE_SECONDS)
+ .untilAsserted(() -> assertThat(metricsTestExtension.getMetricFactory().executionTimesFor(SAVE_BYTES_TIMER_NAME))
+ .hasSize(2));
}
@Test
@@ -78,8 +82,9 @@ public interface MetricableBlobStoreContract extends BlobStoreContract {
Mono.from(store.save(store.getDefaultBucketName(), STRING_CONTENT, LOW_COST)).block();
Mono.from(store.save(store.getDefaultBucketName(), STRING_CONTENT, LOW_COST)).block();
- assertThat(metricsTestExtension.getMetricFactory().executionTimesFor(SAVE_BYTES_TIMER_NAME))
- .hasSize(2);
+ await().atMost(Duration.FIVE_SECONDS)
+ .untilAsserted(() -> assertThat(metricsTestExtension.getMetricFactory().executionTimesFor(SAVE_BYTES_TIMER_NAME))
+ .hasSize(2));
}
@Test
@@ -89,8 +94,9 @@ public interface MetricableBlobStoreContract extends BlobStoreContract {
Mono.from(store.save(store.getDefaultBucketName(), new ByteArrayInputStream(BYTES_CONTENT), LOW_COST)).block();
Mono.from(store.save(store.getDefaultBucketName(), new ByteArrayInputStream(BYTES_CONTENT), LOW_COST)).block();
- assertThat(metricsTestExtension.getMetricFactory().executionTimesFor(SAVE_INPUT_STREAM_TIMER_NAME))
- .hasSize(2);
+ await().atMost(Duration.FIVE_SECONDS)
+ .untilAsserted(() -> assertThat(metricsTestExtension.getMetricFactory().executionTimesFor(SAVE_INPUT_STREAM_TIMER_NAME))
+ .hasSize(2));
}
@Test
@@ -101,8 +107,9 @@ public interface MetricableBlobStoreContract extends BlobStoreContract {
Mono.from(store.readBytes(store.getDefaultBucketName(), blobId)).block();
Mono.from(store.readBytes(store.getDefaultBucketName(), blobId)).block();
- assertThat(metricsTestExtension.getMetricFactory().executionTimesFor(READ_BYTES_TIMER_NAME))
- .hasSize(2);
+ await().atMost(Duration.FIVE_SECONDS)
+ .untilAsserted(() -> assertThat(metricsTestExtension.getMetricFactory().executionTimesFor(READ_BYTES_TIMER_NAME))
+ .hasSize(2));
}
@Test
@@ -113,8 +120,9 @@ public interface MetricableBlobStoreContract extends BlobStoreContract {
store.read(store.getDefaultBucketName(), blobId);
store.read(store.getDefaultBucketName(), blobId);
- assertThat(metricsTestExtension.getMetricFactory().executionTimesFor(READ_TIMER_NAME))
- .hasSize(2);
+ await().atMost(Duration.FIVE_SECONDS)
+ .untilAsserted(() -> assertThat(metricsTestExtension.getMetricFactory().executionTimesFor(READ_TIMER_NAME))
+ .hasSize(2));
}
@Test
@@ -127,8 +135,9 @@ public interface MetricableBlobStoreContract extends BlobStoreContract {
Mono.from(store.deleteBucket(bucketName)).block();
- assertThat(metricsTestExtension.getMetricFactory().executionTimesFor(DELETE_BUCKET_TIMER_NAME))
- .hasSize(1);
+ await().atMost(Duration.FIVE_SECONDS)
+ .untilAsserted(() -> assertThat(metricsTestExtension.getMetricFactory().executionTimesFor(DELETE_BUCKET_TIMER_NAME))
+ .hasSize(1));
}
@Test
@@ -141,7 +150,8 @@ public interface MetricableBlobStoreContract extends BlobStoreContract {
Mono.from(store.delete(BucketName.DEFAULT, blobId1)).block();
Mono.from(store.delete(BucketName.DEFAULT, blobId2)).block();
- assertThat(metricsTestExtension.getMetricFactory().executionTimesFor(DELETE_TIMER_NAME))
- .hasSize(2);
+ await().atMost(Duration.FIVE_SECONDS)
+ .untilAsserted(() -> assertThat(metricsTestExtension.getMetricFactory().executionTimesFor(DELETE_TIMER_NAME))
+ .hasSize(2));
}
}
\ No newline at end of file
diff --git a/server/blob/blob-cassandra/pom.xml b/server/blob/blob-cassandra/pom.xml
index dcaf57c..8894ca6 100644
--- a/server/blob/blob-cassandra/pom.xml
+++ b/server/blob/blob-cassandra/pom.xml
@@ -72,6 +72,11 @@
<artifactId>guava</artifactId>
</dependency>
<dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<scope>test</scope>
diff --git a/server/blob/blob-memory/pom.xml b/server/blob/blob-memory/pom.xml
index 6a6888c..76f67e3 100644
--- a/server/blob/blob-memory/pom.xml
+++ b/server/blob/blob-memory/pom.xml
@@ -66,6 +66,11 @@
<artifactId>reactor-core</artifactId>
</dependency>
<dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
diff --git a/server/blob/blob-objectstorage/pom.xml b/server/blob/blob-objectstorage/pom.xml
index 9f66fae..287b193 100644
--- a/server/blob/blob-objectstorage/pom.xml
+++ b/server/blob/blob-objectstorage/pom.xml
@@ -112,6 +112,11 @@
<version>${jclouds.version}</version>
</dependency>
<dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<scope>test</scope>
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org
[james-project] 09/11: JAMES-3103 Add a test showing message are
dispatched to registered keys qfter outage
Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit da860b57d58451b7098a9a23d9e4fda69061e426
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Mar 6 21:34:44 2020 +0700
JAMES-3103 Add a test showing message are dispatched to registered keys qfter outage
---
.../james/mailbox/events/RabbitMQEventBusTest.java | 20 ++++++++++++++++++++
1 file changed, 20 insertions(+)
diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
index f47a63b..0425320 100644
--- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
+++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
@@ -76,6 +76,8 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.stubbing.Answer;
+import com.google.common.collect.ImmutableSet;
+
import reactor.core.publisher.Mono;
import reactor.rabbitmq.BindingSpecification;
import reactor.rabbitmq.ExchangeSpecification;
@@ -353,6 +355,24 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
rabbitMQEventBusWithNetWorkIssue.dispatch(EVENT, NO_KEYS).block();
assertThatListenerReceiveOneEvent(listener);
}
+
+ @Test
+ void dispatchShouldWorkAfterNetworkIssuesForOldRegistrationAndKey() {
+ rabbitMQEventBusWithNetWorkIssue.start();
+ MailboxListener listener = newListener();
+ rabbitMQEventBusWithNetWorkIssue.register(listener, KEY_1);
+
+ rabbitMQNetWorkIssueExtension.getRabbitMQ().pause();
+
+ assertThatThrownBy(() -> rabbitMQEventBusWithNetWorkIssue.dispatch(EVENT, NO_KEYS).block())
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining("Retries exhausted");
+
+ rabbitMQNetWorkIssueExtension.getRabbitMQ().unpause();
+
+ rabbitMQEventBusWithNetWorkIssue.dispatch(EVENT, ImmutableSet.of(KEY_1)).block();
+ assertThatListenerReceiveOneEvent(listener);
+ }
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org
[james-project] 04/11: JAMES-3082 create test to demonstrate that
event bus messages are persisted on rabbitMQ
Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 257e917892433442a64ce094388041010e93a091
Author: Rémi KOWALSKI <rk...@linagora.com>
AuthorDate: Tue Mar 3 17:09:29 2020 +0100
JAMES-3082 create test to demonstrate that event bus messages are persisted on rabbitMQ
---
.../mailbox/events/ErrorHandlingContract.java | 57 ++++++++++++++++++----
.../james/mailbox/events/EventBusTestFixture.java | 9 +++-
.../events/delivery/InVmEventDeliveryTest.java | 6 +++
.../mailbox/events/KeyRegistrationHandler.java | 28 ++++++++---
.../james/mailbox/events/RabbitMQEventBus.java | 22 +++++++++
.../james/mailbox/events/RabbitMQEventBusTest.java | 24 +++++++++
.../RabbitMQEventDeadLettersIntegrationTest.java | 4 +-
7 files changed, 131 insertions(+), 19 deletions(-)
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java
index 219f5e4..35aaea2 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java
@@ -26,6 +26,7 @@ import static org.apache.james.mailbox.events.EventBusTestFixture.GROUP_A;
import static org.apache.james.mailbox.events.EventBusTestFixture.KEY_1;
import static org.apache.james.mailbox.events.EventBusTestFixture.NO_KEYS;
import static org.apache.james.mailbox.events.EventBusTestFixture.WAIT_CONDITION;
+import static org.apache.james.mailbox.events.EventBusTestFixture.WAIT_CONDITION_LONG;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
@@ -107,17 +108,22 @@ interface ErrorHandlingContract extends EventBusContract {
@Test
default void listenerShouldReceiveWhenFailsEqualsMaxRetries() {
EventCollector eventCollector = eventCollector();
-
+ //do throw RetryBackoffConfiguration.DEFAULT.DEFAULT_MAX_RETRIES
doThrow(new RuntimeException())
.doThrow(new RuntimeException())
.doThrow(new RuntimeException())
+ .doThrow(new RuntimeException())
+ .doThrow(new RuntimeException())
+ .doThrow(new RuntimeException())
+ .doThrow(new RuntimeException())
+ .doThrow(new RuntimeException())
.doCallRealMethod()
.when(eventCollector).event(EVENT);
eventBus().register(eventCollector, GROUP_A);
eventBus().dispatch(EVENT, NO_KEYS).block();
- WAIT_CONDITION
+ WAIT_CONDITION_LONG
.untilAsserted(() -> assertThat(eventCollector.getEvents()).hasSize(1));
}
@@ -125,10 +131,16 @@ interface ErrorHandlingContract extends EventBusContract {
default void listenerShouldNotReceiveWhenFailsGreaterThanMaxRetries() throws Exception {
EventCollector eventCollector = eventCollector();
+ //do throw RetryBackoffConfiguration.DEFAULT.DEFAULT_MAX_RETRIES + 1 times
doThrow(new RuntimeException())
.doThrow(new RuntimeException())
.doThrow(new RuntimeException())
.doThrow(new RuntimeException())
+ .doThrow(new RuntimeException())
+ .doThrow(new RuntimeException())
+ .doThrow(new RuntimeException())
+ .doThrow(new RuntimeException())
+ .doThrow(new RuntimeException())
.doCallRealMethod()
.when(eventCollector).event(EVENT);
@@ -147,9 +159,10 @@ interface ErrorHandlingContract extends EventBusContract {
eventBus().register(throwingListener, GROUP_A);
eventBus().dispatch(EVENT, NO_KEYS).block();
- TimeUnit.SECONDS.sleep(5);
+ TimeUnit.MINUTES.sleep(1);
int numberOfCallsAfterExceedMaxRetries = throwingListener.timeElapsed.size();
- TimeUnit.SECONDS.sleep(5);
+ TimeUnit.MINUTES.sleep(1);
+
assertThat(throwingListener.timeElapsed.size())
.isEqualTo(numberOfCallsAfterExceedMaxRetries);
@@ -162,10 +175,10 @@ interface ErrorHandlingContract extends EventBusContract {
eventBus().register(throwingListener, GROUP_A);
eventBus().dispatch(EVENT, NO_KEYS).block();
- TimeUnit.SECONDS.sleep(5);
+ TimeUnit.MINUTES.sleep(1);
SoftAssertions.assertSoftly(softly -> {
List<Instant> timeElapsed = throwingListener.timeElapsed;
- softly.assertThat(timeElapsed).hasSize(4);
+ softly.assertThat(timeElapsed).hasSize(RetryBackoffConfiguration.DEFAULT.DEFAULT_MAX_RETRIES + 1);
long minFirstDelayAfter = 100; // first backOff
long minSecondDelayAfter = 50; // 50 * jitter factor (50 * 0.5)
@@ -207,10 +220,16 @@ interface ErrorHandlingContract extends EventBusContract {
default void deadLettersIsNotAppliedForKeyRegistrations() throws Exception {
EventCollector eventCollector = eventCollector();
+ //do throw RetryBackoffConfiguration.DEFAULT.DEFAULT_MAX_RETRIES + 1 times
doThrow(new RuntimeException())
.doThrow(new RuntimeException())
.doThrow(new RuntimeException())
.doThrow(new RuntimeException())
+ .doThrow(new RuntimeException())
+ .doThrow(new RuntimeException())
+ .doThrow(new RuntimeException())
+ .doThrow(new RuntimeException())
+ .doThrow(new RuntimeException())
.doCallRealMethod()
.when(eventCollector).event(EVENT);
@@ -247,18 +266,23 @@ interface ErrorHandlingContract extends EventBusContract {
@Test
default void deadLetterShouldStoreWhenDispatchFailsGreaterThanMaxRetries() {
EventCollector eventCollector = eventCollector();
-
+ //do throw RetryBackoffConfiguration.DEFAULT.DEFAULT_MAX_RETRIES + 1 times
doThrow(new RuntimeException())
.doThrow(new RuntimeException())
.doThrow(new RuntimeException())
.doThrow(new RuntimeException())
+ .doThrow(new RuntimeException())
+ .doThrow(new RuntimeException())
+ .doThrow(new RuntimeException())
+ .doThrow(new RuntimeException())
+ .doThrow(new RuntimeException())
.doCallRealMethod()
.when(eventCollector).event(EVENT);
eventBus().register(eventCollector, GROUP_A);
eventBus().dispatch(EVENT, NO_KEYS).block();
- WAIT_CONDITION.untilAsserted(() -> assertThat(deadLetter().failedIds(GROUP_A)
+ WAIT_CONDITION_LONG.untilAsserted(() -> assertThat(deadLetter().failedIds(GROUP_A)
.flatMap(insertionId -> deadLetter().failedEvent(GROUP_A, insertionId))
.toIterable())
.containsOnly(EVENT));
@@ -270,17 +294,23 @@ interface ErrorHandlingContract extends EventBusContract {
default void deadLetterShouldStoreWhenRedeliverFailsGreaterThanMaxRetries() {
EventCollector eventCollector = eventCollector();
+ //do throw RetryBackoffConfiguration.DEFAULT.DEFAULT_MAX_RETRIES + 1 times
doThrow(new RuntimeException())
.doThrow(new RuntimeException())
.doThrow(new RuntimeException())
.doThrow(new RuntimeException())
+ .doThrow(new RuntimeException())
+ .doThrow(new RuntimeException())
+ .doThrow(new RuntimeException())
+ .doThrow(new RuntimeException())
+ .doThrow(new RuntimeException())
.doCallRealMethod()
.when(eventCollector).event(EVENT);
eventBus().register(eventCollector, GROUP_A);
eventBus().reDeliver(GROUP_A, EVENT).block();
- WAIT_CONDITION.untilAsserted(() ->
+ WAIT_CONDITION_LONG.untilAsserted(() ->
assertThat(
deadLetter()
.failedIds(GROUP_A)
@@ -295,16 +325,23 @@ interface ErrorHandlingContract extends EventBusContract {
default void retryShouldDeliverAsManyTimesAsInitialDeliveryAttempt() {
EventCollector eventCollector = eventCollector();
+ //do throw RetryBackoffConfiguration.DEFAULT.DEFAULT_MAX_RETRIES + 1 times
doThrow(new RuntimeException())
.doThrow(new RuntimeException())
.doThrow(new RuntimeException())
+ .doThrow(new RuntimeException())
+ .doThrow(new RuntimeException())
+ .doThrow(new RuntimeException())
+ .doThrow(new RuntimeException())
+ .doThrow(new RuntimeException())
+ .doThrow(new RuntimeException())
.doCallRealMethod()
.when(eventCollector).event(EVENT);
eventBus().register(eventCollector, GROUP_A);
eventBus().reDeliver(GROUP_A, EVENT).block();
- WAIT_CONDITION.untilAsserted(() -> assertThat(eventCollector.getEvents()).isNotEmpty());
+ WAIT_CONDITION_LONG.untilAsserted(() -> assertThat(eventCollector.getEvents()).isNotEmpty());
}
@Test
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java
index a071583..06515e0 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java
@@ -103,7 +103,6 @@ public interface EventBusTestFixture {
MailboxListener.MailboxRenamed EVENT_UNSUPPORTED_BY_LISTENER = new MailboxListener.MailboxRenamed(SESSION_ID, USERNAME, MAILBOX_PATH, TEST_ID, MAILBOX_PATH, EVENT_ID_2);
java.time.Duration ONE_SECOND = java.time.Duration.ofSeconds(1);
- java.time.Duration THIRTY_SECONDS = java.time.Duration.ofSeconds(30);
java.time.Duration FIVE_HUNDRED_MS = java.time.Duration.ofMillis(500);
MailboxId ID_1 = TEST_ID;
MailboxId ID_2 = TestId.of(24);
@@ -118,6 +117,7 @@ public interface EventBusTestFixture {
List<Group> ALL_GROUPS = ImmutableList.of(GROUP_A, GROUP_B, GROUP_C);
ConditionFactory WAIT_CONDITION = await().timeout(Duration.FIVE_SECONDS);
+ ConditionFactory WAIT_CONDITION_LONG = await().timeout(Duration.ONE_MINUTE);
static MailboxListener newListener() {
MailboxListener listener = mock(MailboxListener.class);
@@ -125,4 +125,11 @@ public interface EventBusTestFixture {
when(listener.isHandling(any(MailboxListener.MailboxAdded.class))).thenReturn(true);
return listener;
}
+
+ static MailboxListener newAsyncListener() {
+ MailboxListener listener = mock(MailboxListener.class);
+ when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
+ when(listener.isHandling(any(MailboxListener.MailboxAdded.class))).thenReturn(true);
+ return listener;
+ }
}
diff --git a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java
index 6212178..45335e9 100644
--- a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java
+++ b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java
@@ -215,10 +215,16 @@ class InVmEventDeliveryTest {
@Test
void failureHandlerShouldWorkWhenRetryFails() {
MailboxListenerCountingSuccessfulExecution listener = newListener();
+ //do throw RetryBackoffConfiguration.DEFAULT.DEFAULT_MAX_RETRIES + 1 times
doThrow(new RuntimeException())
.doThrow(new RuntimeException())
.doThrow(new RuntimeException())
.doThrow(new RuntimeException())
+ .doThrow(new RuntimeException())
+ .doThrow(new RuntimeException())
+ .doThrow(new RuntimeException())
+ .doThrow(new RuntimeException())
+ .doThrow(new RuntimeException())
.doCallRealMethod()
.when(listener).event(EVENT);
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
index b8789ca..49ea9b9 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
@@ -28,6 +28,7 @@ import static org.apache.james.mailbox.events.RetryBackoffConfiguration.FOREVER;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.james.backends.rabbitmq.ReceiverProvider;
import org.apache.james.event.json.EventSerializer;
@@ -38,6 +39,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.github.fge.lambdas.Throwing;
+import com.google.common.annotations.VisibleForTesting;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Delivery;
@@ -63,6 +65,7 @@ class KeyRegistrationHandler {
private final MailboxListenerExecutor mailboxListenerExecutor;
private final RetryBackoffConfiguration retryBackoff;
private Optional<Disposable> receiverSubscriber;
+ private AtomicBoolean registrationQueueInitialized = new AtomicBoolean(false);
KeyRegistrationHandler(EventBusId eventBusId, EventSerializer eventSerializer,
Sender sender, ReceiverProvider receiverProvider,
@@ -78,22 +81,35 @@ class KeyRegistrationHandler {
this.retryBackoff = retryBackoff;
this.registrationQueue = new RegistrationQueueName();
this.registrationBinder = new RegistrationBinder(sender, registrationQueue);
+ this.receiverSubscriber = Optional.empty();
+
}
void start() {
+ declareQueue();
+
+ receiverSubscriber = Optional.of(receiver.consumeAutoAck(registrationQueue.asString(), new ConsumeOptions().qos(EventBus.EXECUTION_RATE))
+ .subscribeOn(Schedulers.parallel())
+ .flatMap(this::handleDelivery)
+ .subscribe());
+ }
+
+ @VisibleForTesting
+ void declareQueue() {
sender.declareQueue(QueueSpecification.queue(eventBusId.asString())
.durable(DURABLE)
.exclusive(!EXCLUSIVE)
.autoDelete(!AUTO_DELETE)
.arguments(NO_ARGUMENTS))
.map(AMQP.Queue.DeclareOk::getQueue)
- .doOnSuccess(registrationQueue::initialize)
+ .retryBackoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff(), FOREVER, retryBackoff.getJitterFactor())
+ .doOnSuccess(queueName -> {
+ if (!registrationQueueInitialized.get()) {
+ registrationQueue.initialize(queueName);
+ registrationQueueInitialized.set(true);
+ }
+ })
.block();
-
- receiverSubscriber = Optional.of(receiver.consumeAutoAck(registrationQueue.asString(), new ConsumeOptions().qos(EventBus.EXECUTION_RATE))
- .subscribeOn(Schedulers.parallel())
- .flatMap(this::handleDelivery)
- .subscribe());
}
void stop() {
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
index 01c2b23..7e95dbf 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
@@ -29,6 +29,7 @@ import org.apache.james.event.json.EventSerializer;
import org.apache.james.lifecycle.api.Startable;
import org.apache.james.metrics.api.MetricFactory;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import reactor.core.publisher.Mono;
@@ -86,6 +87,27 @@ public class RabbitMQEventBus implements EventBus, Startable {
}
}
+ @VisibleForTesting
+ void startWithoutStartingKeyRegistrationHandler() {
+ if (!isRunning && !isStopping) {
+
+ LocalListenerRegistry localListenerRegistry = new LocalListenerRegistry();
+ keyRegistrationHandler = new KeyRegistrationHandler(eventBusId, eventSerializer, channelPool, routingKeyConverter, localListenerRegistry, mailboxListenerExecutor, retryBackoff);
+ groupRegistrationHandler = new GroupRegistrationHandler(eventSerializer, channelPool, retryBackoff, eventDeadLetters, mailboxListenerExecutor);
+ eventDispatcher = new EventDispatcher(eventBusId, eventSerializer, channelPool.getSender(), localListenerRegistry, mailboxListenerExecutor);
+
+ keyRegistrationHandler.declareQueue();
+
+ eventDispatcher.start();
+ isRunning = true;
+ }
+ }
+
+ @VisibleForTesting
+ void startKeyRegistrationHandler() {
+ keyRegistrationHandler.start();
+ }
+
@PreDestroy
public void stop() {
if (isRunning && !isStopping) {
diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
index a5a7d48..f47a63b 100644
--- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
+++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
@@ -32,6 +32,7 @@ import static org.apache.james.mailbox.events.EventBusTestFixture.GROUP_A;
import static org.apache.james.mailbox.events.EventBusTestFixture.KEY_1;
import static org.apache.james.mailbox.events.EventBusTestFixture.NO_KEYS;
import static org.apache.james.mailbox.events.EventBusTestFixture.WAIT_CONDITION;
+import static org.apache.james.mailbox.events.EventBusTestFixture.newAsyncListener;
import static org.apache.james.mailbox.events.EventBusTestFixture.newListener;
import static org.apache.james.mailbox.events.GroupRegistration.WorkQueueName.MAILBOX_EVENT_WORK_QUEUE_PREFIX;
import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT;
@@ -75,6 +76,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.stubbing.Answer;
+import reactor.core.publisher.Mono;
import reactor.rabbitmq.BindingSpecification;
import reactor.rabbitmq.ExchangeSpecification;
import reactor.rabbitmq.QueueSpecification;
@@ -91,6 +93,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
private RabbitMQEventBus eventBus;
private RabbitMQEventBus eventBus2;
private RabbitMQEventBus eventBus3;
+ private RabbitMQEventBus eventBusWithKeyHandlerNotStarted;
private EventSerializer eventSerializer;
private RoutingKeyConverter routingKeyConverter;
private MemoryEventDeadLetters memoryEventDeadLetters;
@@ -106,10 +109,12 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
eventBus = newEventBus();
eventBus2 = newEventBus();
eventBus3 = newEventBus();
+ eventBusWithKeyHandlerNotStarted = newEventBus();
eventBus.start();
eventBus2.start();
eventBus3.start();
+ eventBusWithKeyHandlerNotStarted.startWithoutStartingKeyRegistrationHandler();
}
@AfterEach
@@ -117,6 +122,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
eventBus.stop();
eventBus2.stop();
eventBus3.stop();
+ eventBusWithKeyHandlerNotStarted.stop();
ALL_GROUPS.stream()
.map(GroupRegistration.WorkQueueName::of)
.forEach(queueName -> rabbitMQExtension.getSender().delete(QueueSpecification.queue(queueName.asString())).block());
@@ -425,6 +431,21 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
}
@Test
+ void dispatchedMessagesShouldSurviveARabbitMQRestart() throws Exception {
+ eventBusWithKeyHandlerNotStarted.startWithoutStartingKeyRegistrationHandler();
+ MailboxListener listener = newAsyncListener();
+ eventBusWithKeyHandlerNotStarted.register(listener, KEY_1);
+ Mono<Void> dispatch = eventBusWithKeyHandlerNotStarted.dispatch(EVENT, KEY_1);
+ dispatch.block();
+
+ rabbitMQExtension.getRabbitMQ().restart();
+
+ eventBusWithKeyHandlerNotStarted.startKeyRegistrationHandler();
+
+ assertThatListenerReceiveOneEvent(listener);
+ }
+
+ @Test
void dispatchShouldWorkAfterRestartForNewKeyRegistration() throws Exception {
eventBus.start();
MailboxListener listener = newListener();
@@ -606,6 +627,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
eventBus.stop();
eventBus2.stop();
eventBus3.stop();
+ eventBusWithKeyHandlerNotStarted.stop();
assertThat(rabbitManagementAPI.listExchanges())
.anySatisfy(exchange -> assertThat(exchange.getName()).isEqualTo(MAILBOX_EVENT_EXCHANGE_NAME));
@@ -618,6 +640,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
eventBus.stop();
eventBus2.stop();
eventBus3.stop();
+ eventBusWithKeyHandlerNotStarted.stop();
assertThat(rabbitManagementAPI.listQueues())
.anySatisfy(queue -> assertThat(queue.getName()).contains(GroupA.class.getName()));
@@ -628,6 +651,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
eventBus.stop();
eventBus2.stop();
eventBus3.stop();
+ eventBusWithKeyHandlerNotStarted.stop();
assertThat(rabbitManagementAPI.listQueues())
.filteredOn(queue -> !queue.getName().startsWith(MAILBOX_EVENT_WORK_QUEUE_PREFIX))
diff --git a/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQEventDeadLettersIntegrationTest.java b/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQEventDeadLettersIntegrationTest.java
index 18060c8..36dd401 100644
--- a/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQEventDeadLettersIntegrationTest.java
+++ b/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQEventDeadLettersIntegrationTest.java
@@ -179,7 +179,7 @@ class RabbitMQEventDeadLettersIntegrationTest {
//This value is duplicated from default configuration to ensure we keep the same behavior over time
//unless we really want to change that default value
- private static final int MAX_RETRIES = 3;
+ private static final int MAX_RETRIES = 8;
private static final String DOMAIN = "domain.tld";
private static final String BOB = "bob@" + DOMAIN;
@@ -220,7 +220,7 @@ class RabbitMQEventDeadLettersIntegrationTest {
}
private String retrieveFirstFailedInsertionId() {
- calmlyAwait.atMost(TEN_SECONDS)
+ calmlyAwait.atMost(ONE_MINUTE)
.untilAsserted(() ->
when()
.get(EventDeadLettersRoutes.BASE_PATH + "/groups/" + GROUP_ID)
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org
[james-project] 02/11: JAMES-3082 add retry to make event bus test
when rabbitmq restart pass
Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 1ebd654240b4bcc2be1437e19e7f9aab7ed3ccc1
Author: Rémi KOWALSKI <rk...@linagora.com>
AuthorDate: Mon Mar 2 17:41:30 2020 +0100
JAMES-3082 add retry to make event bus test when rabbitmq restart pass
---
.../mailbox/events/RetryBackoffConfiguration.java | 3 ++-
.../james/mailbox/events/GroupRegistration.java | 4 ++++
.../james/mailbox/events/KeyRegistrationHandler.java | 20 ++++++++++++++++----
.../james/mailbox/events/RabbitMQEventBus.java | 2 +-
4 files changed, 23 insertions(+), 6 deletions(-)
diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/events/RetryBackoffConfiguration.java b/mailbox/api/src/main/java/org/apache/james/mailbox/events/RetryBackoffConfiguration.java
index f802d5e..a674a28 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/events/RetryBackoffConfiguration.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/events/RetryBackoffConfiguration.java
@@ -63,8 +63,9 @@ public class RetryBackoffConfiguration {
}
static final double DEFAULT_JITTER_FACTOR = 0.5;
- static final int DEFAULT_MAX_RETRIES = 3;
+ static final int DEFAULT_MAX_RETRIES = 8;
static final Duration DEFAULT_FIRST_BACKOFF = Duration.ofMillis(100);
+ static final Duration FOREVER = Duration.ofMillis(Long.MAX_VALUE);
public static final RetryBackoffConfiguration DEFAULT = new RetryBackoffConfiguration(
DEFAULT_MAX_RETRIES,
DEFAULT_FIRST_BACKOFF,
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
index da0fe77..8d1b7aa 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
@@ -26,6 +26,7 @@ import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE;
import static org.apache.james.backends.rabbitmq.Constants.NO_ARGUMENTS;
import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT;
import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT_EXCHANGE_NAME;
+import static org.apache.james.mailbox.events.RetryBackoffConfiguration.FOREVER;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
@@ -81,6 +82,7 @@ class GroupRegistration implements Registration {
private final GroupConsumerRetry retryHandler;
private final WaitDelayGenerator delayGenerator;
private final Group group;
+ private final RetryBackoffConfiguration retryBackoff;
private final MailboxListenerExecutor mailboxListenerExecutor;
private Optional<Disposable> receiverSubscriber;
@@ -93,6 +95,7 @@ class GroupRegistration implements Registration {
this.queueName = WorkQueueName.of(group);
this.sender = sender;
this.receiver = receiverProvider.createReceiver();
+ this.retryBackoff = retryBackoff;
this.mailboxListenerExecutor = mailboxListenerExecutor;
this.receiverSubscriber = Optional.empty();
this.unregisterGroup = unregisterGroup;
@@ -106,6 +109,7 @@ class GroupRegistration implements Registration {
.of(createGroupWorkQueue()
.then(retryHandler.createRetryExchange(queueName))
.then(Mono.fromCallable(() -> this.consumeWorkQueue()))
+ .retryBackoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff(), FOREVER, retryBackoff.getJitterFactor(), Schedulers.elastic())
.block());
return this;
}
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
index ae49471..b8789ca 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
@@ -24,6 +24,7 @@ import static org.apache.james.backends.rabbitmq.Constants.DURABLE;
import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE;
import static org.apache.james.backends.rabbitmq.Constants.NO_ARGUMENTS;
import static org.apache.james.mailbox.events.RabbitMQEventBus.EVENT_BUS_ID;
+import static org.apache.james.mailbox.events.RetryBackoffConfiguration.FOREVER;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
@@ -60,9 +61,13 @@ class KeyRegistrationHandler {
private final RegistrationQueueName registrationQueue;
private final RegistrationBinder registrationBinder;
private final MailboxListenerExecutor mailboxListenerExecutor;
+ private final RetryBackoffConfiguration retryBackoff;
private Optional<Disposable> receiverSubscriber;
- KeyRegistrationHandler(EventBusId eventBusId, EventSerializer eventSerializer, Sender sender, ReceiverProvider receiverProvider, RoutingKeyConverter routingKeyConverter, LocalListenerRegistry localListenerRegistry, MailboxListenerExecutor mailboxListenerExecutor) {
+ KeyRegistrationHandler(EventBusId eventBusId, EventSerializer eventSerializer,
+ Sender sender, ReceiverProvider receiverProvider,
+ RoutingKeyConverter routingKeyConverter, LocalListenerRegistry localListenerRegistry,
+ MailboxListenerExecutor mailboxListenerExecutor, RetryBackoffConfiguration retryBackoff) {
this.eventBusId = eventBusId;
this.eventSerializer = eventSerializer;
this.sender = sender;
@@ -70,6 +75,7 @@ class KeyRegistrationHandler {
this.localListenerRegistry = localListenerRegistry;
this.receiver = receiverProvider.createReceiver();
this.mailboxListenerExecutor = mailboxListenerExecutor;
+ this.retryBackoff = retryBackoff;
this.registrationQueue = new RegistrationQueueName();
this.registrationBinder = new RegistrationBinder(sender, registrationQueue);
}
@@ -94,17 +100,23 @@ class KeyRegistrationHandler {
receiverSubscriber.filter(subscriber -> !subscriber.isDisposed())
.ifPresent(Disposable::dispose);
receiver.close();
- sender.delete(QueueSpecification.queue(registrationQueue.asString())).block();
+ sender.delete(QueueSpecification.queue(registrationQueue.asString()))
+ .retryBackoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff(), FOREVER, retryBackoff.getJitterFactor(), Schedulers.elastic())
+ .block();
}
Registration register(MailboxListener listener, RegistrationKey key) {
LocalListenerRegistry.LocalRegistration registration = localListenerRegistry.addListener(key, listener);
if (registration.isFirstListener()) {
- registrationBinder.bind(key).block();
+ registrationBinder.bind(key)
+ .retryBackoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff(), FOREVER, retryBackoff.getJitterFactor(), Schedulers.elastic())
+ .block();
}
return new KeyRegistration(() -> {
if (registration.unregister().lastListenerRemoved()) {
- registrationBinder.unbind(key).block();
+ registrationBinder.unbind(key)
+ .retryBackoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff(), FOREVER, retryBackoff.getJitterFactor(), Schedulers.elastic())
+ .block();
}
});
}
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
index ecc1b4f..01c2b23 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
@@ -76,7 +76,7 @@ public class RabbitMQEventBus implements EventBus, Startable {
if (!isRunning && !isStopping) {
LocalListenerRegistry localListenerRegistry = new LocalListenerRegistry();
- keyRegistrationHandler = new KeyRegistrationHandler(eventBusId, eventSerializer, sender, receiverProvider, routingKeyConverter, localListenerRegistry, mailboxListenerExecutor);
+ keyRegistrationHandler = new KeyRegistrationHandler(eventBusId, eventSerializer, sender, receiverProvider, routingKeyConverter, localListenerRegistry, mailboxListenerExecutor, retryBackoff);
groupRegistrationHandler = new GroupRegistrationHandler(eventSerializer, sender, receiverProvider, retryBackoff, eventDeadLetters, mailboxListenerExecutor);
eventDispatcher = new EventDispatcher(eventBusId, eventSerializer, sender, localListenerRegistry, mailboxListenerExecutor);
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org