You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/03/07 11:25:39 UTC
[james-project] 02/11: JAMES-3082 add retry to make event bus test
when rabbitmq restart pass
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 1ebd654240b4bcc2be1437e19e7f9aab7ed3ccc1
Author: RĂ©mi KOWALSKI <rk...@linagora.com>
AuthorDate: Mon Mar 2 17:41:30 2020 +0100
JAMES-3082 add retry to make event bus test when rabbitmq restart pass
---
.../mailbox/events/RetryBackoffConfiguration.java | 3 ++-
.../james/mailbox/events/GroupRegistration.java | 4 ++++
.../james/mailbox/events/KeyRegistrationHandler.java | 20 ++++++++++++++++----
.../james/mailbox/events/RabbitMQEventBus.java | 2 +-
4 files changed, 23 insertions(+), 6 deletions(-)
diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/events/RetryBackoffConfiguration.java b/mailbox/api/src/main/java/org/apache/james/mailbox/events/RetryBackoffConfiguration.java
index f802d5e..a674a28 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/events/RetryBackoffConfiguration.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/events/RetryBackoffConfiguration.java
@@ -63,8 +63,9 @@ public class RetryBackoffConfiguration {
}
static final double DEFAULT_JITTER_FACTOR = 0.5;
- static final int DEFAULT_MAX_RETRIES = 3;
+ static final int DEFAULT_MAX_RETRIES = 8;
static final Duration DEFAULT_FIRST_BACKOFF = Duration.ofMillis(100);
+ static final Duration FOREVER = Duration.ofMillis(Long.MAX_VALUE);
public static final RetryBackoffConfiguration DEFAULT = new RetryBackoffConfiguration(
DEFAULT_MAX_RETRIES,
DEFAULT_FIRST_BACKOFF,
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
index da0fe77..8d1b7aa 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
@@ -26,6 +26,7 @@ import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE;
import static org.apache.james.backends.rabbitmq.Constants.NO_ARGUMENTS;
import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT;
import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT_EXCHANGE_NAME;
+import static org.apache.james.mailbox.events.RetryBackoffConfiguration.FOREVER;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
@@ -81,6 +82,7 @@ class GroupRegistration implements Registration {
private final GroupConsumerRetry retryHandler;
private final WaitDelayGenerator delayGenerator;
private final Group group;
+ private final RetryBackoffConfiguration retryBackoff;
private final MailboxListenerExecutor mailboxListenerExecutor;
private Optional<Disposable> receiverSubscriber;
@@ -93,6 +95,7 @@ class GroupRegistration implements Registration {
this.queueName = WorkQueueName.of(group);
this.sender = sender;
this.receiver = receiverProvider.createReceiver();
+ this.retryBackoff = retryBackoff;
this.mailboxListenerExecutor = mailboxListenerExecutor;
this.receiverSubscriber = Optional.empty();
this.unregisterGroup = unregisterGroup;
@@ -106,6 +109,7 @@ class GroupRegistration implements Registration {
.of(createGroupWorkQueue()
.then(retryHandler.createRetryExchange(queueName))
.then(Mono.fromCallable(() -> this.consumeWorkQueue()))
+ .retryBackoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff(), FOREVER, retryBackoff.getJitterFactor(), Schedulers.elastic())
.block());
return this;
}
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
index ae49471..b8789ca 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
@@ -24,6 +24,7 @@ import static org.apache.james.backends.rabbitmq.Constants.DURABLE;
import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE;
import static org.apache.james.backends.rabbitmq.Constants.NO_ARGUMENTS;
import static org.apache.james.mailbox.events.RabbitMQEventBus.EVENT_BUS_ID;
+import static org.apache.james.mailbox.events.RetryBackoffConfiguration.FOREVER;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
@@ -60,9 +61,13 @@ class KeyRegistrationHandler {
private final RegistrationQueueName registrationQueue;
private final RegistrationBinder registrationBinder;
private final MailboxListenerExecutor mailboxListenerExecutor;
+ private final RetryBackoffConfiguration retryBackoff;
private Optional<Disposable> receiverSubscriber;
- KeyRegistrationHandler(EventBusId eventBusId, EventSerializer eventSerializer, Sender sender, ReceiverProvider receiverProvider, RoutingKeyConverter routingKeyConverter, LocalListenerRegistry localListenerRegistry, MailboxListenerExecutor mailboxListenerExecutor) {
+ KeyRegistrationHandler(EventBusId eventBusId, EventSerializer eventSerializer,
+ Sender sender, ReceiverProvider receiverProvider,
+ RoutingKeyConverter routingKeyConverter, LocalListenerRegistry localListenerRegistry,
+ MailboxListenerExecutor mailboxListenerExecutor, RetryBackoffConfiguration retryBackoff) {
this.eventBusId = eventBusId;
this.eventSerializer = eventSerializer;
this.sender = sender;
@@ -70,6 +75,7 @@ class KeyRegistrationHandler {
this.localListenerRegistry = localListenerRegistry;
this.receiver = receiverProvider.createReceiver();
this.mailboxListenerExecutor = mailboxListenerExecutor;
+ this.retryBackoff = retryBackoff;
this.registrationQueue = new RegistrationQueueName();
this.registrationBinder = new RegistrationBinder(sender, registrationQueue);
}
@@ -94,17 +100,23 @@ class KeyRegistrationHandler {
receiverSubscriber.filter(subscriber -> !subscriber.isDisposed())
.ifPresent(Disposable::dispose);
receiver.close();
- sender.delete(QueueSpecification.queue(registrationQueue.asString())).block();
+ sender.delete(QueueSpecification.queue(registrationQueue.asString()))
+ .retryBackoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff(), FOREVER, retryBackoff.getJitterFactor(), Schedulers.elastic())
+ .block();
}
Registration register(MailboxListener listener, RegistrationKey key) {
LocalListenerRegistry.LocalRegistration registration = localListenerRegistry.addListener(key, listener);
if (registration.isFirstListener()) {
- registrationBinder.bind(key).block();
+ registrationBinder.bind(key)
+ .retryBackoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff(), FOREVER, retryBackoff.getJitterFactor(), Schedulers.elastic())
+ .block();
}
return new KeyRegistration(() -> {
if (registration.unregister().lastListenerRemoved()) {
- registrationBinder.unbind(key).block();
+ registrationBinder.unbind(key)
+ .retryBackoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff(), FOREVER, retryBackoff.getJitterFactor(), Schedulers.elastic())
+ .block();
}
});
}
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
index ecc1b4f..01c2b23 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
@@ -76,7 +76,7 @@ public class RabbitMQEventBus implements EventBus, Startable {
if (!isRunning && !isStopping) {
LocalListenerRegistry localListenerRegistry = new LocalListenerRegistry();
- keyRegistrationHandler = new KeyRegistrationHandler(eventBusId, eventSerializer, sender, receiverProvider, routingKeyConverter, localListenerRegistry, mailboxListenerExecutor);
+ keyRegistrationHandler = new KeyRegistrationHandler(eventBusId, eventSerializer, sender, receiverProvider, routingKeyConverter, localListenerRegistry, mailboxListenerExecutor, retryBackoff);
groupRegistrationHandler = new GroupRegistrationHandler(eventSerializer, sender, receiverProvider, retryBackoff, eventDeadLetters, mailboxListenerExecutor);
eventDispatcher = new EventDispatcher(eventBusId, eventSerializer, sender, localListenerRegistry, mailboxListenerExecutor);
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org