You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2023/01/31 00:13:06 UTC

[james-project] branch master updated (216315e784 -> 0e376b9ca7)

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


    from 216315e784 [FIX] DelegationStoreAuthorizator::isAdministrator should fallback to false
     new b0cd610c55 JAMES-3694 Apply queue expiracy only for per-node queues
     new 0e376b9ca7 JAMES-3694 Apply queue expiracy for the task manager

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../backends/rabbitmq/RabbitMQConfiguration.java   |  1 -
 .../james/events/KeyReconnectionHandler.java       |  6 ++-
 .../james/events/KeyRegistrationHandler.java       |  5 ++-
 .../org/apache/james/task/TaskManagerContract.java | 52 +++++++++++-----------
 .../distributed/RabbitMQTerminationSubscriber.java | 10 ++++-
 .../distributed/RabbitMQWorkQueue.java             | 17 +++++--
 .../RabbitMQWorkQueueReconnectionHandler.java      | 11 +++--
 .../TerminationReconnectionHandler.java            | 11 +++--
 .../distributed/RabbitMQWorkQueueSupplier.scala    |  7 +--
 .../distributed/DistributedTaskManagerTest.java    | 43 +++++++++---------
 .../RabbitMQTerminationSubscriberTest.java         | 10 ++---
 .../RabbitMQWorkQueuePersistenceTest.java          |  8 ++--
 .../distributed/RabbitMQWorkQueueTest.java         | 12 ++---
 .../TerminationSubscriberContract.java             | 16 +++----
 14 files changed, 121 insertions(+), 88 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 02/02: JAMES-3694 Apply queue expiracy for the task manager

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 0e376b9ca76f53548447fd4fa246bb20eb3fcb30
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Jan 27 14:06:56 2023 +0700

    JAMES-3694 Apply queue expiracy for the task manager
    
    Safer than auto-deletes
---
 .../org/apache/james/task/TaskManagerContract.java | 52 +++++++++++-----------
 .../distributed/RabbitMQTerminationSubscriber.java | 10 ++++-
 .../distributed/RabbitMQWorkQueue.java             | 17 +++++--
 .../RabbitMQWorkQueueReconnectionHandler.java      | 11 +++--
 .../TerminationReconnectionHandler.java            | 11 +++--
 .../distributed/RabbitMQWorkQueueSupplier.scala    |  7 +--
 .../distributed/DistributedTaskManagerTest.java    | 43 +++++++++---------
 .../RabbitMQTerminationSubscriberTest.java         | 10 ++---
 .../RabbitMQWorkQueuePersistenceTest.java          |  8 ++--
 .../distributed/RabbitMQWorkQueueTest.java         | 12 ++---
 .../TerminationSubscriberContract.java             | 16 +++----
 11 files changed, 113 insertions(+), 84 deletions(-)

