You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2023/01/31 00:13:08 UTC

[james-project] 02/02: JAMES-3694 Apply queue expiracy for the task manager

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 0e376b9ca76f53548447fd4fa246bb20eb3fcb30
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Jan 27 14:06:56 2023 +0700

    JAMES-3694 Apply queue expiracy for the task manager
    
    Safer than auto-deletes
---
 .../org/apache/james/task/TaskManagerContract.java | 52 +++++++++++-----------
 .../distributed/RabbitMQTerminationSubscriber.java | 10 ++++-
 .../distributed/RabbitMQWorkQueue.java             | 17 +++++--
 .../RabbitMQWorkQueueReconnectionHandler.java      | 11 +++--
 .../TerminationReconnectionHandler.java            | 11 +++--
 .../distributed/RabbitMQWorkQueueSupplier.scala    |  7 +--
 .../distributed/DistributedTaskManagerTest.java    | 43 +++++++++---------
 .../RabbitMQTerminationSubscriberTest.java         | 10 ++---
 .../RabbitMQWorkQueuePersistenceTest.java          |  8 ++--
 .../distributed/RabbitMQWorkQueueTest.java         | 12 ++---
 .../TerminationSubscriberContract.java             | 16 +++----
 11 files changed, 113 insertions(+), 84 deletions(-)

