You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ro...@apache.org on 2019/10/18 06:40:37 UTC

[james-project] 04/17: JAMES-2813 replace mocks in RabbitMQWorkQueueTest by a stub

This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 5da39aee831a23a855ba201fc1ed42095b841174
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Tue Oct 8 15:37:20 2019 +0200

    JAMES-2813 replace mocks in RabbitMQWorkQueueTest by a stub
---
 .../distributed/RabbitMQWorkQueueTest.java         | 109 ++++++++++++---------
 1 file changed, 63 insertions(+), 46 deletions(-)

diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java
index 50950e3..2a109cd 100644
--- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java
+++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java
@@ -20,16 +20,17 @@
 package org.apache.james.task.eventsourcing.distributed;
 
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.mock;
+import static org.awaitility.Awaitility.await;
+import static org.awaitility.Duration.FIVE_HUNDRED_MILLISECONDS;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
+import java.io.IOException;
 import java.util.Optional;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.IntStream;
 
@@ -39,6 +40,7 @@ import org.apache.james.server.task.json.TestTask;
 import org.apache.james.server.task.json.dto.TestTaskDTOModules;
 import org.apache.james.task.CompletedTask;
 import org.apache.james.task.Task;
+import org.apache.james.task.TaskExecutionDetails;
 import org.apache.james.task.TaskId;
 import org.apache.james.task.TaskManagerWorker;
 import org.apache.james.task.TaskWithId;
