You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/04/13 02:53:28 UTC

[james-project] 01/13: JAMES-3041 Change the way we control resources for Spooler mail processing

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit d6c0a6b283161d4ee3b2f0d0f6f04804e9feb881
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Tue Feb 18 15:47:05 2020 +0100

    JAMES-3041 Change the way we control resources for Spooler mail processing
    
        For now, we controlled resources using a bounded threadpool
        for mail processing.
    
        However, it makes the concurrency of the dequeuing harder
        to achieve and doesn't leverage modern tools Reactor brings
        like explicit control over concurrency for a given Flux.
    
        With this commit we start using elastic Scheduler everywhere
        and control resources by specifying concurrency parameter.
---
 .../mailetcontainer/impl/JamesMailSpooler.java     | 53 ++++++++--------------
 .../mailetcontainer/impl/JamesMailSpoolerTest.java |  5 +-
 .../remote/delivery/RemoteDeliveryTest.java        |  2 +-
 .../SetMessagesOutboxFlagUpdateTest.java           |  4 +-
 .../routes/MailRepositoriesRoutesTest.java         |  2 +-
 .../webadmin/service/ReprocessingServiceTest.java  |  2 +-
 .../apache/james/queue/api/MailQueueFactory.java   | 47 ++++++++++++++++++-
 .../james/queue/api/MailQueueFactoryTest.java      | 26 +++++++++++
 .../api/ManageableMailQueueFactoryContract.java    |  6 +--
 .../james/queue/file/FileMailQueueFactory.java     |  4 +-
 .../queue/library/AbstractMailQueueFactory.java    |  6 +--
 .../james/queue/memory/MemoryMailQueueFactory.java |  6 +--
 .../MemoryCacheableMailQueueFactoryTest.java       |  6 +--
 .../org/apache/james/queue/rabbitmq/Dequeuer.java  | 11 +++--
 .../queue/rabbitmq/RabbitMQMailQueueFactory.java   | 22 ++++-----
 15 files changed, 130 insertions(+), 72 deletions(-)

diff --git a/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java b/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
index 28383cd..aaa86d7 100644
--- a/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
+++ b/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
@@ -22,7 +22,6 @@ package org.apache.james.mailetcontainer.impl;
 import static org.apache.james.metrics.api.TimeMetric.ExecutionResult.DEFAULT_100_MS_THRESHOLD;
 
 import java.io.IOException;
-import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.annotation.PostConstruct;
@@ -41,14 +40,12 @@ import org.apache.james.metrics.api.TimeMetric;
 import org.apache.james.queue.api.MailQueue;
 import org.apache.james.queue.api.MailQueue.MailQueueItem;
 import org.apache.james.queue.api.MailQueueFactory;
-import org.apache.james.util.concurrent.NamedThreadFactory;
 import org.apache.mailet.Mail;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Scheduler;
 import reactor.core.scheduler.Schedulers;
 
 /**
@@ -62,9 +59,10 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
     public static final String SPOOL_PROCESSING = "spoolProcessing";
 
     /**
-     * The number of threads used to move mail through the spool.
+     * concurrency level to use for dequeuing mails from spool, allows to throttle resources dedicated to that async
+     * process.
      */
-    private int numThreads;
+    private int concurrencyLevel;
 
     private final AtomicInteger processingActive = new AtomicInteger(0);
 
@@ -77,11 +75,8 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
 
     private final MailQueueFactory<?> queueFactory;
     private reactor.core.Disposable disposable;
-    private Scheduler spooler;
-    private int parallelismLevel;
     private MailQueue queue;
 