diff --git a/server/task/task-api/src/test/java/org/apache/james/task/TaskManagerContract.java b/server/task/task-api/src/test/java/org/apache/james/task/TaskManagerContract.java
index 3e5b4a18ad..2a092c735a 100644
--- a/server/task/task-api/src/test/java/org/apache/james/task/TaskManagerContract.java
+++ b/server/task/task-api/src/test/java/org/apache/james/task/TaskManagerContract.java
@@ -47,10 +47,10 @@ public interface TaskManagerContract {
     ConditionFactory awaitAtMostTwoSeconds = calmlyAwait.atMost(Duration.ofSeconds(2));
     java.time.Duration TIMEOUT = java.time.Duration.ofMinutes(15);
 
-    TaskManager taskManager();
+    TaskManager taskManager() throws Exception;
 
     @Test
-    default void submitShouldReturnATaskId() {
+    default void submitShouldReturnATaskId() throws Exception {
         TaskId taskId = taskManager().submit(new CompletedTask());
         assertThat(taskId).isNotNull();
     }
@@ -63,7 +63,7 @@ public interface TaskManagerContract {
     }
 
     @Test
-    default void getStatusShouldReturnWaitingWhenNotYetProcessed(CountDownLatch waitingForResultLatch) {
+    default void getStatusShouldReturnWaitingWhenNotYetProcessed(CountDownLatch waitingForResultLatch) throws Exception {
         TaskManager taskManager = taskManager();
         taskManager.submit(new MemoryReferenceTask(() -> {
             waitingForResultLatch.await();
@@ -77,7 +77,7 @@ public interface TaskManagerContract {
     }
 
     @Test
-    default void taskCodeAfterCancelIsNotRun(CountDownLatch waitingForResultLatch) throws InterruptedException {
+    default void taskCodeAfterCancelIsNotRun(CountDownLatch waitingForResultLatch) throws Exception {
         TaskManager taskManager = taskManager();
         CountDownLatch waitForTaskToBeLaunched = new CountDownLatch(1);
         AtomicInteger count = new AtomicInteger(0);
@@ -98,7 +98,7 @@ public interface TaskManagerContract {
     }
 
     @Test
-    default void completedTaskShouldNotBeCancelled() {
+    default void completedTaskShouldNotBeCancelled() throws Exception {
         TaskManager taskManager = taskManager();
         TaskId id = taskManager.submit(new CompletedTask());
 
@@ -114,7 +114,7 @@ public interface TaskManagerContract {
     }
 
     @Test
-    default void failedTaskShouldNotBeCancelled() {
+    default void failedTaskShouldNotBeCancelled() throws Exception {
         TaskManager taskManager = taskManager();
         TaskId id = taskManager.submit(new FailedTask());
 
@@ -130,7 +130,7 @@ public interface TaskManagerContract {
     }
 
     @Test
-    default void getStatusShouldBeCancelledWhenCancelled(CountDownLatch countDownLatch) {
+    default void getStatusShouldBeCancelledWhenCancelled(CountDownLatch countDownLatch) throws Exception {
         TaskManager taskManager = taskManager();
         TaskId id = taskManager.submit(new MemoryReferenceTask(() -> {
             countDownLatch.await();
@@ -152,7 +152,7 @@ public interface TaskManagerContract {
     }
 
     @Test
-    default void aWaitingTaskShouldBeCancelled(CountDownLatch countDownLatch) {
+    default void aWaitingTaskShouldBeCancelled(CountDownLatch countDownLatch) throws Exception {
         TaskManager taskManager = taskManager();
         TaskId id = taskManager.submit(new MemoryReferenceTask(() -> {
             countDownLatch.await();
@@ -176,7 +176,7 @@ public interface TaskManagerContract {
     }
 
     @Test
-    default void cancelShouldBeIdempotent(CountDownLatch waitingForResultLatch) {
+    default void cancelShouldBeIdempotent(CountDownLatch waitingForResultLatch) throws Exception {
         TaskManager taskManager = taskManager();
         TaskId id = taskManager.submit(new MemoryReferenceTask(() -> {
             waitingForResultLatch.await();
@@ -189,7 +189,7 @@ public interface TaskManagerContract {
     }
 
     @Test
-    default void getStatusShouldReturnInProgressWhenProcessingIsInProgress(CountDownLatch waitingForResultLatch) {
+    default void getStatusShouldReturnInProgressWhenProcessingIsInProgress(CountDownLatch waitingForResultLatch) throws Exception {
         TaskManager taskManager = taskManager();
         TaskId taskId = taskManager.submit(new MemoryReferenceTask(() -> {
             waitingForResultLatch.await();
@@ -201,7 +201,7 @@ public interface TaskManagerContract {
     }
 
     @Test
-    default void getStatusShouldReturnCompletedWhenRunSuccessfully() {
+    default void getStatusShouldReturnCompletedWhenRunSuccessfully() throws Exception {
         TaskManager taskManager = taskManager();
         TaskId taskId = taskManager.submit(
             new CompletedTask());
@@ -212,7 +212,7 @@ public interface TaskManagerContract {
     }
 
     @Test
-    default void additionalInformationShouldBeUpdatedWhenRunSuccessfully() {
+    default void additionalInformationShouldBeUpdatedWhenRunSuccessfully() throws Exception {
         TaskManager taskManager = taskManager();
         TaskId taskId = taskManager.submit(new MemoryReferenceWithCounterTask(counter -> {
             counter.incrementAndGet();
@@ -230,7 +230,7 @@ public interface TaskManagerContract {
     }
 
     @Test
-    default void additionalInformationShouldBeUpdatedWhenFailed() {
+    default void additionalInformationShouldBeUpdatedWhenFailed() throws Exception {
         TaskManager taskManager = taskManager();
         TaskId taskId = taskManager.submit(new MemoryReferenceWithCounterTask(counter -> {
             counter.incrementAndGet();
@@ -248,7 +248,7 @@ public interface TaskManagerContract {
     }
 
     @Test
-    default void additionalInformationShouldBeUpdatedWhenCancelled(CountDownLatch countDownLatch) {
+    default void additionalInformationShouldBeUpdatedWhenCancelled(CountDownLatch countDownLatch) throws Exception {
         TaskManager taskManager = taskManager();
         TaskId id = taskManager.submit(new MemoryReferenceWithCounterTask((counter) -> {
             counter.incrementAndGet();
@@ -273,7 +273,7 @@ public interface TaskManagerContract {
     }
 
     @Test
-    default void additionalInformationShouldBeUpdatedDuringExecution(CountDownLatch countDownLatch) {
+    default void additionalInformationShouldBeUpdatedDuringExecution(CountDownLatch countDownLatch) throws Exception {
         TaskManager taskManager = taskManager();
         TaskId id = taskManager.submit(new MemoryReferenceWithCounterTask((counter) -> {
             counter.incrementAndGet();
@@ -288,7 +288,7 @@ public interface TaskManagerContract {
     }
 
     @Test
-    default void additionalInformationShouldBeAvailableOnAnyTaskManagerDuringExecution(CountDownLatch countDownLatch) {
+    default void additionalInformationShouldBeAvailableOnAnyTaskManagerDuringExecution(CountDownLatch countDownLatch) throws Exception {
         TaskManager taskManager = taskManager();
         TaskManager otherTaskManager = taskManager();
         TaskId id = taskManager.submit(new MemoryReferenceWithCounterTask((counter) -> {
@@ -312,7 +312,7 @@ public interface TaskManagerContract {
     }
 
     @Test
-    default void getStatusShouldReturnFailedWhenRunPartially() {
+    default void getStatusShouldReturnFailedWhenRunPartially() throws Exception {
         TaskManager taskManager = taskManager();
         TaskId taskId = taskManager.submit(
             new FailedTask());
@@ -493,22 +493,22 @@ public interface TaskManagerContract {
     }
 
     @Test
-    default void listShouldBeEmptyWhenNoTasks() {
+    default void listShouldBeEmptyWhenNoTasks() throws Exception {
         assertThat(taskManager().list()).isEmpty();
     }
 
     @Test
-    default void listCancelledShouldBeEmptyWhenNoTasks() {
+    default void listCancelledShouldBeEmptyWhenNoTasks() throws Exception {
         assertThat(taskManager().list(TaskManager.Status.CANCELLED)).isEmpty();
     }
 
     @Test
-    default void listCancelRequestedShouldBeEmptyWhenNoTasks() {
+    default void listCancelRequestedShouldBeEmptyWhenNoTasks() throws Exception {
         assertThat(taskManager().list(TaskManager.Status.CANCEL_REQUESTED)).isEmpty();
     }
 
     @Test
-    default void awaitShouldNotThrowWhenCompletedTask() throws TaskManager.ReachedTimeoutException {
+    default void awaitShouldNotThrowWhenCompletedTask() throws Exception {
         TaskManager taskManager = taskManager();
         TaskId taskId = taskManager.submit(new CompletedTask());
         taskManager.await(taskId, TIMEOUT);
@@ -516,7 +516,7 @@ public interface TaskManagerContract {
     }
 
     @Test
-    default void awaitShouldAwaitWaitingTask() throws TaskManager.ReachedTimeoutException, InterruptedException {
+    default void awaitShouldAwaitWaitingTask() throws Exception {
         TaskManager taskManager = taskManager();
         CountDownLatch latch = new CountDownLatch(1);
         taskManager.submit(new MemoryReferenceTask(
@@ -537,7 +537,7 @@ public interface TaskManagerContract {
     }
 
     @Test
-    default void awaitWithATooShortTimeoutShouldReturnATimeoutAwaitedTaskExecutionDetailsAndThrow() {
+    default void awaitWithATooShortTimeoutShouldReturnATimeoutAwaitedTaskExecutionDetailsAndThrow() throws Exception {
         TaskManager taskManager = taskManager();
         TaskId taskId = taskManager.submit(new MemoryReferenceTask(
             () -> {
@@ -550,7 +550,7 @@ public interface TaskManagerContract {
     }
 
     @Test
-    default void submittedTaskShouldExecuteSequentially() {
+    default void submittedTaskShouldExecuteSequentially() throws Exception {
         TaskManager taskManager = taskManager();
         ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
 
@@ -580,7 +580,7 @@ public interface TaskManagerContract {
     }
 
     @Test
-    default void awaitShouldReturnFailedWhenExceptionThrown() {
+    default void awaitShouldReturnFailedWhenExceptionThrown() throws Exception {
         TaskManager taskManager = taskManager();
         TaskId taskId = taskManager.submit(new ThrowingTask());
         awaitUntilTaskHasStatus(taskId, TaskManager.Status.FAILED, taskManager);
@@ -589,7 +589,7 @@ public interface TaskManagerContract {
     }
 
     @Test
-    default void getStatusShouldReturnFailedWhenExceptionThrown() {
+    default void getStatusShouldReturnFailedWhenExceptionThrown() throws Exception {
         TaskManager taskManager = taskManager();
         TaskId taskId = taskManager.submit(new ThrowingTask());
         awaitUntilTaskHasStatus(taskId, TaskManager.Status.FAILED, taskManager);
diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
index 212dce28c5..fdc24021cd 100644
--- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
+++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
@@ -32,6 +32,8 @@ import java.util.Optional;
 import javax.annotation.PreDestroy;
 import javax.inject.Inject;
 
+import org.apache.james.backends.rabbitmq.QueueArguments;
+import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
 import org.apache.james.backends.rabbitmq.ReceiverProvider;
 import org.apache.james.eventsourcing.Event;
 import org.apache.james.eventsourcing.eventstore.cassandra.JsonEventSerializer;
@@ -66,22 +68,26 @@ public class RabbitMQTerminationSubscriber implements TerminationSubscriber, Sta
     private final JsonEventSerializer serializer;
     private final Sender sender;
     private final ReceiverProvider receiverProvider;
+    private final RabbitMQConfiguration rabbitMQConfiguration;
     private Sinks.Many<OutboundMessage> sendQueue;
     private Sinks.Many<Event> listener;
     private Disposable sendQueueHandle;
     private Disposable listenQueueHandle;
 
     @Inject
-    RabbitMQTerminationSubscriber(TerminationQueueName queueName, Sender sender, ReceiverProvider receiverProvider, JsonEventSerializer serializer) {
+    RabbitMQTerminationSubscriber(TerminationQueueName queueName, Sender sender, ReceiverProvider receiverProvider, JsonEventSerializer serializer, RabbitMQConfiguration rabbitMQConfiguration) {
         this.queueName = queueName;
         this.sender = sender;
         this.receiverProvider = receiverProvider;
         this.serializer = serializer;
+        this.rabbitMQConfiguration = rabbitMQConfiguration;
     }
 
     public void start() {
         sender.declareExchange(ExchangeSpecification.exchange(EXCHANGE_NAME)).block();
-        sender.declare(QueueSpecification.queue(queueName.asString()).durable(!DURABLE).autoDelete(AUTO_DELETE)).block();
+        QueueArguments.Builder builder = QueueArguments.builder();
+        rabbitMQConfiguration.getQueueTTL().ifPresent(builder::queueTTL);
+        sender.declare(QueueSpecification.queue(queueName.asString()).durable(!DURABLE).autoDelete(!AUTO_DELETE).arguments(builder.build())).block();
         sender.bind(BindingSpecification.binding(EXCHANGE_NAME, ROUTING_KEY, queueName.asString())).block();
         sendQueue = Sinks.many().unicast().onBackpressureBuffer();
         sendQueueHandle = sender
diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
index d49645d0c2..b315e9bec5 100644
--- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
+++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
@@ -32,6 +32,8 @@ import java.time.Duration;
 import java.util.Optional;
 
 import org.apache.james.backends.rabbitmq.Constants;
+import org.apache.james.backends.rabbitmq.QueueArguments;
+import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
 import org.apache.james.backends.rabbitmq.ReceiverProvider;
 import org.apache.james.server.task.json.JsonTaskSerializer;
 import org.apache.james.task.Task;
@@ -81,6 +83,7 @@ public class RabbitMQWorkQueue implements WorkQueue {
     private final TaskManagerWorker worker;
     private final JsonTaskSerializer taskSerializer;
     private final RabbitMQWorkQueueConfiguration configuration;
+    private final RabbitMQConfiguration rabbitMQConfiguration;
     private final Sender sender;
     private final ReceiverProvider receiverProvider;
     private final CancelRequestQueueName cancelRequestQueueName;
@@ -91,13 +94,15 @@ public class RabbitMQWorkQueue implements WorkQueue {
 
     public RabbitMQWorkQueue(TaskManagerWorker worker, Sender sender,
                              ReceiverProvider receiverProvider, JsonTaskSerializer taskSerializer,
-                             RabbitMQWorkQueueConfiguration configuration, CancelRequestQueueName cancelRequestQueueName) {
+                             RabbitMQWorkQueueConfiguration configuration, CancelRequestQueueName cancelRequestQueueName,
+                             RabbitMQConfiguration rabbitMQConfiguration) {
         this.cancelRequestQueueName = cancelRequestQueueName;
         this.worker = worker;
         this.receiverProvider = receiverProvider;
         this.sender = sender;
         this.taskSerializer = taskSerializer;
         this.configuration = configuration;
+        this.rabbitMQConfiguration = rabbitMQConfiguration;
     }
 
     @Override
@@ -193,11 +198,17 @@ public class RabbitMQWorkQueue implements WorkQueue {
 
     private void listenToCancelRequests() {
         sender.declareExchange(ExchangeSpecification.exchange(CANCEL_REQUESTS_EXCHANGE_NAME)).block();
-        sender.declare(QueueSpecification.queue(cancelRequestQueueName.asString()).durable(!DURABLE).autoDelete(AUTO_DELETE)).block();
+        QueueArguments.Builder builder = QueueArguments.builder();
+        rabbitMQConfiguration.getQueueTTL().ifPresent(builder::queueTTL);
+        QueueSpecification specification = QueueSpecification.queue(cancelRequestQueueName.asString())
+            .durable(!DURABLE)
+            .autoDelete(AUTO_DELETE)
+            .arguments(builder.build());
+        sender.declare(specification).block();
         sender.bind(BindingSpecification.binding(CANCEL_REQUESTS_EXCHANGE_NAME, CANCEL_REQUESTS_ROUTING_KEY, cancelRequestQueueName.asString())).block();
         registerCancelRequestsListener(cancelRequestQueueName.asString());
 
-        sendCancelRequestsQueue = Sinks.many().unicast().onBackpressureBuffer();
+        sendCancelRequestsQueue = Sinks.many().multicast().onBackpressureBuffer();
         sendCancelRequestsQueueHandle = sender
             .send(sendCancelRequestsQueue.asFlux().map(this::makeCancelRequestMessage))
             .subscribeOn(Schedulers.boundedElastic())
diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueReconnectionHandler.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueReconnectionHandler.java
index 95633fb89a..6ad3f1def9 100644
--- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueReconnectionHandler.java
+++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueReconnectionHandler.java
@@ -26,13 +26,14 @@ import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE;
 
 import javax.inject.Inject;
 
+import org.apache.james.backends.rabbitmq.QueueArguments;
+import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
 import org.apache.james.backends.rabbitmq.SimpleConnectionPool;
 import org.apache.james.task.eventsourcing.EventSourcingTaskManager;
 import org.reactivestreams.Publisher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.ImmutableMap;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 
@@ -42,11 +43,13 @@ public class RabbitMQWorkQueueReconnectionHandler implements SimpleConnectionPoo
     private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQWorkQueueReconnectionHandler.class);
     private final CancelRequestQueueName cancelRequestQueueName;
     private final EventSourcingTaskManager taskManager;
+    private final RabbitMQConfiguration configuration;
 
     @Inject
-    public RabbitMQWorkQueueReconnectionHandler(CancelRequestQueueName cancelRequestQueueName, EventSourcingTaskManager taskManager) {
+    public RabbitMQWorkQueueReconnectionHandler(CancelRequestQueueName cancelRequestQueueName, EventSourcingTaskManager taskManager, RabbitMQConfiguration configuration) {
         this.cancelRequestQueueName = cancelRequestQueueName;
         this.taskManager = taskManager;
+        this.configuration = configuration;
     }
 
     @Override
@@ -57,7 +60,9 @@ public class RabbitMQWorkQueueReconnectionHandler implements SimpleConnectionPoo
 
     private void createCancelQueue(Connection connection) {
         try (Channel channel = connection.createChannel()) {
-            channel.queueDeclare(cancelRequestQueueName.asString(), !DURABLE, !EXCLUSIVE, AUTO_DELETE, ImmutableMap.of());
+            QueueArguments.Builder builder = QueueArguments.builder();
+            configuration.getQueueTTL().ifPresent(builder::queueTTL);
+            channel.queueDeclare(cancelRequestQueueName.asString(), !DURABLE, !EXCLUSIVE, !AUTO_DELETE, builder.build());
         } catch (Exception e) {
             LOGGER.error("Error recovering connection", e);
         }
diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/TerminationReconnectionHandler.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/TerminationReconnectionHandler.java
index 224d2a7f68..f1fc733057 100644
--- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/TerminationReconnectionHandler.java
+++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/TerminationReconnectionHandler.java
@@ -26,12 +26,13 @@ import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE;
 
 import javax.inject.Inject;
 
+import org.apache.james.backends.rabbitmq.QueueArguments;
+import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
 import org.apache.james.backends.rabbitmq.SimpleConnectionPool;
 import org.reactivestreams.Publisher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.ImmutableMap;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 
@@ -42,11 +43,13 @@ public class TerminationReconnectionHandler implements SimpleConnectionPool.Reco
 
     private final TerminationQueueName queueName;
     private final RabbitMQTerminationSubscriber terminationSubscriber;
+    private final RabbitMQConfiguration configuration;
 
     @Inject
-    public TerminationReconnectionHandler(TerminationQueueName queueName, RabbitMQTerminationSubscriber terminationSubscriber) {
+    public TerminationReconnectionHandler(TerminationQueueName queueName, RabbitMQTerminationSubscriber terminationSubscriber, RabbitMQConfiguration configuration) {
         this.queueName = queueName;
         this.terminationSubscriber = terminationSubscriber;
+        this.configuration = configuration;
     }
 
     @Override
@@ -57,7 +60,9 @@ public class TerminationReconnectionHandler implements SimpleConnectionPool.Reco
 
     private void createTerminationQueue(Connection connection) {
         try (Channel channel = connection.createChannel()) {
-            channel.queueDeclare(queueName.asString(), !DURABLE, !EXCLUSIVE, AUTO_DELETE, ImmutableMap.of());
+            QueueArguments.Builder builder = QueueArguments.builder();
+            configuration.getQueueTTL().ifPresent(builder::queueTTL);
+            channel.queueDeclare(queueName.asString(), !DURABLE, !EXCLUSIVE, !AUTO_DELETE, builder.build());
         } catch (Exception e) {
             LOGGER.error("Error recovering connection", e);
         }
diff --git a/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala b/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala
index 92dd8a4634..7c5d542295 100644
--- a/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala
+++ b/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala
@@ -22,7 +22,7 @@ import java.time.Duration
 
 import com.google.common.annotations.VisibleForTesting
 import javax.inject.Inject
-import org.apache.james.backends.rabbitmq.ReceiverProvider
+import org.apache.james.backends.rabbitmq.{RabbitMQConfiguration, ReceiverProvider}
 import org.apache.james.eventsourcing.EventSourcingSystem
 import org.apache.james.server.task.json.JsonTaskSerializer
 import org.apache.james.task.SerialTaskManagerWorker
@@ -33,7 +33,8 @@ class RabbitMQWorkQueueSupplier @Inject()(private val sender: Sender,
                                           private val receiverProvider: ReceiverProvider,
                                           private val jsonTaskSerializer: JsonTaskSerializer,
                                           private val cancelRequestName: CancelRequestQueueName,
-                                          private val configuration: RabbitMQWorkQueueConfiguration) extends WorkQueueSupplier {
+                                          private val configuration: RabbitMQWorkQueueConfiguration,
+                                          private val rabbitMQConfiguration: RabbitMQConfiguration) extends WorkQueueSupplier {
 
   val DEFAULT_ADDITIONAL_INFORMATION_POLLING_INTERVAL =  Duration.ofSeconds(30)
   override def apply(eventSourcingSystem: EventSourcingSystem): RabbitMQWorkQueue = {
@@ -44,7 +45,7 @@ class RabbitMQWorkQueueSupplier @Inject()(private val sender: Sender,
   def apply(eventSourcingSystem: EventSourcingSystem, additionalInformationPollingInterval: Duration): RabbitMQWorkQueue = {
     val listener = WorkerStatusListener(eventSourcingSystem)
     val worker = new SerialTaskManagerWorker(listener, additionalInformationPollingInterval)
-    val rabbitMQWorkQueue = new RabbitMQWorkQueue(worker, sender, receiverProvider, jsonTaskSerializer, configuration, cancelRequestName)
+    val rabbitMQWorkQueue = new RabbitMQWorkQueue(worker, sender, receiverProvider, jsonTaskSerializer, configuration, cancelRequestName, rabbitMQConfiguration)
     rabbitMQWorkQueue
   }
 }
diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
index 3d99530eb6..f7ac3131b5 100644
--- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
+++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
@@ -31,6 +31,7 @@ import static org.awaitility.Awaitility.await;
 import static org.awaitility.Durations.FIVE_SECONDS;
 import static org.awaitility.Durations.ONE_SECOND;
 
+import java.net.URISyntaxException;
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.util.ArrayList;
@@ -118,9 +119,9 @@ class DistributedTaskManagerTest implements TaskManagerContract {
         private final List<RabbitMQWorkQueue> workQueues;
         private final RabbitMQWorkQueueSupplier supplier;
 
-        TrackedRabbitMQWorkQueueSupplier(Sender sender, ReceiverProvider receiverProvider, JsonTaskSerializer taskSerializer) {
+        TrackedRabbitMQWorkQueueSupplier(Sender sender, ReceiverProvider receiverProvider, JsonTaskSerializer taskSerializer) throws Exception {
             workQueues = new ArrayList<>();
-            supplier = new RabbitMQWorkQueueSupplier(sender, receiverProvider, taskSerializer, CancelRequestQueueName.generate(), RabbitMQWorkQueueConfiguration$.MODULE$.enabled());
+            supplier = new RabbitMQWorkQueueSupplier(sender, receiverProvider, taskSerializer, CancelRequestQueueName.generate(), RabbitMQWorkQueueConfiguration$.MODULE$.enabled(), rabbitMQExtension.getRabbitMQ().getConfiguration());
         }
 
         @Override
@@ -198,7 +199,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
     JsonEventSerializer eventSerializer;
 
     @BeforeEach
-    void setUp(EventStore eventStore) {
+    void setUp(EventStore eventStore) throws Exception {
         memoryReferenceTaskStore = new MemoryReferenceTaskStore();
         memoryReferenceWithCounterTaskStore = new MemoryReferenceWithCounterTaskStore();
         CassandraCluster cassandra = CASSANDRA_CLUSTER.getCassandraCluster();
@@ -223,21 +224,21 @@ class DistributedTaskManagerTest implements TaskManagerContract {
             .forEach(queue -> managementAPI.deleteQueue("/", queue.getName()));
     }
 
-    public EventSourcingTaskManager taskManager() {
+    public EventSourcingTaskManager taskManager() throws Exception {
         return taskManager(HOSTNAME);
     }
 
-    EventSourcingTaskManager taskManager(Hostname hostname) {
+    EventSourcingTaskManager taskManager(Hostname hostname) throws Exception {
         RabbitMQTerminationSubscriber terminationSubscriber = new RabbitMQTerminationSubscriber(TerminationQueueName.generate(), rabbitMQExtension.getSender(),
             rabbitMQExtension.getReceiverProvider(),
-            eventSerializer);
+            eventSerializer, rabbitMQExtension.getRabbitMQ().getConfiguration());
         terminationSubscribers.add(terminationSubscriber);
         terminationSubscriber.start();
         return new EventSourcingTaskManager(workQueueSupplier, eventStore, executionDetailsProjection, hostname, terminationSubscriber);
     }
 
     @Test
-    void badPayloadShouldNotAffectTaskManagerOnCancelTask() throws TaskManager.ReachedTimeoutException {
+    void badPayloadShouldNotAffectTaskManagerOnCancelTask() throws Exception {
         TaskManager taskManager = taskManager(HOSTNAME);
         TaskId id = taskManager.submit(new MemoryReferenceTask(() -> {
             Thread.sleep(250);
@@ -255,7 +256,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
     }
 
     @Test
-    void badPayloadsShouldNotAffectTaskManagerOnCancelTask() throws TaskManager.ReachedTimeoutException {
+    void badPayloadsShouldNotAffectTaskManagerOnCancelTask() throws Exception {
         TaskManager taskManager = taskManager(HOSTNAME);
         TaskId id = taskManager.submit(new MemoryReferenceTask(() -> {
             Thread.sleep(250);
@@ -274,7 +275,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
     }
 
     @Test
-    void badPayloadShouldNotAffectTaskManagerOnCompleteTask() throws TaskManager.ReachedTimeoutException {
+    void badPayloadShouldNotAffectTaskManagerOnCompleteTask() throws Exception {
         TaskManager taskManager = taskManager(HOSTNAME);
         TaskId id = taskManager.submit(new MemoryReferenceTask(() -> {
             Thread.sleep(250);
@@ -291,7 +292,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
     }
 
     @Test
-    void badPayloadsShouldNotAffectTaskManagerOnCompleteTask() throws TaskManager.ReachedTimeoutException {
+    void badPayloadsShouldNotAffectTaskManagerOnCompleteTask() throws Exception {
         TaskManager taskManager = taskManager(HOSTNAME);
         TaskId id = taskManager.submit(new MemoryReferenceTask(() -> {
             Thread.sleep(250);
@@ -309,7 +310,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
     }
 
     @Test
-    void givenOneEventStoreTwoEventTaskManagersShareTheSameEvents() {
+    void givenOneEventStoreTwoEventTaskManagersShareTheSameEvents() throws Exception {
         try (EventSourcingTaskManager taskManager1 = taskManager();
              EventSourcingTaskManager taskManager2 = taskManager(HOSTNAME_2)) {
             TaskId taskId = taskManager1.submit(new CompletedTask());
@@ -325,7 +326,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
     }
 
     @Test
-    void givenTwoTaskManagersAndTwoTaskOnlyOneTaskShouldRunAtTheSameTime() {
+    void givenTwoTaskManagersAndTwoTaskOnlyOneTaskShouldRunAtTheSameTime() throws Exception {
         CountDownLatch waitingForFirstTaskLatch = new CountDownLatch(1);
 
         try (EventSourcingTaskManager taskManager1 = taskManager();
@@ -348,7 +349,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
     }
 
     @Test
-    void givenTwoTaskManagerATaskSubmittedOnOneCouldBeRunOnTheOther() throws InterruptedException {
+    void givenTwoTaskManagerATaskSubmittedOnOneCouldBeRunOnTheOther() throws Exception {
         try (EventSourcingTaskManager taskManager1 = taskManager()) {
             Thread.sleep(100);
             try (EventSourcingTaskManager taskManager2 = taskManager(HOSTNAME_2)) {
@@ -368,7 +369,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
     }
 
     @Test
-    void givenTwoTaskManagerATaskRunningOnOneShouldBeCancellableFromTheOtherOne(CountDownLatch countDownLatch) {
+    void givenTwoTaskManagerATaskRunningOnOneShouldBeCancellableFromTheOtherOne(CountDownLatch countDownLatch) throws Exception {
         TaskManager taskManager1 = taskManager(HOSTNAME);
         TaskManager taskManager2 = taskManager(HOSTNAME_2);
         TaskId id = taskManager1.submit(new MemoryReferenceTask(() -> {
@@ -397,7 +398,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
     }
 
     @Test
-    void givenTwoTaskManagersATaskRunningOnOneShouldBeWaitableFromTheOtherOne() throws TaskManager.ReachedTimeoutException {
+    void givenTwoTaskManagersATaskRunningOnOneShouldBeWaitableFromTheOtherOne() throws Exception {
         TaskManager taskManager1 = taskManager(HOSTNAME);
         TaskManager taskManager2 = taskManager(HOSTNAME_2);
         TaskId id = taskManager1.submit(new MemoryReferenceTask(() -> {
@@ -424,7 +425,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
     }
 
     @Test
-    void givenTwoTaskManagerAndATaskRanPerTaskManagerListingThemOnEachShouldShowBothTasks() {
+    void givenTwoTaskManagerAndATaskRanPerTaskManagerListingThemOnEachShouldShowBothTasks() throws Exception {
         try (EventSourcingTaskManager taskManager1 = taskManager();
              EventSourcingTaskManager taskManager2 = taskManager(HOSTNAME_2)) {
 
@@ -449,7 +450,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
     }
 
     @Test
-    void givenTwoTaskManagerIfTheFirstOneIsDownTheSecondOneShouldBeAbleToRunTheRemainingTasks(CountDownLatch countDownLatch) {
+    void givenTwoTaskManagerIfTheFirstOneIsDownTheSecondOneShouldBeAbleToRunTheRemainingTasks(CountDownLatch countDownLatch) throws Exception {
         try (EventSourcingTaskManager taskManager1 = taskManager();
              EventSourcingTaskManager taskManager2 = taskManager(HOSTNAME_2)) {
             ImmutableBiMap<EventSourcingTaskManager, Hostname> hostNameByTaskManager = ImmutableBiMap.of(taskManager1, HOSTNAME, taskManager2, HOSTNAME_2);
@@ -477,7 +478,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
     }
 
     @Test
-    void shouldNotCrashWhenBadMessage() {
+    void shouldNotCrashWhenBadMessage() throws Exception {
         TaskManager taskManager = taskManager(HOSTNAME);
 
         taskManager.submit(new FailsDeserializationTask());
@@ -488,7 +489,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
     }
 
     @Test
-    void shouldNotCrashWhenBadMessages() {
+    void shouldNotCrashWhenBadMessages() throws Exception {
         TaskManager taskManager = taskManager(HOSTNAME);
 
         IntStream.range(0, 100).forEach(i -> taskManager.submit(new FailsDeserializationTask()));
@@ -586,7 +587,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
     }
 
     @Test
-    void cassandraTasksShouldBeCancealable(CassandraCluster cassandra) {
+    void cassandraTasksShouldBeCancealable(CassandraCluster cassandra) throws Exception {
         TaskManager taskManager = taskManager(HOSTNAME);
 
         TaskId taskId = taskManager.submit(new CassandraExecutingTask(cassandra.getConf(), true));
@@ -599,7 +600,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
     }
 
     @Test
-    void inProgressTaskShouldBeCanceledWhenCloseTaskManager() {
+    void inProgressTaskShouldBeCanceledWhenCloseTaskManager() throws Exception {
         try (EventSourcingTaskManager taskManager = taskManager()) {
             TaskId taskId = taskManager.submit(new MemoryReferenceTask(() -> {
                 TimeUnit.SECONDS.sleep(5);
diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java
index 2f53672adb..f1a868f97f 100644
--- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java
+++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java
@@ -65,15 +65,15 @@ class RabbitMQTerminationSubscriberTest implements TerminationSubscriberContract
         .isolationPolicy(WEAK);
 
     @Override
-    public TerminationSubscriber subscriber() {
+    public TerminationSubscriber subscriber() throws Exception {
         RabbitMQTerminationSubscriber subscriber = new RabbitMQTerminationSubscriber(TerminationQueueName.generate(), rabbitMQExtension.getSender(),
-            rabbitMQExtension.getReceiverProvider(), SERIALIZER);
+            rabbitMQExtension.getReceiverProvider(), SERIALIZER, rabbitMQExtension.getRabbitMQ().getConfiguration());
         subscriber.start();
         return subscriber;
     }
 
     @Test
-    void givenTwoTerminationSubscribersWhenAnEventIsSentItShouldBeReceivedByBoth() {
+    void givenTwoTerminationSubscribersWhenAnEventIsSentItShouldBeReceivedByBoth() throws Exception {
         TerminationSubscriber subscriber1 = subscriber();
         TerminationSubscriber subscriber2 = subscriber();
 
@@ -94,7 +94,7 @@ class RabbitMQTerminationSubscriberTest implements TerminationSubscriberContract
     }
 
     @Test
-    void eventProcessingShouldNotCrashOnInvalidMessage() {
+    void eventProcessingShouldNotCrashOnInvalidMessage() throws Exception {
         TerminationSubscriber subscriber1 = subscriber();
         Flux<Event> firstListener = Flux.from(subscriber1.listenEvents());
 
@@ -113,7 +113,7 @@ class RabbitMQTerminationSubscriberTest implements TerminationSubscriberContract
     }
 
     @Test
-    void eventProcessingShouldNotCrashOnInvalidMessages() {
+    void eventProcessingShouldNotCrashOnInvalidMessages() throws Exception {
         TerminationSubscriber subscriber1 = subscriber();
         Flux<Event> firstListener = Flux.from(subscriber1.listenEvents());
 
diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueuePersistenceTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueuePersistenceTest.java
index 6a964af193..81b56e5bdb 100644
--- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueuePersistenceTest.java
+++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueuePersistenceTest.java
@@ -50,10 +50,10 @@ class RabbitMQWorkQueuePersistenceTest {
     private JsonTaskSerializer serializer;
 
     @BeforeEach
-    void setUp() {
+    void setUp() throws Exception {
         worker = spy(new ImmediateWorker());
         serializer = JsonTaskSerializer.of(TestTaskDTOModules.COMPLETED_TASK_MODULE, TestTaskDTOModules.MEMORY_REFERENCE_TASK_MODULE.apply(new MemoryReferenceTaskStore()));
-        testee = new RabbitMQWorkQueue(worker, rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), serializer, RabbitMQWorkQueueConfiguration$.MODULE$.enabled(), CancelRequestQueueName.generate());
+        testee = new RabbitMQWorkQueue(worker, rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), serializer, RabbitMQWorkQueueConfiguration$.MODULE$.enabled(), CancelRequestQueueName.generate(), rabbitMQExtension.getRabbitMQ().getConfiguration());
         //declare the queue but do not start consuming from it
         testee.declareQueue();
     }
@@ -90,9 +90,9 @@ class RabbitMQWorkQueuePersistenceTest {
         assertThat(worker.results).containsExactly(Task.Result.COMPLETED);
     }
 
-    private void startNewConsumingWorkqueue() {
+    private void startNewConsumingWorkqueue() throws Exception {
         worker = spy(new ImmediateWorker());
-        testee = new RabbitMQWorkQueue(worker, rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), serializer, RabbitMQWorkQueueConfiguration$.MODULE$.enabled(), CancelRequestQueueName.generate());
+        testee = new RabbitMQWorkQueue(worker, rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), serializer, RabbitMQWorkQueueConfiguration$.MODULE$.enabled(), CancelRequestQueueName.generate(), rabbitMQExtension.getRabbitMQ().getConfiguration());
         testee.start();
     }
 }
diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java
index 8f83aa5a95..45265689b0 100644
--- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java
+++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java
@@ -66,10 +66,10 @@ class RabbitMQWorkQueueTest {
 
 
     @BeforeEach
-    void setUp() {
+    void setUp() throws Exception {
         worker = new ImmediateWorker();
         serializer = JsonTaskSerializer.of(TestTaskDTOModules.COMPLETED_TASK_MODULE, TestTaskDTOModules.MEMORY_REFERENCE_TASK_MODULE.apply(new MemoryReferenceTaskStore()));
-        testee = new RabbitMQWorkQueue(worker, rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), serializer, RabbitMQWorkQueueConfiguration$.MODULE$.enabled(), CancelRequestQueueName.generate());
+        testee = new RabbitMQWorkQueue(worker, rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), serializer, RabbitMQWorkQueueConfiguration$.MODULE$.enabled(), CancelRequestQueueName.generate(), rabbitMQExtension.getRabbitMQ().getConfiguration());
         testee.start();
     }
 
@@ -96,11 +96,11 @@ class RabbitMQWorkQueueTest {
     }
 
     @Test
-    void givenTwoWorkQueuesOnlyTheFirstOneIsConsumingTasks() {
+    void givenTwoWorkQueuesOnlyTheFirstOneIsConsumingTasks() throws Exception {
         testee.submit(TASK_WITH_ID);
 
         ImmediateWorker otherTaskManagerWorker = new ImmediateWorker();
-        try (RabbitMQWorkQueue otherWorkQueue = new RabbitMQWorkQueue(otherTaskManagerWorker, rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), serializer, RabbitMQWorkQueueConfiguration$.MODULE$.enabled(), CancelRequestQueueName.generate())) {
+        try (RabbitMQWorkQueue otherWorkQueue = new RabbitMQWorkQueue(otherTaskManagerWorker, rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), serializer, RabbitMQWorkQueueConfiguration$.MODULE$.enabled(), CancelRequestQueueName.generate(), rabbitMQExtension.getRabbitMQ().getConfiguration())) {
             otherWorkQueue.start();
 
             IntStream.range(0, 9)
@@ -112,7 +112,7 @@ class RabbitMQWorkQueueTest {
     }
 
     @Test
-    void givenANonDeserializableTaskItShouldBeFlaggedAsFailedAndItDoesNotPreventFollowingTasks() throws InterruptedException {
+    void givenANonDeserializableTaskItShouldBeFlaggedAsFailedAndItDoesNotPreventFollowingTasks() throws Exception {
         Task task = new TestTask(42);
         TaskId taskId = TaskId.fromString("4bf6d081-aa30-11e9-bf6c-2d3b9e84aafd");
         TaskWithId taskWithId = new TaskWithId(taskId, task);
@@ -120,7 +120,7 @@ class RabbitMQWorkQueueTest {
         ImmediateWorker otherTaskManagerWorker = new ImmediateWorker();
         JsonTaskSerializer otherTaskSerializer = JsonTaskSerializer.of(TestTaskDTOModules.TEST_TYPE);
         try (RabbitMQWorkQueue otherWorkQueue = new RabbitMQWorkQueue(otherTaskManagerWorker, rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), otherTaskSerializer,
-            RabbitMQWorkQueueConfiguration$.MODULE$.enabled(), CancelRequestQueueName.generate())) {
+            RabbitMQWorkQueueConfiguration$.MODULE$.enabled(), CancelRequestQueueName.generate(), rabbitMQExtension.getRabbitMQ().getConfiguration())) {
             //wait to be sur that the first workqueue has subscribed as an exclusive consumer of the RabbitMQ queue.
             Thread.sleep(200);
             otherWorkQueue.start();
diff --git a/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TerminationSubscriberContract.java b/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TerminationSubscriberContract.java
index 63d9625c8e..1a2c801292 100644
--- a/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TerminationSubscriberContract.java
+++ b/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TerminationSubscriberContract.java
@@ -53,10 +53,10 @@ public interface TerminationSubscriberContract {
     Duration DELAY_BEFORE_PUBLISHING = Duration.ofMillis(50);
     ExecutorService EXECUTOR = Executors.newCachedThreadPool();
 
-    TerminationSubscriber subscriber();
+    TerminationSubscriber subscriber() throws Exception;
 
     @Test
-    default void handlingCompletedShouldBeListed() {
+    default void handlingCompletedShouldBeListed() throws Exception {
         TerminationSubscriber subscriber = subscriber();
 
         sendEvents(subscriber, COMPLETED_EVENT);
@@ -65,7 +65,7 @@ public interface TerminationSubscriberContract {
     }
 
     @Test
-    default void handlingFailedShouldBeListed() {
+    default void handlingFailedShouldBeListed() throws Exception {
         TerminationSubscriber subscriber = subscriber();
 
         sendEvents(subscriber, FAILED_EVENT);
@@ -74,7 +74,7 @@ public interface TerminationSubscriberContract {
     }
 
     @Test
-    default void handlingCancelledShouldBeListed() {
+    default void handlingCancelledShouldBeListed() throws Exception {
         TerminationSubscriber subscriber = subscriber();
 
         sendEvents(subscriber, CANCELLED_EVENT);
@@ -83,7 +83,7 @@ public interface TerminationSubscriberContract {
     }
 
     @Test
-    default void handlingNonTerminalEventShouldNotBeListed() {
+    default void handlingNonTerminalEventShouldNotBeListed() throws Exception {
         TerminationSubscriber subscriber = subscriber();
         TaskEvent event = new Started(new TaskAggregateId(TaskId.generateTaskId()), EventId.fromSerialized(42), new Hostname("foo"));
 
@@ -93,7 +93,7 @@ public interface TerminationSubscriberContract {
     }
 
     @Test
-    default void handlingMultipleEventsShouldBeListed() {
+    default void handlingMultipleEventsShouldBeListed() throws Exception {
         TerminationSubscriber subscriber = subscriber();
 
         sendEvents(subscriber, COMPLETED_EVENT, FAILED_EVENT, CANCELLED_EVENT);
@@ -102,7 +102,7 @@ public interface TerminationSubscriberContract {
     }
 
     @Test
-    default void multipleListeningEventsShouldShareEvents() {
+    default void multipleListeningEventsShouldShareEvents() throws Exception {
         TerminationSubscriber subscriber = subscriber();
 
         Flux<Event> firstListener = Flux.from(subscriber.listenEvents());
@@ -122,7 +122,7 @@ public interface TerminationSubscriberContract {
     }
 
     @Test
-    default void dynamicListeningEventsShouldGetOnlyNewEvents() {
+    default void dynamicListeningEventsShouldGetOnlyNewEvents() throws Exception {
         TerminationSubscriber subscriber = subscriber();
 
         sendEvents(subscriber, COMPLETED_EVENT, FAILED_EVENT, CANCELLED_EVENT);


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org