@@ -46,10 +48,9 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
-import org.mockito.ArgumentCaptor;
-import org.mockito.verification.Timeout;
 
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
 class RabbitMQWorkQueueTest {
     private static final TaskId TASK_ID = TaskId.fromString("2c7f4081-aa30-11e9-bf6c-2d3b9e84aafd");
@@ -65,14 +66,40 @@ class RabbitMQWorkQueueTest {
 
 
     private RabbitMQWorkQueue testee;
-    private TaskManagerWorker taskManagerWorker;
+    private ImmediateWorker taskManagerWorker;
     private JsonTaskSerializer taskSerializer;
 
+    private static class ImmediateWorker implements TaskManagerWorker {
+
+        ConcurrentLinkedQueue<TaskWithId> tasks = new ConcurrentLinkedQueue<>();
+        ConcurrentLinkedQueue<Task.Result> results = new ConcurrentLinkedQueue<>();
+        ConcurrentLinkedQueue<TaskId> failedTasks = new ConcurrentLinkedQueue<>();
+
+        @Override
+        public Mono<Task.Result> executeTask(TaskWithId taskWithId) {
+            tasks.add(taskWithId);
+            return Mono.fromCallable(() -> taskWithId.getTask().run())
+                .doOnNext(result -> results.add(result))
+                .subscribeOn(Schedulers.boundedElastic());
+        }
+
+        @Override
+        public void cancelTask(TaskId taskId) {
+        }
+
+        @Override
+        public void fail(TaskId taskId, Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation,String errorMessage, Throwable reason) {
+            failedTasks.add(taskId);
+        }
+
+        @Override
+        public void close() throws IOException {
+        }
+    }
+
     @BeforeEach
     void setUp() {
-        taskManagerWorker = mock(TaskManagerWorker.class);
-        when(taskManagerWorker.executeTask(TASK_WITH_ID)).thenReturn(Mono.just(Task.Result.COMPLETED));
-        when(taskManagerWorker.executeTask(TASK_WITH_ID_2)).thenReturn(Mono.just(Task.Result.COMPLETED));
+        taskManagerWorker = spy(new ImmediateWorker());
         taskSerializer = new JsonTaskSerializer(TestTaskDTOModules.COMPLETED_TASK_MODULE);
         testee = new RabbitMQWorkQueue(taskManagerWorker, rabbitMQExtension.getRabbitConnectionPool(), taskSerializer);
         testee.start();
@@ -86,46 +113,34 @@ class RabbitMQWorkQueueTest {
     @Test
     void workerShouldConsumeSubmittedTask() {
         testee.submit(TASK_WITH_ID);
-
-        ArgumentCaptor<TaskWithId> taskWithIdCaptor = ArgumentCaptor.forClass(TaskWithId.class);
-        verify(taskManagerWorker, timeout(1000)).executeTask(taskWithIdCaptor.capture());
-
-        TaskWithId actualTaskWithId = taskWithIdCaptor.getValue();
-        assertThat(actualTaskWithId.getId()).isEqualTo(TASK_ID);
-        assertThat(actualTaskWithId.getTask().type()).isEqualTo(TASK.type());
+        await().atMost(FIVE_HUNDRED_MILLISECONDS).until(() -> !taskManagerWorker.results.isEmpty());
+        assertThat(taskManagerWorker.tasks).containsExactly(TASK_WITH_ID);
+        assertThat(taskManagerWorker.results).containsExactly(Task.Result.COMPLETED);
     }
 
     @Test
     void workerShouldConsumeTwoSubmittedTask() {
         testee.submit(TASK_WITH_ID);
         testee.submit(TASK_WITH_ID_2);
-
-        ArgumentCaptor<TaskWithId> taskWithIdCaptor = ArgumentCaptor.forClass(TaskWithId.class);
-        verify(taskManagerWorker, new Timeout(1000, times(2))).executeTask(taskWithIdCaptor.capture());
-
-        TaskWithId actualTaskWithId = taskWithIdCaptor.getAllValues().get(0);
-        assertThat(actualTaskWithId.getId()).isEqualTo(TASK_ID);
-        assertThat(actualTaskWithId.getTask().type()).isEqualTo(TASK.type());
-
-        TaskWithId actualSecondTaskWithId = taskWithIdCaptor.getAllValues().get(1);
-        assertThat(actualSecondTaskWithId.getId()).isEqualTo(TASK_ID_2);
-        assertThat(actualSecondTaskWithId.getTask().type()).isEqualTo(TASK2.type());
+        await().atMost(FIVE_HUNDRED_MILLISECONDS).until(() -> taskManagerWorker.results.size() == 2);
+        assertThat(taskManagerWorker.tasks).containsExactly(TASK_WITH_ID, TASK_WITH_ID_2);
+        assertThat(taskManagerWorker.results).allSatisfy(result -> assertThat(result).isEqualTo(Task.Result.COMPLETED));
     }
 
     @Test
     void givenTwoWorkQueuesOnlyTheFirstOneIsConsumingTasks() {
         testee.submit(TASK_WITH_ID);
 
-        TaskManagerWorker otherTaskManagerWorker = mock(TaskManagerWorker.class);
-        RabbitMQWorkQueue otherWorkQueue = new RabbitMQWorkQueue(otherTaskManagerWorker, rabbitMQExtension.getRabbitConnectionPool(), taskSerializer);
-        otherWorkQueue.start();
-
-        IntStream.range(0, 9)
-            .forEach(ignoredIndex -> testee.submit(TASK_WITH_ID_2));
+        ImmediateWorker otherTaskManagerWorker = new ImmediateWorker();
+        try (RabbitMQWorkQueue otherWorkQueue = new RabbitMQWorkQueue(otherTaskManagerWorker, rabbitMQExtension.getRabbitConnectionPool(), taskSerializer)) {
+            otherWorkQueue.start();
 
-        verify(taskManagerWorker, new Timeout(1000, times(10))).executeTask(any());
+            IntStream.range(0, 9)
+                .forEach(ignoredIndex -> testee.submit(TASK_WITH_ID_2));
 
-        verify(otherTaskManagerWorker, new Timeout(1000, times(0))).executeTask(any());
+            await().atMost(FIVE_HUNDRED_MILLISECONDS).until(() -> taskManagerWorker.results.size() == 10);
+            assertThat(otherTaskManagerWorker.tasks).isEmpty();
+        }
     }
 
     @Test
@@ -134,20 +149,22 @@ class RabbitMQWorkQueueTest {
         TaskId taskId = TaskId.fromString("4bf6d081-aa30-11e9-bf6c-2d3b9e84aafd");
         TaskWithId taskWithId = new TaskWithId(taskId, task);
 
-        TaskManagerWorker otherTaskManagerWorker = mock(TaskManagerWorker.class);
+        ImmediateWorker otherTaskManagerWorker = new ImmediateWorker();
         JsonTaskSerializer otherTaskSerializer = new JsonTaskSerializer(TestTaskDTOModules.TEST_TYPE);
-        RabbitMQWorkQueue otherWorkQueue = new RabbitMQWorkQueue(otherTaskManagerWorker, rabbitMQExtension.getRabbitConnectionPool(), otherTaskSerializer);
-        //wait to be sur that the first workqueue has subscribed as an exclusive consumer of the RabbitMQ queue.
-        Thread.sleep(200);
-        otherWorkQueue.start();
+        try (RabbitMQWorkQueue otherWorkQueue = new RabbitMQWorkQueue(otherTaskManagerWorker, rabbitMQExtension.getRabbitConnectionPool(), otherTaskSerializer)) {
+            //wait to be sur that the first workqueue has subscribed as an exclusive consumer of the RabbitMQ queue.
+            Thread.sleep(200);
+            otherWorkQueue.start();
 
-        otherWorkQueue.submit(taskWithId);
+            otherWorkQueue.submit(taskWithId);
 
-        verify(taskManagerWorker, new Timeout(100, times(0))).executeTask(any());
-        verify(taskManagerWorker, timeout(100)).fail(eq(taskId), eq(Optional.empty()), any(), any());
+            await().atMost(FIVE_HUNDRED_MILLISECONDS).until(() -> taskManagerWorker.failedTasks.size() == 1);
+            assertThat(taskManagerWorker.failedTasks).containsExactly(taskWithId.getId());
 
-        testee.submit(TASK_WITH_ID);
-        verify(taskManagerWorker, timeout(100)).executeTask(any());
+            testee.submit(TASK_WITH_ID);
+            await().atMost(FIVE_HUNDRED_MILLISECONDS).until(() -> taskManagerWorker.results.size() == 1);
+            assertThat(taskManagerWorker.tasks).containsExactly(TASK_WITH_ID);
+        }
     }
 
     @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org