You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by rc...@apache.org on 2020/03/06 03:07:05 UTC
[james-project] 04/21: JAMES-3080 use reworked rabbitMQrestart
method in tests and make the code more resilient
This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 890f20c323e94a3ee95e2eab455e9e4846d8015a
Author: RĂ©mi KOWALSKI <rk...@linagora.com>
AuthorDate: Tue Mar 3 14:13:48 2020 +0100
JAMES-3080 use reworked rabbitMQrestart method in tests and make the code more resilient
---
.../distributed/RabbitMQWorkQueue.java | 22 +++++++++++++++++++---
.../RabbitMQWorkQueuePersistenceTest.java | 20 +++++---------------
2 files changed, 24 insertions(+), 18 deletions(-)
diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
index 1efa7cd..005b86d 100644
--- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
+++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
@@ -23,6 +23,7 @@ package org.apache.james.task.eventsourcing.distributed;
import static com.rabbitmq.client.MessageProperties.PERSISTENT_TEXT_PLAIN;
import java.nio.charset.StandardCharsets;
+import java.time.Duration;
import java.util.Optional;
import java.util.UUID;
@@ -69,6 +70,10 @@ public class RabbitMQWorkQueue implements WorkQueue {
private static final String CANCEL_REQUESTS_QUEUE_NAME_PREFIX = "taskManagerCancelRequestsQueue";
public static final String TASK_ID = "taskId";
+ public static final int NUM_RETRIES = 8;
+ public static final Duration FIRST_BACKOFF = Duration.ofMillis(100);
+ public static final Duration FOREVER = Duration.ofMillis(Long.MAX_VALUE);
+
private final TaskManagerWorker worker;
private final ReactorRabbitMQChannelPool channelPool;
private final JsonTaskSerializer taskSerializer;
@@ -99,9 +104,20 @@ public class RabbitMQWorkQueue implements WorkQueue {
@VisibleForTesting
void declareQueue() {
- channelPool.getSender().declareExchange(ExchangeSpecification.exchange(EXCHANGE_NAME)).block();
- channelPool.getSender().declare(QueueSpecification.queue(QUEUE_NAME).durable(true).arguments(Constants.WITH_SINGLE_ACTIVE_CONSUMER)).block();
- channelPool.getSender().bind(BindingSpecification.binding(EXCHANGE_NAME, ROUTING_KEY, QUEUE_NAME)).block();
+ Mono<AMQP.Exchange.DeclareOk> declareExchange = channelPool.getSender()
+ .declareExchange(ExchangeSpecification.exchange(EXCHANGE_NAME))
+ .retryBackoff(NUM_RETRIES, FIRST_BACKOFF, FOREVER);
+ Mono<AMQP.Queue.DeclareOk> declareQueue = channelPool.getSender()
+ .declare(QueueSpecification.queue(QUEUE_NAME).durable(true).arguments(Constants.WITH_SINGLE_ACTIVE_CONSUMER))
+ .retryBackoff(NUM_RETRIES, FIRST_BACKOFF, FOREVER);
+ Mono<AMQP.Queue.BindOk> bindQueueToExchange = channelPool.getSender()
+ .bind(BindingSpecification.binding(EXCHANGE_NAME, ROUTING_KEY, QUEUE_NAME))
+ .retryBackoff(NUM_RETRIES, FIRST_BACKOFF, FOREVER);
+
+ declareExchange
+ .then(declareQueue)
+ .then(bindQueueToExchange)
+ .block();
}
private void consumeWorkqueue() {
diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueuePersistenceTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueuePersistenceTest.java
index 7040957..298b45d 100644
--- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueuePersistenceTest.java
+++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueuePersistenceTest.java
@@ -21,8 +21,6 @@ package org.apache.james.task.eventsourcing.distributed;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
-import static org.awaitility.Duration.FIVE_HUNDRED_MILLISECONDS;
-import static org.awaitility.Duration.FIVE_SECONDS;
import static org.mockito.Mockito.spy;
import org.apache.james.backends.rabbitmq.RabbitMQExtension;
@@ -33,6 +31,7 @@ import org.apache.james.task.MemoryReferenceTask;
import org.apache.james.task.Task;
import org.apache.james.task.TaskId;
import org.apache.james.task.TaskWithId;
+import org.awaitility.Duration;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -72,7 +71,6 @@ class RabbitMQWorkQueuePersistenceTest {
@Test
void submittedMessageShouldSurviveRabbitMQRestart() throws Exception {
Task TASK = new MemoryReferenceTask(() -> Task.Result.COMPLETED);
-
TaskWithId TASK_WITH_ID = new TaskWithId(TASK_ID, TASK);
testee.submit(TASK_WITH_ID);
@@ -81,27 +79,19 @@ class RabbitMQWorkQueuePersistenceTest {
Thread.sleep(500);
testee.close();
- restartRabbitMQ();
+ rabbitMQExtension.getRabbitMQ().restart();
- startNewWorkqueue();
+ startNewConsumingWorkqueue();
- await().atMost(FIVE_HUNDRED_MILLISECONDS).until(() -> !worker.results.isEmpty());
+ await().atMost(Duration.ONE_MINUTE).until(() -> !worker.results.isEmpty());
assertThat(worker.tasks).containsExactly(TASK_WITH_ID);
assertThat(worker.results).containsExactly(Task.Result.COMPLETED);
}
- private void startNewWorkqueue() {
+ private void startNewConsumingWorkqueue() {
worker = spy(new ImmediateWorker());
testee = new RabbitMQWorkQueue(worker, rabbitMQExtension.getRabbitChannelPool(), serializer);
testee.start();
}
-
- private void restartRabbitMQ() throws Exception {
- rabbitMQExtension.getRabbitMQ().stopApp();
- rabbitMQExtension.getRabbitMQ().startApp();
- //wait until healthcheck is ok
- await().atMost(FIVE_SECONDS).until(() -> rabbitMQExtension.managementAPI().listQueues().size() > 0);
- rabbitMQExtension.getRabbitChannelPool().start();
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org