You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ro...@apache.org on 2019/10/18 06:40:37 UTC
[james-project] 04/17: JAMES-2813 replace mocks in
RabbitMQWorkQueueTest by a stub
This is an automated email from the ASF dual-hosted git repository.
rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 5da39aee831a23a855ba201fc1ed42095b841174
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Tue Oct 8 15:37:20 2019 +0200
JAMES-2813 replace mocks in RabbitMQWorkQueueTest by a stub
---
.../distributed/RabbitMQWorkQueueTest.java | 109 ++++++++++++---------
1 file changed, 63 insertions(+), 46 deletions(-)
diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java
index 50950e3..2a109cd 100644
--- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java
+++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java
@@ -20,16 +20,17 @@
package org.apache.james.task.eventsourcing.distributed;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.mock;
+import static org.awaitility.Awaitility.await;
+import static org.awaitility.Duration.FIVE_HUNDRED_MILLISECONDS;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
+import java.io.IOException;
import java.util.Optional;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
@@ -39,6 +40,7 @@ import org.apache.james.server.task.json.TestTask;
import org.apache.james.server.task.json.dto.TestTaskDTOModules;
import org.apache.james.task.CompletedTask;
import org.apache.james.task.Task;
+import org.apache.james.task.TaskExecutionDetails;
import org.apache.james.task.TaskId;
import org.apache.james.task.TaskManagerWorker;
import org.apache.james.task.TaskWithId;
@@ -46,10 +48,9 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
-import org.mockito.ArgumentCaptor;
-import org.mockito.verification.Timeout;
import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
class RabbitMQWorkQueueTest {
private static final TaskId TASK_ID = TaskId.fromString("2c7f4081-aa30-11e9-bf6c-2d3b9e84aafd");
@@ -65,14 +66,40 @@ class RabbitMQWorkQueueTest {
private RabbitMQWorkQueue testee;
- private TaskManagerWorker taskManagerWorker;
+ private ImmediateWorker taskManagerWorker;
private JsonTaskSerializer taskSerializer;
+ private static class ImmediateWorker implements TaskManagerWorker {
+
+ ConcurrentLinkedQueue<TaskWithId> tasks = new ConcurrentLinkedQueue<>();
+ ConcurrentLinkedQueue<Task.Result> results = new ConcurrentLinkedQueue<>();
+ ConcurrentLinkedQueue<TaskId> failedTasks = new ConcurrentLinkedQueue<>();
+
+ @Override
+ public Mono<Task.Result> executeTask(TaskWithId taskWithId) {
+ tasks.add(taskWithId);
+ return Mono.fromCallable(() -> taskWithId.getTask().run())
+ .doOnNext(result -> results.add(result))
+ .subscribeOn(Schedulers.boundedElastic());
+ }
+
+ @Override
+ public void cancelTask(TaskId taskId) {
+ }
+
+ @Override
+ public void fail(TaskId taskId, Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation,String errorMessage, Throwable reason) {
+ failedTasks.add(taskId);
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+ }
+
@BeforeEach
void setUp() {
- taskManagerWorker = mock(TaskManagerWorker.class);
- when(taskManagerWorker.executeTask(TASK_WITH_ID)).thenReturn(Mono.just(Task.Result.COMPLETED));
- when(taskManagerWorker.executeTask(TASK_WITH_ID_2)).thenReturn(Mono.just(Task.Result.COMPLETED));
+ taskManagerWorker = spy(new ImmediateWorker());
taskSerializer = new JsonTaskSerializer(TestTaskDTOModules.COMPLETED_TASK_MODULE);
testee = new RabbitMQWorkQueue(taskManagerWorker, rabbitMQExtension.getRabbitConnectionPool(), taskSerializer);
testee.start();
@@ -86,46 +113,34 @@ class RabbitMQWorkQueueTest {
@Test
void workerShouldConsumeSubmittedTask() {
testee.submit(TASK_WITH_ID);
-
- ArgumentCaptor<TaskWithId> taskWithIdCaptor = ArgumentCaptor.forClass(TaskWithId.class);
- verify(taskManagerWorker, timeout(1000)).executeTask(taskWithIdCaptor.capture());
-
- TaskWithId actualTaskWithId = taskWithIdCaptor.getValue();
- assertThat(actualTaskWithId.getId()).isEqualTo(TASK_ID);
- assertThat(actualTaskWithId.getTask().type()).isEqualTo(TASK.type());
+ await().atMost(FIVE_HUNDRED_MILLISECONDS).until(() -> !taskManagerWorker.results.isEmpty());
+ assertThat(taskManagerWorker.tasks).containsExactly(TASK_WITH_ID);
+ assertThat(taskManagerWorker.results).containsExactly(Task.Result.COMPLETED);
}
@Test
void workerShouldConsumeTwoSubmittedTask() {
testee.submit(TASK_WITH_ID);
testee.submit(TASK_WITH_ID_2);
-
- ArgumentCaptor<TaskWithId> taskWithIdCaptor = ArgumentCaptor.forClass(TaskWithId.class);
- verify(taskManagerWorker, new Timeout(1000, times(2))).executeTask(taskWithIdCaptor.capture());
-
- TaskWithId actualTaskWithId = taskWithIdCaptor.getAllValues().get(0);
- assertThat(actualTaskWithId.getId()).isEqualTo(TASK_ID);
- assertThat(actualTaskWithId.getTask().type()).isEqualTo(TASK.type());
-
- TaskWithId actualSecondTaskWithId = taskWithIdCaptor.getAllValues().get(1);
- assertThat(actualSecondTaskWithId.getId()).isEqualTo(TASK_ID_2);
- assertThat(actualSecondTaskWithId.getTask().type()).isEqualTo(TASK2.type());
+ await().atMost(FIVE_HUNDRED_MILLISECONDS).until(() -> taskManagerWorker.results.size() == 2);
+ assertThat(taskManagerWorker.tasks).containsExactly(TASK_WITH_ID, TASK_WITH_ID_2);
+ assertThat(taskManagerWorker.results).allSatisfy(result -> assertThat(result).isEqualTo(Task.Result.COMPLETED));
}
@Test
void givenTwoWorkQueuesOnlyTheFirstOneIsConsumingTasks() {
testee.submit(TASK_WITH_ID);
- TaskManagerWorker otherTaskManagerWorker = mock(TaskManagerWorker.class);
- RabbitMQWorkQueue otherWorkQueue = new RabbitMQWorkQueue(otherTaskManagerWorker, rabbitMQExtension.getRabbitConnectionPool(), taskSerializer);
- otherWorkQueue.start();
-
- IntStream.range(0, 9)
- .forEach(ignoredIndex -> testee.submit(TASK_WITH_ID_2));
+ ImmediateWorker otherTaskManagerWorker = new ImmediateWorker();
+ try (RabbitMQWorkQueue otherWorkQueue = new RabbitMQWorkQueue(otherTaskManagerWorker, rabbitMQExtension.getRabbitConnectionPool(), taskSerializer)) {
+ otherWorkQueue.start();
- verify(taskManagerWorker, new Timeout(1000, times(10))).executeTask(any());
+ IntStream.range(0, 9)
+ .forEach(ignoredIndex -> testee.submit(TASK_WITH_ID_2));
- verify(otherTaskManagerWorker, new Timeout(1000, times(0))).executeTask(any());
+ await().atMost(FIVE_HUNDRED_MILLISECONDS).until(() -> taskManagerWorker.results.size() == 10);
+ assertThat(otherTaskManagerWorker.tasks).isEmpty();
+ }
}
@Test
@@ -134,20 +149,22 @@ class RabbitMQWorkQueueTest {
TaskId taskId = TaskId.fromString("4bf6d081-aa30-11e9-bf6c-2d3b9e84aafd");
TaskWithId taskWithId = new TaskWithId(taskId, task);
- TaskManagerWorker otherTaskManagerWorker = mock(TaskManagerWorker.class);
+ ImmediateWorker otherTaskManagerWorker = new ImmediateWorker();
JsonTaskSerializer otherTaskSerializer = new JsonTaskSerializer(TestTaskDTOModules.TEST_TYPE);
- RabbitMQWorkQueue otherWorkQueue = new RabbitMQWorkQueue(otherTaskManagerWorker, rabbitMQExtension.getRabbitConnectionPool(), otherTaskSerializer);
- //wait to be sur that the first workqueue has subscribed as an exclusive consumer of the RabbitMQ queue.
- Thread.sleep(200);
- otherWorkQueue.start();
+ try (RabbitMQWorkQueue otherWorkQueue = new RabbitMQWorkQueue(otherTaskManagerWorker, rabbitMQExtension.getRabbitConnectionPool(), otherTaskSerializer)) {
+ //wait to be sur that the first workqueue has subscribed as an exclusive consumer of the RabbitMQ queue.
+ Thread.sleep(200);
+ otherWorkQueue.start();
- otherWorkQueue.submit(taskWithId);
+ otherWorkQueue.submit(taskWithId);
- verify(taskManagerWorker, new Timeout(100, times(0))).executeTask(any());
- verify(taskManagerWorker, timeout(100)).fail(eq(taskId), eq(Optional.empty()), any(), any());
+ await().atMost(FIVE_HUNDRED_MILLISECONDS).until(() -> taskManagerWorker.failedTasks.size() == 1);
+ assertThat(taskManagerWorker.failedTasks).containsExactly(taskWithId.getId());
- testee.submit(TASK_WITH_ID);
- verify(taskManagerWorker, timeout(100)).executeTask(any());
+ testee.submit(TASK_WITH_ID);
+ await().atMost(FIVE_HUNDRED_MILLISECONDS).until(() -> taskManagerWorker.results.size() == 1);
+ assertThat(taskManagerWorker.tasks).containsExactly(TASK_WITH_ID);
+ }
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org