You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/03/07 11:25:39 UTC

[james-project] 02/11: JAMES-3082 add retry to make event bus test when rabbitmq restart pass

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 1ebd654240b4bcc2be1437e19e7f9aab7ed3ccc1
Author: RĂ©mi KOWALSKI <rk...@linagora.com>
AuthorDate: Mon Mar 2 17:41:30 2020 +0100

    JAMES-3082 add retry to make event bus test when rabbitmq restart pass
---
 .../mailbox/events/RetryBackoffConfiguration.java    |  3 ++-
 .../james/mailbox/events/GroupRegistration.java      |  4 ++++
 .../james/mailbox/events/KeyRegistrationHandler.java | 20 ++++++++++++++++----
 .../james/mailbox/events/RabbitMQEventBus.java       |  2 +-
 4 files changed, 23 insertions(+), 6 deletions(-)

diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/events/RetryBackoffConfiguration.java b/mailbox/api/src/main/java/org/apache/james/mailbox/events/RetryBackoffConfiguration.java
index f802d5e..a674a28 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/events/RetryBackoffConfiguration.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/events/RetryBackoffConfiguration.java
@@ -63,8 +63,9 @@ public class RetryBackoffConfiguration {
     }
 
     static final double DEFAULT_JITTER_FACTOR = 0.5;
-    static final int DEFAULT_MAX_RETRIES = 3;
+    static final int DEFAULT_MAX_RETRIES = 8;
     static final Duration DEFAULT_FIRST_BACKOFF = Duration.ofMillis(100);
