You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2023/01/31 00:13:08 UTC
[james-project] 02/02: JAMES-3694 Apply queue expiracy for the task manager
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 0e376b9ca76f53548447fd4fa246bb20eb3fcb30
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Jan 27 14:06:56 2023 +0700
JAMES-3694 Apply queue expiracy for the task manager
Safer than auto-deletes
---
.../org/apache/james/task/TaskManagerContract.java | 52 +++++++++++-----------
.../distributed/RabbitMQTerminationSubscriber.java | 10 ++++-
.../distributed/RabbitMQWorkQueue.java | 17 +++++--
.../RabbitMQWorkQueueReconnectionHandler.java | 11 +++--
.../TerminationReconnectionHandler.java | 11 +++--
.../distributed/RabbitMQWorkQueueSupplier.scala | 7 +--
.../distributed/DistributedTaskManagerTest.java | 43 +++++++++---------
.../RabbitMQTerminationSubscriberTest.java | 10 ++---
.../RabbitMQWorkQueuePersistenceTest.java | 8 ++--
.../distributed/RabbitMQWorkQueueTest.java | 12 ++---
.../TerminationSubscriberContract.java | 16 +++----
11 files changed, 113 insertions(+), 84 deletions(-)
diff --git a/server/task/task-api/src/test/java/org/apache/james/task/TaskManagerContract.java b/server/task/task-api/src/test/java/org/apache/james/task/TaskManagerContract.java
index 3e5b4a18ad..2a092c735a 100644
--- a/server/task/task-api/src/test/java/org/apache/james/task/TaskManagerContract.java
+++ b/server/task/task-api/src/test/java/org/apache/james/task/TaskManagerContract.java
@@ -47,10 +47,10 @@ public interface TaskManagerContract {
ConditionFactory awaitAtMostTwoSeconds = calmlyAwait.atMost(Duration.ofSeconds(2));
java.time.Duration TIMEOUT = java.time.Duration.ofMinutes(15);
- TaskManager taskManager();
+ TaskManager taskManager() throws Exception;
@Test
- default void submitShouldReturnATaskId() {
+ default void submitShouldReturnATaskId() throws Exception {
TaskId taskId = taskManager().submit(new CompletedTask());
assertThat(taskId).isNotNull();
}
@@ -63,7 +63,7 @@ public interface TaskManagerContract {
}
@Test
- default void getStatusShouldReturnWaitingWhenNotYetProcessed(CountDownLatch waitingForResultLatch) {
+ default void getStatusShouldReturnWaitingWhenNotYetProcessed(CountDownLatch waitingForResultLatch) throws Exception {
TaskManager taskManager = taskManager();
taskManager.submit(new MemoryReferenceTask(() -> {
waitingForResultLatch.await();
@@ -77,7 +77,7 @@ public interface TaskManagerContract {
}
@Test
- default void taskCodeAfterCancelIsNotRun(CountDownLatch waitingForResultLatch) throws InterruptedException {
+ default void taskCodeAfterCancelIsNotRun(CountDownLatch waitingForResultLatch) throws Exception {
TaskManager taskManager = taskManager();
CountDownLatch waitForTaskToBeLaunched = new CountDownLatch(1);
AtomicInteger count = new AtomicInteger(0);
@@ -98,7 +98,7 @@ public interface TaskManagerContract {
}
@Test
- default void completedTaskShouldNotBeCancelled() {
+ default void completedTaskShouldNotBeCancelled() throws Exception {
TaskManager taskManager = taskManager();
TaskId id = taskManager.submit(new CompletedTask());
@@ -114,7 +114,7 @@ public interface TaskManagerContract {
}
@Test
- default void failedTaskShouldNotBeCancelled() {
+ default void failedTaskShouldNotBeCancelled() throws Exception {
TaskManager taskManager = taskManager();
TaskId id = taskManager.submit(new FailedTask());
@@ -130,7 +130,7 @@ public interface TaskManagerContract {
}
@Test
- default void getStatusShouldBeCancelledWhenCancelled(CountDownLatch countDownLatch) {
+ default void getStatusShouldBeCancelledWhenCancelled(CountDownLatch countDownLatch) throws Exception {
TaskManager taskManager = taskManager();
TaskId id = taskManager.submit(new MemoryReferenceTask(() -> {
countDownLatch.await();
@@ -152,7 +152,7 @@ public interface TaskManagerContract {
}
@Test
- default void aWaitingTaskShouldBeCancelled(CountDownLatch countDownLatch) {
+ default void aWaitingTaskShouldBeCancelled(CountDownLatch countDownLatch) throws Exception {
TaskManager taskManager = taskManager();
TaskId id = taskManager.submit(new MemoryReferenceTask(() -> {
countDownLatch.await();
@@ -176,7 +176,7 @@ public interface TaskManagerContract {
}
@Test
- default void cancelShouldBeIdempotent(CountDownLatch waitingForResultLatch) {
+ default void cancelShouldBeIdempotent(CountDownLatch waitingForResultLatch) throws Exception {
TaskManager taskManager = taskManager();
TaskId id = taskManager.submit(new MemoryReferenceTask(() -> {
waitingForResultLatch.await();
@@ -189,7 +189,7 @@ public interface TaskManagerContract {
}
@Test
- default void getStatusShouldReturnInProgressWhenProcessingIsInProgress(CountDownLatch waitingForResultLatch) {
+ default void getStatusShouldReturnInProgressWhenProcessingIsInProgress(CountDownLatch waitingForResultLatch) throws Exception {
TaskManager taskManager = taskManager();
TaskId taskId = taskManager.submit(new MemoryReferenceTask(() -> {
waitingForResultLatch.await();
@@ -201,7 +201,7 @@ public interface TaskManagerContract {
}
@Test
- default void getStatusShouldReturnCompletedWhenRunSuccessfully() {
+ default void getStatusShouldReturnCompletedWhenRunSuccessfully() throws Exception {
TaskManager taskManager = taskManager();
TaskId taskId = taskManager.submit(
new CompletedTask());
@@ -212,7 +212,7 @@ public interface TaskManagerContract {
}
@Test
- default void additionalInformationShouldBeUpdatedWhenRunSuccessfully() {
+ default void additionalInformationShouldBeUpdatedWhenRunSuccessfully() throws Exception {
TaskManager taskManager = taskManager();
TaskId taskId = taskManager.submit(new MemoryReferenceWithCounterTask(counter -> {
counter.incrementAndGet();
@@ -230,7 +230,7 @@ public interface TaskManagerContract {
}
@Test
- default void additionalInformationShouldBeUpdatedWhenFailed() {
+ default void additionalInformationShouldBeUpdatedWhenFailed() throws Exception {
TaskManager taskManager = taskManager();
TaskId taskId = taskManager.submit(new MemoryReferenceWithCounterTask(counter -> {
counter.incrementAndGet();
@@ -248,7 +248,7 @@ public interface TaskManagerContract {
}
@Test
- default void additionalInformationShouldBeUpdatedWhenCancelled(CountDownLatch countDownLatch) {
+ default void additionalInformationShouldBeUpdatedWhenCancelled(CountDownLatch countDownLatch) throws Exception {
TaskManager taskManager = taskManager();
TaskId id = taskManager.submit(new MemoryReferenceWithCounterTask((counter) -> {
counter.incrementAndGet();
@@ -273,7 +273,7 @@ public interface TaskManagerContract {
}
@Test
- default void additionalInformationShouldBeUpdatedDuringExecution(CountDownLatch countDownLatch) {
+ default void additionalInformationShouldBeUpdatedDuringExecution(CountDownLatch countDownLatch) throws Exception {
TaskManager taskManager = taskManager();
TaskId id = taskManager.submit(new MemoryReferenceWithCounterTask((counter) -> {
counter.incrementAndGet();
@@ -288,7 +288,7 @@ public interface TaskManagerContract {
}
@Test
- default void additionalInformationShouldBeAvailableOnAnyTaskManagerDuringExecution(CountDownLatch countDownLatch) {
+ default void additionalInformationShouldBeAvailableOnAnyTaskManagerDuringExecution(CountDownLatch countDownLatch) throws Exception {
TaskManager taskManager = taskManager();
TaskManager otherTaskManager = taskManager();
TaskId id = taskManager.submit(new MemoryReferenceWithCounterTask((counter) -> {
@@ -312,7 +312,7 @@ public interface TaskManagerContract {
}
@Test
- default void getStatusShouldReturnFailedWhenRunPartially() {
+ default void getStatusShouldReturnFailedWhenRunPartially() throws Exception {
TaskManager taskManager = taskManager();
TaskId taskId = taskManager.submit(
new FailedTask());
@@ -493,22 +493,22 @@ public interface TaskManagerContract {
}
@Test
- default void listShouldBeEmptyWhenNoTasks() {
+ default void listShouldBeEmptyWhenNoTasks() throws Exception {
assertThat(taskManager().list()).isEmpty();
}
@Test
- default void listCancelledShouldBeEmptyWhenNoTasks() {
+ default void listCancelledShouldBeEmptyWhenNoTasks() throws Exception {
assertThat(taskManager().list(TaskManager.Status.CANCELLED)).isEmpty();
}
@Test
- default void listCancelRequestedShouldBeEmptyWhenNoTasks() {
+ default void listCancelRequestedShouldBeEmptyWhenNoTasks() throws Exception {
assertThat(taskManager().list(TaskManager.Status.CANCEL_REQUESTED)).isEmpty();
}
@Test
- default void awaitShouldNotThrowWhenCompletedTask() throws TaskManager.ReachedTimeoutException {
+ default void awaitShouldNotThrowWhenCompletedTask() throws Exception {
TaskManager taskManager = taskManager();
TaskId taskId = taskManager.submit(new CompletedTask());
taskManager.await(taskId, TIMEOUT);
@@ -516,7 +516,7 @@ public interface TaskManagerContract {
}
@Test
- default void awaitShouldAwaitWaitingTask() throws TaskManager.ReachedTimeoutException, InterruptedException {
+ default void awaitShouldAwaitWaitingTask() throws Exception {
TaskManager taskManager = taskManager();
CountDownLatch latch = new CountDownLatch(1);
taskManager.submit(new MemoryReferenceTask(
@@ -537,7 +537,7 @@ public interface TaskManagerContract {
}
@Test
- default void awaitWithATooShortTimeoutShouldReturnATimeoutAwaitedTaskExecutionDetailsAndThrow() {
+ default void awaitWithATooShortTimeoutShouldReturnATimeoutAwaitedTaskExecutionDetailsAndThrow() throws Exception {
TaskManager taskManager = taskManager();
TaskId taskId = taskManager.submit(new MemoryReferenceTask(
() -> {
@@ -550,7 +550,7 @@ public interface TaskManagerContract {
}
@Test
- default void submittedTaskShouldExecuteSequentially() {
+ default void submittedTaskShouldExecuteSequentially() throws Exception {
TaskManager taskManager = taskManager();
ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
@@ -580,7 +580,7 @@ public interface TaskManagerContract {
}
@Test
- default void awaitShouldReturnFailedWhenExceptionThrown() {
+ default void awaitShouldReturnFailedWhenExceptionThrown() throws Exception {
TaskManager taskManager = taskManager();
TaskId taskId = taskManager.submit(new ThrowingTask());
awaitUntilTaskHasStatus(taskId, TaskManager.Status.FAILED, taskManager);
@@ -589,7 +589,7 @@ public interface TaskManagerContract {
}
@Test
- default void getStatusShouldReturnFailedWhenExceptionThrown() {
+ default void getStatusShouldReturnFailedWhenExceptionThrown() throws Exception {
TaskManager taskManager = taskManager();
TaskId taskId = taskManager.submit(new ThrowingTask());
awaitUntilTaskHasStatus(taskId, TaskManager.Status.FAILED, taskManager);
diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
index 212dce28c5..fdc24021cd 100644
--- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
+++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
@@ -32,6 +32,8 @@ import java.util.Optional;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
+import org.apache.james.backends.rabbitmq.QueueArguments;
+import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
import org.apache.james.backends.rabbitmq.ReceiverProvider;
import org.apache.james.eventsourcing.Event;
import org.apache.james.eventsourcing.eventstore.cassandra.JsonEventSerializer;
@@ -66,22 +68,26 @@ public class RabbitMQTerminationSubscriber implements TerminationSubscriber, Sta
private final JsonEventSerializer serializer;
private final Sender sender;
private final ReceiverProvider receiverProvider;
+ private final RabbitMQConfiguration rabbitMQConfiguration;
private Sinks.Many<OutboundMessage> sendQueue;
private Sinks.Many<Event> listener;
private Disposable sendQueueHandle;
private Disposable listenQueueHandle;
@Inject
- RabbitMQTerminationSubscriber(TerminationQueueName queueName, Sender sender, ReceiverProvider receiverProvider, JsonEventSerializer serializer) {
+ RabbitMQTerminationSubscriber(TerminationQueueName queueName, Sender sender, ReceiverProvider receiverProvider, JsonEventSerializer serializer, RabbitMQConfiguration rabbitMQConfiguration) {
this.queueName = queueName;
this.sender = sender;
this.receiverProvider = receiverProvider;
this.serializer = serializer;
+ this.rabbitMQConfiguration = rabbitMQConfiguration;
}
public void start() {
sender.declareExchange(ExchangeSpecification.exchange(EXCHANGE_NAME)).block();
- sender.declare(QueueSpecification.queue(queueName.asString()).durable(!DURABLE).autoDelete(AUTO_DELETE)).block();
+ QueueArguments.Builder builder = QueueArguments.builder();
+ rabbitMQConfiguration.getQueueTTL().ifPresent(builder::queueTTL);
+ sender.declare(QueueSpecification.queue(queueName.asString()).durable(!DURABLE).autoDelete(!AUTO_DELETE).arguments(builder.build())).block();
sender.bind(BindingSpecification.binding(EXCHANGE_NAME, ROUTING_KEY, queueName.asString())).block();
sendQueue = Sinks.many().unicast().onBackpressureBuffer();
sendQueueHandle = sender
diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
index d49645d0c2..b315e9bec5 100644
--- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
+++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
@@ -32,6 +32,8 @@ import java.time.Duration;
import java.util.Optional;
import org.apache.james.backends.rabbitmq.Constants;
+import org.apache.james.backends.rabbitmq.QueueArguments;
+import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
import org.apache.james.backends.rabbitmq.ReceiverProvider;
import org.apache.james.server.task.json.JsonTaskSerializer;
import org.apache.james.task.Task;
@@ -81,6 +83,7 @@ public class RabbitMQWorkQueue implements WorkQueue {
private final TaskManagerWorker worker;
private final JsonTaskSerializer taskSerializer;
private final RabbitMQWorkQueueConfiguration configuration;
+ private final RabbitMQConfiguration rabbitMQConfiguration;
private final Sender sender;
private final ReceiverProvider receiverProvider;
private final CancelRequestQueueName cancelRequestQueueName;
@@ -91,13 +94,15 @@ public class RabbitMQWorkQueue implements WorkQueue {
public RabbitMQWorkQueue(TaskManagerWorker worker, Sender sender,
ReceiverProvider receiverProvider, JsonTaskSerializer taskSerializer,
- RabbitMQWorkQueueConfiguration configuration, CancelRequestQueueName cancelRequestQueueName) {
+ RabbitMQWorkQueueConfiguration configuration, CancelRequestQueueName cancelRequestQueueName,
+ RabbitMQConfiguration rabbitMQConfiguration) {
this.cancelRequestQueueName = cancelRequestQueueName;
this.worker = worker;
this.receiverProvider = receiverProvider;
this.sender = sender;
this.taskSerializer = taskSerializer;
this.configuration = configuration;
+ this.rabbitMQConfiguration = rabbitMQConfiguration;
}
@Override
@@ -193,11 +198,17 @@ public class RabbitMQWorkQueue implements WorkQueue {
private void listenToCancelRequests() {
sender.declareExchange(ExchangeSpecification.exchange(CANCEL_REQUESTS_EXCHANGE_NAME)).block();
- sender.declare(QueueSpecification.queue(cancelRequestQueueName.asString()).durable(!DURABLE).autoDelete(AUTO_DELETE)).block();
+ QueueArguments.Builder builder = QueueArguments.builder();
+ rabbitMQConfiguration.getQueueTTL().ifPresent(builder::queueTTL);
+ QueueSpecification specification = QueueSpecification.queue(cancelRequestQueueName.asString())
+ .durable(!DURABLE)
+ .autoDelete(AUTO_DELETE)
+ .arguments(builder.build());
+ sender.declare(specification).block();
sender.bind(BindingSpecification.binding(CANCEL_REQUESTS_EXCHANGE_NAME, CANCEL_REQUESTS_ROUTING_KEY, cancelRequestQueueName.asString())).block();
registerCancelRequestsListener(cancelRequestQueueName.asString());
- sendCancelRequestsQueue = Sinks.many().unicast().onBackpressureBuffer();
+ sendCancelRequestsQueue = Sinks.many().multicast().onBackpressureBuffer();
sendCancelRequestsQueueHandle = sender
.send(sendCancelRequestsQueue.asFlux().map(this::makeCancelRequestMessage))
.subscribeOn(Schedulers.boundedElastic())
diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueReconnectionHandler.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueReconnectionHandler.java
index 95633fb89a..6ad3f1def9 100644
--- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueReconnectionHandler.java
+++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueReconnectionHandler.java
@@ -26,13 +26,14 @@ import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE;
import javax.inject.Inject;
+import org.apache.james.backends.rabbitmq.QueueArguments;
+import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
import org.apache.james.backends.rabbitmq.SimpleConnectionPool;
import org.apache.james.task.eventsourcing.EventSourcingTaskManager;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.ImmutableMap;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
@@ -42,11 +43,13 @@ public class RabbitMQWorkQueueReconnectionHandler implements SimpleConnectionPoo
private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQWorkQueueReconnectionHandler.class);
private final CancelRequestQueueName cancelRequestQueueName;
private final EventSourcingTaskManager taskManager;
+ private final RabbitMQConfiguration configuration;
@Inject
- public RabbitMQWorkQueueReconnectionHandler(CancelRequestQueueName cancelRequestQueueName, EventSourcingTaskManager taskManager) {
+ public RabbitMQWorkQueueReconnectionHandler(CancelRequestQueueName cancelRequestQueueName, EventSourcingTaskManager taskManager, RabbitMQConfiguration configuration) {
this.cancelRequestQueueName = cancelRequestQueueName;
this.taskManager = taskManager;
+ this.configuration = configuration;
}
@Override
@@ -57,7 +60,9 @@ public class RabbitMQWorkQueueReconnectionHandler implements SimpleConnectionPoo
private void createCancelQueue(Connection connection) {
try (Channel channel = connection.createChannel()) {
- channel.queueDeclare(cancelRequestQueueName.asString(), !DURABLE, !EXCLUSIVE, AUTO_DELETE, ImmutableMap.of());
+ QueueArguments.Builder builder = QueueArguments.builder();
+ configuration.getQueueTTL().ifPresent(builder::queueTTL);
+ channel.queueDeclare(cancelRequestQueueName.asString(), !DURABLE, !EXCLUSIVE, !AUTO_DELETE, builder.build());
} catch (Exception e) {
LOGGER.error("Error recovering connection", e);
}
diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/TerminationReconnectionHandler.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/TerminationReconnectionHandler.java
index 224d2a7f68..f1fc733057 100644
--- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/TerminationReconnectionHandler.java
+++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/TerminationReconnectionHandler.java
@@ -26,12 +26,13 @@ import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE;
import javax.inject.Inject;
+import org.apache.james.backends.rabbitmq.QueueArguments;
+import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
import org.apache.james.backends.rabbitmq.SimpleConnectionPool;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.ImmutableMap;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
@@ -42,11 +43,13 @@ public class TerminationReconnectionHandler implements SimpleConnectionPool.Reco
private final TerminationQueueName queueName;
private final RabbitMQTerminationSubscriber terminationSubscriber;
+ private final RabbitMQConfiguration configuration;
@Inject
- public TerminationReconnectionHandler(TerminationQueueName queueName, RabbitMQTerminationSubscriber terminationSubscriber) {
+ public TerminationReconnectionHandler(TerminationQueueName queueName, RabbitMQTerminationSubscriber terminationSubscriber, RabbitMQConfiguration configuration) {
this.queueName = queueName;
this.terminationSubscriber = terminationSubscriber;
+ this.configuration = configuration;
}
@Override
@@ -57,7 +60,9 @@ public class TerminationReconnectionHandler implements SimpleConnectionPool.Reco
private void createTerminationQueue(Connection connection) {
try (Channel channel = connection.createChannel()) {
- channel.queueDeclare(queueName.asString(), !DURABLE, !EXCLUSIVE, AUTO_DELETE, ImmutableMap.of());
+ QueueArguments.Builder builder = QueueArguments.builder();
+ configuration.getQueueTTL().ifPresent(builder::queueTTL);
+ channel.queueDeclare(queueName.asString(), !DURABLE, !EXCLUSIVE, !AUTO_DELETE, builder.build());
} catch (Exception e) {
LOGGER.error("Error recovering connection", e);
}
diff --git a/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala b/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala
index 92dd8a4634..7c5d542295 100644
--- a/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala
+++ b/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala
@@ -22,7 +22,7 @@ import java.time.Duration
import com.google.common.annotations.VisibleForTesting
import javax.inject.Inject
-import org.apache.james.backends.rabbitmq.ReceiverProvider
+import org.apache.james.backends.rabbitmq.{RabbitMQConfiguration, ReceiverProvider}
import org.apache.james.eventsourcing.EventSourcingSystem
import org.apache.james.server.task.json.JsonTaskSerializer
import org.apache.james.task.SerialTaskManagerWorker
@@ -33,7 +33,8 @@ class RabbitMQWorkQueueSupplier @Inject()(private val sender: Sender,
private val receiverProvider: ReceiverProvider,
private val jsonTaskSerializer: JsonTaskSerializer,
private val cancelRequestName: CancelRequestQueueName,
- private val configuration: RabbitMQWorkQueueConfiguration) extends WorkQueueSupplier {
+ private val configuration: RabbitMQWorkQueueConfiguration,
+ private val rabbitMQConfiguration: RabbitMQConfiguration) extends WorkQueueSupplier {
val DEFAULT_ADDITIONAL_INFORMATION_POLLING_INTERVAL = Duration.ofSeconds(30)
override def apply(eventSourcingSystem: EventSourcingSystem): RabbitMQWorkQueue = {
@@ -44,7 +45,7 @@ class RabbitMQWorkQueueSupplier @Inject()(private val sender: Sender,
def apply(eventSourcingSystem: EventSourcingSystem, additionalInformationPollingInterval: Duration): RabbitMQWorkQueue = {
val listener = WorkerStatusListener(eventSourcingSystem)
val worker = new SerialTaskManagerWorker(listener, additionalInformationPollingInterval)
- val rabbitMQWorkQueue = new RabbitMQWorkQueue(worker, sender, receiverProvider, jsonTaskSerializer, configuration, cancelRequestName)
+ val rabbitMQWorkQueue = new RabbitMQWorkQueue(worker, sender, receiverProvider, jsonTaskSerializer, configuration, cancelRequestName, rabbitMQConfiguration)
rabbitMQWorkQueue
}
}
diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
index 3d99530eb6..f7ac3131b5 100644
--- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
+++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
@@ -31,6 +31,7 @@ import static org.awaitility.Awaitility.await;
import static org.awaitility.Durations.FIVE_SECONDS;
import static org.awaitility.Durations.ONE_SECOND;
+import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
@@ -118,9 +119,9 @@ class DistributedTaskManagerTest implements TaskManagerContract {
private final List<RabbitMQWorkQueue> workQueues;
private final RabbitMQWorkQueueSupplier supplier;
- TrackedRabbitMQWorkQueueSupplier(Sender sender, ReceiverProvider receiverProvider, JsonTaskSerializer taskSerializer) {
+ TrackedRabbitMQWorkQueueSupplier(Sender sender, ReceiverProvider receiverProvider, JsonTaskSerializer taskSerializer) throws Exception {
workQueues = new ArrayList<>();
- supplier = new RabbitMQWorkQueueSupplier(sender, receiverProvider, taskSerializer, CancelRequestQueueName.generate(), RabbitMQWorkQueueConfiguration$.MODULE$.enabled());
+ supplier = new RabbitMQWorkQueueSupplier(sender, receiverProvider, taskSerializer, CancelRequestQueueName.generate(), RabbitMQWorkQueueConfiguration$.MODULE$.enabled(), rabbitMQExtension.getRabbitMQ().getConfiguration());
}
@Override
@@ -198,7 +199,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
JsonEventSerializer eventSerializer;
@BeforeEach
- void setUp(EventStore eventStore) {
+ void setUp(EventStore eventStore) throws Exception {
memoryReferenceTaskStore = new MemoryReferenceTaskStore();
memoryReferenceWithCounterTaskStore = new MemoryReferenceWithCounterTaskStore();
CassandraCluster cassandra = CASSANDRA_CLUSTER.getCassandraCluster();
@@ -223,21 +224,21 @@ class DistributedTaskManagerTest implements TaskManagerContract {
.forEach(queue -> managementAPI.deleteQueue("/", queue.getName()));
}
- public EventSourcingTaskManager taskManager() {
+ public EventSourcingTaskManager taskManager() throws Exception {
return taskManager(HOSTNAME);
}
- EventSourcingTaskManager taskManager(Hostname hostname) {
+ EventSourcingTaskManager taskManager(Hostname hostname) throws Exception {
RabbitMQTerminationSubscriber terminationSubscriber = new RabbitMQTerminationSubscriber(TerminationQueueName.generate(), rabbitMQExtension.getSender(),
rabbitMQExtension.getReceiverProvider(),
- eventSerializer);
+ eventSerializer, rabbitMQExtension.getRabbitMQ().getConfiguration());
terminationSubscribers.add(terminationSubscriber);
terminationSubscriber.start();
return new EventSourcingTaskManager(workQueueSupplier, eventStore, executionDetailsProjection, hostname, terminationSubscriber);
}
@Test
- void badPayloadShouldNotAffectTaskManagerOnCancelTask() throws TaskManager.ReachedTimeoutException {
+ void badPayloadShouldNotAffectTaskManagerOnCancelTask() throws Exception {
TaskManager taskManager = taskManager(HOSTNAME);
TaskId id = taskManager.submit(new MemoryReferenceTask(() -> {
Thread.sleep(250);
@@ -255,7 +256,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
}
@Test
- void badPayloadsShouldNotAffectTaskManagerOnCancelTask() throws TaskManager.ReachedTimeoutException {
+ void badPayloadsShouldNotAffectTaskManagerOnCancelTask() throws Exception {
TaskManager taskManager = taskManager(HOSTNAME);
TaskId id = taskManager.submit(new MemoryReferenceTask(() -> {
Thread.sleep(250);
@@ -274,7 +275,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
}
@Test
- void badPayloadShouldNotAffectTaskManagerOnCompleteTask() throws TaskManager.ReachedTimeoutException {
+ void badPayloadShouldNotAffectTaskManagerOnCompleteTask() throws Exception {
TaskManager taskManager = taskManager(HOSTNAME);
TaskId id = taskManager.submit(new MemoryReferenceTask(() -> {
Thread.sleep(250);
@@ -291,7 +292,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
}
@Test
- void badPayloadsShouldNotAffectTaskManagerOnCompleteTask() throws TaskManager.ReachedTimeoutException {
+ void badPayloadsShouldNotAffectTaskManagerOnCompleteTask() throws Exception {
TaskManager taskManager = taskManager(HOSTNAME);
TaskId id = taskManager.submit(new MemoryReferenceTask(() -> {
Thread.sleep(250);
@@ -309,7 +310,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
}
@Test
- void givenOneEventStoreTwoEventTaskManagersShareTheSameEvents() {
+ void givenOneEventStoreTwoEventTaskManagersShareTheSameEvents() throws Exception {
try (EventSourcingTaskManager taskManager1 = taskManager();
EventSourcingTaskManager taskManager2 = taskManager(HOSTNAME_2)) {
TaskId taskId = taskManager1.submit(new CompletedTask());
@@ -325,7 +326,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
}
@Test
- void givenTwoTaskManagersAndTwoTaskOnlyOneTaskShouldRunAtTheSameTime() {
+ void givenTwoTaskManagersAndTwoTaskOnlyOneTaskShouldRunAtTheSameTime() throws Exception {
CountDownLatch waitingForFirstTaskLatch = new CountDownLatch(1);
try (EventSourcingTaskManager taskManager1 = taskManager();
@@ -348,7 +349,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
}
@Test
- void givenTwoTaskManagerATaskSubmittedOnOneCouldBeRunOnTheOther() throws InterruptedException {
+ void givenTwoTaskManagerATaskSubmittedOnOneCouldBeRunOnTheOther() throws Exception {
try (EventSourcingTaskManager taskManager1 = taskManager()) {
Thread.sleep(100);
try (EventSourcingTaskManager taskManager2 = taskManager(HOSTNAME_2)) {
@@ -368,7 +369,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
}
@Test
- void givenTwoTaskManagerATaskRunningOnOneShouldBeCancellableFromTheOtherOne(CountDownLatch countDownLatch) {
+ void givenTwoTaskManagerATaskRunningOnOneShouldBeCancellableFromTheOtherOne(CountDownLatch countDownLatch) throws Exception {
TaskManager taskManager1 = taskManager(HOSTNAME);
TaskManager taskManager2 = taskManager(HOSTNAME_2);
TaskId id = taskManager1.submit(new MemoryReferenceTask(() -> {
@@ -397,7 +398,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
}
@Test
- void givenTwoTaskManagersATaskRunningOnOneShouldBeWaitableFromTheOtherOne() throws TaskManager.ReachedTimeoutException {
+ void givenTwoTaskManagersATaskRunningOnOneShouldBeWaitableFromTheOtherOne() throws Exception {
TaskManager taskManager1 = taskManager(HOSTNAME);
TaskManager taskManager2 = taskManager(HOSTNAME_2);
TaskId id = taskManager1.submit(new MemoryReferenceTask(() -> {
@@ -424,7 +425,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
}
@Test
- void givenTwoTaskManagerAndATaskRanPerTaskManagerListingThemOnEachShouldShowBothTasks() {
+ void givenTwoTaskManagerAndATaskRanPerTaskManagerListingThemOnEachShouldShowBothTasks() throws Exception {
try (EventSourcingTaskManager taskManager1 = taskManager();
EventSourcingTaskManager taskManager2 = taskManager(HOSTNAME_2)) {
@@ -449,7 +450,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
}
@Test
- void givenTwoTaskManagerIfTheFirstOneIsDownTheSecondOneShouldBeAbleToRunTheRemainingTasks(CountDownLatch countDownLatch) {
+ void givenTwoTaskManagerIfTheFirstOneIsDownTheSecondOneShouldBeAbleToRunTheRemainingTasks(CountDownLatch countDownLatch) throws Exception {
try (EventSourcingTaskManager taskManager1 = taskManager();
EventSourcingTaskManager taskManager2 = taskManager(HOSTNAME_2)) {
ImmutableBiMap<EventSourcingTaskManager, Hostname> hostNameByTaskManager = ImmutableBiMap.of(taskManager1, HOSTNAME, taskManager2, HOSTNAME_2);
@@ -477,7 +478,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
}
@Test
- void shouldNotCrashWhenBadMessage() {
+ void shouldNotCrashWhenBadMessage() throws Exception {
TaskManager taskManager = taskManager(HOSTNAME);
taskManager.submit(new FailsDeserializationTask());
@@ -488,7 +489,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
}
@Test
- void shouldNotCrashWhenBadMessages() {
+ void shouldNotCrashWhenBadMessages() throws Exception {
TaskManager taskManager = taskManager(HOSTNAME);
IntStream.range(0, 100).forEach(i -> taskManager.submit(new FailsDeserializationTask()));
@@ -586,7 +587,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
}
@Test
- void cassandraTasksShouldBeCancealable(CassandraCluster cassandra) {
+ void cassandraTasksShouldBeCancealable(CassandraCluster cassandra) throws Exception {
TaskManager taskManager = taskManager(HOSTNAME);
TaskId taskId = taskManager.submit(new CassandraExecutingTask(cassandra.getConf(), true));
@@ -599,7 +600,7 @@ class DistributedTaskManagerTest implements TaskManagerContract {
}
@Test
- void inProgressTaskShouldBeCanceledWhenCloseTaskManager() {
+ void inProgressTaskShouldBeCanceledWhenCloseTaskManager() throws Exception {
try (EventSourcingTaskManager taskManager = taskManager()) {
TaskId taskId = taskManager.submit(new MemoryReferenceTask(() -> {
TimeUnit.SECONDS.sleep(5);
diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java
index 2f53672adb..f1a868f97f 100644
--- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java
+++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java
@@ -65,15 +65,15 @@ class RabbitMQTerminationSubscriberTest implements TerminationSubscriberContract
.isolationPolicy(WEAK);
@Override
- public TerminationSubscriber subscriber() {
+ public TerminationSubscriber subscriber() throws Exception {
RabbitMQTerminationSubscriber subscriber = new RabbitMQTerminationSubscriber(TerminationQueueName.generate(), rabbitMQExtension.getSender(),
- rabbitMQExtension.getReceiverProvider(), SERIALIZER);
+ rabbitMQExtension.getReceiverProvider(), SERIALIZER, rabbitMQExtension.getRabbitMQ().getConfiguration());
subscriber.start();
return subscriber;
}
@Test
- void givenTwoTerminationSubscribersWhenAnEventIsSentItShouldBeReceivedByBoth() {
+ void givenTwoTerminationSubscribersWhenAnEventIsSentItShouldBeReceivedByBoth() throws Exception {
TerminationSubscriber subscriber1 = subscriber();
TerminationSubscriber subscriber2 = subscriber();
@@ -94,7 +94,7 @@ class RabbitMQTerminationSubscriberTest implements TerminationSubscriberContract
}
@Test
- void eventProcessingShouldNotCrashOnInvalidMessage() {
+ void eventProcessingShouldNotCrashOnInvalidMessage() throws Exception {
TerminationSubscriber subscriber1 = subscriber();
Flux<Event> firstListener = Flux.from(subscriber1.listenEvents());
@@ -113,7 +113,7 @@ class RabbitMQTerminationSubscriberTest implements TerminationSubscriberContract
}
@Test
- void eventProcessingShouldNotCrashOnInvalidMessages() {
+ void eventProcessingShouldNotCrashOnInvalidMessages() throws Exception {
TerminationSubscriber subscriber1 = subscriber();
Flux<Event> firstListener = Flux.from(subscriber1.listenEvents());
diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueuePersistenceTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueuePersistenceTest.java
index 6a964af193..81b56e5bdb 100644
--- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueuePersistenceTest.java
+++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueuePersistenceTest.java
@@ -50,10 +50,10 @@ class RabbitMQWorkQueuePersistenceTest {
private JsonTaskSerializer serializer;
@BeforeEach
- void setUp() {
+ void setUp() throws Exception {
worker = spy(new ImmediateWorker());
serializer = JsonTaskSerializer.of(TestTaskDTOModules.COMPLETED_TASK_MODULE, TestTaskDTOModules.MEMORY_REFERENCE_TASK_MODULE.apply(new MemoryReferenceTaskStore()));
- testee = new RabbitMQWorkQueue(worker, rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), serializer, RabbitMQWorkQueueConfiguration$.MODULE$.enabled(), CancelRequestQueueName.generate());
+ testee = new RabbitMQWorkQueue(worker, rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), serializer, RabbitMQWorkQueueConfiguration$.MODULE$.enabled(), CancelRequestQueueName.generate(), rabbitMQExtension.getRabbitMQ().getConfiguration());
//declare the queue but do not start consuming from it
testee.declareQueue();
}
@@ -90,9 +90,9 @@ class RabbitMQWorkQueuePersistenceTest {
assertThat(worker.results).containsExactly(Task.Result.COMPLETED);
}
- private void startNewConsumingWorkqueue() {
+ private void startNewConsumingWorkqueue() throws Exception {
worker = spy(new ImmediateWorker());
- testee = new RabbitMQWorkQueue(worker, rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), serializer, RabbitMQWorkQueueConfiguration$.MODULE$.enabled(), CancelRequestQueueName.generate());
+ testee = new RabbitMQWorkQueue(worker, rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), serializer, RabbitMQWorkQueueConfiguration$.MODULE$.enabled(), CancelRequestQueueName.generate(), rabbitMQExtension.getRabbitMQ().getConfiguration());
testee.start();
}
}
diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java
index 8f83aa5a95..45265689b0 100644
--- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java
+++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java
@@ -66,10 +66,10 @@ class RabbitMQWorkQueueTest {
@BeforeEach
- void setUp() {
+ void setUp() throws Exception {
worker = new ImmediateWorker();
serializer = JsonTaskSerializer.of(TestTaskDTOModules.COMPLETED_TASK_MODULE, TestTaskDTOModules.MEMORY_REFERENCE_TASK_MODULE.apply(new MemoryReferenceTaskStore()));
- testee = new RabbitMQWorkQueue(worker, rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), serializer, RabbitMQWorkQueueConfiguration$.MODULE$.enabled(), CancelRequestQueueName.generate());
+ testee = new RabbitMQWorkQueue(worker, rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), serializer, RabbitMQWorkQueueConfiguration$.MODULE$.enabled(), CancelRequestQueueName.generate(), rabbitMQExtension.getRabbitMQ().getConfiguration());
testee.start();
}
@@ -96,11 +96,11 @@ class RabbitMQWorkQueueTest {
}
@Test
- void givenTwoWorkQueuesOnlyTheFirstOneIsConsumingTasks() {
+ void givenTwoWorkQueuesOnlyTheFirstOneIsConsumingTasks() throws Exception {
testee.submit(TASK_WITH_ID);
ImmediateWorker otherTaskManagerWorker = new ImmediateWorker();
- try (RabbitMQWorkQueue otherWorkQueue = new RabbitMQWorkQueue(otherTaskManagerWorker, rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), serializer, RabbitMQWorkQueueConfiguration$.MODULE$.enabled(), CancelRequestQueueName.generate())) {
+ try (RabbitMQWorkQueue otherWorkQueue = new RabbitMQWorkQueue(otherTaskManagerWorker, rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), serializer, RabbitMQWorkQueueConfiguration$.MODULE$.enabled(), CancelRequestQueueName.generate(), rabbitMQExtension.getRabbitMQ().getConfiguration())) {
otherWorkQueue.start();
IntStream.range(0, 9)
@@ -112,7 +112,7 @@ class RabbitMQWorkQueueTest {
}
@Test
- void givenANonDeserializableTaskItShouldBeFlaggedAsFailedAndItDoesNotPreventFollowingTasks() throws InterruptedException {
+ void givenANonDeserializableTaskItShouldBeFlaggedAsFailedAndItDoesNotPreventFollowingTasks() throws Exception {
Task task = new TestTask(42);
TaskId taskId = TaskId.fromString("4bf6d081-aa30-11e9-bf6c-2d3b9e84aafd");
TaskWithId taskWithId = new TaskWithId(taskId, task);
@@ -120,7 +120,7 @@ class RabbitMQWorkQueueTest {
ImmediateWorker otherTaskManagerWorker = new ImmediateWorker();
JsonTaskSerializer otherTaskSerializer = JsonTaskSerializer.of(TestTaskDTOModules.TEST_TYPE);
try (RabbitMQWorkQueue otherWorkQueue = new RabbitMQWorkQueue(otherTaskManagerWorker, rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), otherTaskSerializer,
- RabbitMQWorkQueueConfiguration$.MODULE$.enabled(), CancelRequestQueueName.generate())) {
+ RabbitMQWorkQueueConfiguration$.MODULE$.enabled(), CancelRequestQueueName.generate(), rabbitMQExtension.getRabbitMQ().getConfiguration())) {
//wait to be sur that the first workqueue has subscribed as an exclusive consumer of the RabbitMQ queue.
Thread.sleep(200);
otherWorkQueue.start();
diff --git a/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TerminationSubscriberContract.java b/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TerminationSubscriberContract.java
index 63d9625c8e..1a2c801292 100644
--- a/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TerminationSubscriberContract.java
+++ b/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TerminationSubscriberContract.java
@@ -53,10 +53,10 @@ public interface TerminationSubscriberContract {
Duration DELAY_BEFORE_PUBLISHING = Duration.ofMillis(50);
ExecutorService EXECUTOR = Executors.newCachedThreadPool();
- TerminationSubscriber subscriber();
+ TerminationSubscriber subscriber() throws Exception;
@Test
- default void handlingCompletedShouldBeListed() {
+ default void handlingCompletedShouldBeListed() throws Exception {
TerminationSubscriber subscriber = subscriber();
sendEvents(subscriber, COMPLETED_EVENT);
@@ -65,7 +65,7 @@ public interface TerminationSubscriberContract {
}
@Test
- default void handlingFailedShouldBeListed() {
+ default void handlingFailedShouldBeListed() throws Exception {
TerminationSubscriber subscriber = subscriber();
sendEvents(subscriber, FAILED_EVENT);
@@ -74,7 +74,7 @@ public interface TerminationSubscriberContract {
}
@Test
- default void handlingCancelledShouldBeListed() {
+ default void handlingCancelledShouldBeListed() throws Exception {
TerminationSubscriber subscriber = subscriber();
sendEvents(subscriber, CANCELLED_EVENT);
@@ -83,7 +83,7 @@ public interface TerminationSubscriberContract {
}
@Test
- default void handlingNonTerminalEventShouldNotBeListed() {
+ default void handlingNonTerminalEventShouldNotBeListed() throws Exception {
TerminationSubscriber subscriber = subscriber();
TaskEvent event = new Started(new TaskAggregateId(TaskId.generateTaskId()), EventId.fromSerialized(42), new Hostname("foo"));
@@ -93,7 +93,7 @@ public interface TerminationSubscriberContract {
}
@Test
- default void handlingMultipleEventsShouldBeListed() {
+ default void handlingMultipleEventsShouldBeListed() throws Exception {
TerminationSubscriber subscriber = subscriber();
sendEvents(subscriber, COMPLETED_EVENT, FAILED_EVENT, CANCELLED_EVENT);
@@ -102,7 +102,7 @@ public interface TerminationSubscriberContract {
}
@Test
- default void multipleListeningEventsShouldShareEvents() {
+ default void multipleListeningEventsShouldShareEvents() throws Exception {
TerminationSubscriber subscriber = subscriber();
Flux<Event> firstListener = Flux.from(subscriber.listenEvents());
@@ -122,7 +122,7 @@ public interface TerminationSubscriberContract {
}
@Test
- default void dynamicListeningEventsShouldGetOnlyNewEvents() {
+ default void dynamicListeningEventsShouldGetOnlyNewEvents() throws Exception {
TerminationSubscriber subscriber = subscriber();
sendEvents(subscriber, COMPLETED_EVENT, FAILED_EVENT, CANCELLED_EVENT);
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org