You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by rc...@apache.org on 2020/03/06 03:07:05 UTC

[james-project] 04/21: JAMES-3080 use reworked rabbitMQrestart method in tests and make the code more resilient

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 890f20c323e94a3ee95e2eab455e9e4846d8015a
Author: RĂ©mi KOWALSKI <rk...@linagora.com>
AuthorDate: Tue Mar 3 14:13:48 2020 +0100

    JAMES-3080 use reworked rabbitMQrestart method in tests and make the code more resilient
---
 .../distributed/RabbitMQWorkQueue.java             | 22 +++++++++++++++++++---
 .../RabbitMQWorkQueuePersistenceTest.java          | 20 +++++---------------
 2 files changed, 24 insertions(+), 18 deletions(-)

diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
index 1efa7cd..005b86d 100644
--- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
+++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
@@ -23,6 +23,7 @@ package org.apache.james.task.eventsourcing.distributed;
 import static com.rabbitmq.client.MessageProperties.PERSISTENT_TEXT_PLAIN;
 
 import java.nio.charset.StandardCharsets;
+import java.time.Duration;
 import java.util.Optional;
 import java.util.UUID;
 
@@ -69,6 +70,10 @@ public class RabbitMQWorkQueue implements WorkQueue {
     private static final String CANCEL_REQUESTS_QUEUE_NAME_PREFIX = "taskManagerCancelRequestsQueue";
     public static final String TASK_ID = "taskId";
 
+    public static final int NUM_RETRIES = 8;
+    public static final Duration FIRST_BACKOFF = Duration.ofMillis(100);
+    public static final Duration FOREVER = Duration.ofMillis(Long.MAX_VALUE);
+
     private final TaskManagerWorker worker;
     private final ReactorRabbitMQChannelPool channelPool;
     private final JsonTaskSerializer taskSerializer;
@@ -99,9 +104,20 @@ public class RabbitMQWorkQueue implements WorkQueue {
 
     @VisibleForTesting
     void declareQueue() {
-        channelPool.getSender().declareExchange(ExchangeSpecification.exchange(EXCHANGE_NAME)).block();
-        channelPool.getSender().declare(QueueSpecification.queue(QUEUE_NAME).durable(true).arguments(Constants.WITH_SINGLE_ACTIVE_CONSUMER)).block();
-        channelPool.getSender().bind(BindingSpecification.binding(EXCHANGE_NAME, ROUTING_KEY, QUEUE_NAME)).block();
+        Mono<AMQP.Exchange.DeclareOk> declareExchange = channelPool.getSender()
+            .declareExchange(ExchangeSpecification.exchange(EXCHANGE_NAME))
+            .retryBackoff(NUM_RETRIES, FIRST_BACKOFF, FOREVER);
+        Mono<AMQP.Queue.DeclareOk> declareQueue = channelPool.getSender()
+            .declare(QueueSpecification.queue(QUEUE_NAME).durable(true).arguments(Constants.WITH_SINGLE_ACTIVE_CONSUMER))
+            .retryBackoff(NUM_RETRIES, FIRST_BACKOFF, FOREVER);
+        Mono<AMQP.Queue.BindOk> bindQueueToExchange = channelPool.getSender()
+            .bind(BindingSpecification.binding(EXCHANGE_NAME, ROUTING_KEY, QUEUE_NAME))
+            .retryBackoff(NUM_RETRIES, FIRST_BACKOFF, FOREVER);
+
+        declareExchange
+            .then(declareQueue)
+            .then(bindQueueToExchange)
+            .block();
     }
 
     private void consumeWorkqueue() {
diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueuePersistenceTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueuePersistenceTest.java
index 7040957..298b45d 100644
--- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueuePersistenceTest.java
+++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueuePersistenceTest.java
@@ -21,8 +21,6 @@ package org.apache.james.task.eventsourcing.distributed;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.awaitility.Awaitility.await;
-import static org.awaitility.Duration.FIVE_HUNDRED_MILLISECONDS;
-import static org.awaitility.Duration.FIVE_SECONDS;
 import static org.mockito.Mockito.spy;
 
 import org.apache.james.backends.rabbitmq.RabbitMQExtension;
@@ -33,6 +31,7 @@ import org.apache.james.task.MemoryReferenceTask;
 import org.apache.james.task.Task;
 import org.apache.james.task.TaskId;
 import org.apache.james.task.TaskWithId;
+import org.awaitility.Duration;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -72,7 +71,6 @@ class RabbitMQWorkQueuePersistenceTest {
     @Test
     void submittedMessageShouldSurviveRabbitMQRestart() throws Exception {
         Task TASK = new MemoryReferenceTask(() -> Task.Result.COMPLETED);
-
         TaskWithId TASK_WITH_ID = new TaskWithId(TASK_ID, TASK);
 
         testee.submit(TASK_WITH_ID);
@@ -81,27 +79,19 @@ class RabbitMQWorkQueuePersistenceTest {
         Thread.sleep(500);
         testee.close();
 
-        restartRabbitMQ();
+        rabbitMQExtension.getRabbitMQ().restart();
 
-        startNewWorkqueue();
+        startNewConsumingWorkqueue();
 
-        await().atMost(FIVE_HUNDRED_MILLISECONDS).until(() -> !worker.results.isEmpty());
+        await().atMost(Duration.ONE_MINUTE).until(() -> !worker.results.isEmpty());
 
         assertThat(worker.tasks).containsExactly(TASK_WITH_ID);
         assertThat(worker.results).containsExactly(Task.Result.COMPLETED);
     }
 
-    private void startNewWorkqueue() {
+    private void startNewConsumingWorkqueue() {
         worker = spy(new ImmediateWorker());
         testee = new RabbitMQWorkQueue(worker, rabbitMQExtension.getRabbitChannelPool(), serializer);
         testee.start();
     }
-
-    private void restartRabbitMQ() throws Exception {
-        rabbitMQExtension.getRabbitMQ().stopApp();
-        rabbitMQExtension.getRabbitMQ().startApp();
-        //wait until healthcheck is ok
-        await().atMost(FIVE_SECONDS).until(() -> rabbitMQExtension.managementAPI().listQueues().size() > 0);
-        rabbitMQExtension.getRabbitChannelPool().start();
-    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org