-
     @Inject
     public JamesMailSpooler(MetricFactory metricFactory, MailProcessor mailProcessor, MailQueueFactory<?> queueFactory) {
         this.metricFactory = metricFactory;
@@ -91,10 +86,7 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
 
     @Override
     public void configure(HierarchicalConfiguration<ImmutableNode> config) {
-        numThreads = config.getInt("threads", 100);
-        //Reactor helps us run things in parallel but we have to ensure there are always threads available
-        //in the threadpool to avoid starvation.
-        parallelismLevel = Math.max(1, numThreads - 2);
+        concurrencyLevel = config.getInt("threads", 100);
     }
 
     /**
@@ -103,18 +95,17 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
     @PostConstruct
     public void init() {
         LOGGER.info("init...");
-        queue = queueFactory.createQueue(MailQueueFactory.SPOOL);
-        spooler = Schedulers.fromExecutor(Executors.newFixedThreadPool(numThreads, NamedThreadFactory.withName("spooler")));
-        LOGGER.info("uses {} Thread(s)", numThreads);
-        run();
+        LOGGER.info("Concurrency level is {}", concurrencyLevel);
+        queue = queueFactory.createQueue(MailQueueFactory.SPOOL, MailQueueFactory.prefetchCount(concurrencyLevel));
+        disposable = run(queue);
+        LOGGER.info("Spooler started");
     }
 
-    private void run() {
-        LOGGER.info("Queue={}", queue);
-        disposable = Flux.from(queue.deQueue())
-            .flatMap(item -> handleOnQueueItem(item).subscribeOn(spooler), parallelismLevel)
+    private reactor.core.Disposable run(MailQueue queue) {
+        return Flux.from(queue.deQueue())
+            .flatMap(item -> handleOnQueueItem(item).subscribeOn(Schedulers.elastic()), concurrencyLevel)
             .onErrorContinue((throwable, item) -> LOGGER.error("Exception processing mail while spooling {}", item, throwable))
-            .subscribeOn(spooler)
+            .subscribeOn(Schedulers.elastic())
             .subscribe();
     }
 
@@ -122,7 +113,7 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
         TimeMetric timeMetric = metricFactory.timer(SPOOL_PROCESSING);
         try {
             return Mono.fromCallable(processingActive::incrementAndGet)
-                .flatMap(ignore -> processMail(queueItem).subscribeOn(spooler))
+                .flatMap(ignore -> processMail(queueItem))
                 .doOnSuccess(any -> timeMetric.stopAndPublish().logWhenExceedP99(DEFAULT_100_MS_THRESHOLD))
                 .doOnSuccess(any -> processingActive.decrementAndGet());
         } catch (Throwable e) {
@@ -134,16 +125,12 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
         return Mono
             .using(
                 queueItem::getMail,
-                resource -> Mono.just(resource)
-                    .doOnNext(mail -> LOGGER.debug("==== Begin processing mail {} ====", mail.getName()))
-                    .map(mail -> performProcessMail(queueItem, mail))
-                    .doOnNext(mail -> LOGGER.debug("==== End processing mail {} ====", mail.getName())),
-                LifecycleUtil::dispose)
-            .then();
+                mail -> Mono.fromRunnable(() -> performProcessMail(queueItem, mail)),
+                LifecycleUtil::dispose);
     }
 
-
-    private Mail performProcessMail(MailQueueItem queueItem, Mail mail) {
+    private void performProcessMail(MailQueueItem queueItem, Mail mail) {
+        LOGGER.debug("==== Begin processing mail {} ====", mail.getName());
         try {
             mailProcessor.service(mail);
 
@@ -157,8 +144,9 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
             } catch (MailQueue.MailQueueException ex) {
                 throw new RuntimeException(e);
             }
+        } finally {
+            LOGGER.debug("==== End processing mail {} ====", mail.getName());
         }
-        return mail;
     }
 
     /**
@@ -174,7 +162,6 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
     public void dispose() {
         LOGGER.info("start dispose() ...");
         disposable.dispose();
-        spooler.dispose();
         try {
             queue.close();
         } catch (IOException e) {
@@ -185,7 +172,7 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
 
     @Override
     public int getThreadCount() {
-        return numThreads;
+        return concurrencyLevel;
     }
 
     @Override
diff --git a/server/mailet/mailetcontainer-camel/src/test/java/org/apache/james/mailetcontainer/impl/JamesMailSpoolerTest.java b/server/mailet/mailetcontainer-camel/src/test/java/org/apache/james/mailetcontainer/impl/JamesMailSpoolerTest.java
index 0b62199..9485686 100644
--- a/server/mailet/mailetcontainer-camel/src/test/java/org/apache/james/mailetcontainer/impl/JamesMailSpoolerTest.java
+++ b/server/mailet/mailetcontainer-camel/src/test/java/org/apache/james/mailetcontainer/impl/JamesMailSpoolerTest.java
@@ -23,6 +23,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.awaitility.Duration.ONE_HUNDRED_MILLISECONDS;
 import static org.awaitility.Duration.TEN_SECONDS;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
@@ -73,7 +74,7 @@ class JamesMailSpoolerTest {
         MailQueue queue = mock(MailQueue.class);
         workQueue.onNext(item);
         when(queue.deQueue()).thenAnswer(any -> workQueue.limitRate(1).filter(MockedMailQueueItem::isNotDone));
-        when(queueFactory.createQueue(MailQueueFactory.SPOOL)).thenAnswer(any -> queue);
+        when(queueFactory.createQueue(eq(MailQueueFactory.SPOOL), any())).thenAnswer(any -> queue);
 
         doThrow(new RuntimeException("Arbitrary failure"))
             .doNothing()
@@ -107,7 +108,7 @@ class JamesMailSpoolerTest {
         MailQueue queue = mock(MailQueue.class);
         workQueue.onNext(item);
         when(queue.deQueue()).thenAnswer(any -> workQueue.limitRate(1).filter(MockedMailQueueItem::isNotDone));
-        when(queueFactory.createQueue(MailQueueFactory.SPOOL)).thenAnswer(any -> queue);
+        when(queueFactory.createQueue(eq(MailQueueFactory.SPOOL), any())).thenAnswer(any -> queue);
 
         doAnswer(ignored -> {
             Thread.currentThread().interrupt();
diff --git a/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/remote/delivery/RemoteDeliveryTest.java b/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/remote/delivery/RemoteDeliveryTest.java
index 1e62160..cfcc648 100644
--- a/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/remote/delivery/RemoteDeliveryTest.java
+++ b/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/remote/delivery/RemoteDeliveryTest.java
@@ -99,7 +99,7 @@ public class RemoteDeliveryTest {
 
     @Before
     public void setUp() throws ConfigurationException {
-        MailQueueFactory<ManageableMailQueue> queueFactory = new MemoryMailQueueFactory(new RawMailQueueItemDecoratorFactory());
+        MailQueueFactory<? extends ManageableMailQueue> queueFactory = new MemoryMailQueueFactory(new RawMailQueueItemDecoratorFactory());
         mailQueue = queueFactory.createQueue(RemoteDeliveryConfiguration.DEFAULT_OUTGOING_QUEUE_NAME);
         DNSService dnsService = mock(DNSService.class);
         MemoryDomainList domainList = new MemoryDomainList(dnsService);
diff --git a/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/java/org/apache/james/jmap/draft/methods/integration/SetMessagesOutboxFlagUpdateTest.java b/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/java/org/apache/james/jmap/draft/methods/integration/SetMessagesOutboxFlagUpdateTest.java
index 0f82362..bf21d1e 100644
--- a/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/java/org/apache/james/jmap/draft/methods/integration/SetMessagesOutboxFlagUpdateTest.java
+++ b/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/java/org/apache/james/jmap/draft/methods/integration/SetMessagesOutboxFlagUpdateTest.java
@@ -73,12 +73,12 @@ public abstract class SetMessagesOutboxFlagUpdateTest {
 
     protected MailQueueFactory<MailQueue> noopMailQueueFactory = new MailQueueFactory<MailQueue>() {
         @Override
-        public Optional<MailQueue> getQueue(MailQueueName name) {
+        public Optional<MailQueue> getQueue(MailQueueName name, PrefetchCount prefetchCount) {
             return Optional.of(createQueue(name));
         }
 
         @Override
-        public MailQueue createQueue(MailQueueName name) {
+        public MailQueue createQueue(MailQueueName name, PrefetchCount prefetchCount) {
             return new MailQueue() {
                 @Override
                 public void close() throws IOException {
diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/routes/MailRepositoriesRoutesTest.java b/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/routes/MailRepositoriesRoutesTest.java
index 613c506..b69d674 100644
--- a/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/routes/MailRepositoriesRoutesTest.java
+++ b/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/routes/MailRepositoriesRoutesTest.java
@@ -114,7 +114,7 @@ public class MailRepositoriesRoutesTest {
 
         MemoryTaskManager taskManager = new MemoryTaskManager(new Hostname("foo"));
         JsonTransformer jsonTransformer = new JsonTransformer();
-        MailQueueFactory<ManageableMailQueue> queueFactory = new MemoryMailQueueFactory(new RawMailQueueItemDecoratorFactory());
+        MailQueueFactory<? extends ManageableMailQueue> queueFactory = new MemoryMailQueueFactory(new RawMailQueueItemDecoratorFactory());
         spoolQueue = queueFactory.createQueue(MailQueueFactory.SPOOL);
         customQueue = queueFactory.createQueue(CUSTOM_QUEUE);
 
diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingServiceTest.java b/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingServiceTest.java
index 994c7aa..02cbc0d 100644
--- a/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingServiceTest.java
+++ b/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingServiceTest.java
@@ -64,7 +64,7 @@ public class ReprocessingServiceTest {
 
     private ReprocessingService reprocessingService;
     private MemoryMailRepositoryStore mailRepositoryStore;
-    private MailQueueFactory<ManageableMailQueue> queueFactory;
+    private MailQueueFactory<? extends ManageableMailQueue> queueFactory;
     private FakeMail mail1;
     private FakeMail mail2;
     private FakeMail mail3;
diff --git a/server/queue/queue-api/src/main/java/org/apache/james/queue/api/MailQueueFactory.java b/server/queue/queue-api/src/main/java/org/apache/james/queue/api/MailQueueFactory.java
index 3f0a5dc..35a4967 100644
--- a/server/queue/queue-api/src/main/java/org/apache/james/queue/api/MailQueueFactory.java
+++ b/server/queue/queue-api/src/main/java/org/apache/james/queue/api/MailQueueFactory.java
@@ -22,11 +22,46 @@ package org.apache.james.queue.api;
 import java.util.Optional;
 import java.util.Set;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
 /**
  * Factory for {@link MailQueue}
  */
 public interface MailQueueFactory<T extends MailQueue> {
 
+    static PrefetchCount defaultPrefetchCount() {
+        return prefetchCount(5);
+    }
+
+    static PrefetchCount prefetchCount(int count) {
+        return new PrefetchCount(count);
+    }
+
+    /**
+     * {@link PrefetchCount} provides producers insights about what kind of load consumers expect.
+     * If you expect to consume the mailqueue with 10 concurrent workers, it's a good idea to
+     * ensure the producer will fill the stream with at least 10 elements.
+     */
+    class PrefetchCount {
+        private final int value;
+
+        @VisibleForTesting
+        PrefetchCount(int value) {
+            Preconditions.checkArgument(value >= 0, "only non-negative values are allowed");
+            this.value = value;
+        }
+
+        public int asInt() {
+            return value;
+        }
+
+        @Override
+        public String toString() {
+            return "PrefetchCount = " + value;
+        }
+    }
+
     /**
      * {@link MailQueue} which is used for spooling the messages
      */
@@ -38,9 +73,17 @@ public interface MailQueueFactory<T extends MailQueue> {
      * @param name
      * @return queue
      */
-    Optional<T> getQueue(MailQueueName name);
+    default Optional<T> getQueue(MailQueueName name) {
+        return getQueue(name, defaultPrefetchCount());
+    }
+
+    Optional<T> getQueue(MailQueueName name, PrefetchCount count);
+
+    default T createQueue(MailQueueName name) {
+        return createQueue(name, defaultPrefetchCount());
+    }
 
-    T createQueue(MailQueueName name);
+    T createQueue(MailQueueName name, PrefetchCount count);
 
     Set<MailQueueName> listCreatedMailQueues();
 }
diff --git a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueFactoryTest.java b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueFactoryTest.java
new file mode 100644
index 0000000..c565603
--- /dev/null
+++ b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueFactoryTest.java
@@ -0,0 +1,26 @@
+package org.apache.james.queue.api;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+class MailQueueFactoryTest {
+
+    @Test
+    void prefetchCountShouldNotBeNegative() {
+        Assertions.assertThatThrownBy(() -> new MailQueueFactory.PrefetchCount(-12))
+            .isInstanceOf(IllegalArgumentException.class);
+    }
+
+    @Test
+    void prefetchCountCouldBeZero() {
+        Assertions.assertThatCode(() -> new MailQueueFactory.PrefetchCount(0)).doesNotThrowAnyException();
+    }
+
+    @Test
+    void prefetchCountCouldBePositive() {
+        Assertions.assertThatCode(() -> new MailQueueFactory.PrefetchCount(12)).doesNotThrowAnyException();
+    }
+
+}
\ No newline at end of file
diff --git a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/ManageableMailQueueFactoryContract.java b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/ManageableMailQueueFactoryContract.java
index 5ae065f..fc671b8 100644
--- a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/ManageableMailQueueFactoryContract.java
+++ b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/ManageableMailQueueFactoryContract.java
@@ -25,15 +25,15 @@ import javax.mail.MessagingException;
 
 import org.junit.jupiter.api.Test;
 
-public interface ManageableMailQueueFactoryContract {
+public interface ManageableMailQueueFactoryContract<T extends ManageableMailQueue> {
 
     MailQueueName NAME_1 = MailQueueName.of("name1");
 
-    MailQueueFactory<ManageableMailQueue> getMailQueueFactory();
+    MailQueueFactory<T> getMailQueueFactory();
 
     @Test
     default void createMailQueueShouldNotConflictIfAlreadyExists() throws MessagingException {
-        MailQueueFactory<ManageableMailQueue> mailQueueFactory = getMailQueueFactory();
+        MailQueueFactory<T> mailQueueFactory = getMailQueueFactory();
         MailQueue firstCreation = mailQueueFactory.createQueue(NAME_1);
 
         firstCreation.enQueue(Mails.defaultMail().name("name").build());
diff --git a/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueueFactory.java b/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueueFactory.java
index 0587620..f727bcd 100644
--- a/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueueFactory.java
+++ b/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueueFactory.java
@@ -76,12 +76,12 @@ public class FileMailQueueFactory implements MailQueueFactory<ManageableMailQueu
     }
 
     @Override
-    public Optional<ManageableMailQueue> getQueue(MailQueueName name) {
+    public Optional<ManageableMailQueue> getQueue(MailQueueName name, PrefetchCount prefetchCount) {
         return Optional.ofNullable(queues.get(name));
     }
 
     @Override
-    public ManageableMailQueue createQueue(MailQueueName name) {
+    public ManageableMailQueue createQueue(MailQueueName name, PrefetchCount prefetchCount) {
         return queues.computeIfAbsent(name, mailQueueName -> {
             try {
                 return new FileCacheableMailQueue(mailQueueActionItemDecoratorFactory, fs.getFile("file://var/store/queue"), mailQueueName, sync);
diff --git a/server/queue/queue-jms/src/main/java/org/apache/james/queue/library/AbstractMailQueueFactory.java b/server/queue/queue-jms/src/main/java/org/apache/james/queue/library/AbstractMailQueueFactory.java
index 5fa22ef..052b0d2 100644
--- a/server/queue/queue-jms/src/main/java/org/apache/james/queue/library/AbstractMailQueueFactory.java
+++ b/server/queue/queue-jms/src/main/java/org/apache/james/queue/library/AbstractMailQueueFactory.java
@@ -96,13 +96,13 @@ public abstract class AbstractMailQueueFactory<T extends MailQueue> implements M
     }
 
     @Override
-    public final synchronized Optional<T> getQueue(MailQueueName name) {
+    public final synchronized Optional<T> getQueue(MailQueueName name, PrefetchCount prefetchCount) {
         return Optional.ofNullable(queues.get(name));
     }
 
     @Override
-    public synchronized T createQueue(MailQueueName name) {
-        return getQueue(name).orElseGet(() -> createAndRegisterQueue(name));
+    public synchronized T createQueue(MailQueueName name, PrefetchCount prefetchCount) {
+        return getQueue(name, prefetchCount).orElseGet(() -> createAndRegisterQueue(name));
     }
 
     private T createAndRegisterQueue(MailQueueName name) {
diff --git a/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java b/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java
index e13cb10..da9e8b1 100644
--- a/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java
+++ b/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java
@@ -58,7 +58,7 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 
-public class MemoryMailQueueFactory implements MailQueueFactory<ManageableMailQueue> {
+public class MemoryMailQueueFactory implements MailQueueFactory<MemoryMailQueueFactory.MemoryCacheableMailQueue> {
 
     private final ConcurrentHashMap<MailQueueName, MemoryCacheableMailQueue> mailQueues;
     private final MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory;
@@ -78,12 +78,12 @@ public class MemoryMailQueueFactory implements MailQueueFactory<ManageableMailQu
     }
 
     @Override
-    public Optional<ManageableMailQueue> getQueue(MailQueueName name) {
+    public Optional<MemoryCacheableMailQueue> getQueue(MailQueueName name, PrefetchCount count) {
         return Optional.ofNullable(mailQueues.get(name));
     }
 
     @Override
-    public MemoryCacheableMailQueue createQueue(MailQueueName name) {
+    public MemoryCacheableMailQueue createQueue(MailQueueName name, PrefetchCount prefetchCount) {
         return mailQueues.computeIfAbsent(name, mailQueueName -> new MemoryCacheableMailQueue(mailQueueName, mailQueueItemDecoratorFactory));
     }
 
diff --git a/server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryCacheableMailQueueFactoryTest.java b/server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryCacheableMailQueueFactoryTest.java
index aa2e294..a50722b 100644
--- a/server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryCacheableMailQueueFactoryTest.java
+++ b/server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryCacheableMailQueueFactoryTest.java
@@ -21,12 +21,12 @@ package org.apache.james.queue.memory;
 
 import org.apache.james.queue.api.MailQueueFactory;
 import org.apache.james.queue.api.MailQueueFactoryContract;
-import org.apache.james.queue.api.ManageableMailQueue;
 import org.apache.james.queue.api.ManageableMailQueueFactoryContract;
 import org.apache.james.queue.api.RawMailQueueItemDecoratorFactory;
 import org.junit.jupiter.api.BeforeEach;
 
-class MemoryCacheableMailQueueFactoryTest implements MailQueueFactoryContract<ManageableMailQueue>, ManageableMailQueueFactoryContract {
+class MemoryCacheableMailQueueFactoryTest implements MailQueueFactoryContract<MemoryMailQueueFactory.MemoryCacheableMailQueue>,
+    ManageableMailQueueFactoryContract<MemoryMailQueueFactory.MemoryCacheableMailQueue> {
 
     MemoryMailQueueFactory memoryMailQueueFactory;
 
@@ -36,7 +36,7 @@ class MemoryCacheableMailQueueFactoryTest implements MailQueueFactoryContract<Ma
     }
 
     @Override
-    public MailQueueFactory<ManageableMailQueue> getMailQueueFactory() {
+    public MailQueueFactory<MemoryMailQueueFactory.MemoryCacheableMailQueue> getMailQueueFactory() {
         return memoryMailQueueFactory;
     }
 }
\ No newline at end of file
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
index 64c0c93..b22d850 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
@@ -30,6 +30,7 @@ import org.apache.james.backends.rabbitmq.ReceiverProvider;
 import org.apache.james.metrics.api.Metric;
 import org.apache.james.metrics.api.MetricFactory;
 import org.apache.james.queue.api.MailQueue;
+import org.apache.james.queue.api.MailQueueFactory;
 import org.apache.james.queue.rabbitmq.view.api.DeleteCondition;
 import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
 import org.apache.mailet.Mail;
@@ -39,13 +40,13 @@ import com.rabbitmq.client.Delivery;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 import reactor.rabbitmq.AcknowledgableDelivery;
 import reactor.rabbitmq.ConsumeOptions;
 import reactor.rabbitmq.Receiver;
 
 class Dequeuer implements Closeable {
     private static final boolean REQUEUE = true;
-    private static final int EXECUTION_RATE = 5;
 
     private static class RabbitMQMailQueueItem implements MailQueue.MailQueueItem {
 
@@ -84,14 +85,14 @@ class Dequeuer implements Closeable {
 
     Dequeuer(MailQueueName name, ReceiverProvider receiverProvider, Function<MailReferenceDTO, MailWithEnqueueId> mailLoader,
              MailReferenceSerializer serializer, MetricFactory metricFactory,
-             MailQueueView mailQueueView) {
+             MailQueueView mailQueueView, MailQueueFactory.PrefetchCount prefetchCount) {
         this.mailLoader = mailLoader;
         this.mailReferenceSerializer = serializer;
         this.mailQueueView = mailQueueView;
         this.dequeueMetric = metricFactory.generate(DEQUEUED_METRIC_NAME_PREFIX + name.asString());
         this.receiver = receiverProvider.createReceiver();
         this.flux = this.receiver
-            .consumeManualAck(name.toWorkQueueName().asString(), new ConsumeOptions().qos(EXECUTION_RATE))
+            .consumeManualAck(name.toWorkQueueName().asString(), new ConsumeOptions().qos(prefetchCount.asInt()))
             .filter(getResponse -> getResponse.getBody() != null);
     }
 
@@ -101,8 +102,8 @@ class Dequeuer implements Closeable {
     }
 
     Flux<? extends MailQueue.MailQueueItem> deQueue() {
-        return flux.concatMap(this::loadItem)
-            .concatMap(this::filterIfDeleted);
+        return flux.flatMapSequential(response -> loadItem(response).subscribeOn(Schedulers.elastic()))
+            .concatMap(item -> filterIfDeleted(item).subscribeOn(Schedulers.elastic()));
     }
 
     private Mono<RabbitMQMailQueueItem> filterIfDeleted(RabbitMQMailQueueItem item) {
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
index 3805e74..fc2f784 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
@@ -93,7 +93,7 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
             this.configuration = configuration;
         }
 
-        RabbitMQMailQueue create(MailQueueName mailQueueName) {
+        RabbitMQMailQueue create(MailQueueName mailQueueName, PrefetchCount prefetchCount) {
             MailQueueView mailQueueView = mailQueueViewFactory.create(mailQueueName);
             mailQueueView.initialize(mailQueueName);
 
@@ -103,7 +103,7 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
                 new Enqueuer(mailQueueName, sender, mimeMessageStore, mailReferenceSerializer,
                     metricFactory, mailQueueView, clock),
                 new Dequeuer(mailQueueName, receiverProvider, mailLoader, mailReferenceSerializer,
-                    metricFactory, mailQueueView),
+                    metricFactory, mailQueueView, prefetchCount),
                 mailQueueView,
                 decoratorFactory);
 
@@ -133,15 +133,15 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
     }
 
     @Override
-    public Optional<RabbitMQMailQueue> getQueue(org.apache.james.queue.api.MailQueueName name) {
-        return getQueueFromRabbitServer(MailQueueName.fromString(name.asString()));
+    public Optional<RabbitMQMailQueue> getQueue(org.apache.james.queue.api.MailQueueName name, PrefetchCount count) {
+        return getQueueFromRabbitServer(MailQueueName.fromString(name.asString()), count);
     }
 
     @Override
-    public RabbitMQMailQueue createQueue(org.apache.james.queue.api.MailQueueName name) {
+    public RabbitMQMailQueue createQueue(org.apache.james.queue.api.MailQueueName name, PrefetchCount count) {
         MailQueueName mailQueueName = MailQueueName.fromString(name.asString());
-        return getQueueFromRabbitServer(mailQueueName)
-            .orElseGet(() -> createQueueIntoRabbitServer(mailQueueName));
+        return getQueueFromRabbitServer(mailQueueName, count)
+            .orElseGet(() -> createQueueIntoRabbitServer(mailQueueName, count));
     }
 
     @Override
@@ -152,7 +152,7 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
             .collect(ImmutableSet.toImmutableSet());
     }
 
-    private RabbitMQMailQueue createQueueIntoRabbitServer(MailQueueName mailQueueName) {
+    private RabbitMQMailQueue createQueueIntoRabbitServer(MailQueueName mailQueueName, PrefetchCount prefetchCount) {
         String exchangeName = mailQueueName.toRabbitExchangeName().asString();
         Flux.concat(
             sender.declareExchange(ExchangeSpecification.exchange(exchangeName)
@@ -169,13 +169,13 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
                 .routingKey(EMPTY_ROUTING_KEY)))
             .then()
             .block();
-        return privateFactory.create(mailQueueName);
+        return privateFactory.create(mailQueueName, prefetchCount);
     }
 
-    private Optional<RabbitMQMailQueue> getQueueFromRabbitServer(MailQueueName name) {
+    private Optional<RabbitMQMailQueue> getQueueFromRabbitServer(MailQueueName name, PrefetchCount prefetchCount) {
         return mqManagementApi.listCreatedMailQueueNames()
             .filter(name::equals)
-            .map(privateFactory::create)
+            .map(queueName -> privateFactory.create(queueName, prefetchCount))
             .findFirst();
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org