You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ro...@apache.org on 2020/07/07 16:02:41 UTC

[james-project] 03/05: JAMES-3299 RabbitMQ EventBus key registration should not hang

This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 147aded8fac341ecdcd86026eb8654dc85b56561
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue Jul 7 08:30:41 2020 +0700

    JAMES-3299 RabbitMQ EventBus key registration should not hang
    
    Debugging proved bind / unbind operation can hang. Upon such hangs,
    timing out will allow to retry the given operation.
---
 .../james/mailbox/events/KeyRegistrationHandler.java    | 17 ++++++++++++-----
 1 file changed, 12 insertions(+), 5 deletions(-)

diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
index 5add858..8b79d06 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
@@ -58,6 +58,8 @@ class KeyRegistrationHandler {
     private static final Duration EXPIRATION_TIMEOUT = Duration.ofMinutes(30);
     private static final Map<String, Object> QUEUE_ARGUMENTS = ImmutableMap.of("x-expires", EXPIRATION_TIMEOUT.toMillis());
 
+    private static final Duration TOPOLOGY_CHANGES_TIMEOUT = Duration.ofMinutes(1);
+
     private final EventBusId eventBusId;
     private final LocalListenerRegistry localListenerRegistry;
     private final EventSerializer eventSerializer;
@@ -100,11 +102,13 @@ class KeyRegistrationHandler {
 
     @VisibleForTesting
     void declareQueue() {
-        sender.declareQueue(QueueSpecification.queue(EVENTBUS_QUEUE_NAME_PREFIX + eventBusId.asString())
-            .durable(DURABLE)
-            .exclusive(!EXCLUSIVE)
-            .autoDelete(AUTO_DELETE)
-            .arguments(QUEUE_ARGUMENTS))
+        sender.declareQueue(
+            QueueSpecification.queue(EVENTBUS_QUEUE_NAME_PREFIX + eventBusId.asString())
+                .durable(DURABLE)
+                .exclusive(!EXCLUSIVE)
+                .autoDelete(AUTO_DELETE)
+                .arguments(QUEUE_ARGUMENTS))
+            .timeout(TOPOLOGY_CHANGES_TIMEOUT)
             .map(AMQP.Queue.DeclareOk::getQueue)
             .retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor()))
             .doOnSuccess(queueName -> {
@@ -121,6 +125,7 @@ class KeyRegistrationHandler {
             .ifPresent(Disposable::dispose);
         receiver.close();
         sender.delete(QueueSpecification.queue(registrationQueue.asString()))
+            .timeout(TOPOLOGY_CHANGES_TIMEOUT)
             .retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor()).scheduler(Schedulers.elastic()))
             .block();
     }
@@ -132,6 +137,7 @@ class KeyRegistrationHandler {
             .thenReturn(new KeyRegistration(() -> {
                 if (registration.unregister().lastListenerRemoved()) {
                     registrationBinder.unbind(key)
+                        .timeout(TOPOLOGY_CHANGES_TIMEOUT)
                         .retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor()).scheduler(Schedulers.elastic()))
                         .subscribeOn(Schedulers.elastic())
                         .block();
@@ -142,6 +148,7 @@ class KeyRegistrationHandler {
     private Mono<Void> registerIfNeeded(RegistrationKey key, LocalListenerRegistry.LocalRegistration registration) {
         if (registration.isFirstListener()) {
             return registrationBinder.bind(key)
+                .timeout(TOPOLOGY_CHANGES_TIMEOUT)
                 .retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor()).scheduler(Schedulers.elastic()));
         }
         return Mono.empty();


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org