You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ro...@apache.org on 2019/11/26 15:31:35 UTC

[james-project] branch master updated (d8a4a8c -> 1e5a8dd)

This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git.


    from d8a4a8c  JAMES-2110 Reduce ImapRequestLineReader::consumeLiteral buffer size
     new 0bf4e83  JAMES-2813 use rabbitmq 3.8 docker image  in tests
     new 0f7d0b0  JAMES-2813 add single active consumer constants
     new 1e9f23b  JAMES-2813 expose singe active consumer information in management API
     new 93496e1  JAMES-2813 add test to demonstrate the use of single active consumer with rabbitmq 3.8
     new 8ef3cab  JAMES-2813 replace exclusive consumer usage with a queue with single active consumer
     new 02b5848  JAMES-2813 use concatMap instead on flatmap to queue message only in the workqueue, not in the worker
     new 1e5a8dd  JAMES-2813 add test to demonstrate resilience with single active consumer

The 7 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/james/backends/rabbitmq/Constants.java  |   6 +
 .../backends/rabbitmq/RabbitMQManagementAPI.java   |  90 ++++++++++-
 .../james/backends/rabbitmq/RabbitMQTest.java      | 149 +++++++++++++++++-
 .../distributed/RabbitMQExclusiveConsumer.java     | 174 ---------------------
 .../distributed/RabbitMQWorkQueue.java             |  15 +-
 .../distributed/DistributedTaskManagerTest.java    |  39 +++++
 .../distributed/RabbitMQWorkQueueTest.java         |  39 +++--
 .../apache/james/task/SerialTaskManagerWorker.java |  57 ++-----
 .../java/org/apache/james/util/docker/Images.java  |   2 +-
 9 files changed, 324 insertions(+), 247 deletions(-)
 delete mode 100644 server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQExclusiveConsumer.java


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 06/07: JAMES-2813 use concatMap instead on flatmap to queue message only in the workqueue, not in the worker

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 02b5848652b66e36b56468a4fd10738fb1d85bc3
Author: Rémi KOWALSKI <rk...@linagora.com>
AuthorDate: Mon Oct 21 15:41:40 2019 +0200

    JAMES-2813 use concatMap instead on flatmap to queue message only in the workqueue, not in the worker
    
    and remove the semaphore in the worker which was just a workaround this problem.
---
 .../distributed/RabbitMQWorkQueue.java             |  2 +-
 .../distributed/RabbitMQWorkQueueTest.java         | 39 +++++++++------
 .../apache/james/task/SerialTaskManagerWorker.java | 57 ++++++----------------
 3 files changed, 41 insertions(+), 57 deletions(-)

diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
index 54d8b03..27196f8 100644
--- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
+++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
@@ -104,7 +104,7 @@ public class RabbitMQWorkQueue implements WorkQueue {
         receiver = new Receiver(new ReceiverOptions().connectionMono(channelPool.getConnectionMono()));
         receiverHandle = receiver.consumeManualAck(QUEUE_NAME, new ConsumeOptions())
             .subscribeOn(Schedulers.boundedElastic())
-            .flatMap(this::executeTask)
+            .concatMap(this::executeTask)
             .subscribe();
     }
 
diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java
index 3d8b17e..5b8bb43 100644
--- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java
+++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java
@@ -20,30 +20,34 @@
 package org.apache.james.task.eventsourcing.distributed;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.awaitility.Awaitility.await;
 import static org.awaitility.Duration.FIVE_HUNDRED_MILLISECONDS;
+import static org.awaitility.Duration.TWO_SECONDS;
 import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.IntStream;
 
 import org.apache.james.backends.rabbitmq.RabbitMQExtension;
 import org.apache.james.server.task.json.JsonTaskSerializer;
 import org.apache.james.server.task.json.TestTask;
+import org.apache.james.server.task.json.dto.MemoryReferenceTaskStore;
 import org.apache.james.server.task.json.dto.TestTaskDTOModules;
 import org.apache.james.task.CompletedTask;
+import org.apache.james.task.MemoryReferenceTask;
 import org.apache.james.task.Task;
 import org.apache.james.task.TaskExecutionDetails;
 import org.apache.james.task.TaskId;
 import org.apache.james.task.TaskManagerWorker;
 import org.apache.james.task.TaskWithId;
