You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/03/02 03:16:22 UTC

[james-project] 25/29: JAMES-3080 add test to ensure message can survive a rabbitMQ restart

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 8e8775943e4d64e74e37cbfddfaa4ead1948e122
Author: RĂ©mi KOWALSKI <rk...@linagora.com>
AuthorDate: Fri Feb 28 15:44:44 2020 +0100

    JAMES-3080 add test to ensure message can survive a rabbitMQ restart
---
 .../distributed/RabbitMQWorkQueue.java             |  10 +-
 .../RabbitMQWorkQueuePersistenceTest.java          | 109 +++++++++++++++++++++
 2 files changed, 116 insertions(+), 3 deletions(-)

diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
index eb2044d..d91d786 100644
--- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
+++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
@@ -36,10 +36,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
 import com.rabbitmq.client.AMQP;
 import com.rabbitmq.client.Delivery;
-
 import reactor.core.Disposable;
 import reactor.core.publisher.Mono;
 import reactor.core.publisher.UnicastProcessor;
@@ -93,11 +93,15 @@ public class RabbitMQWorkQueue implements WorkQueue {
     }
 
     private void startWorkqueue() {
+        declareQueue();
+        consumeWorkqueue();
+    }
+
+    @VisibleForTesting
+    void declareQueue() {
         channelPool.getSender().declareExchange(ExchangeSpecification.exchange(EXCHANGE_NAME)).block();
         channelPool.getSender().declare(QueueSpecification.queue(QUEUE_NAME).durable(true).arguments(Constants.WITH_SINGLE_ACTIVE_CONSUMER)).block();
         channelPool.getSender().bind(BindingSpecification.binding(EXCHANGE_NAME, ROUTING_KEY, QUEUE_NAME)).block();
-
-        consumeWorkqueue();
     }
 
     private void consumeWorkqueue() {
diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueuePersistenceTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueuePersistenceTest.java
new file mode 100644
index 0000000..b83bf6f
--- /dev/null
+++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueuePersistenceTest.java
@@ -0,0 +1,109 @@
+/**
+ * *************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0                   *
+ * *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ***************************************************************/
+package org.apache.james.task.eventsourcing.distributed;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+import static org.awaitility.Duration.FIVE_HUNDRED_MILLISECONDS;
+import static org.awaitility.Duration.FIVE_SECONDS;
+import static org.mockito.Mockito.spy;
+
+import org.apache.james.backends.rabbitmq.RabbitMQExtension;
+import org.apache.james.server.task.json.JsonTaskSerializer;
+import org.apache.james.server.task.json.dto.MemoryReferenceTaskStore;
+import org.apache.james.server.task.json.dto.TestTaskDTOModules;
+import org.apache.james.task.MemoryReferenceTask;
+import org.apache.james.task.Task;
+import org.apache.james.task.TaskId;
+import org.apache.james.task.TaskWithId;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+class RabbitMQWorkQueuePersistenceTest {
+    private static final TaskId TASK_ID = TaskId.fromString("2c7f4081-aa30-11e9-bf6c-2d3b9e84aafd");
+
+    @RegisterExtension
+    RabbitMQExtension rabbitMQExtension = RabbitMQExtension.defaultRabbitMQ()
+        .restartPolicy(RabbitMQExtension.DockerRestartPolicy.PER_TEST);
+
+    private RabbitMQWorkQueue testee;
+    private ImmediateWorker worker;
+    private JsonTaskSerializer serializer;
+
+    @BeforeEach
+    void setUp() {
+        worker = spy(new ImmediateWorker());
+        serializer = JsonTaskSerializer.of(TestTaskDTOModules.COMPLETED_TASK_MODULE, TestTaskDTOModules.MEMORY_REFERENCE_TASK_MODULE.apply(new MemoryReferenceTaskStore()));
+        testee = new RabbitMQWorkQueue(worker, rabbitMQExtension.getRabbitChannelPool(), serializer);
+        //declare the queue but do not start consuming from it
+        testee.declareQueue();
+    }
+
+    @AfterEach
+    void tearDown() {
+        testee.close();
+    }
+
+    /**
+     * submit on a workqueue which do not consume
+     * restart rabbit
+     * start a workqueue which consume messages
+     * verify that the message is treated
+     */
+    @Disabled("JAMES-3080 rabbitMQ messages need to be persisted")
+    @Test
+    void submittedMessageShouldSurviveRabbitMQRestart() throws Exception {
+        Task TASK = new MemoryReferenceTask(() -> Task.Result.COMPLETED);
+
+        TaskWithId TASK_WITH_ID = new TaskWithId(TASK_ID, TASK);
+
+        testee.submit(TASK_WITH_ID);
+
+        //wait for submit to be effective
+        Thread.sleep(500);
+        testee.close();
+
+        restartRabbitMQ();
+
+        startNewWorkqueue();
+
+        await().atMost(FIVE_HUNDRED_MILLISECONDS).until(() -> !worker.results.isEmpty());
+
+        assertThat(worker.tasks).containsExactly(TASK_WITH_ID);
+        assertThat(worker.results).containsExactly(Task.Result.COMPLETED);
+    }
+
+    private void startNewWorkqueue() {
+        worker = spy(new ImmediateWorker());
+        testee = new RabbitMQWorkQueue(worker, rabbitMQExtension.getRabbitChannelPool(), serializer);
+        testee.start();
+    }
+
+    private void restartRabbitMQ() throws Exception {
+        rabbitMQExtension.getRabbitMQ().stopApp();
+        rabbitMQExtension.getRabbitMQ().startApp();
+        //wait until healthcheck is ok
+        await().atMost(FIVE_SECONDS).until(() -> rabbitMQExtension.managementAPI().listQueues().size() > 0);
+        rabbitMQExtension.getRabbitChannelPool().start();
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org