You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ro...@apache.org on 2019/11/26 15:31:41 UTC

[james-project] 06/07: JAMES-2813 use concatMap instead on flatmap to queue message only in the workqueue, not in the worker

This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 02b5848652b66e36b56468a4fd10738fb1d85bc3
Author: RĂ©mi KOWALSKI <rk...@linagora.com>
AuthorDate: Mon Oct 21 15:41:40 2019 +0200

    JAMES-2813 use concatMap instead on flatmap to queue message only in the workqueue, not in the worker
    
    and remove the semaphore in the worker which was just a workaround this problem.
---
 .../distributed/RabbitMQWorkQueue.java             |  2 +-
 .../distributed/RabbitMQWorkQueueTest.java         | 39 +++++++++------
 .../apache/james/task/SerialTaskManagerWorker.java | 57 ++++++----------------
 3 files changed, 41 insertions(+), 57 deletions(-)

diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
index 54d8b03..27196f8 100644
--- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
+++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
@@ -104,7 +104,7 @@ public class RabbitMQWorkQueue implements WorkQueue {
         receiver = new Receiver(new ReceiverOptions().connectionMono(channelPool.getConnectionMono()));
         receiverHandle = receiver.consumeManualAck(QUEUE_NAME, new ConsumeOptions())
             .subscribeOn(Schedulers.boundedElastic())
-            .flatMap(this::executeTask)
+            .concatMap(this::executeTask)
             .subscribe();
     }
 
diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java
index 3d8b17e..5b8bb43 100644
--- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java
+++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java
@@ -20,30 +20,34 @@
 package org.apache.james.task.eventsourcing.distributed;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.awaitility.Awaitility.await;
 import static org.awaitility.Duration.FIVE_HUNDRED_MILLISECONDS;
+import static org.awaitility.Duration.TWO_SECONDS;
 import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.IntStream;
 
 import org.apache.james.backends.rabbitmq.RabbitMQExtension;
 import org.apache.james.server.task.json.JsonTaskSerializer;
 import org.apache.james.server.task.json.TestTask;
+import org.apache.james.server.task.json.dto.MemoryReferenceTaskStore;
 import org.apache.james.server.task.json.dto.TestTaskDTOModules;
 import org.apache.james.task.CompletedTask;
+import org.apache.james.task.MemoryReferenceTask;
 import org.apache.james.task.Task;
 import org.apache.james.task.TaskExecutionDetails;
 import org.apache.james.task.TaskId;
 import org.apache.james.task.TaskManagerWorker;
 import org.apache.james.task.TaskWithId;
