You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ro...@apache.org on 2020/07/07 16:02:41 UTC
[james-project] 03/05: JAMES-3299 RabbitMQ EventBus key
registration should not hang
This is an automated email from the ASF dual-hosted git repository.
rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 147aded8fac341ecdcd86026eb8654dc85b56561
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue Jul 7 08:30:41 2020 +0700
JAMES-3299 RabbitMQ EventBus key registration should not hang
Debugging proved bind / unbind operation can hang. Upon such hangs,
timing out will allow to retry the given operation.
---
.../james/mailbox/events/KeyRegistrationHandler.java | 17 ++++++++++++-----
1 file changed, 12 insertions(+), 5 deletions(-)
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
index 5add858..8b79d06 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
@@ -58,6 +58,8 @@ class KeyRegistrationHandler {
private static final Duration EXPIRATION_TIMEOUT = Duration.ofMinutes(30);
private static final Map<String, Object> QUEUE_ARGUMENTS = ImmutableMap.of("x-expires", EXPIRATION_TIMEOUT.toMillis());
+ private static final Duration TOPOLOGY_CHANGES_TIMEOUT = Duration.ofMinutes(1);
+
private final EventBusId eventBusId;
private final LocalListenerRegistry localListenerRegistry;
private final EventSerializer eventSerializer;
@@ -100,11 +102,13 @@ class KeyRegistrationHandler {
@VisibleForTesting
void declareQueue() {
- sender.declareQueue(QueueSpecification.queue(EVENTBUS_QUEUE_NAME_PREFIX + eventBusId.asString())
- .durable(DURABLE)
- .exclusive(!EXCLUSIVE)
- .autoDelete(AUTO_DELETE)
- .arguments(QUEUE_ARGUMENTS))
+ sender.declareQueue(
+ QueueSpecification.queue(EVENTBUS_QUEUE_NAME_PREFIX + eventBusId.asString())
+ .durable(DURABLE)
+ .exclusive(!EXCLUSIVE)
+ .autoDelete(AUTO_DELETE)
+ .arguments(QUEUE_ARGUMENTS))
+ .timeout(TOPOLOGY_CHANGES_TIMEOUT)
.map(AMQP.Queue.DeclareOk::getQueue)
.retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor()))
.doOnSuccess(queueName -> {
@@ -121,6 +125,7 @@ class KeyRegistrationHandler {
.ifPresent(Disposable::dispose);
receiver.close();
sender.delete(QueueSpecification.queue(registrationQueue.asString()))
+ .timeout(TOPOLOGY_CHANGES_TIMEOUT)
.retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor()).scheduler(Schedulers.elastic()))
.block();
}
@@ -132,6 +137,7 @@ class KeyRegistrationHandler {
.thenReturn(new KeyRegistration(() -> {
if (registration.unregister().lastListenerRemoved()) {
registrationBinder.unbind(key)
+ .timeout(TOPOLOGY_CHANGES_TIMEOUT)
.retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor()).scheduler(Schedulers.elastic()))
.subscribeOn(Schedulers.elastic())
.block();
@@ -142,6 +148,7 @@ class KeyRegistrationHandler {
private Mono<Void> registerIfNeeded(RegistrationKey key, LocalListenerRegistry.LocalRegistration registration) {
if (registration.isFirstListener()) {
return registrationBinder.bind(key)
+ .timeout(TOPOLOGY_CHANGES_TIMEOUT)
.retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor()).scheduler(Schedulers.elastic()));
}
return Mono.empty();
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org