+import org.awaitility.core.ConditionTimeoutException;
+import org.hamcrest.CoreMatchers;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -100,7 +104,7 @@ class RabbitMQWorkQueueTest {
     @BeforeEach
     void setUp() {
         worker = spy(new ImmediateWorker());
-        serializer = JsonTaskSerializer.of(TestTaskDTOModules.COMPLETED_TASK_MODULE);
+        serializer = JsonTaskSerializer.of(TestTaskDTOModules.COMPLETED_TASK_MODULE, TestTaskDTOModules.MEMORY_REFERENCE_TASK_MODULE.apply(new MemoryReferenceTaskStore()));
         testee = new RabbitMQWorkQueue(worker, rabbitMQExtension.getRabbitChannelPool(), serializer);
         testee.start();
     }
@@ -169,23 +173,30 @@ class RabbitMQWorkQueueTest {
 
     @Test
     void tasksShouldBeConsumedSequentially() {
-        Task task1 = new CompletedTask();
+        AtomicLong counter = new AtomicLong(0L);
+
+        Task task1 = new MemoryReferenceTask(() -> {
+            counter.addAndGet(1);
+            Thread.sleep(1000);
+            return Task.Result.COMPLETED;
+        });
         TaskId taskId1 = TaskId.fromString("1111d081-aa30-11e9-bf6c-2d3b9e84aafd");
         TaskWithId taskWithId1 = new TaskWithId(taskId1, task1);
 
-        Task task2 = new CompletedTask();
+        Task task2 =  new MemoryReferenceTask(() -> {
+            counter.addAndGet(2);
+            return Task.Result.COMPLETED;
+        });
+
         TaskId taskId2 = TaskId.fromString("2222d082-aa30-22e9-bf6c-2d3b9e84aafd");
         TaskWithId taskWithId2 = new TaskWithId(taskId2, task2);
 
-        when(worker.executeTask(taskWithId1)).then(answer -> {
-            TimeUnit.MINUTES.sleep(2);
-            return Mono.just(Task.Result.COMPLETED);
-        });
-
         testee.submit(taskWithId1);
         testee.submit(taskWithId2);
 
-        verify(worker, timeout(100)).executeTask(taskWithId1);
-        verifyNoMoreInteractions(worker);
+        assertThatThrownBy(() -> await().atMost(FIVE_HUNDRED_MILLISECONDS).untilAtomic(counter, CoreMatchers.equalTo(3L))).isInstanceOf(ConditionTimeoutException.class);
+        assertThatCode(() -> await().atMost(TWO_SECONDS).untilAtomic(counter, CoreMatchers.equalTo(3L))).doesNotThrowAnyException();
+
     }
+
 }
diff --git a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
index 34f4548..25005fd 100644
--- a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
+++ b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
@@ -22,15 +22,12 @@ import java.io.IOException;
 import java.time.Duration;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.Callable;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Function;
 
 import org.apache.james.util.MDCBuilder;
 import org.apache.james.util.concurrent.NamedThreadFactory;
@@ -38,6 +35,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Sets;
+import reactor.core.Disposable;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
@@ -50,7 +48,6 @@ public class SerialTaskManagerWorker implements TaskManagerWorker {
     private final ExecutorService taskExecutor;
     private final Listener listener;
     private final AtomicReference<Tuple2<TaskId, Future<?>>> runningTask;
-    private final Semaphore semaphore;
     private final Set<TaskId> cancelledTasks;
     private final Duration pollingInterval;
 
@@ -60,48 +57,24 @@ public class SerialTaskManagerWorker implements TaskManagerWorker {
         this.listener = listener;
         this.cancelledTasks = Sets.newConcurrentHashSet();
         this.runningTask = new AtomicReference<>();
-        this.semaphore = new Semaphore(1);
     }
 
     @Override
     public Mono<Task.Result> executeTask(TaskWithId taskWithId) {
-        return Mono
-            .using(
-                acquireSemaphore(taskWithId, listener),
-                executeWithSemaphore(taskWithId, listener),
-                Semaphore::release);
-
-    }
-
-    private Callable<Semaphore> acquireSemaphore(TaskWithId taskWithId, Listener listener) {
-        return () -> {
-                try {
-                    semaphore.acquire();
-                    return semaphore;
-                } catch (InterruptedException e) {
-                    listener.cancelled(taskWithId.getId(), taskWithId.getTask().details());
-                    throw e;
-                }
-            };
-    }
-
-    private Function<Semaphore, Mono<Task.Result>> executeWithSemaphore(TaskWithId taskWithId, Listener listener) {
-        return semaphore -> {
-            if (!cancelledTasks.remove(taskWithId.getId())) {
-                CompletableFuture<Task.Result> future = CompletableFuture.supplyAsync(() -> runWithMdc(taskWithId, listener), taskExecutor);
-                runningTask.set(Tuples.of(taskWithId.getId(), future));
-
-                return Mono.using(
-                    () -> pollAdditionalInformation(taskWithId).subscribe(),
-                    ignored -> Mono.fromFuture(future)
-                            .doOnError(exception -> handleExecutionError(taskWithId, listener, exception))
-                            .onErrorReturn(Task.Result.PARTIAL),
-                    polling -> polling.dispose());
-            } else {
-                listener.cancelled(taskWithId.getId(), taskWithId.getTask().details());
-                return Mono.empty();
-            }
-        };
+        if (!cancelledTasks.remove(taskWithId.getId())) {
+            CompletableFuture<Task.Result> future = CompletableFuture.supplyAsync(() -> runWithMdc(taskWithId, listener), taskExecutor);
+            runningTask.set(Tuples.of(taskWithId.getId(), future));
+
+            return Mono.using(
+                () -> pollAdditionalInformation(taskWithId).subscribe(),
+                ignored -> Mono.fromFuture(future)
+                    .doOnError(exception -> handleExecutionError(taskWithId, listener, exception))
+                    .onErrorReturn(Task.Result.PARTIAL),
+                Disposable::dispose);
+        } else {
+            listener.cancelled(taskWithId.getId(), taskWithId.getTask().details());
+            return Mono.empty();
+        }
     }
 
     private void handleExecutionError(TaskWithId taskWithId, Listener listener, Throwable exception) {


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 04/07: JAMES-2813 add test to demonstrate the use of single active consumer with rabbitmq 3.8

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 93496e1d98915c1073d3767f3300aeddcbdf5754
Author: Rémi KOWALSKI <rk...@linagora.com>
AuthorDate: Mon Oct 21 10:17:43 2019 +0200

    JAMES-2813 add test to demonstrate the use of single active consumer with rabbitmq 3.8
---
 .../james/backends/rabbitmq/RabbitMQTest.java      | 149 +++++++++++++++++++--
 1 file changed, 141 insertions(+), 8 deletions(-)

diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQTest.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQTest.java
index b673a10..a41a903 100644
--- a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQTest.java
+++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQTest.java
@@ -37,10 +37,13 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
+import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import org.junit.jupiter.api.AfterEach;
@@ -59,6 +62,7 @@ import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
 import com.rabbitmq.client.DeliverCallback;
+import com.rabbitmq.client.Delivery;
 
 class RabbitMQTest {
 
@@ -200,8 +204,8 @@ class RabbitMQTest {
                 IntStream.range(0, 10)
                     .mapToObj(String::valueOf)
                     .map(RabbitMQTest.this::asBytes)
-                    .forEach(Throwing.consumer(
-                        bytes -> channel1.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
+                    .forEach(Throwing.<byte[]>consumer(
+                        bytes -> channel1.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)).sneakyThrow());
 
                 awaitAtMostOneMinute.until(
                     () -> countReceivedMessages(consumer2, consumer3, consumer4) == 30);
@@ -233,8 +237,8 @@ class RabbitMQTest {
                 IntStream.range(0, nbMessages)
                     .mapToObj(String::valueOf)
                     .map(RabbitMQTest.this::asBytes)
-                    .forEach(Throwing.consumer(
-                        bytes -> channel1.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
+                    .forEach(Throwing.<byte[]>consumer(
+                        bytes -> channel1.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)).sneakyThrow());
 
                 InMemoryConsumer consumer2 = new InMemoryConsumer(channel2);
                 InMemoryConsumer consumer3 = new InMemoryConsumer(channel3);
@@ -265,8 +269,8 @@ class RabbitMQTest {
                 IntStream.range(0, 10)
                         .mapToObj(String::valueOf)
                         .map(RabbitMQTest.this::asBytes)
-                        .forEach(Throwing.consumer(
-                                bytes -> channel1.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
+                        .forEach(Throwing.<byte[]>consumer(
+                                bytes -> channel1.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)).sneakyThrow());
 
                 ConcurrentLinkedQueue<Integer> receivedMessages = new ConcurrentLinkedQueue<>();
                 String dyingConsumerTag = "dyingConsumer";
@@ -297,8 +301,8 @@ class RabbitMQTest {
                 IntStream.range(0, 10)
                         .mapToObj(String::valueOf)
                         .map(RabbitMQTest.this::asBytes)
-                        .forEach(Throwing.consumer(
-                                bytes -> channel1.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
+                        .forEach(Throwing.<byte[]>consumer(
+                                bytes -> channel1.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)).sneakyThrow());
 
                 String dyingConsumerTag = "dyingConsumer";
                 ImmutableMap<String, Object> arguments = ImmutableMap.of();
@@ -327,6 +331,135 @@ class RabbitMQTest {
                 assertThat(fallbackConsumer.getConsumedMessages()).contains(1, 2).doesNotContain(0);
             }
 
+            @Test
+            void rabbitMQShouldDeliverMessageToSingleActiveConsumer() throws Exception {
+                channel1.exchangeDeclare(EXCHANGE_NAME, "direct", DURABLE);
+                channel1.queueDeclare(WORK_QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, Constants.WITH_SINGLE_ACTIVE_CONSUMER);
+                channel1.queueBind(WORK_QUEUE, EXCHANGE_NAME, ROUTING_KEY);
+
+                IntStream.range(0, 10)
+                    .mapToObj(String::valueOf)
+                    .map(RabbitMQTest.this::asBytes)
+                    .forEach(Throwing.<byte[]>consumer(
+                        bytes -> channel1.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)).sneakyThrow());
+
+                channel2.basicQos(1);
+                channel3.basicQos(1);
+
+                AtomicInteger firstRegisteredConsumerMessageCount = new AtomicInteger(0);
+                AtomicInteger secondRegisteredConsumerMessageCount = new AtomicInteger(0);
+
+                String firstRegisteredConsumer = "firstRegisteredConsumer";
+                ImmutableMap<String, Object> arguments = ImmutableMap.of();
+                channel2.basicConsume(WORK_QUEUE, !AUTO_ACK, firstRegisteredConsumer, !NO_LOCAL, !EXCLUSIVE, arguments,
+                    (consumerTag, message) -> incrementCountForConsumerAndAckMessage(firstRegisteredConsumerMessageCount, message, channel2),
+                    (consumerTag -> {
+                    }));
+                channel3.basicConsume(WORK_QUEUE, !AUTO_ACK, "starvingConsumer", !NO_LOCAL, !EXCLUSIVE, arguments,
+                    (consumerTag, message) -> incrementCountForConsumerAndAckMessage(secondRegisteredConsumerMessageCount, message, channel3),
+                    consumerTag -> { });
+
+                awaitAtMostOneMinute.until(() -> (firstRegisteredConsumerMessageCount.get() + secondRegisteredConsumerMessageCount.get()) == 10);
+
+                assertThat(firstRegisteredConsumerMessageCount.get()).isEqualTo(10);
+                assertThat(secondRegisteredConsumerMessageCount.get()).isEqualTo(0);
+            }
+
+            private void incrementCountForConsumerAndAckMessage(AtomicInteger firstRegisteredConsumerMessageCount, Delivery message, Channel channel2) throws IOException {
+                try {
+                    firstRegisteredConsumerMessageCount.incrementAndGet();
+                    TimeUnit.SECONDS.sleep(1);
+                    channel2.basicAck(message.getEnvelope().getDeliveryTag(), false);
+                } catch (InterruptedException e) {
+                    //do nothing
+                }
+            }
+
+            @Test
+            void rabbitMQShouldProvideSingleActiveConsumerName() throws Exception {
+                channel1.exchangeDeclare(EXCHANGE_NAME, "direct", DURABLE);
+                channel1.queueDeclare(WORK_QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, Constants.WITH_SINGLE_ACTIVE_CONSUMER);
+                channel1.queueBind(WORK_QUEUE, EXCHANGE_NAME, ROUTING_KEY);
+
+                channel1.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, "foo".getBytes(StandardCharsets.UTF_8));
+
+                AtomicInteger deliveredMessagesCount = new AtomicInteger(0);
+
+                String firstRegisteredConsumer = "firstRegisteredConsumer";
+                ImmutableMap<String, Object> arguments = ImmutableMap.of();
+                channel2.basicConsume(WORK_QUEUE, AUTO_ACK, firstRegisteredConsumer, !NO_LOCAL, !EXCLUSIVE, arguments,
+                    (consumerTag, message) -> deliveredMessagesCount.incrementAndGet(),
+                    (consumerTag -> { }));
+                channel3.basicConsume(WORK_QUEUE, AUTO_ACK, "starvingConsumer", !NO_LOCAL, !EXCLUSIVE, arguments,
+                    (consumerTag, message) -> deliveredMessagesCount.incrementAndGet(),
+                    consumerTag -> { });
+
+                awaitAtMostOneMinute.until(() -> deliveredMessagesCount.get() > 0);
+                awaitAtMostOneMinute.until(() -> rabbitMQExtension.managementAPI()
+                    .queueDetails("/", WORK_QUEUE)
+                    .consumerDetails.isEmpty() == false);
+
+                List<String> currentConsumerName = rabbitMQExtension.managementAPI()
+                    .queueDetails("/", WORK_QUEUE)
+                    .consumerDetails
+                    .stream()
+                    .filter(consumer -> consumer.status == RabbitMQManagementAPI.ActivityStatus.SingleActive)
+                    .map(RabbitMQManagementAPI.ConsumerDetails::getTag)
+                    .collect(Collectors.toList());
+
+                assertThat(currentConsumerName)
+                    .hasSize(1)
+                    .first()
+                    .isEqualTo(firstRegisteredConsumer);
+            }
+
+            @Test
+            void rabbitMQShouldDeliverMessageToFallbackSingleActiveConsumer() throws Exception {
+                channel1.exchangeDeclare(EXCHANGE_NAME, "direct", DURABLE);
+                channel1.queueDeclare(WORK_QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, Constants.WITH_SINGLE_ACTIVE_CONSUMER);
+                channel1.queueBind(WORK_QUEUE, EXCHANGE_NAME, ROUTING_KEY);
+
+                IntStream.range(0, 10)
+                    .mapToObj(String::valueOf)
+                    .map(RabbitMQTest.this::asBytes)
+                    .forEach(Throwing.<byte[]>consumer(
+                        bytes -> channel1.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)).sneakyThrow());
+
+                AtomicInteger firstRegisteredConsumerMessageCount = new AtomicInteger(0);
+                AtomicInteger secondRegisteredConsumerMessageCount = new AtomicInteger(0);
+
+                String firstRegisteredConsumer = "firstRegisteredConsumer";
+                ImmutableMap<String, Object> arguments = ImmutableMap.of();
+                channel2.basicConsume(WORK_QUEUE, !AUTO_ACK, firstRegisteredConsumer, !NO_LOCAL, !EXCLUSIVE, arguments,
+                    (consumerTag, message) -> {
+                        try {
+                            if (firstRegisteredConsumerMessageCount.get() < 5) {
+                                channel2.basicAck(message.getEnvelope().getDeliveryTag(), !MULTIPLE);
+                                firstRegisteredConsumerMessageCount.incrementAndGet();
+                            } else {
+                                channel2.basicNack(message.getEnvelope().getDeliveryTag(), !MULTIPLE, REQUEUE);
+                            }
+                            TimeUnit.SECONDS.sleep(1);
+                        } catch (InterruptedException e) {
+                            //do nothing
+                        }
+                    },
+                    (consumerTag -> { }));
+                channel3.basicConsume(WORK_QUEUE, AUTO_ACK, "fallbackConsumer", !NO_LOCAL, !EXCLUSIVE, arguments,
+                    (consumerTag, message) -> {
+                        secondRegisteredConsumerMessageCount.incrementAndGet();
+                    },
+                    consumerTag -> { });
+
+                awaitAtMostOneMinute.until(() -> firstRegisteredConsumerMessageCount.get() == 5);
+
+                channel2.basicCancel(firstRegisteredConsumer);
+
+                awaitAtMostOneMinute.until(() -> (firstRegisteredConsumerMessageCount.get() + secondRegisteredConsumerMessageCount.get()) == 10);
+
+                assertThat(firstRegisteredConsumerMessageCount.get()).isEqualTo(5);
+                assertThat(secondRegisteredConsumerMessageCount.get()).isEqualTo(5);
+            }
         }
 
         @Nested


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 02/07: JAMES-2813 add single active consumer constants

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 0f7d0b06e900edd450905b3f8c8ae8e495355285
Author: Rémi KOWALSKI <rk...@linagora.com>
AuthorDate: Tue Oct 22 11:02:17 2019 +0200

    JAMES-2813 add single active consumer constants
---
 .../src/main/java/org/apache/james/backends/rabbitmq/Constants.java | 6 ++++++
 1 file changed, 6 insertions(+)

diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/Constants.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/Constants.java
index 89455c0..021ab60 100644
--- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/Constants.java
+++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/Constants.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.backends.rabbitmq;
 
+import java.util.Map;
+
 import com.google.common.collect.ImmutableMap;
 import com.rabbitmq.client.AMQP;
 
@@ -34,6 +36,10 @@ public interface Constants {
     String EMPTY_ROUTING_KEY = "";
     boolean REQUEUE = true;
 
+    String SINGLE_ACTIVE_CONSUMER_ARGUMENT = "x-single-active-consumer";
+    boolean SINGLE_ACTIVE_CONSUMER = true;
+    Map<String, Object> WITH_SINGLE_ACTIVE_CONSUMER = ImmutableMap.of(Constants.SINGLE_ACTIVE_CONSUMER_ARGUMENT, Constants.SINGLE_ACTIVE_CONSUMER);
+
     String DIRECT_EXCHANGE = "direct";
 
     AMQP.BasicProperties NO_PROPERTIES = new AMQP.BasicProperties();


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 01/07: JAMES-2813 use rabbitmq 3.8 docker image in tests

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 0bf4e8384eaa07a9427051e5d36202cba72a662d
Author: Rémi KOWALSKI <rk...@linagora.com>
AuthorDate: Mon Oct 21 10:15:47 2019 +0200

    JAMES-2813 use rabbitmq 3.8 docker image  in tests
---
 server/testing/src/main/java/org/apache/james/util/docker/Images.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/server/testing/src/main/java/org/apache/james/util/docker/Images.java b/server/testing/src/main/java/org/apache/james/util/docker/Images.java
index 48b004e..917438f 100644
--- a/server/testing/src/main/java/org/apache/james/util/docker/Images.java
+++ b/server/testing/src/main/java/org/apache/james/util/docker/Images.java
@@ -21,7 +21,7 @@ package org.apache.james.util.docker;
 
 public interface Images {
     String FAKE_SMTP = "weave/rest-smtp-sink:latest";
-    String RABBITMQ = "rabbitmq:3.7.7-management";
+    String RABBITMQ = "rabbitmq:3.8.0-management";
     String ELASTICSEARCH_2 = "elasticsearch:2.4.6";
     String ELASTICSEARCH_6 = "docker.elastic.co/elasticsearch/elasticsearch:6.3.2";
     String NGINX = "nginx:1.15.1";


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 05/07: JAMES-2813 replace exclusive consumer usage with a queue with single active consumer

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 8ef3cabdcd3f47c42a66bc405b5c0096ef6afd3f
Author: Rémi KOWALSKI <rk...@linagora.com>
AuthorDate: Mon Oct 21 10:33:22 2019 +0200

    JAMES-2813 replace exclusive consumer usage with a queue with single active consumer
---
 .../distributed/RabbitMQExclusiveConsumer.java     | 174 ---------------------
 .../distributed/RabbitMQWorkQueue.java             |  13 +-
 2 files changed, 7 insertions(+), 180 deletions(-)

diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQExclusiveConsumer.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQExclusiveConsumer.java
deleted file mode 100644
index e6d4e1c..0000000
--- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQExclusiveConsumer.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-package org.apache.james.task.eventsourcing.distributed;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Function;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.ImmutableMap;
-import com.rabbitmq.client.CancelCallback;
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.DeliverCallback;
-import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.FluxSink;
-import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Scheduler;
-import reactor.core.scheduler.Schedulers;
-import reactor.rabbitmq.AcknowledgableDelivery;
-import reactor.rabbitmq.ConsumeOptions;
-import reactor.rabbitmq.RabbitFluxException;
-import reactor.rabbitmq.ReceiverOptions;
-
-/**
- * Taken from {@link reactor.rabbitmq.Receiver}
- * In order to be able to set the `exclusive` parameter to `true`
- * to the `channel.basicConsume` method.
- *
- * @deprecated to remove once the parallel execution of task has been implemented
- */
-@Deprecated
-public class RabbitMQExclusiveConsumer implements Closeable {
-    private static final Function<Connection, Channel> CHANNEL_CREATION_FUNCTION = new RabbitMQExclusiveConsumer.ChannelCreationFunction();
-    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQExclusiveConsumer.class);
-
-    private static final boolean NON_LOCAL = true;
-    private static final boolean EXCLUSIVE = true;
-    private static final boolean AUTO_ACK = true;
-
-    private static class ChannelCreationFunction implements Function<Connection, Channel> {
-
-        @Override
-        public Channel apply(Connection connection) {
-            try {
-                return connection.createChannel();
-            } catch (IOException e) {
-                throw new RabbitFluxException("Error while creating channel", e);
-            }
-        }
-    }
-
-    private Mono<? extends Connection> connectionMono;
-    private final AtomicBoolean hasConnection;
-    private final Scheduler connectionSubscriptionScheduler;
-    private final boolean privateConnectionSubscriptionScheduler;
-
-    public RabbitMQExclusiveConsumer(ReceiverOptions options) {
-        this.privateConnectionSubscriptionScheduler = options.getConnectionSubscriptionScheduler() == null;
-        this.connectionSubscriptionScheduler = privateConnectionSubscriptionScheduler ?
-            createScheduler("rabbitmq-receiver-connection-subscription") : options.getConnectionSubscriptionScheduler();
-        hasConnection = new AtomicBoolean(false);
-        this.connectionMono = options.getConnectionMono() != null ? options.getConnectionMono() :
-            Mono.fromCallable(() -> options.getConnectionFactory().newConnection())
-                .doOnSubscribe(c -> hasConnection.set(true))
-                .subscribeOn(this.connectionSubscriptionScheduler)
-                .cache();
-    }
-
-    protected Scheduler createScheduler(String name) {
-        return Schedulers.newBoundedElastic(Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE, Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, name);
-    }
-
-    public Flux<AcknowledgableDelivery> consumeExclusiveManualAck(final String queue, ConsumeOptions options) {
-        // TODO track flux so it can be disposed when the sender is closed?
-        // could be also developer responsibility
-        return Flux.create(emitter -> connectionMono.map(CHANNEL_CREATION_FUNCTION).subscribe(channel -> {
-            try {
-                if (options.getQos() != 0) {
-                    channel.basicQos(options.getQos());
-                }
-
-                DeliverCallback deliverCallback = (consumerTag, message) -> {
-                    AcknowledgableDelivery delivery = new AcknowledgableDelivery(message, channel, options.getExceptionHandler());
-                    if (options.getHookBeforeEmitBiFunction().apply(emitter.requestedFromDownstream(), delivery)) {
-                        emitter.next(delivery);
-                    }
-                    if (options.getStopConsumingBiFunction().apply(emitter.requestedFromDownstream(), message)) {
-                        emitter.complete();
-                    }
-                };
-
-                AtomicBoolean basicCancel = new AtomicBoolean(true);
-                CancelCallback cancelCallback = consumerTag -> {
-                    LOGGER.info("Flux consumer {} has been cancelled", consumerTag);
-                    basicCancel.set(false);
-                    emitter.complete();
-                };
-
-                completeOnChannelShutdown(channel, emitter);
-
-                Map<String, Object> arguments = ImmutableMap.of();
-                final String consumerTag = channel.basicConsume(queue, !AUTO_ACK, UUID.randomUUID().toString(), !NON_LOCAL, EXCLUSIVE, arguments, deliverCallback, cancelCallback);
-                AtomicBoolean cancelled = new AtomicBoolean(false);
-                LOGGER.info("Consumer {} consuming from {} has been registered", consumerTag, queue);
-                emitter.onDispose(() -> {
-                    LOGGER.info("Cancelling consumer {} consuming from {}", consumerTag, queue);
-                    if (cancelled.compareAndSet(false, true)) {
-                        try {
-                            if (channel.isOpen() && channel.getConnection().isOpen()) {
-                                if (basicCancel.compareAndSet(true, false)) {
-                                    channel.basicCancel(consumerTag);
-                                }
-                                channel.close();
-                            }
-                        } catch (TimeoutException | IOException e) {
-                            // Not sure what to do, not much we can do,
-                            // logging should be enough.
-                            // Maybe one good reason to introduce an exception handler to choose more easily.
-                            LOGGER.warn("Error while closing channel: " + e.getMessage());
-                        }
-                    }
-                });
-            } catch (IOException e) {
-                throw new RabbitFluxException(e);
-            }
-        }, emitter::error), options.getOverflowStrategy());
-    }
-
-    protected void completeOnChannelShutdown(Channel channel, FluxSink<?> emitter) {
-        channel.addShutdownListener(reason -> {
-            if (!AutorecoveringConnection.DEFAULT_CONNECTION_RECOVERY_TRIGGERING_CONDITION.test(reason)) {
-                emitter.complete();
-            }
-        });
-    }
-
-    public void close() {
-        if (hasConnection.getAndSet(false)) {
-            try {
-                // FIXME use timeout on block (should be a parameter of the Receiver)
-                connectionMono.block().close();
-            } catch (IOException e) {
-                throw new RabbitFluxException(e);
-            }
-        }
-        if (privateConnectionSubscriptionScheduler) {
-            this.connectionSubscriptionScheduler.dispose();
-        }
-    }
-}
diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
index 62a2f1b..54d8b03 100644
--- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
+++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
@@ -24,6 +24,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.Optional;
 import java.util.UUID;
 
+import org.apache.james.backends.rabbitmq.Constants;
 import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
 import org.apache.james.server.task.json.JsonTaskSerializer;
 import org.apache.james.task.Task;
@@ -70,7 +71,8 @@ public class RabbitMQWorkQueue implements WorkQueue {
     private final TaskManagerWorker worker;
     private final ReactorRabbitMQChannelPool channelPool;
     private final JsonTaskSerializer taskSerializer;
-    private RabbitMQExclusiveConsumer receiver;
+    private Sender sender;
+    private Receiver receiver;
     private UnicastProcessor<TaskId> sendCancelRequestsQueue;
     private Disposable sendCancelRequestsQueueHandle;
     private Disposable receiverHandle;
@@ -92,15 +94,15 @@ public class RabbitMQWorkQueue implements WorkQueue {
 
     private void startWorkqueue() {
         channelPool.getSender().declareExchange(ExchangeSpecification.exchange(EXCHANGE_NAME)).block();
-        channelPool.getSender().declare(QueueSpecification.queue(QUEUE_NAME).durable(true)).block();
+        channelPool.getSender().declare(QueueSpecification.queue(QUEUE_NAME).durable(true).arguments(Constants.WITH_SINGLE_ACTIVE_CONSUMER)).block();
         channelPool.getSender().bind(BindingSpecification.binding(EXCHANGE_NAME, ROUTING_KEY, QUEUE_NAME)).block();
 
         consumeWorkqueue();
     }
 
     private void consumeWorkqueue() {
-        receiver = new RabbitMQExclusiveConsumer(new ReceiverOptions().connectionMono(channelPool.getConnectionMono()));
-        receiverHandle = receiver.consumeExclusiveManualAck(QUEUE_NAME, new ConsumeOptions())
+        receiver = new Receiver(new ReceiverOptions().connectionMono(channelPool.getConnectionMono()));
+        receiverHandle = receiver.consumeManualAck(QUEUE_NAME, new ConsumeOptions())
             .subscribeOn(Schedulers.boundedElastic())
             .flatMap(this::executeTask)
             .subscribe();
@@ -195,10 +197,9 @@ public class RabbitMQWorkQueue implements WorkQueue {
     @Override
     public void close() {
         Optional.ofNullable(receiverHandle).ifPresent(Disposable::dispose);
-        Optional.ofNullable(receiver).ifPresent(RabbitMQExclusiveConsumer::close);
+        Optional.ofNullable(receiver).ifPresent(Receiver::close);
         Optional.ofNullable(sendCancelRequestsQueueHandle).ifPresent(Disposable::dispose);
         Optional.ofNullable(cancelRequestListenerHandle).ifPresent(Disposable::dispose);
-        Optional.ofNullable(cancelRequestSender).ifPresent(Sender::close);
         Optional.ofNullable(cancelRequestListener).ifPresent(Receiver::close);
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 03/07: JAMES-2813 expose singe active consumer information in management API

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 1e9f23b5414c41c73af4961e9431779b6157ffba
Author: Rémi KOWALSKI <rk...@linagora.com>
AuthorDate: Mon Oct 21 10:17:00 2019 +0200

    JAMES-2813 expose singe active consumer information in management API
---
 .../backends/rabbitmq/RabbitMQManagementAPI.java   | 90 +++++++++++++++++++++-
 1 file changed, 89 insertions(+), 1 deletion(-)

diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQManagementAPI.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQManagementAPI.java
index 9f688e8..17983c0 100644
--- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQManagementAPI.java
+++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQManagementAPI.java
@@ -23,9 +23,10 @@ import java.util.Date;
 import java.util.List;
 import java.util.Map;
 
+import com.fasterxml.jackson.annotation.JsonFormat;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonValue;
 import com.google.common.base.MoreObjects;
-
 import feign.Feign;
 import feign.Logger;
 import feign.Param;
@@ -84,6 +85,90 @@ public interface RabbitMQManagementAPI {
         }
     }
 
+    class MessageQueueDetails {
+        @JsonProperty("name")
+        String name;
+
+        @JsonProperty("vhost")
+        String vhost;
+
+        @JsonProperty("auto_delete")
+        boolean autoDelete;
+
+        @JsonProperty("durable")
+        boolean durable;
+
+        @JsonProperty("exclusive")
+        boolean exclusive;
+
+        @JsonProperty("arguments")
+        Map<String, String> arguments;
+
+        @JsonProperty("consumer_details")
+        List<ConsumerDetails> consumerDetails;
+
+        public String getName() {
+            return name;
+        }
+
+        public String getVhost() {
+            return vhost;
+        }
+
+        public boolean isAutoDelete() {
+            return autoDelete;
+        }
+
+        public boolean isDurable() {
+            return durable;
+        }
+
+        public boolean isExclusive() {
+            return exclusive;
+        }
+
+        public Map<String, String> getArguments() {
+            return arguments;
+        }
+
+        public List<ConsumerDetails> getConsumerDetails() {
+            return consumerDetails;
+        }
+    }
+
+    class ConsumerDetails {
+        @JsonProperty("consumer_tag")
+        String tag;
+
+        @JsonProperty("activity_status")
+        ActivityStatus status;
+
+        public ActivityStatus getStatus() {
+            return this.status;
+        }
+
+        public String getTag() {
+            return this.tag;
+        }
+    }
+
+    @JsonFormat(shape = JsonFormat.Shape.STRING)
+    enum ActivityStatus {
+        Waiting("waiting"),
+        SingleActive("single_active");
+
+        private final String value;
+
+        ActivityStatus(String value) {
+            this.value = value;
+        }
+
+        @JsonValue
+        String getValue() {
+            return value;
+        }
+    }
+
     class Exchange {
 
         @JsonProperty("name")
@@ -164,6 +249,9 @@ public interface RabbitMQManagementAPI {
     @RequestLine("GET /api/queues")
     List<MessageQueue> listQueues();
 
+    @RequestLine(value = "GET /api/queues/{vhost}/{name}", decodeSlash = false)
+    MessageQueueDetails queueDetails(@Param("vhost") String vhost, @Param("name") String name);
+
     @RequestLine(value = "DELETE /api/queues/{vhost}/{name}", decodeSlash = false)
     void deleteQueue(@Param("vhost") String vhost, @Param("name") String name);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 07/07: JAMES-2813 add test to demonstrate resilience with single active consumer

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 1e5a8dd1db3db79e04080ee9d0db3f39a99a5c16
Author: Rémi KOWALSKI <rk...@linagora.com>
AuthorDate: Mon Oct 21 15:44:41 2019 +0200

    JAMES-2813 add test to demonstrate resilience with single active consumer
---
 .../distributed/DistributedTaskManagerTest.java    | 39 ++++++++++++++++++++++
 1 file changed, 39 insertions(+)

diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
index 02c6024..1b5a5de 100644
--- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
+++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
@@ -73,6 +73,8 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
+import com.google.common.collect.ImmutableBiMap;
+
 class DistributedTaskManagerTest implements TaskManagerContract {
 
     static class TrackedRabbitMQWorkQueueSupplier implements WorkQueueSupplier {
@@ -305,4 +307,41 @@ class DistributedTaskManagerTest implements TaskManagerContract {
                 });
         }
     }
+
+    @Test
+    void givenTwoTaskManagerIfTheFirstOneIsDownTheSecondOneShouldBeAbleToRunTheRemainingTasks(CountDownLatch countDownLatch) throws Exception {
+        try (EventSourcingTaskManager taskManager1 = taskManager();
+             EventSourcingTaskManager taskManager2 = taskManager(HOSTNAME_2)) {
+            ImmutableBiMap<EventSourcingTaskManager, Hostname> hostNameByTaskManager = ImmutableBiMap.of(taskManager1, HOSTNAME, taskManager2, HOSTNAME_2);
+            TaskId firstTask = taskManager1.submit(new MemoryReferenceTask(() -> {
+                countDownLatch.await();
+                return Task.Result.COMPLETED;
+            }));
+
+            awaitUntilTaskHasStatus(firstTask, TaskManager.Status.IN_PROGRESS, taskManager1);
+
+            Hostname nodeRunningFirstTask = taskManager1.getExecutionDetails(firstTask).getRanNode().get();
+            Hostname otherNode = getOtherNode(hostNameByTaskManager, nodeRunningFirstTask);
+            EventSourcingTaskManager taskManagerRunningFirstTask = hostNameByTaskManager.inverse().get(nodeRunningFirstTask);
+            EventSourcingTaskManager otherTaskManager = hostNameByTaskManager.inverse().get(otherNode);
+
+            TaskId taskToExecuteAfterFirstNodeIsDown = taskManagerRunningFirstTask.submit(new CompletedTask());
+            taskManagerRunningFirstTask.close();
+
+            awaitAtMostFiveSeconds.untilAsserted(() ->
+                assertThat(otherTaskManager.getExecutionDetails(taskToExecuteAfterFirstNodeIsDown).getStatus())
+                    .isEqualTo(TaskManager.Status.COMPLETED));
+            TaskExecutionDetails detailsSecondTask = otherTaskManager.getExecutionDetails(taskToExecuteAfterFirstNodeIsDown);
+            assertThat(detailsSecondTask.getRanNode()).contains(otherNode);
+        }
+    }
+
+    private Hostname getOtherNode(ImmutableBiMap<EventSourcingTaskManager, Hostname> hostNameByTaskManager, Hostname node) {
+        return hostNameByTaskManager
+            .values()
+            .stream()
+            .filter(hostname -> !hostname.equals(node))
+            .findFirst()
+            .get();
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org