+import org.awaitility.core.ConditionTimeoutException;
+import org.hamcrest.CoreMatchers;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -100,7 +104,7 @@ class RabbitMQWorkQueueTest {
     @BeforeEach
     void setUp() {
         worker = spy(new ImmediateWorker());
-        serializer = JsonTaskSerializer.of(TestTaskDTOModules.COMPLETED_TASK_MODULE);
+        serializer = JsonTaskSerializer.of(TestTaskDTOModules.COMPLETED_TASK_MODULE, TestTaskDTOModules.MEMORY_REFERENCE_TASK_MODULE.apply(new MemoryReferenceTaskStore()));
         testee = new RabbitMQWorkQueue(worker, rabbitMQExtension.getRabbitChannelPool(), serializer);
         testee.start();
     }
@@ -169,23 +173,30 @@ class RabbitMQWorkQueueTest {
 
     @Test
     void tasksShouldBeConsumedSequentially() {
-        Task task1 = new CompletedTask();
+        AtomicLong counter = new AtomicLong(0L);
+
+        Task task1 = new MemoryReferenceTask(() -> {
+            counter.addAndGet(1);
+            Thread.sleep(1000);
+            return Task.Result.COMPLETED;
+        });
         TaskId taskId1 = TaskId.fromString("1111d081-aa30-11e9-bf6c-2d3b9e84aafd");
         TaskWithId taskWithId1 = new TaskWithId(taskId1, task1);
 
-        Task task2 = new CompletedTask();
+        Task task2 =  new MemoryReferenceTask(() -> {
+            counter.addAndGet(2);
+            return Task.Result.COMPLETED;
+        });
+
         TaskId taskId2 = TaskId.fromString("2222d082-aa30-22e9-bf6c-2d3b9e84aafd");
         TaskWithId taskWithId2 = new TaskWithId(taskId2, task2);
 
-        when(worker.executeTask(taskWithId1)).then(answer -> {
-            TimeUnit.MINUTES.sleep(2);
-            return Mono.just(Task.Result.COMPLETED);
-        });
-
         testee.submit(taskWithId1);
         testee.submit(taskWithId2);
 
-        verify(worker, timeout(100)).executeTask(taskWithId1);
-        verifyNoMoreInteractions(worker);
+        assertThatThrownBy(() -> await().atMost(FIVE_HUNDRED_MILLISECONDS).untilAtomic(counter, CoreMatchers.equalTo(3L))).isInstanceOf(ConditionTimeoutException.class);
+        assertThatCode(() -> await().atMost(TWO_SECONDS).untilAtomic(counter, CoreMatchers.equalTo(3L))).doesNotThrowAnyException();
+
     }
+
 }
diff --git a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
index 34f4548..25005fd 100644
--- a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
+++ b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
@@ -22,15 +22,12 @@ import java.io.IOException;
 import java.time.Duration;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.Callable;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Function;
 
 import org.apache.james.util.MDCBuilder;
 import org.apache.james.util.concurrent.NamedThreadFactory;
@@ -38,6 +35,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Sets;
+import reactor.core.Disposable;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
@@ -50,7 +48,6 @@ public class SerialTaskManagerWorker implements TaskManagerWorker {
     private final ExecutorService taskExecutor;
     private final Listener listener;
     private final AtomicReference<Tuple2<TaskId, Future<?>>> runningTask;
-    private final Semaphore semaphore;
     private final Set<TaskId> cancelledTasks;
     private final Duration pollingInterval;
 
@@ -60,48 +57,24 @@ public class SerialTaskManagerWorker implements TaskManagerWorker {
         this.listener = listener;
         this.cancelledTasks = Sets.newConcurrentHashSet();
         this.runningTask = new AtomicReference<>();
-        this.semaphore = new Semaphore(1);
     }
 
     @Override
     public Mono<Task.Result> executeTask(TaskWithId taskWithId) {
-        return Mono
-            .using(
-                acquireSemaphore(taskWithId, listener),
-                executeWithSemaphore(taskWithId, listener),
-                Semaphore::release);
-
-    }
-
-    private Callable<Semaphore> acquireSemaphore(TaskWithId taskWithId, Listener listener) {
-        return () -> {
-                try {
-                    semaphore.acquire();
-                    return semaphore;
-                } catch (InterruptedException e) {
-                    listener.cancelled(taskWithId.getId(), taskWithId.getTask().details());
-                    throw e;
-                }
-            };
-    }
-
-    private Function<Semaphore, Mono<Task.Result>> executeWithSemaphore(TaskWithId taskWithId, Listener listener) {
-        return semaphore -> {
-            if (!cancelledTasks.remove(taskWithId.getId())) {
-                CompletableFuture<Task.Result> future = CompletableFuture.supplyAsync(() -> runWithMdc(taskWithId, listener), taskExecutor);
-                runningTask.set(Tuples.of(taskWithId.getId(), future));
-
-                return Mono.using(
-                    () -> pollAdditionalInformation(taskWithId).subscribe(),
-                    ignored -> Mono.fromFuture(future)
-                            .doOnError(exception -> handleExecutionError(taskWithId, listener, exception))
-                            .onErrorReturn(Task.Result.PARTIAL),
-                    polling -> polling.dispose());
-            } else {
-                listener.cancelled(taskWithId.getId(), taskWithId.getTask().details());
-                return Mono.empty();
-            }
-        };
+        if (!cancelledTasks.remove(taskWithId.getId())) {
+            CompletableFuture<Task.Result> future = CompletableFuture.supplyAsync(() -> runWithMdc(taskWithId, listener), taskExecutor);
+            runningTask.set(Tuples.of(taskWithId.getId(), future));
+
+            return Mono.using(
+                () -> pollAdditionalInformation(taskWithId).subscribe(),
+                ignored -> Mono.fromFuture(future)
+                    .doOnError(exception -> handleExecutionError(taskWithId, listener, exception))
+                    .onErrorReturn(Task.Result.PARTIAL),
+                Disposable::dispose);
+        } else {
+            listener.cancelled(taskWithId.getId(), taskWithId.getTask().details());
+            return Mono.empty();
+        }
     }
 
     private void handleExecutionError(TaskWithId taskWithId, Listener listener, Throwable exception) {


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org