diff --git a/server/task/task-api/src/test/java/org/apache/james/task/TaskManagerContract.java b/server/task/task-api/src/test/java/org/apache/james/task/TaskManagerContract.java
index 3e5b4a18ad..2a092c735a 100644
--- a/server/task/task-api/src/test/java/org/apache/james/task/TaskManagerContract.java
+++ b/server/task/task-api/src/test/java/org/apache/james/task/TaskManagerContract.java
@@ -47,10 +47,10 @@ public interface TaskManagerContract {
     ConditionFactory awaitAtMostTwoSeconds = calmlyAwait.atMost(Duration.ofSeconds(2));
     java.time.Duration TIMEOUT = java.time.Duration.ofMinutes(15);
 
-    TaskManager taskManager();
+    TaskManager taskManager() throws Exception;
 
     @Test
-    default void submitShouldReturnATaskId() {
+    default void submitShouldReturnATaskId() throws Exception {
         TaskId taskId = taskManager().submit(new CompletedTask());
         assertThat(taskId).isNotNull();
     }
@@ -63,7 +63,7 @@ public interface TaskManagerContract {
     }
 
     @Test
-    default void getStatusShouldReturnWaitingWhenNotYetProcessed(CountDownLatch waitingForResultLatch) {
+    default void getStatusShouldReturnWaitingWhenNotYetProcessed(CountDownLatch waitingForResultLatch) throws Exception {
         TaskManager taskManager = taskManager();
         taskManager.submit(new MemoryReferenceTask(() -> {
             waitingForResultLatch.await();
@@ -77,7 +77,7 @@ public interface TaskManagerContract {
     }
 
     @Test
-    default void taskCodeAfterCancelIsNotRun(CountDownLatch waitingForResultLatch) throws InterruptedException {
+    default void taskCodeAfterCancelIsNotRun(CountDownLatch waitingForResultLatch) throws Exception {
         TaskManager taskManager = taskManager();
         CountDownLatch waitForTaskToBeLaunched = new CountDownLatch(1);
         AtomicInteger count = new AtomicInteger(0);
@@ -98,7 +98,7 @@ public interface TaskManagerContract {
     }
 
     @Test
-    default void completedTaskShouldNotBeCancelled() {
+    default void completedTaskShouldNotBeCancelled() throws Exception {
         TaskManager taskManager = taskManager();
         TaskId id = taskManager.submit(new CompletedTask());
 
@@ -114,7 +114,7 @@ public interface TaskManagerContract {
     }
 
     @Test
-    default void failedTaskShouldNotBeCancelled() {
+    default void failedTaskShouldNotBeCancelled() throws Exception {
         TaskManager taskManager = taskManager();
         TaskId id = taskManager.submit(new FailedTask());
 
@@ -130,7 +130,7 @@ public interface TaskManagerContract {
     }
 
     @Test
-    default void getStatusShouldBeCancelledWhenCancelled(CountDownLatch countDownLatch) {
+    default void getStatusShouldBeCancelledWhenCancelled(CountDownLatch countDownLatch) throws Exception {
         TaskManager taskManager = taskManager();
         TaskId id = taskManager.submit(new MemoryReferenceTask(() -> {
             countDownLatch.await();
@@ -152,7 +152,7 @@ public interface TaskManagerContract {
     }
 
     @Test
-    default void aWaitingTaskShouldBeCancelled(CountDownLatch countDownLatch) {
+    default void aWaitingTaskShouldBeCancelled(CountDownLatch countDownLatch) throws Exception {
         TaskManager taskManager = taskManager();
         TaskId id = taskManager.submit(new MemoryReferenceTask(() -> {
             countDownLatch.await();
@@ -176,7 +176,7 @@ public interface TaskManagerContract {
     }
 
     @Test
-    default void cancelShouldBeIdempotent(CountDownLatch waitingForResultLatch) {
+    default void cancelShouldBeIdempotent(CountDownLatch waitingForResultLatch) throws Exception {
         TaskManager taskManager = taskManager();
         TaskId id = taskManager.submit(new MemoryReferenceTask(() -> {
             waitingForResultLatch.await();
@@ -189,7 +189,7 @@ public interface TaskManagerContract {
     }
 
     @Test
-    default void getStatusShouldReturnInProgressWhenProcessingIsInProgress(CountDownLatch waitingForResultLatch) {
+    default void getStatusShouldReturnInProgressWhenProcessingIsInProgress(CountDownLatch waitingForResultLatch) throws Exception {
         TaskManager taskManager = taskManager();
         TaskId taskId = taskManager.submit(new MemoryReferenceTask(() -> {
             waitingForResultLatch.await();
@@ -201,7 +201,7 @@ public interface TaskManagerContract {
     }
 
     @Test
-    default void getStatusShouldReturnCompletedWhenRunSuccessfully() {
+    default void getStatusShouldReturnCompletedWhenRunSuccessfully() throws Exception {
         TaskManager taskManager = taskManager();
         TaskId taskId = taskManager.submit(
             new CompletedTask());
@@ -212,7 +212,7 @@ public interface TaskManagerContract {
     }
 
     @Test
-    default void additionalInformationShouldBeUpdatedWhenRunSuccessfully() {
+    default void additionalInformationShouldBeUpdatedWhenRunSuccessfully() throws Exception {
         TaskManager taskManager = taskManager();
         TaskId taskId = taskManager.submit(new MemoryReferenceWithCounterTask(counter -> {
             counter.incrementAndGet();
@@ -230,7 +230,7 @@ public interface TaskManagerContract {
     }
 
     @Test
-    default void additionalInformationShouldBeUpdatedWhenFailed() {
+    default void additionalInformationShouldBeUpdatedWhenFailed() throws Exception {
         TaskManager taskManager = taskManager();
         TaskId taskId = taskManager.submit(new MemoryReferenceWithCounterTask(counter -> {
             counter.incrementAndGet();
@@ -248,7 +248,7 @@ public interface TaskManagerContract {
     }
 
     @Test
-    default void additionalInformationShouldBeUpdatedWhenCancelled(CountDownLatch countDownLatch) {
+    default void additionalInformationShouldBeUpdatedWhenCancelled(CountDownLatch countDownLatch) throws Exception {
         TaskManager taskManager = taskManager();
         TaskId id = taskManager.submit(new MemoryReferenceWithCounterTask((counter) -> {
             counter.incrementAndGet();
@@ -273,7 +273,7 @@ public interface TaskManagerContract {
     }
 
     @Test
-    default void additionalInformationShouldBeUpdatedDuringExecution(CountDownLatch countDownLatch) {
+    default void additionalInformationShouldBeUpdatedDuringExecution(CountDownLatch countDownLatch) throws Exception {
         TaskManager taskManager = taskManager();
         TaskId id = taskManager.submit(new MemoryReferenceWithCounterTask((counter) -> {
             counter.incrementAndGet();
@@ -288,7 +288,7 @@ public interface TaskManagerContract {
     }
 
     @Test
-    default void additionalInformationShouldBeAvailableOnAnyTaskManagerDuringExecution(CountDownLatch countDownLatch) {
+    default void additionalInformationShouldBeAvailableOnAnyTaskManagerDuringExecution(CountDownLatch countDownLatch) throws Exception {
         TaskManager taskManager = taskManager();
         TaskManager otherTaskManager = taskManager();
         TaskId id = taskManager.submit(new MemoryReferenceWithCounterTask((counter) -> {
@@ -312,7 +312,7 @@ public interface TaskManagerContract {
     }
 
     @Test
-    default void getStatusShouldReturnFailedWhenRunPartially() {
+    default void getStatusShouldReturnFailedWhenRunPartially() throws Exception {
         TaskManager taskManager = taskManager();
         TaskId taskId = taskManager.submit(
             new FailedTask());
@@ -493,22 +493,22 @@ public interface TaskManagerContract {
     }
 
     @Test
-    default void listShouldBeEmptyWhenNoTasks() {
+    default void listShouldBeEmptyWhenNoTasks() throws Exception {
         assertThat(taskManager().list()).isEmpty();
     }
 
     @Test
-    default void listCancelledShouldBeEmptyWhenNoTasks() {
+    default void listCancelledShouldBeEmptyWhenNoTasks() throws Exception {
         assertThat(taskManager().list(TaskManager.Status.CANCELLED)).isEmpty();
     }
 
     @Test
-    default void listCancelRequestedShouldBeEmptyWhenNoTasks() {
+    default void listCancelRequestedShouldBeEmptyWhenNoTasks() throws Exception {
         assertThat(taskManager().list(TaskManager.Status.CANCEL_REQUESTED)).isEmpty();
     }
 
     @Test
-    default void awaitShouldNotThrowWhenCompletedTask() throws TaskManager.ReachedTimeoutException {
+    default void awaitShouldNotThrowWhenCompletedTask() throws Exception {
         TaskManager taskManager = taskManager();
         TaskId taskId = taskManager.submit(new CompletedTask());
         taskManager.await(taskId, TIMEOUT);
@@ -516,7 +516,7 @@ public interface TaskManagerContract {
     }
 
     @Test
-    default void awaitShouldAwaitWaitingTask() throws TaskManager.ReachedTimeoutException, InterruptedException {
+    default void awaitShouldAwaitWaitingTask() throws Exception {
         TaskManager taskManager = taskManager();
         CountDownLatch latch = new CountDownLatch(1);
         taskManager.submit(new MemoryReferenceTask(
@@ -537,7 +537,7 @@ public interface TaskManagerContract {
     }
 
     @Test
-    default void awaitWithATooShortTimeoutShouldReturnATimeoutAwaitedTaskExecutionDetailsAndThrow() {
+    default void awaitWithATooShortTimeoutShouldReturnATimeoutAwaitedTaskExecutionDetailsAndThrow() throws Exception {
         TaskManager taskManager = taskManager();
         TaskId taskId = taskManager.submit(new MemoryReferenceTask(
             () -> {
@@ -550,7 +550,7 @@ public interface TaskManagerContract {
     }
 
     @Test
-    default void submittedTaskShouldExecuteSequentially() {
+    default void submittedTaskShouldExecuteSequentially() throws Exception {
         TaskManager taskManager = taskManager();
         ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
 
@@ -580,7 +580,7 @@ public interface TaskManagerContract {
     }
 
     @Test
-    default void awaitShouldReturnFailedWhenExceptionThrown() {
+    default void awaitShouldReturnFailedWhenExceptionThrown() throws Exception {
         TaskManager taskManager = taskManager();
         TaskId taskId = taskManager.submit(new ThrowingTask());
         awaitUntilTaskHasStatus(taskId, TaskManager.Status.FAILED, taskManager);
@@ -589,7 +589,7 @@ public interface TaskManagerContract {
     }
 
     @Test
-    default void getStatusShouldReturnFailedWhenExceptionThrown() {
+    default void getStatusShouldReturnFailedWhenExceptionThrown() throws Exception {
         TaskManager taskManager = taskManager();
         TaskId taskId = taskManager.submit(new ThrowingTask());
         awaitUntilTaskHasStatus(taskId, TaskManager.Status.FAILED, taskManager);
diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
index 212dce28c5..fdc24021cd 100644
--- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
+++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
@@ -32,6 +32,8 @@ import java.util.Optional;
 import javax.annotation.PreDestroy;
 import javax.inject.Inject;
 
+import org.apache.james.backends.rabbitmq.QueueArguments;
+import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
 import org.apache.james.backends.rabbitmq.ReceiverProvider;
 import org.apache.james.eventsourcing.Event;
 import org.apache.james.eventsourcing.eventstore.cassandra.JsonEventSerializer;
@@ -66,22 +68,26 @@ public class RabbitMQTerminationSubscriber implements TerminationSubscriber, Sta
     private final JsonEventSerializer serializer;
     private final Sender sender;
     private final ReceiverProvider receiverProvider;
+    private final RabbitMQConfiguration rabbitMQConfiguration;
     private Sinks.Many<OutboundMessage> sendQueue;
     private Sinks.Many<Event> listener;
     private Disposable sendQueueHandle;
     private Disposable listenQueueHandle;
 
     @Inject
-    RabbitMQTerminationSubscriber(TerminationQueueName queueName, Sender sender, ReceiverProvider receiverProvider, JsonEventSerializer serializer) {
+    RabbitMQTerminationSubscriber(TerminationQueueName queueName, Sender sender, ReceiverProvider receiverProvider, JsonEventSerializer serializer, RabbitMQConfiguration rabbitMQConfiguration) {
         this.queueName = queueName;
         this.sender = sender;
         this.receiverProvider = receiverProvider;
         this.serializer = serializer;
+        this.rabbitMQConfiguration = rabbitMQConfiguration;
     }
 
     public void start() {
         sender.declareExchange(ExchangeSpecification.exchange(EXCHANGE_NAME)).block();
-        sender.declare(QueueSpecification.queue(queueName.asString()).durable(!DURABLE).autoDelete(AUTO_DELETE)).block();
+        QueueArguments.Builder builder = QueueArguments.builder();
+        rabbitMQConfiguration.getQueueTTL().ifPresent(builder::queueTTL);
+        sender.declare(QueueSpecification.queue(queueName.asString()).durable(!DURABLE).autoDelete(!AUTO_DELETE).arguments(builder.build())).block();
         sender.bind(BindingSpecification.binding(EXCHANGE_NAME, ROUTING_KEY, queueName.asString())).block();
         sendQueue = Sinks.many().unicast().onBackpressureBuffer();
         sendQueueHandle = sender
diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
index d49645d0c2..b315e9bec5 100644
--- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
+++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
@@ -32,6 +32,8 @@ import java.time.Duration;
 import java.util.Optional;
 
 import org.apache.james.backends.rabbitmq.Constants;
+import org.apache.james.backends.rabbitmq.QueueArguments;
+import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
 import org.apache.james.backends.rabbitmq.ReceiverProvider;
 import org.apache.james.server.task.json.JsonTaskSerializer;
 import org.apache.james.task.Task;
@@ -81,6 +83,7 @@ public class RabbitMQWorkQueue implements WorkQueue {
     private final TaskManagerWorker worker;
     private final JsonTaskSerializer taskSerializer;
     private final RabbitMQWorkQueueConfiguration configuration;
+    private final RabbitMQConfiguration rabbitMQConfiguration;
     private final Sender sender;
     private final ReceiverProvider receiverProvider;
     private final CancelRequestQueueName cancelRequestQueueName;
@@ -91,13 +94,15 @@ public class RabbitMQWorkQueue implements WorkQueue {
 
     public RabbitMQWorkQueue(TaskManagerWorker worker, Sender sender,
                              ReceiverProvider receiverProvider, JsonTaskSerializer taskSerializer,
-                             RabbitMQWorkQueueConfiguration configuration, CancelRequestQueueName cancelRequestQueueName) {
+                             RabbitMQWorkQueueConfiguration configuration, CancelRequestQueueName cancelRequestQueueName,
+                             RabbitMQConfiguration rabbitMQConfiguration) {
         this.cancelRequestQueueName = cancelRequestQueueName;
         this.worker = worker;
         this.receiverProvider = receiverProvider;
         this.sender = sender;
         this.taskSerializer = taskSerializer;
         this.configuration = configuration;
+        this.rabbitMQConfiguration = rabbitMQConfiguration;
     }
 
     @Override
@@ -193,11 +198,17 @@ public class RabbitMQWorkQueue implements WorkQueue {
 
     private void listenToCancelRequests() {
         sender.declareExchange(ExchangeSpecification.exchange(CANCEL_REQUESTS_EXCHANGE_NAME)).block();
-        sender.declare(QueueSpecification.queue(cancelRequestQueueName.asString()).durable(!DURABLE).autoDelete(AUTO_DELETE)).block();
+        QueueArguments.Builder builder = QueueArguments.builder();
+        rabbitMQConfiguration.getQueueTTL().ifPresent(builder::queueTTL);
+        QueueSpecification specification = QueueSpecification.queue(cancelRequestQueueName.asString())
+            .durable(!DURABLE)
+            .autoDelete(AUTO_DELETE)
+            .arguments(builder.build());
+        sender.declare(specification).block();
         sender.bind(BindingSpecification.binding(CANCEL_REQUESTS_EXCHANGE_NAME, CANCEL_REQUESTS_ROUTING_KEY, cancelRequestQueueName.asString())).block();
         registerCancelRequestsListener(cancelRequestQueueName.asString());
 
-        sendCancelRequestsQueue = Sinks.many().unicast().onBackpressureBuffer();
+        sendCancelRequestsQueue = Sinks.many().multicast().onBackpressureBuffer();
         sendCancelRequestsQueueHandle = sender
             .send(sendCancelRequestsQueue.asFlux().map(this::makeCancelRequestMessage))
             .subscribeOn(Schedulers.boundedElastic())
diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueReconnectionHandler.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueReconnectionHandler.java
index 95633fb89a..6ad3f1def9 100644
--- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueReconnectionHandler.java
+++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueReconnectionHandler.java
@@ -26,13 +26,14 @@ import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE;
 
 import javax.inject.Inject;
 
+import org.apache.james.backends.rabbitmq.QueueArguments;
+import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
 import org.apache.james.backends.rabbitmq.SimpleConnectionPool;
 import org.apache.james.task.eventsourcing.EventSourcingTaskManager;
 import org.reactivestreams.Publisher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.ImmutableMap;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 
@@ -42,11 +43,13 @@ public class RabbitMQWorkQueueReconnectionHandler implements SimpleConnectionPoo
     private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQWorkQueueReconnectionHandler.class);
     private final CancelRequestQueueName cancelRequestQueueName;
     private final EventSourcingTaskManager taskManager;
+    private final RabbitMQConfiguration configuration;
 
     @Inject
-    public RabbitMQWorkQueueReconnectionHandler(CancelRequestQueueName cancelRequestQueueName, EventSourcingTaskManager taskManager) {
+    public RabbitMQWorkQueueReconnectionHandler(CancelRequestQueueName cancelRequestQueueName, EventSourcingTaskManager taskManager, RabbitMQConfiguration configuration) {
         this.cancelRequestQueueName = cancelRequestQueueName;
         this.taskManager = taskManager;
+        this.configuration = configuration;
     }
 
     @Override
@@ -57,7 +60,9 @@ public class RabbitMQWorkQueueReconnectionHandler implements SimpleConnectionPoo
 
     private void createCancelQueue(Connection connection) {
         try (Channel channel = connection.createChannel()) {
-            channel.queueDeclare(cancelRequestQueueName.asString(), !DURABLE, !EXCLUSIVE, AUTO_DELETE, ImmutableMap.of());
+            QueueArguments.Builder builder = QueueArguments.builder();
+            configuration.getQueueTTL().ifPresent(builder::queueTTL);
+            channel.queueDeclare(cancelRequestQueueName.asString(), !DURABLE, !EXCLUSIVE, !AUTO_DELETE, builder.build());
         } catch (Exception e) {
             LOGGER.error("Error recovering connection", e);
         }
diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/TerminationReconnectionHandler.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/TerminationReconnectionHandler.java
index 224d2a7f68..f1fc733057 100644
--- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/TerminationReconnectionHandler.java
+++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/TerminationReconnectionHandler.java
@@ -26,12 +26,13 @@ import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE;
 
 import javax.inject.Inject;
 
+import org.apache.james.backends.rabbitmq.QueueArguments;
+import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
 import org.apache.james.backends.rabbitmq.SimpleConnectionPool;
 import org.reactivestreams.Publisher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.ImmutableMap;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 
@@ -42,11 +43,13 @@ public class TerminationReconnectionHandler implements SimpleConnectionPool.Reco
 
     private final TerminationQueueName queueName;
     private final RabbitMQTerminationSubscriber terminationSubscriber;
+    private final RabbitMQConfiguration configuration;
 
     @Inject
-    public TerminationReconnectionHandler(TerminationQueueName queueName, RabbitMQTerminationSubscriber terminationSubscriber) {
+    public TerminationReconnectionHandler(TerminationQueueName queueName, RabbitMQTerminationSubscriber terminationSubscriber, RabbitMQConfiguration configuration) {
         this.queueName = queueName;
         this.terminationSubscriber = terminationSubscriber;
+        this.configuration = configuration;
     }
 
     @Override
@@ -57,7 +60,9 @@ public class TerminationReconnectionHandler implements SimpleConnectionPool.Reco
 
     private void createTerminationQueue(Connection connection) {
         try (Channel channel = connection.createChannel()) {
-            channel.queueDeclare(queueName.asString(), !DURABLE, !EXCLUSIVE, AUTO_DELETE, ImmutableMap.of());
+            QueueArguments.Builder builder = QueueArguments.builder();
+            configuration.getQueueTTL().ifPresent(builder::queueTTL);
+            channel.queueDeclare(queueName.asString(), !DURABLE, !EXCLUSIVE, !AUTO_DELETE, builder.build());
         } catch (Exception e) {
             LOGGER.error("Error recovering connection", e);
         }
diff --git a/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala b/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala
index 92dd8a4634..7c5d542295 100644
--- a/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala
+++ b/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala
@@ -22,7 +22,7 @@ import java.time.Duration
 
 import com.google.common.annotations.VisibleForTesting
 import javax.inject.Inject
-import org.apache.james.backends.rabbitmq.ReceiverProvider
+import org.apache.james.backends.rabbitmq.{RabbitMQConfiguration, ReceiverProvider}
 import org.apache.james.eventsourcing.EventSourcingSystem
 import org.apache.james.server.task.json.JsonTaskSerializer
 import org.apache.james.task.SerialTaskManagerWorker
@@ -33,7 +33,8 @@ class RabbitMQWorkQueueSupplier @Inject()(private val sender: Sender,
                                           private val receiverProvider: ReceiverProvider,
                                           private val jsonTaskSerializer: JsonTaskSerializer,
                                           private val cancelRequestName: CancelRequestQueueName,
-                                          private val configuration: RabbitMQWorkQueueConfiguration) extends WorkQueueSupplier {
+                                          private val configuration: RabbitMQWorkQueueConfiguration,
+                                          private val rabbitMQConfiguration: RabbitMQConfiguration) extends WorkQueueSupplier {
 
   val DEFAULT_ADDITIONAL_INFORMATION_POLLING_INTERVAL =  Duration.ofSeconds(30)
   override def apply(eventSourcingSystem: EventSourcingSystem): RabbitMQWorkQueue = {
@@ -44,7 +45,7 @@ class RabbitMQWorkQueueSupplier @Inject()(private val sender: Sender,
   def apply(eventSourcingSystem: EventSourcingSystem, additionalInformationPollingInterval: Duration): RabbitMQWorkQueue = {
     val listener = WorkerStatusListener(eventSourcingSystem)
     val worker = new SerialTaskManagerWorker(listener, additionalInformationPollingInterval)
-    val rabbitMQWorkQueue = new RabbitMQWorkQueue(worker, sender, receiverProvider, jsonTaskSerializer, configuration, cancelRequestName)
+    val rabbitMQWorkQueue = new RabbitMQWorkQueue(worker, sender, receiverProvider, jsonTaskSerializer, configuration, cancelRequestName, rabbitMQConfiguration)
     rabbitMQWorkQueue
   }
 }
diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
index 3d99530eb6..f7ac3131b5 100644
--- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
+++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
@@ -31,6 +31,7 @@ import static org.awaitility.Awaitility.await;
 import static org.awaitility.Durations.FIVE_SECONDS;
 import static org.awaitility.Durations.ONE_SECOND;
 
+import java.net.URISyntaxException;
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.util.ArrayList;
@@ -118,9 +119,9 @@ class DistributedTaskManagerTest implements TaskManagerContract {
         private final List<RabbitMQWorkQueue> workQueues;
         private final RabbitMQWorkQueueSupplier supplier;
 
-        TrackedRabbitMQWorkQueueSupplier(Sender sender, ReceiverProvider receiverProvider, JsonTaskSerializer taskSerializer) {
+        TrackedRabbitMQWorkQueueSupplier(Sender sender, ReceiverProvider receiverProvider, JsonTaskSerializer taskSerializer) throws Exception {
             workQueues = new ArrayList<>();
-            supplier = new RabbitMQWorkQueueSupplier(sender, receiverProvider, taskSerializer, CancelRequestQueueName.generate(), RabbitMQWorkQueueConfiguration$.MODULE$.enabled());
+            supplier = new RabbitMQWorkQueueSupplier(sender, receiverProvider, taskSerializer, CancelRequestQueueName.generate(), RabbitMQWorkQueueConfiguration$.MODULE$.enabled(), rabbitMQExtension.getRabbitMQ().getConfiguration());
         }
 
         @Override
@@ -198,7 +199,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
     JsonEventSerializer eventSerializer;
 
     @BeforeEach
-    void setUp(EventStore eventStore) {
+    void setUp(EventStore eventStore) throws Exception {
         memoryReferenceTaskStore = new MemoryReferenceTaskStore();
         memoryReferenceWithCounterTaskStore = new MemoryReferenceWithCounterTaskStore();
         CassandraCluster cassandra = CASSANDRA_CLUSTER.getCassandraCluster();
@@ -223,21 +224,21 @@ class DistributedTaskManagerTest implements TaskManagerContract {
             .forEach(queue -> managementAPI.deleteQueue("/", queue.getName()));
     }
 
-    public EventSourcingTaskManager taskManager() {
+    public EventSourcingTaskManager taskManager() throws Exception {
         return taskManager(HOSTNAME);
     }
 
-    EventSourcingTaskManager taskManager(Hostname hostname) {
+    EventSourcingTaskManager taskManager(Hostname hostname) throws Exception {
         RabbitMQTerminationSubscriber terminationSubscriber = new RabbitMQTerminationSubscriber(TerminationQueueName.generate(), rabbitMQExtension.getSender(),
             rabbitMQExtension.getReceiverProvider(),
-            eventSerializer);
+            eventSerializer, rabbitMQExtension.getRabbitMQ().getConfiguration());
         terminationSubscribers.add(terminationSubscriber);
         terminationSubscriber.start();
         return new EventSourcingTaskManager(workQueueSupplier, eventStore, executionDetailsProjection, hostname, terminationSubscriber);
     }
 
     @Test
-    void badPayloadShouldNotAffectTaskManagerOnCancelTask() throws TaskManager.ReachedTimeoutException {
+    void badPayloadShouldNotAffectTaskManagerOnCancelTask() throws Exception {
         TaskManager taskManager = taskManager(HOSTNAME);
         TaskId id = taskManager.submit(new MemoryReferenceTask(() -> {
             Thread.sleep(250);
@@ -255,7 +256,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
     }
 
     @Test
-    void badPayloadsShouldNotAffectTaskManagerOnCancelTask() throws TaskManager.ReachedTimeoutException {
+    void badPayloadsShouldNotAffectTaskManagerOnCancelTask() throws Exception {
         TaskManager taskManager = taskManager(HOSTNAME);
         TaskId id = taskManager.submit(new MemoryReferenceTask(() -> {
             Thread.sleep(250);
@@ -274,7 +275,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
     }
 
     @Test
-    void badPayloadShouldNotAffectTaskManagerOnCompleteTask() throws TaskManager.ReachedTimeoutException {
+    void badPayloadShouldNotAffectTaskManagerOnCompleteTask() throws Exception {
         TaskManager taskManager = taskManager(HOSTNAME);
         TaskId id = taskManager.submit(new MemoryReferenceTask(() -> {
             Thread.sleep(250);
@@ -291,7 +292,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
     }
 
     @Test
-    void badPayloadsShouldNotAffectTaskManagerOnCompleteTask() throws TaskManager.ReachedTimeoutException {
+    void badPayloadsShouldNotAffectTaskManagerOnCompleteTask() throws Exception {
         TaskManager taskManager = taskManager(HOSTNAME);
         TaskId id = taskManager.submit(new MemoryReferenceTask(() -> {
             Thread.sleep(250);
@@ -309,7 +310,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
     }
 
     @Test
-    void givenOneEventStoreTwoEventTaskManagersShareTheSameEvents() {
+    void givenOneEventStoreTwoEventTaskManagersShareTheSameEvents() throws Exception {
         try (EventSourcingTaskManager taskManager1 = taskManager();
              EventSourcingTaskManager taskManager2 = taskManager(HOSTNAME_2)) {
             TaskId taskId = taskManager1.submit(new CompletedTask());
@@ -325,7 +326,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
     }
 
     @Test
-    void givenTwoTaskManagersAndTwoTaskOnlyOneTaskShouldRunAtTheSameTime() {
+    void givenTwoTaskManagersAndTwoTaskOnlyOneTaskShouldRunAtTheSameTime() throws Exception {
         CountDownLatch waitingForFirstTaskLatch = new CountDownLatch(1);
 
         try (EventSourcingTaskManager taskManager1 = taskManager();
@@ -348,7 +349,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
     }
 
     @Test
-    void givenTwoTaskManagerATaskSubmittedOnOneCouldBeRunOnTheOther() throws InterruptedException {
+    void givenTwoTaskManagerATaskSubmittedOnOneCouldBeRunOnTheOther() throws Exception {
         try (EventSourcingTaskManager taskManager1 = taskManager()) {
             Thread.sleep(100);
             try (EventSourcingTaskManager taskManager2 = taskManager(HOSTNAME_2)) {
@@ -368,7 +369,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
     }
 
     @Test
-    void givenTwoTaskManagerATaskRunningOnOneShouldBeCancellableFromTheOtherOne(CountDownLatch countDownLatch) {
+    void givenTwoTaskManagerATaskRunningOnOneShouldBeCancellableFromTheOtherOne(CountDownLatch countDownLatch) throws Exception {
         TaskManager taskManager1 = taskManager(HOSTNAME);
         TaskManager taskManager2 = taskManager(HOSTNAME_2);
         TaskId id = taskManager1.submit(new MemoryReferenceTask(() -> {
@@ -397,7 +398,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
     }
 
     @Test
-    void givenTwoTaskManagersATaskRunningOnOneShouldBeWaitableFromTheOtherOne() throws TaskManager.ReachedTimeoutException {
+    void givenTwoTaskManagersATaskRunningOnOneShouldBeWaitableFromTheOtherOne() throws Exception {
         TaskManager taskManager1 = taskManager(HOSTNAME);
         TaskManager taskManager2 = taskManager(HOSTNAME_2);
         TaskId id = taskManager1.submit(new MemoryReferenceTask(() -> {
@@ -424,7 +425,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
     }
 
     @Test
-    void givenTwoTaskManagerAndATaskRanPerTaskManagerListingThemOnEachShouldShowBothTasks() {
+    void givenTwoTaskManagerAndATaskRanPerTaskManagerListingThemOnEachShouldShowBothTasks() throws Exception {
         try (EventSourcingTaskManager taskManager1 = taskManager();
              EventSourcingTaskManager taskManager2 = taskManager(HOSTNAME_2)) {
 
@@ -449,7 +450,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
     }
 
     @Test
-    void givenTwoTaskManagerIfTheFirstOneIsDownTheSecondOneShouldBeAbleToRunTheRemainingTasks(CountDownLatch countDownLatch) {
+    void givenTwoTaskManagerIfTheFirstOneIsDownTheSecondOneShouldBeAbleToRunTheRemainingTasks(CountDownLatch countDownLatch) throws Exception {
         try (EventSourcingTaskManager taskManager1 = taskManager();
              EventSourcingTaskManager taskManager2 = taskManager(HOSTNAME_2)) {
             ImmutableBiMap<EventSourcingTaskManager, Hostname> hostNameByTaskManager = ImmutableBiMap.of(taskManager1, HOSTNAME, taskManager2, HOSTNAME_2);
@@ -477,7 +478,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
     }
 
     @Test
-    void shouldNotCrashWhenBadMessage() {
+    void shouldNotCrashWhenBadMessage() throws Exception {
         TaskManager taskManager = taskManager(HOSTNAME);
 
         taskManager.submit(new FailsDeserializationTask());
@@ -488,7 +489,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
     }
 
     @Test
-    void shouldNotCrashWhenBadMessages() {
+    void shouldNotCrashWhenBadMessages() throws Exception {
         TaskManager taskManager = taskManager(HOSTNAME);
 
         IntStream.range(0, 100).forEach(i -> taskManager.submit(new FailsDeserializationTask()));
@@ -586,7 +587,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
     }
 
     @Test
-    void cassandraTasksShouldBeCancealable(CassandraCluster cassandra) {
+    void cassandraTasksShouldBeCancealable(CassandraCluster cassandra) throws Exception {
         TaskManager taskManager = taskManager(HOSTNAME);
 
         TaskId taskId = taskManager.submit(new CassandraExecutingTask(cassandra.getConf(), true));
@@ -599,7 +600,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
     }
 
     @Test
-    void inProgressTaskShouldBeCanceledWhenCloseTaskManager() {
+    void inProgressTaskShouldBeCanceledWhenCloseTaskManager() throws Exception {
         try (EventSourcingTaskManager taskManager = taskManager()) {
             TaskId taskId = taskManager.submit(new MemoryReferenceTask(() -> {
                 TimeUnit.SECONDS.sleep(5);
diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java
index 2f53672adb..f1a868f97f 100644
--- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java
+++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java
@@ -65,15 +65,15 @@ class RabbitMQTerminationSubscriberTest implements TerminationSubscriberContract
         .isolationPolicy(WEAK);
 
     @Override
-    public TerminationSubscriber subscriber() {
+    public TerminationSubscriber subscriber() throws Exception {
         RabbitMQTerminationSubscriber subscriber = new RabbitMQTerminationSubscriber(TerminationQueueName.generate(), rabbitMQExtension.getSender(),
-            rabbitMQExtension.getReceiverProvider(), SERIALIZER);
+            rabbitMQExtension.getReceiverProvider(), SERIALIZER, rabbitMQExtension.getRabbitMQ().getConfiguration());
         subscriber.start();
         return subscriber;
     }
 
     @Test
-    void givenTwoTerminationSubscribersWhenAnEventIsSentItShouldBeReceivedByBoth() {
+    void givenTwoTerminationSubscribersWhenAnEventIsSentItShouldBeReceivedByBoth() throws Exception {
         TerminationSubscriber subscriber1 = subscriber();
         TerminationSubscriber subscriber2 = subscriber();
 
@@ -94,7 +94,7 @@ class RabbitMQTerminationSubscriberTest implements TerminationSubscriberContract
     }
 
     @Test
-    void eventProcessingShouldNotCrashOnInvalidMessage() {
+    void eventProcessingShouldNotCrashOnInvalidMessage() throws Exception {
         TerminationSubscriber subscriber1 = subscriber();
         Flux<Event> firstListener = Flux.from(subscriber1.listenEvents());
 
@@ -113,7 +113,7 @@ class RabbitMQTerminationSubscriberTest implements TerminationSubscriberContract
     }
 
     @Test
-    void eventProcessingShouldNotCrashOnInvalidMessages() {
+    void eventProcessingShouldNotCrashOnInvalidMessages() throws Exception {
         TerminationSubscriber subscriber1 = subscriber();
         Flux<Event> firstListener = Flux.from(subscriber1.listenEvents());
 
diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueuePersistenceTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueuePersistenceTest.java
index 6a964af193..81b56e5bdb 100644
--- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueuePersistenceTest.java
+++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueuePersistenceTest.java
@@ -50,10 +50,10 @@ class RabbitMQWorkQueuePersistenceTest {
     private JsonTaskSerializer serializer;
 
     @BeforeEach
-    void setUp() {
+    void setUp() throws Exception {
         worker = spy(new ImmediateWorker());
         serializer = JsonTaskSerializer.of(TestTaskDTOModules.COMPLETED_TASK_MODULE, TestTaskDTOModules.MEMORY_REFERENCE_TASK_MODULE.apply(new MemoryReferenceTaskStore()));
-        testee = new RabbitMQWorkQueue(worker, rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), serializer, RabbitMQWorkQueueConfiguration$.MODULE$.enabled(), CancelRequestQueueName.generate());
+        testee = new RabbitMQWorkQueue(worker, rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), serializer, RabbitMQWorkQueueConfiguration$.MODULE$.enabled(), CancelRequestQueueName.generate(), rabbitMQExtension.getRabbitMQ().getConfiguration());
         //declare the queue but do not start consuming from it
         testee.declareQueue();
     }
@@ -90,9 +90,9 @@ class RabbitMQWorkQueuePersistenceTest {
         assertThat(worker.results).containsExactly(Task.Result.COMPLETED);
     }
 
-    private void startNewConsumingWorkqueue() {
+    private void startNewConsumingWorkqueue() throws Exception {
         worker = spy(new ImmediateWorker());
-        testee = new RabbitMQWorkQueue(worker, rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), serializer, RabbitMQWorkQueueConfiguration$.MODULE$.enabled(), CancelRequestQueueName.generate());
+        testee = new RabbitMQWorkQueue(worker, rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), serializer, RabbitMQWorkQueueConfiguration$.MODULE$.enabled(), CancelRequestQueueName.generate(), rabbitMQExtension.getRabbitMQ().getConfiguration());
         testee.start();
     }
 }
diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java
index 8f83aa5a95..45265689b0 100644
--- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java
+++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java
@@ -66,10 +66,10 @@ class RabbitMQWorkQueueTest {
 
 
     @BeforeEach
-    void setUp() {
+    void setUp() throws Exception {
         worker = new ImmediateWorker();
         serializer = JsonTaskSerializer.of(TestTaskDTOModules.COMPLETED_TASK_MODULE, TestTaskDTOModules.MEMORY_REFERENCE_TASK_MODULE.apply(new MemoryReferenceTaskStore()));
-        testee = new RabbitMQWorkQueue(worker, rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), serializer, RabbitMQWorkQueueConfiguration$.MODULE$.enabled(), CancelRequestQueueName.generate());
+        testee = new RabbitMQWorkQueue(worker, rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), serializer, RabbitMQWorkQueueConfiguration$.MODULE$.enabled(), CancelRequestQueueName.generate(), rabbitMQExtension.getRabbitMQ().getConfiguration());
         testee.start();
     }
 
@@ -96,11 +96,11 @@ class RabbitMQWorkQueueTest {
     }
 
     @Test
-    void givenTwoWorkQueuesOnlyTheFirstOneIsConsumingTasks() {
+    void givenTwoWorkQueuesOnlyTheFirstOneIsConsumingTasks() throws Exception {
         testee.submit(TASK_WITH_ID);
 
         ImmediateWorker otherTaskManagerWorker = new ImmediateWorker();
-        try (RabbitMQWorkQueue otherWorkQueue = new RabbitMQWorkQueue(otherTaskManagerWorker, rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), serializer, RabbitMQWorkQueueConfiguration$.MODULE$.enabled(), CancelRequestQueueName.generate())) {
+        try (RabbitMQWorkQueue otherWorkQueue = new RabbitMQWorkQueue(otherTaskManagerWorker, rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), serializer, RabbitMQWorkQueueConfiguration$.MODULE$.enabled(), CancelRequestQueueName.generate(), rabbitMQExtension.getRabbitMQ().getConfiguration())) {
             otherWorkQueue.start();
 
             IntStream.range(0, 9)
@@ -112,7 +112,7 @@ class RabbitMQWorkQueueTest {
     }
 
     @Test
-    void givenANonDeserializableTaskItShouldBeFlaggedAsFailedAndItDoesNotPreventFollowingTasks() throws InterruptedException {
+    void givenANonDeserializableTaskItShouldBeFlaggedAsFailedAndItDoesNotPreventFollowingTasks() throws Exception {
         Task task = new TestTask(42);
         TaskId taskId = TaskId.fromString("4bf6d081-aa30-11e9-bf6c-2d3b9e84aafd");
         TaskWithId taskWithId = new TaskWithId(taskId, task);
@@ -120,7 +120,7 @@ class RabbitMQWorkQueueTest {
         ImmediateWorker otherTaskManagerWorker = new ImmediateWorker();
         JsonTaskSerializer otherTaskSerializer = JsonTaskSerializer.of(TestTaskDTOModules.TEST_TYPE);
         try (RabbitMQWorkQueue otherWorkQueue = new RabbitMQWorkQueue(otherTaskManagerWorker, rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), otherTaskSerializer,
-            RabbitMQWorkQueueConfiguration$.MODULE$.enabled(), CancelRequestQueueName.generate())) {
+            RabbitMQWorkQueueConfiguration$.MODULE$.enabled(), CancelRequestQueueName.generate(), rabbitMQExtension.getRabbitMQ().getConfiguration())) {
             //wait to be sur that the first workqueue has subscribed as an exclusive consumer of the RabbitMQ queue.
             Thread.sleep(200);
             otherWorkQueue.start();
diff --git a/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TerminationSubscriberContract.java b/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TerminationSubscriberContract.java
index 63d9625c8e..1a2c801292 100644
--- a/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TerminationSubscriberContract.java
+++ b/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TerminationSubscriberContract.java
@@ -53,10 +53,10 @@ public interface TerminationSubscriberContract {
     Duration DELAY_BEFORE_PUBLISHING = Duration.ofMillis(50);
     ExecutorService EXECUTOR = Executors.newCachedThreadPool();
 
-    TerminationSubscriber subscriber();
+    TerminationSubscriber subscriber() throws Exception;
 
     @Test
-    default void handlingCompletedShouldBeListed() {
+    default void handlingCompletedShouldBeListed() throws Exception {
         TerminationSubscriber subscriber = subscriber();
 
         sendEvents(subscriber, COMPLETED_EVENT);
@@ -65,7 +65,7 @@ public interface TerminationSubscriberContract {
     }
 
     @Test
-    default void handlingFailedShouldBeListed() {
+    default void handlingFailedShouldBeListed() throws Exception {
         TerminationSubscriber subscriber = subscriber();
 
         sendEvents(subscriber, FAILED_EVENT);
@@ -74,7 +74,7 @@ public interface TerminationSubscriberContract {
     }
 
     @Test
-    default void handlingCancelledShouldBeListed() {
+    default void handlingCancelledShouldBeListed() throws Exception {
         TerminationSubscriber subscriber = subscriber();
 
         sendEvents(subscriber, CANCELLED_EVENT);
@@ -83,7 +83,7 @@ public interface TerminationSubscriberContract {
     }
 
     @Test
-    default void handlingNonTerminalEventShouldNotBeListed() {
+    default void handlingNonTerminalEventShouldNotBeListed() throws Exception {
         TerminationSubscriber subscriber = subscriber();
         TaskEvent event = new Started(new TaskAggregateId(TaskId.generateTaskId()), EventId.fromSerialized(42), new Hostname("foo"));
 
@@ -93,7 +93,7 @@ public interface TerminationSubscriberContract {
     }
 
     @Test
-    default void handlingMultipleEventsShouldBeListed() {
+    default void handlingMultipleEventsShouldBeListed() throws Exception {
         TerminationSubscriber subscriber = subscriber();
 
         sendEvents(subscriber, COMPLETED_EVENT, FAILED_EVENT, CANCELLED_EVENT);
@@ -102,7 +102,7 @@ public interface TerminationSubscriberContract {
     }
 
     @Test
-    default void multipleListeningEventsShouldShareEvents() {
+    default void multipleListeningEventsShouldShareEvents() throws Exception {
         TerminationSubscriber subscriber = subscriber();
 
         Flux<Event> firstListener = Flux.from(subscriber.listenEvents());
@@ -122,7 +122,7 @@ public interface TerminationSubscriberContract {
     }
 
     @Test
-    default void dynamicListeningEventsShouldGetOnlyNewEvents() {
+    default void dynamicListeningEventsShouldGetOnlyNewEvents() throws Exception {
         TerminationSubscriber subscriber = subscriber();
 
         sendEvents(subscriber, COMPLETED_EVENT, FAILED_EVENT, CANCELLED_EVENT);


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 01/02: JAMES-3694 Apply queue expiracy only for per-node queues

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit b0cd610c5593c85301add2dd58f114fd94291553
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Jan 27 14:06:30 2023 +0700

    JAMES-3694 Apply queue expiracy only for per-node queues
    
    Those are temporary RPC-like response queues tied to a
    specific James instance, needing cleanup. Other,
    permanent queues should not be deleted.
---
 .../org/apache/james/backends/rabbitmq/RabbitMQConfiguration.java   | 1 -
 .../main/java/org/apache/james/events/KeyReconnectionHandler.java   | 6 ++++--
 .../main/java/org/apache/james/events/KeyRegistrationHandler.java   | 5 ++++-
 3 files changed, 8 insertions(+), 4 deletions(-)

diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConfiguration.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConfiguration.java
index 9a242ebe51..a47e17bece 100644
--- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConfiguration.java
+++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConfiguration.java
@@ -767,7 +767,6 @@ public class RabbitMQConfiguration {
         if (allowQuorum && useQuorumQueues) {
             builder.quorumQueue().replicationFactor(quorumQueueReplicationFactor);
         }
-        queueTTL.ifPresent(builder::queueTTL);
         return builder;
     }
 
diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/KeyReconnectionHandler.java b/event-bus/distributed/src/main/java/org/apache/james/events/KeyReconnectionHandler.java
index 1c2ba21d4e..e9c5a728fb 100644
--- a/event-bus/distributed/src/main/java/org/apache/james/events/KeyReconnectionHandler.java
+++ b/event-bus/distributed/src/main/java/org/apache/james/events/KeyReconnectionHandler.java
@@ -26,6 +26,7 @@ import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE;
 
 import javax.inject.Inject;
 
+import org.apache.james.backends.rabbitmq.QueueArguments;
 import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
 import org.apache.james.backends.rabbitmq.SimpleConnectionPool;
 import org.reactivestreams.Publisher;
@@ -55,8 +56,9 @@ public class KeyReconnectionHandler implements SimpleConnectionPool.Reconnection
     public Publisher<Void> handleReconnection(Connection connection) {
         return Mono.fromRunnable(() -> {
             try (Channel channel = connection.createChannel()) {
-                channel.queueDeclare(namingStrategy.queueName(eventBusId).asString(), DURABLE, !EXCLUSIVE, AUTO_DELETE,
-                    configuration.workQueueArgumentsBuilder(!ALLOW_QUORUM).build());
+                QueueArguments.Builder builder = configuration.workQueueArgumentsBuilder(!ALLOW_QUORUM);
+                configuration.getQueueTTL().ifPresent(builder::queueTTL);
+                channel.queueDeclare(namingStrategy.queueName(eventBusId).asString(), DURABLE, !EXCLUSIVE, AUTO_DELETE, builder.build());
             } catch (Exception e) {
                 LOGGER.error("Error recovering connection", e);
             }
diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/KeyRegistrationHandler.java b/event-bus/distributed/src/main/java/org/apache/james/events/KeyRegistrationHandler.java
index 5fc2a5ec84..8b32e11db1 100644
--- a/event-bus/distributed/src/main/java/org/apache/james/events/KeyRegistrationHandler.java
+++ b/event-bus/distributed/src/main/java/org/apache/james/events/KeyRegistrationHandler.java
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Optional;
 import java.util.function.Predicate;
 
+import org.apache.james.backends.rabbitmq.QueueArguments;
 import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
 import org.apache.james.backends.rabbitmq.ReceiverProvider;
 import org.apache.james.metrics.api.MetricFactory;
@@ -122,12 +123,14 @@ class KeyRegistrationHandler {
     }
 
     private void declareQueue(Sender sender) {
+        QueueArguments.Builder builder = configuration.workQueueArgumentsBuilder(!ALLOW_QUORUM);
+        configuration.getQueueTTL().ifPresent(builder::queueTTL);
         sender.declareQueue(
             QueueSpecification.queue(registrationQueue.asString())
                 .durable(configuration.isEventBusNotificationDurabilityEnabled())
                 .exclusive(!EXCLUSIVE)
                 .autoDelete(AUTO_DELETE)
-                .arguments(configuration.workQueueArgumentsBuilder(!ALLOW_QUORUM).build()))
+                .arguments(builder.build()))
             .timeout(TOPOLOGY_CHANGES_TIMEOUT)
             .map(AMQP.Queue.DeclareOk::getQueue)
             .retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor()))


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org