+    static final Duration FOREVER = Duration.ofMillis(Long.MAX_VALUE);
     public static final RetryBackoffConfiguration DEFAULT = new RetryBackoffConfiguration(
         DEFAULT_MAX_RETRIES,
         DEFAULT_FIRST_BACKOFF,
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
index da0fe77..8d1b7aa 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
@@ -26,6 +26,7 @@ import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE;
 import static org.apache.james.backends.rabbitmq.Constants.NO_ARGUMENTS;
 import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT;
 import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT_EXCHANGE_NAME;
+import static org.apache.james.mailbox.events.RetryBackoffConfiguration.FOREVER;
 
 import java.nio.charset.StandardCharsets;
 import java.util.Objects;
@@ -81,6 +82,7 @@ class GroupRegistration implements Registration {
     private final GroupConsumerRetry retryHandler;
     private final WaitDelayGenerator delayGenerator;
     private final Group group;
+    private final RetryBackoffConfiguration retryBackoff;
     private final MailboxListenerExecutor mailboxListenerExecutor;
     private Optional<Disposable> receiverSubscriber;
 
@@ -93,6 +95,7 @@ class GroupRegistration implements Registration {
         this.queueName = WorkQueueName.of(group);
         this.sender = sender;
         this.receiver = receiverProvider.createReceiver();
+        this.retryBackoff = retryBackoff;
         this.mailboxListenerExecutor = mailboxListenerExecutor;
         this.receiverSubscriber = Optional.empty();
         this.unregisterGroup = unregisterGroup;
@@ -106,6 +109,7 @@ class GroupRegistration implements Registration {
             .of(createGroupWorkQueue()
                 .then(retryHandler.createRetryExchange(queueName))
                 .then(Mono.fromCallable(() -> this.consumeWorkQueue()))
+                .retryBackoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff(), FOREVER, retryBackoff.getJitterFactor(), Schedulers.elastic())
                 .block());
         return this;
     }
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
index ae49471..b8789ca 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
@@ -24,6 +24,7 @@ import static org.apache.james.backends.rabbitmq.Constants.DURABLE;
 import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE;
 import static org.apache.james.backends.rabbitmq.Constants.NO_ARGUMENTS;
 import static org.apache.james.mailbox.events.RabbitMQEventBus.EVENT_BUS_ID;
+import static org.apache.james.mailbox.events.RetryBackoffConfiguration.FOREVER;
 
 import java.nio.charset.StandardCharsets;
 import java.util.Optional;
@@ -60,9 +61,13 @@ class KeyRegistrationHandler {
     private final RegistrationQueueName registrationQueue;
     private final RegistrationBinder registrationBinder;
     private final MailboxListenerExecutor mailboxListenerExecutor;
+    private final RetryBackoffConfiguration retryBackoff;
     private Optional<Disposable> receiverSubscriber;
 
-    KeyRegistrationHandler(EventBusId eventBusId, EventSerializer eventSerializer, Sender sender, ReceiverProvider receiverProvider, RoutingKeyConverter routingKeyConverter, LocalListenerRegistry localListenerRegistry, MailboxListenerExecutor mailboxListenerExecutor) {
+    KeyRegistrationHandler(EventBusId eventBusId, EventSerializer eventSerializer,
+                           Sender sender, ReceiverProvider receiverProvider,
+                           RoutingKeyConverter routingKeyConverter, LocalListenerRegistry localListenerRegistry,
+                           MailboxListenerExecutor mailboxListenerExecutor, RetryBackoffConfiguration retryBackoff) {
         this.eventBusId = eventBusId;
         this.eventSerializer = eventSerializer;
         this.sender = sender;
@@ -70,6 +75,7 @@ class KeyRegistrationHandler {
         this.localListenerRegistry = localListenerRegistry;
         this.receiver = receiverProvider.createReceiver();
         this.mailboxListenerExecutor = mailboxListenerExecutor;
+        this.retryBackoff = retryBackoff;
         this.registrationQueue = new RegistrationQueueName();
         this.registrationBinder = new RegistrationBinder(sender, registrationQueue);
     }
@@ -94,17 +100,23 @@ class KeyRegistrationHandler {
         receiverSubscriber.filter(subscriber -> !subscriber.isDisposed())
             .ifPresent(Disposable::dispose);
         receiver.close();
-        sender.delete(QueueSpecification.queue(registrationQueue.asString())).block();
+        sender.delete(QueueSpecification.queue(registrationQueue.asString()))
+            .retryBackoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff(), FOREVER, retryBackoff.getJitterFactor(), Schedulers.elastic())
+            .block();
     }
 
     Registration register(MailboxListener listener, RegistrationKey key) {
         LocalListenerRegistry.LocalRegistration registration = localListenerRegistry.addListener(key, listener);
         if (registration.isFirstListener()) {
-            registrationBinder.bind(key).block();
+            registrationBinder.bind(key)
+                .retryBackoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff(), FOREVER, retryBackoff.getJitterFactor(), Schedulers.elastic())
+                .block();
         }
         return new KeyRegistration(() -> {
             if (registration.unregister().lastListenerRemoved()) {
-                registrationBinder.unbind(key).block();
+                registrationBinder.unbind(key)
+                    .retryBackoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff(), FOREVER, retryBackoff.getJitterFactor(), Schedulers.elastic())
+                    .block();
             }
         });
     }
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
index ecc1b4f..01c2b23 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
@@ -76,7 +76,7 @@ public class RabbitMQEventBus implements EventBus, Startable {
         if (!isRunning && !isStopping) {
 
             LocalListenerRegistry localListenerRegistry = new LocalListenerRegistry();
-            keyRegistrationHandler = new KeyRegistrationHandler(eventBusId, eventSerializer, sender, receiverProvider, routingKeyConverter, localListenerRegistry, mailboxListenerExecutor);
+            keyRegistrationHandler = new KeyRegistrationHandler(eventBusId, eventSerializer, sender, receiverProvider, routingKeyConverter, localListenerRegistry, mailboxListenerExecutor, retryBackoff);
             groupRegistrationHandler = new GroupRegistrationHandler(eventSerializer, sender, receiverProvider, retryBackoff, eventDeadLetters, mailboxListenerExecutor);
             eventDispatcher = new EventDispatcher(eventBusId, eventSerializer, sender, localListenerRegistry, mailboxListenerExecutor);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org