You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/03/02 03:16:22 UTC
[james-project] 25/29: JAMES-3080 add test to ensure message can
survive a rabbitMQ restart
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 8e8775943e4d64e74e37cbfddfaa4ead1948e122
Author: RĂ©mi KOWALSKI <rk...@linagora.com>
AuthorDate: Fri Feb 28 15:44:44 2020 +0100
JAMES-3080 add test to ensure message can survive a rabbitMQ restart
---
.../distributed/RabbitMQWorkQueue.java | 10 +-
.../RabbitMQWorkQueuePersistenceTest.java | 109 +++++++++++++++++++++
2 files changed, 116 insertions(+), 3 deletions(-)
diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
index eb2044d..d91d786 100644
--- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
+++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
@@ -36,10 +36,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Delivery;
-
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.publisher.UnicastProcessor;
@@ -93,11 +93,15 @@ public class RabbitMQWorkQueue implements WorkQueue {
}
private void startWorkqueue() {
+ declareQueue();
+ consumeWorkqueue();
+ }
+
+ @VisibleForTesting
+ void declareQueue() {
channelPool.getSender().declareExchange(ExchangeSpecification.exchange(EXCHANGE_NAME)).block();
channelPool.getSender().declare(QueueSpecification.queue(QUEUE_NAME).durable(true).arguments(Constants.WITH_SINGLE_ACTIVE_CONSUMER)).block();
channelPool.getSender().bind(BindingSpecification.binding(EXCHANGE_NAME, ROUTING_KEY, QUEUE_NAME)).block();
-
- consumeWorkqueue();
}
private void consumeWorkqueue() {
diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueuePersistenceTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueuePersistenceTest.java
new file mode 100644
index 0000000..b83bf6f
--- /dev/null
+++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueuePersistenceTest.java
@@ -0,0 +1,109 @@
+/**
+ * *************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ***************************************************************/
+package org.apache.james.task.eventsourcing.distributed;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+import static org.awaitility.Duration.FIVE_HUNDRED_MILLISECONDS;
+import static org.awaitility.Duration.FIVE_SECONDS;
+import static org.mockito.Mockito.spy;
+
+import org.apache.james.backends.rabbitmq.RabbitMQExtension;
+import org.apache.james.server.task.json.JsonTaskSerializer;
+import org.apache.james.server.task.json.dto.MemoryReferenceTaskStore;
+import org.apache.james.server.task.json.dto.TestTaskDTOModules;
+import org.apache.james.task.MemoryReferenceTask;
+import org.apache.james.task.Task;
+import org.apache.james.task.TaskId;
+import org.apache.james.task.TaskWithId;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+class RabbitMQWorkQueuePersistenceTest {
+ private static final TaskId TASK_ID = TaskId.fromString("2c7f4081-aa30-11e9-bf6c-2d3b9e84aafd");
+
+ @RegisterExtension
+ RabbitMQExtension rabbitMQExtension = RabbitMQExtension.defaultRabbitMQ()
+ .restartPolicy(RabbitMQExtension.DockerRestartPolicy.PER_TEST);
+
+ private RabbitMQWorkQueue testee;
+ private ImmediateWorker worker;
+ private JsonTaskSerializer serializer;
+
+ @BeforeEach
+ void setUp() {
+ worker = spy(new ImmediateWorker());
+ serializer = JsonTaskSerializer.of(TestTaskDTOModules.COMPLETED_TASK_MODULE, TestTaskDTOModules.MEMORY_REFERENCE_TASK_MODULE.apply(new MemoryReferenceTaskStore()));
+ testee = new RabbitMQWorkQueue(worker, rabbitMQExtension.getRabbitChannelPool(), serializer);
+ //declare the queue but do not start consuming from it
+ testee.declareQueue();
+ }
+
+ @AfterEach
+ void tearDown() {
+ testee.close();
+ }
+
+ /**
+ * submit on a workqueue which do not consume
+ * restart rabbit
+ * start a workqueue which consume messages
+ * verify that the message is treated
+ */
+ @Disabled("JAMES-3080 rabbitMQ messages need to be persisted")
+ @Test
+ void submittedMessageShouldSurviveRabbitMQRestart() throws Exception {
+ Task TASK = new MemoryReferenceTask(() -> Task.Result.COMPLETED);
+
+ TaskWithId TASK_WITH_ID = new TaskWithId(TASK_ID, TASK);
+
+ testee.submit(TASK_WITH_ID);
+
+ //wait for submit to be effective
+ Thread.sleep(500);
+ testee.close();
+
+ restartRabbitMQ();
+
+ startNewWorkqueue();
+
+ await().atMost(FIVE_HUNDRED_MILLISECONDS).until(() -> !worker.results.isEmpty());
+
+ assertThat(worker.tasks).containsExactly(TASK_WITH_ID);
+ assertThat(worker.results).containsExactly(Task.Result.COMPLETED);
+ }
+
+ private void startNewWorkqueue() {
+ worker = spy(new ImmediateWorker());
+ testee = new RabbitMQWorkQueue(worker, rabbitMQExtension.getRabbitChannelPool(), serializer);
+ testee.start();
+ }
+
+ private void restartRabbitMQ() throws Exception {
+ rabbitMQExtension.getRabbitMQ().stopApp();
+ rabbitMQExtension.getRabbitMQ().startApp();
+ //wait until healthcheck is ok
+ await().atMost(FIVE_SECONDS).until(() -> rabbitMQExtension.managementAPI().listQueues().size() > 0);
+ rabbitMQExtension.getRabbitChannelPool().start();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org