You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ro...@apache.org on 2019/11/26 15:31:41 UTC
[james-project] 06/07: JAMES-2813 use concatMap instead on flatmap
to queue message only in the workqueue, not in the worker
This is an automated email from the ASF dual-hosted git repository.
rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 02b5848652b66e36b56468a4fd10738fb1d85bc3
Author: RĂ©mi KOWALSKI <rk...@linagora.com>
AuthorDate: Mon Oct 21 15:41:40 2019 +0200
JAMES-2813 use concatMap instead on flatmap to queue message only in the workqueue, not in the worker
and remove the semaphore in the worker which was just a workaround this problem.
---
.../distributed/RabbitMQWorkQueue.java | 2 +-
.../distributed/RabbitMQWorkQueueTest.java | 39 +++++++++------
.../apache/james/task/SerialTaskManagerWorker.java | 57 ++++++----------------
3 files changed, 41 insertions(+), 57 deletions(-)
diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
index 54d8b03..27196f8 100644
--- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
+++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
@@ -104,7 +104,7 @@ public class RabbitMQWorkQueue implements WorkQueue {
receiver = new Receiver(new ReceiverOptions().connectionMono(channelPool.getConnectionMono()));
receiverHandle = receiver.consumeManualAck(QUEUE_NAME, new ConsumeOptions())
.subscribeOn(Schedulers.boundedElastic())
- .flatMap(this::executeTask)
+ .concatMap(this::executeTask)
.subscribe();
}
diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java
index 3d8b17e..5b8bb43 100644
--- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java
+++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java
@@ -20,30 +20,34 @@
package org.apache.james.task.eventsourcing.distributed;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.awaitility.Awaitility.await;
import static org.awaitility.Duration.FIVE_HUNDRED_MILLISECONDS;
+import static org.awaitility.Duration.TWO_SECONDS;
import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;
import org.apache.james.backends.rabbitmq.RabbitMQExtension;
import org.apache.james.server.task.json.JsonTaskSerializer;
import org.apache.james.server.task.json.TestTask;
+import org.apache.james.server.task.json.dto.MemoryReferenceTaskStore;
import org.apache.james.server.task.json.dto.TestTaskDTOModules;
import org.apache.james.task.CompletedTask;
+import org.apache.james.task.MemoryReferenceTask;
import org.apache.james.task.Task;
import org.apache.james.task.TaskExecutionDetails;
import org.apache.james.task.TaskId;
import org.apache.james.task.TaskManagerWorker;
import org.apache.james.task.TaskWithId;
+import org.awaitility.core.ConditionTimeoutException;
+import org.hamcrest.CoreMatchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -100,7 +104,7 @@ class RabbitMQWorkQueueTest {
@BeforeEach
void setUp() {
worker = spy(new ImmediateWorker());
- serializer = JsonTaskSerializer.of(TestTaskDTOModules.COMPLETED_TASK_MODULE);
+ serializer = JsonTaskSerializer.of(TestTaskDTOModules.COMPLETED_TASK_MODULE, TestTaskDTOModules.MEMORY_REFERENCE_TASK_MODULE.apply(new MemoryReferenceTaskStore()));
testee = new RabbitMQWorkQueue(worker, rabbitMQExtension.getRabbitChannelPool(), serializer);
testee.start();
}
@@ -169,23 +173,30 @@ class RabbitMQWorkQueueTest {
@Test
void tasksShouldBeConsumedSequentially() {
- Task task1 = new CompletedTask();
+ AtomicLong counter = new AtomicLong(0L);
+
+ Task task1 = new MemoryReferenceTask(() -> {
+ counter.addAndGet(1);
+ Thread.sleep(1000);
+ return Task.Result.COMPLETED;
+ });
TaskId taskId1 = TaskId.fromString("1111d081-aa30-11e9-bf6c-2d3b9e84aafd");
TaskWithId taskWithId1 = new TaskWithId(taskId1, task1);
- Task task2 = new CompletedTask();
+ Task task2 = new MemoryReferenceTask(() -> {
+ counter.addAndGet(2);
+ return Task.Result.COMPLETED;
+ });
+
TaskId taskId2 = TaskId.fromString("2222d082-aa30-22e9-bf6c-2d3b9e84aafd");
TaskWithId taskWithId2 = new TaskWithId(taskId2, task2);
- when(worker.executeTask(taskWithId1)).then(answer -> {
- TimeUnit.MINUTES.sleep(2);
- return Mono.just(Task.Result.COMPLETED);
- });
-
testee.submit(taskWithId1);
testee.submit(taskWithId2);
- verify(worker, timeout(100)).executeTask(taskWithId1);
- verifyNoMoreInteractions(worker);
+ assertThatThrownBy(() -> await().atMost(FIVE_HUNDRED_MILLISECONDS).untilAtomic(counter, CoreMatchers.equalTo(3L))).isInstanceOf(ConditionTimeoutException.class);
+ assertThatCode(() -> await().atMost(TWO_SECONDS).untilAtomic(counter, CoreMatchers.equalTo(3L))).doesNotThrowAnyException();
+
}
+
}
diff --git a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
index 34f4548..25005fd 100644
--- a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
+++ b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
@@ -22,15 +22,12 @@ import java.io.IOException;
import java.time.Duration;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Function;
import org.apache.james.util.MDCBuilder;
import org.apache.james.util.concurrent.NamedThreadFactory;
@@ -38,6 +35,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Sets;
+import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
@@ -50,7 +48,6 @@ public class SerialTaskManagerWorker implements TaskManagerWorker {
private final ExecutorService taskExecutor;
private final Listener listener;
private final AtomicReference<Tuple2<TaskId, Future<?>>> runningTask;
- private final Semaphore semaphore;
private final Set<TaskId> cancelledTasks;
private final Duration pollingInterval;
@@ -60,48 +57,24 @@ public class SerialTaskManagerWorker implements TaskManagerWorker {
this.listener = listener;
this.cancelledTasks = Sets.newConcurrentHashSet();
this.runningTask = new AtomicReference<>();
- this.semaphore = new Semaphore(1);
}
@Override
public Mono<Task.Result> executeTask(TaskWithId taskWithId) {
- return Mono
- .using(
- acquireSemaphore(taskWithId, listener),
- executeWithSemaphore(taskWithId, listener),
- Semaphore::release);
-
- }
-
- private Callable<Semaphore> acquireSemaphore(TaskWithId taskWithId, Listener listener) {
- return () -> {
- try {
- semaphore.acquire();
- return semaphore;
- } catch (InterruptedException e) {
- listener.cancelled(taskWithId.getId(), taskWithId.getTask().details());
- throw e;
- }
- };
- }
-
- private Function<Semaphore, Mono<Task.Result>> executeWithSemaphore(TaskWithId taskWithId, Listener listener) {
- return semaphore -> {
- if (!cancelledTasks.remove(taskWithId.getId())) {
- CompletableFuture<Task.Result> future = CompletableFuture.supplyAsync(() -> runWithMdc(taskWithId, listener), taskExecutor);
- runningTask.set(Tuples.of(taskWithId.getId(), future));
-
- return Mono.using(
- () -> pollAdditionalInformation(taskWithId).subscribe(),
- ignored -> Mono.fromFuture(future)
- .doOnError(exception -> handleExecutionError(taskWithId, listener, exception))
- .onErrorReturn(Task.Result.PARTIAL),
- polling -> polling.dispose());
- } else {
- listener.cancelled(taskWithId.getId(), taskWithId.getTask().details());
- return Mono.empty();
- }
- };
+ if (!cancelledTasks.remove(taskWithId.getId())) {
+ CompletableFuture<Task.Result> future = CompletableFuture.supplyAsync(() -> runWithMdc(taskWithId, listener), taskExecutor);
+ runningTask.set(Tuples.of(taskWithId.getId(), future));
+
+ return Mono.using(
+ () -> pollAdditionalInformation(taskWithId).subscribe(),
+ ignored -> Mono.fromFuture(future)
+ .doOnError(exception -> handleExecutionError(taskWithId, listener, exception))
+ .onErrorReturn(Task.Result.PARTIAL),
+ Disposable::dispose);
+ } else {
+ listener.cancelled(taskWithId.getId(), taskWithId.getTask().details());
+ return Mono.empty();
+ }
}
private void handleExecutionError(TaskWithId taskWithId, Listener listener, Throwable exception) {
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org