You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/03/14 15:03:59 UTC

[james-project] branch master updated (8f8c4c8 -> 60c7f63)

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git.


    from 8f8c4c8  JAMES-2665 Re-enable vault integration test about mailbox deletion
     new cc09e19  MAILBOX-342 Not reusing forks helps testing stability of mailbox/cassandra
     new 9fdb43b  MAILBOX-342 Add missing cassandra modules
     new 66ef29e  JAMES-2544 Reverse MailQueue usage
     new 60c7f63  JAMES-2544 dequeueTime time is always 0 now that the queue is reactive

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../MailQueue-1490071879988-dashboard.json         | 226 ---------------------
 mailbox/cassandra/pom.xml                          |  12 --
 .../mail/CassandraGenericMailboxMapperTest.java    |   2 +
 .../cassandra/mail/CassandraMailboxDAOTest.java    |   2 +
 .../mail/CassandraMailboxMapperAclTest.java        |   2 +
 .../CassandraMailboxMapperConcurrencyTest.java     |   2 +
 .../cassandra/mail/CassandraMailboxMapperTest.java |   6 +-
 .../modules/server/CamelMailetContainerModule.java |   3 +-
 .../mailetcontainer/impl/JamesMailSpooler.java     | 162 +++++----------
 .../james/transport/mailets/RemoteDelivery.java    |  36 +---
 .../mailets/remote/delivery/DeliveryRunnable.java  |  91 ++++-----
 .../remote/delivery/DeliveryRunnableTest.java      |   2 +-
 .../remote/delivery/RemoteDeliveryRunningTest.java |   4 +-
 .../remote/delivery/RemoteDeliveryTest.java        |  17 +-
 .../SetMessagesOutboxFlagUpdateTest.java           |  13 +-
 .../org/apache/james/jmap/send/MailSpoolTest.java  |   6 +-
 .../queue/activemq/ActiveMQMailQueueTest.java      |   2 +-
 .../java/org/apache/james/queue/api/MailQueue.java |   4 +-
 .../james/queue/api/DelayedMailQueueContract.java  |  52 ++---
 .../api/DelayedManageableMailQueueContract.java    |   7 +-
 .../api/DelayedPriorityMailQueueContract.java      |  14 +-
 .../apache/james/queue/api/MailQueueContract.java  | 106 ++++++----
 .../james/queue/api/MailQueueMetricContract.java   |  39 +---
 .../james/queue/api/MailQueueMetricExtension.java  |   3 -
 .../queue/api/ManageableMailQueueContract.java     |  21 +-
 .../james/queue/api/PriorityMailQueueContract.java |  51 +++--
 .../org/apache/james/queue/file/FileMailQueue.java |  18 +-
 .../org/apache/james/queue/jms/JMSMailQueue.java   |  50 ++---
 server/queue/queue-memory/pom.xml                  |   5 +
 .../james/queue/memory/MemoryMailQueueFactory.java |  13 +-
 .../org/apache/james/queue/rabbitmq/Dequeuer.java  |  34 ++--
 .../james/queue/rabbitmq/RabbitMQMailQueue.java    |   7 +-
 .../queue/rabbitmq/RabbitMQMailQueueTest.java      |  13 +-
 33 files changed, 383 insertions(+), 642 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 01/04: MAILBOX-342 Not reusing forks helps testing stability of mailbox/cassandra

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit cc09e1930fc0cecb82b3d5eb9a319bb7f67c7b7b
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Thu Mar 14 15:16:08 2019 +0700

    MAILBOX-342 Not reusing forks helps testing stability of mailbox/cassandra
---
 mailbox/cassandra/pom.xml | 12 ------------
 1 file changed, 12 deletions(-)

diff --git a/mailbox/cassandra/pom.xml b/mailbox/cassandra/pom.xml
index e2609e1..250955e 100644
--- a/mailbox/cassandra/pom.xml
+++ b/mailbox/cassandra/pom.xml
@@ -182,16 +182,4 @@
         </dependency>
     </dependencies>
 
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-surefire-plugin</artifactId>
-                <configuration>
-                    <reuseForks>true</reuseForks>
-                </configuration>
-            </plugin>
-        </plugins>
-    </build>
-
 </project>


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 03/04: JAMES-2544 Reverse MailQueue usage

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 66ef29eb9b1a60c47cb6a38568732ffd03cfd9e1
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Wed Jan 30 16:44:19 2019 +0100

    JAMES-2544 Reverse MailQueue usage
    
    	Instead of polling from a pool of threads we now return the
    	Flux so it can be consumed in a more optimized way.
---
 .../modules/server/CamelMailetContainerModule.java |   3 +-
 .../mailetcontainer/impl/JamesMailSpooler.java     | 162 +++++++--------------
 .../james/transport/mailets/RemoteDelivery.java    |  36 ++---
 .../mailets/remote/delivery/DeliveryRunnable.java  |  91 ++++++------
 .../remote/delivery/DeliveryRunnableTest.java      |   2 +-
 .../remote/delivery/RemoteDeliveryRunningTest.java |   4 +-
 .../remote/delivery/RemoteDeliveryTest.java        |  17 ++-
 .../SetMessagesOutboxFlagUpdateTest.java           |  13 +-
 .../org/apache/james/jmap/send/MailSpoolTest.java  |   6 +-
 .../queue/activemq/ActiveMQMailQueueTest.java      |   2 +-
 .../java/org/apache/james/queue/api/MailQueue.java |   3 +-
 .../james/queue/api/DelayedMailQueueContract.java  |  52 +++----
 .../api/DelayedManageableMailQueueContract.java    |   7 +-
 .../api/DelayedPriorityMailQueueContract.java      |  14 +-
 .../apache/james/queue/api/MailQueueContract.java  | 106 ++++++++------
 .../james/queue/api/MailQueueMetricContract.java   |  15 +-
 .../queue/api/ManageableMailQueueContract.java     |  21 +--
 .../james/queue/api/PriorityMailQueueContract.java |  51 ++++---
 .../org/apache/james/queue/file/FileMailQueue.java |  18 ++-
 .../org/apache/james/queue/jms/JMSMailQueue.java   |  53 ++++---
 server/queue/queue-memory/pom.xml                  |   5 +
 .../james/queue/memory/MemoryMailQueueFactory.java |  13 +-
 .../org/apache/james/queue/rabbitmq/Dequeuer.java  |  34 ++---
 .../james/queue/rabbitmq/RabbitMQMailQueue.java    |   7 +-
 .../queue/rabbitmq/RabbitMQMailQueueTest.java      |  13 +-
 25 files changed, 375 insertions(+), 373 deletions(-)

diff --git a/server/container/guice/mailet/src/main/java/org/apache/james/modules/server/CamelMailetContainerModule.java b/server/container/guice/mailet/src/main/java/org/apache/james/modules/server/CamelMailetContainerModule.java
index fb28f27..0a0a46c 100644
--- a/server/container/guice/mailet/src/main/java/org/apache/james/modules/server/CamelMailetContainerModule.java
+++ b/server/container/guice/mailet/src/main/java/org/apache/james/modules/server/CamelMailetContainerModule.java
@@ -171,10 +171,11 @@ public class CamelMailetContainerModule extends AbstractModule {
             }
         }
 
-        private void configureJamesSpooler() throws ConfigurationException {
+        private void configureJamesSpooler() {
             jamesMailSpooler.setMailProcessor(camelCompositeProcessor);
             jamesMailSpooler.configure(getJamesSpoolerConfiguration());
             jamesMailSpooler.init();
+            jamesMailSpooler.run();
         }
 
         private HierarchicalConfiguration getJamesSpoolerConfiguration() {
diff --git a/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java b/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
index 4100533..d24e23e 100644
--- a/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
+++ b/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
@@ -19,15 +19,13 @@
 
 package org.apache.james.mailetcontainer.impl;
 
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
 import javax.inject.Inject;
 
-import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.HierarchicalConfiguration;
 import org.apache.james.lifecycle.api.Configurable;
 import org.apache.james.lifecycle.api.Disposable;
@@ -37,20 +35,24 @@ import org.apache.james.mailetcontainer.api.jmx.MailSpoolerMBean;
 import org.apache.james.metrics.api.MetricFactory;
 import org.apache.james.metrics.api.TimeMetric;
 import org.apache.james.queue.api.MailQueue;
-import org.apache.james.queue.api.MailQueue.MailQueueException;
 import org.apache.james.queue.api.MailQueue.MailQueueItem;
 import org.apache.james.queue.api.MailQueueFactory;
-import org.apache.james.util.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.james.util.concurrent.NamedThreadFactory;
 import org.apache.mailet.Mail;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Scheduler;
+import reactor.core.scheduler.Schedulers;
+
 /**
  * Manages the mail spool. This class is responsible for retrieving messages
  * from the spool, directing messages to the appropriate processor, and removing
  * them from the spool when processing is complete.
  */
-public class JamesMailSpooler implements Runnable, Disposable, Configurable, MailSpoolerMBean {
+public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMBean {
     private static final Logger LOGGER = LoggerFactory.getLogger(JamesMailSpooler.class);
 
     public static final String SPOOL_PROCESSING = "spoolProcessing";
@@ -61,35 +63,18 @@ public class JamesMailSpooler implements Runnable, Disposable, Configurable, Mai
      */
     private int numThreads;
 
-    /**
-     * Number of active threads
-     */
-    private final AtomicInteger numActive = new AtomicInteger(0);
-
     private final AtomicInteger processingActive = new AtomicInteger(0);
 
-    /**
-     * Spool threads are active
-     */
-    private final AtomicBoolean active = new AtomicBoolean(false);
-
     private final MetricFactory metricFactory;
 
     /**
-     * Spool threads
-     */
-    private ExecutorService dequeueService;
-
-    private ExecutorService workerService;
-
-    /**
      * The mail processor
      */
     private MailProcessor mailProcessor;
 
     private MailQueueFactory<?> queueFactory;
-
-    private int numDequeueThreads;
+    private reactor.core.Disposable disposable;
+    private Scheduler spooler;
 
     @Inject
     public JamesMailSpooler(MetricFactory metricFactory) {
@@ -107,9 +92,7 @@ public class JamesMailSpooler implements Runnable, Disposable, Configurable, Mai
     }
 
     @Override
-    public void configure(HierarchicalConfiguration config) throws ConfigurationException {
-        numDequeueThreads = config.getInt("dequeueThreads", 2);
-
+    public void configure(HierarchicalConfiguration config) {
         numThreads = config.getInt("threads", 100);
     }
 
@@ -118,81 +101,50 @@ public class JamesMailSpooler implements Runnable, Disposable, Configurable, Mai
      */
     @PostConstruct
     public void init() {
-        LOGGER.info("{} init...", getClass().getName());
-
+        LOGGER.info("init...");
         queue = queueFactory.createQueue(MailQueueFactory.SPOOL);
+        spooler = Schedulers.fromExecutor(Executors.newFixedThreadPool(numThreads, NamedThreadFactory.withName("spooler")));
+        LOGGER.info("uses {} Thread(s)", numThreads);
+    }
 
-        LOGGER.info("{} uses {} Thread(s)", getClass().getName(), numThreads);
+    public void run() {
+        LOGGER.info("Queue={}", queue);
 
-        active.set(true);
-        workerService = JMXEnabledThreadPoolExecutor.newFixedThreadPool("org.apache.james:type=component,component=mailetcontainer,name=mailspooler,sub-type=threadpool", "spooler", numThreads);
-        dequeueService = JMXEnabledThreadPoolExecutor.newFixedThreadPool("org.apache.james:type=component,component=mailetcontainer,name=mailspooler,sub-type=threadpool", "dequeuer", numDequeueThreads);
+        disposable = Flux.from(queue.deQueue())
+            .publishOn(spooler)
+            .flatMap(this::handleOnQueueItem)
+            .onErrorContinue((throwable, item) -> LOGGER.error("Exception processing mail while spooling {}", item, throwable))
+            .subscribeOn(Schedulers.elastic())
+            .subscribe();
+    }
 
-        for (int i = 0; i < numDequeueThreads; i++) {
-            Thread reader = new Thread(this, "Dequeue Thread #" + i);
-            dequeueService.execute(reader);
+    private Mono<Void> handleOnQueueItem(MailQueueItem queueItem) {
+        TimeMetric timeMetric = metricFactory.timer(SPOOL_PROCESSING);
+        try {
+            processingActive.incrementAndGet();
+            return processMail(queueItem);
+        } catch (Throwable e) {
+            return Mono.error(e);
+        } finally {
+            processingActive.decrementAndGet();
+            timeMetric.stopAndPublish();
         }
     }
 
-    /**
-     * This routinely checks the message spool for messages, and processes them
-     * as necessary
-     */
-    @Override
-    public void run() {
-        LOGGER.info("Run {}: {}", getClass().getName(), Thread.currentThread().getName());
-        LOGGER.info("Queue={}", queue);
-
-        while (active.get()) {
-
-            final MailQueueItem queueItem;
-            try {
-                queueItem = queue.deQueue();
-                workerService.execute(() -> {
-                    TimeMetric timeMetric = metricFactory.timer(SPOOL_PROCESSING);
-                    try {
-                        numActive.incrementAndGet();
-
-                        // increase count
-                        processingActive.incrementAndGet();
-
-                        Mail mail = queueItem.getMail();
-                        LOGGER.debug("==== Begin processing mail {} ====", mail.getName());
-
-                        try {
-                            mailProcessor.service(mail);
-                            queueItem.done(true);
-                        } catch (Exception e) {
-                            if (active.get()) {
-                                LOGGER.error("Exception processing mail while spooling", e);
-                            }
-                            queueItem.done(false);
-
-                        } finally {
-                            LifecycleUtil.dispose(mail);
-                            mail = null;
-                        }
-                    } catch (Throwable e) {
-                        if (active.get()) {
-                            LOGGER.error("Exception processing mail while spooling", e);
-
-                        }
-                    } finally {
-                        processingActive.decrementAndGet();
-                        numActive.decrementAndGet();
-                        timeMetric.stopAndPublish();
-                    }
-
-                });
-            } catch (MailQueueException e1) {
-                if (active.get()) {
-                    LOGGER.error("Exception dequeue mail", e1);
-                }
-            } catch (InterruptedException interrupted) {
-                //MailSpooler is stopping
-            }
+    private Mono<Void> processMail(MailQueueItem queueItem) throws MailQueue.MailQueueException {
+        Mail mail = queueItem.getMail();
+        LOGGER.debug("==== Begin processing mail {} ====", mail.getName());
+        try {
+            mailProcessor.service(mail);
+            queueItem.done(true);
+            return Mono.empty();
+        } catch (Exception e) {
+            queueItem.done(false);
+            return Mono.error(e);
+        } finally {
+            LOGGER.debug("==== End processing mail {} ====", mail.getName());
+            LifecycleUtil.dispose(mail);
         }
-        LOGGER.info("Stop {} : {}", getClass().getName(), Thread.currentThread().getName());
     }
 
     /**
@@ -206,22 +158,10 @@ public class JamesMailSpooler implements Runnable, Disposable, Configurable, Mai
     @PreDestroy
     @Override
     public void dispose() {
-        LOGGER.info("{} dispose...", getClass().getName());
-        active.set(false); // shutdown the threads
-        dequeueService.shutdownNow();
-        workerService.shutdown();
-
-        long stop = System.currentTimeMillis() + 60000;
-        // give the spooler threads one minute to terminate gracefully
-        while (numActive.get() != 0 && stop > System.currentTimeMillis()) {
-            try {
-                Thread.sleep(1000);
-            } catch (InterruptedException e) {
-                throw new RuntimeException(e);
-            }
-        }
-
-        LOGGER.info("{} thread shutdown completed.", getClass().getName());
+        LOGGER.info("start dispose() ...");
+        disposable.dispose();
+        spooler.dispose();
+        LOGGER.info("thread shutdown completed.");
     }
 
     @Override
diff --git a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java
index 7192edc..9adb9a6 100644
--- a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java
+++ b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java
@@ -22,10 +22,6 @@ package org.apache.james.transport.mailets;
 import java.net.UnknownHostException;
 import java.util.Collection;
 import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.inject.Inject;
 import javax.mail.MessagingException;
@@ -43,7 +39,6 @@ import org.apache.james.transport.mailets.remote.delivery.Bouncer;
 import org.apache.james.transport.mailets.remote.delivery.DeliveryRunnable;
 import org.apache.james.transport.mailets.remote.delivery.RemoteDeliveryConfiguration;
 import org.apache.james.transport.mailets.remote.delivery.RemoteDeliverySocketFactory;
-import org.apache.james.util.concurrent.NamedThreadFactory;
 import org.apache.mailet.Mail;
 import org.apache.mailet.base.GenericMailet;
 import org.slf4j.Logger;
@@ -123,6 +118,7 @@ import com.google.common.collect.HashMultimap;
  */
 public class RemoteDelivery extends GenericMailet {
     private static final Logger LOGGER = LoggerFactory.getLogger(RemoteDelivery.class);
+    private DeliveryRunnable deliveryRunnable;
 
     public enum ThreadState {
         START_THREADS,
@@ -135,12 +131,10 @@ public class RemoteDelivery extends GenericMailet {
     private final DomainList domainList;
     private final MailQueueFactory<?> queueFactory;
     private final MetricFactory metricFactory;
-    private final AtomicBoolean isDestroyed;
     private final ThreadState startThreads;
 
     private MailQueue queue;
     private RemoteDeliveryConfiguration configuration;
-    private ExecutorService executor;
 
     @Inject
     public RemoteDelivery(DNSService dnsServer, DomainList domainList, MailQueueFactory<?> queueFactory, MetricFactory metricFactory) {
@@ -152,7 +146,6 @@ public class RemoteDelivery extends GenericMailet {
         this.domainList = domainList;
         this.queueFactory = queueFactory;
         this.metricFactory = metricFactory;
-        this.isDestroyed = new AtomicBoolean(false);
         this.startThreads = startThreads;
     }
 
@@ -167,23 +160,14 @@ public class RemoteDelivery extends GenericMailet {
         } catch (UnknownHostException e) {
             LOGGER.error("Invalid bind setting ({}): ", configuration.getBindAddress(), e);
         }
+        deliveryRunnable = new DeliveryRunnable(queue,
+            configuration,
+            dnsServer,
+            metricFactory,
+            getMailetContext(),
+            new Bouncer(configuration, getMailetContext()));
         if (startThreads == ThreadState.START_THREADS) {
-            initDeliveryThreads();
-        }
-    }
-
-    private void initDeliveryThreads() {
-        ThreadFactory threadFactory = NamedThreadFactory.withClassName(getClass());
-        executor = Executors.newFixedThreadPool(configuration.getWorkersThreadCount(), threadFactory);
-        for (int a = 0; a < configuration.getWorkersThreadCount(); a++) {
-            executor.execute(
-                new DeliveryRunnable(queue,
-                    configuration,
-                    dnsServer,
-                    metricFactory,
-                    getMailetContext(),
-                    new Bouncer(configuration, getMailetContext()),
-                    isDestroyed));
+            deliveryRunnable.start();
         }
     }
 
@@ -261,9 +245,7 @@ public class RemoteDelivery extends GenericMailet {
     @Override
     public synchronized void destroy() {
         if (startThreads == ThreadState.START_THREADS) {
-            isDestroyed.set(true);
-            executor.shutdownNow();
-            notifyAll();
+            deliveryRunnable.dispose();
         }
     }
 
diff --git a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/DeliveryRunnable.java b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/DeliveryRunnable.java
index b5fcebe..ddaa71a 100644
--- a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/DeliveryRunnable.java
+++ b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/DeliveryRunnable.java
@@ -37,8 +37,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
+import reactor.core.Disposable;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
-public class DeliveryRunnable implements Runnable {
+public class DeliveryRunnable implements Disposable {
     private static final Logger LOGGER = LoggerFactory.getLogger(DeliveryRunnable.class);
 
     public static final Supplier<Date> CURRENT_DATE_SUPPLIER = Date::new;
@@ -52,77 +56,65 @@ public class DeliveryRunnable implements Runnable {
     private final MetricFactory metricFactory;
     private final Bouncer bouncer;
     private final MailDelivrer mailDelivrer;
-    private final AtomicBoolean isDestroyed;
     private final Supplier<Date> dateSupplier;
+    private Disposable disposable;
 
     public DeliveryRunnable(MailQueue queue, RemoteDeliveryConfiguration configuration, DNSService dnsServer, MetricFactory metricFactory,
-                            MailetContext mailetContext, Bouncer bouncer, AtomicBoolean isDestroyed) {
+                            MailetContext mailetContext, Bouncer bouncer) {
         this(queue, configuration, metricFactory, bouncer,
             new MailDelivrer(configuration, new MailDelivrerToHost(configuration, mailetContext), dnsServer, bouncer),
-            isDestroyed, CURRENT_DATE_SUPPLIER);
+            CURRENT_DATE_SUPPLIER);
     }
 
     @VisibleForTesting
     DeliveryRunnable(MailQueue queue, RemoteDeliveryConfiguration configuration, MetricFactory metricFactory, Bouncer bouncer,
-                     MailDelivrer mailDelivrer, AtomicBoolean isDestroyeds, Supplier<Date> dateSupplier) {
+                     MailDelivrer mailDelivrer, Supplier<Date> dateSupplier) {
         this.queue = queue;
         this.configuration = configuration;
         this.outgoingMailsMetric = metricFactory.generate(OUTGOING_MAILS);
         this.bouncer = bouncer;
         this.mailDelivrer = mailDelivrer;
-        this.isDestroyed = isDestroyeds;
         this.dateSupplier = dateSupplier;
         this.metricFactory = metricFactory;
     }
 
-    @Override
-    public void run() {
+    public void start() {
+        disposable = Flux.from(queue.deQueue())
+            .publishOn(Schedulers.newParallel("RemoteDelivery", configuration.getWorkersThreadCount()))
+            .flatMap(this::runStep)
+            .onErrorContinue(((throwable, nothing) -> LOGGER.error("Exception caught in RemoteDelivery", throwable)))
+            .subscribeOn(Schedulers.elastic())
+            .subscribe();
+    }
+
+    private Mono<Void> runStep(MailQueue.MailQueueItem queueItem) {
+        TimeMetric timeMetric = metricFactory.timer(REMOTE_DELIVERY_TRIAL);
         try {
-            while (!Thread.interrupted() && !isDestroyed.get()) {
-                runStep();
-            }
+            return processMail(queueItem);
+        } catch (Throwable e) {
+            return Mono.error(e);
         } finally {
-            // Restore the thread state to non-interrupted.
-            Thread.interrupted();
+            timeMetric.stopAndPublish();
         }
     }
 
-    private void runStep() {
-        TimeMetric timeMetric = null;
-        try {
-            // Get the 'mail' object that is ready for deliverying. If no message is
-            // ready, the 'accept' will block until message is ready.
-            // The amount of time to block is determined by the 'getWaitTime' method of the MultipleDelayFilter.
-            MailQueue.MailQueueItem queueItem = queue.deQueue();
-            timeMetric = metricFactory.timer(REMOTE_DELIVERY_TRIAL);
-            Mail mail = queueItem.getMail();
-
-            try {
-                if (configuration.isDebug()) {
-                    LOGGER.debug("{} will process mail {}", Thread.currentThread().getName(), mail.getName());
-                }
-                attemptDelivery(mail);
-                LifecycleUtil.dispose(mail);
-                mail = null;
-                queueItem.done(true);
-            } catch (Exception e) {
-                // Prevent unexpected exceptions from causing looping by removing message from outgoing.
-                // DO NOT CHANGE THIS to catch Error!
-                // For example, if there were an OutOfMemory condition caused because
-                // something else in the server was abusing memory, we would not want to start purging the retrying spool!
-                LOGGER.error("Exception caught in RemoteDelivery.run()", e);
-                LifecycleUtil.dispose(mail);
-                queueItem.done(false);
-            }
+    private Mono<Void> processMail(MailQueue.MailQueueItem queueItem) throws MailQueue.MailQueueException {
+        Mail mail = queueItem.getMail();
 
-        } catch (Throwable e) {
-            if (!isDestroyed.get()) {
-                LOGGER.error("Exception caught in RemoteDelivery.run()", e);
-            }
+        try {
+            LOGGER.debug("will process mail {}", mail.getName());
+            attemptDelivery(mail);
+            queueItem.done(true);
+            return Mono.empty();
+        } catch (Exception e) {
+            // Prevent unexpected exceptions from causing looping by removing message from outgoing.
+            // DO NOT CHANGE THIS to catch Error!
+            // For example, if there were an OutOfMemory condition caused because
+            // something else in the server was abusing memory, we would not want to start purging the retrying spool!
+            queueItem.done(false);
+            return Mono.error(e);
         } finally {
-            if (timeMetric != null) {
-                timeMetric.stopAndPublish();
-            }
+            LifecycleUtil.dispose(mail);
         }
     }
 
@@ -178,4 +170,9 @@ public class DeliveryRunnable implements Runnable {
         }
         return configuration.getDelayTimes().get(retry_count - 1);
     }
+
+    @Override
+    public void dispose() {
+        disposable.dispose();
+    }
 }
diff --git a/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/remote/delivery/DeliveryRunnableTest.java b/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/remote/delivery/DeliveryRunnableTest.java
index 00095e2..e2820eb 100644
--- a/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/remote/delivery/DeliveryRunnableTest.java
+++ b/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/remote/delivery/DeliveryRunnableTest.java
@@ -74,7 +74,7 @@ public class DeliveryRunnableTest {
         bouncer = mock(Bouncer.class);
         mailDelivrer = mock(MailDelivrer.class);
         mailQueue = mock(MailQueue.class);
-        testee = new DeliveryRunnable(mailQueue, configuration, mockMetricFactory, bouncer, mailDelivrer, DeliveryRunnable.DEFAULT_NOT_STARTED, FIXED_DATE_SUPPLIER);
+        testee = new DeliveryRunnable(mailQueue, configuration, mockMetricFactory, bouncer, mailDelivrer, FIXED_DATE_SUPPLIER);
     }
 
     @Test
diff --git a/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/remote/delivery/RemoteDeliveryRunningTest.java b/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/remote/delivery/RemoteDeliveryRunningTest.java
index 075aa43..2f3f938 100644
--- a/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/remote/delivery/RemoteDeliveryRunningTest.java
+++ b/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/remote/delivery/RemoteDeliveryRunningTest.java
@@ -37,6 +37,8 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import reactor.core.publisher.Flux;
+
 public class RemoteDeliveryRunningTest {
 
     public static final String QUEUE_NAME = "queueName";
@@ -61,7 +63,7 @@ public class RemoteDeliveryRunningTest {
         when(mailQueue.deQueue()).thenAnswer(invocation -> {
             countDownLatch.countDown();
             Thread.sleep(TimeUnit.SECONDS.toMillis(20));
-            return null;
+            return Flux.never();
         });
         remoteDelivery.init(FakeMailetConfig.builder()
             .setProperty(RemoteDeliveryConfiguration.DELIVERY_THREADS, "1")
diff --git a/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/remote/delivery/RemoteDeliveryTest.java b/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/remote/delivery/RemoteDeliveryTest.java
index 00612ae..117446d 100644
--- a/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/remote/delivery/RemoteDeliveryTest.java
+++ b/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/remote/delivery/RemoteDeliveryTest.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.transport.mailets.remote.delivery;
 
+import static org.apache.mailet.base.MailAddressFixture.JAMES_APACHE_ORG;
+import static org.apache.mailet.base.MailAddressFixture.JAMES_APACHE_ORG_DOMAIN;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.mock;
 
@@ -27,9 +29,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
+import org.apache.commons.configuration.ConfigurationException;
 import org.apache.james.core.MailAddress;
 import org.apache.james.dnsservice.api.DNSService;
-import org.apache.james.domainlist.api.DomainList;
+import org.apache.james.domainlist.lib.DomainListConfiguration;
+import org.apache.james.domainlist.memory.MemoryDomainList;
 import org.apache.james.metrics.api.NoopMetricFactory;
 import org.apache.james.queue.api.MailPrioritySupport;
 import org.apache.james.queue.api.MailQueueFactory;
@@ -94,10 +98,13 @@ public class RemoteDeliveryTest {
     private ManageableMailQueue mailQueue;
 
     @Before
-    public void setUp() {
+    public void setUp() throws ConfigurationException {
         MailQueueFactory<ManageableMailQueue> queueFactory = new MemoryMailQueueFactory(new RawMailQueueItemDecoratorFactory());
         mailQueue = queueFactory.createQueue(RemoteDeliveryConfiguration.OUTGOING);
-        remoteDelivery = new RemoteDelivery(mock(DNSService.class), mock(DomainList.class),
+        DNSService dnsService = mock(DNSService.class);
+        MemoryDomainList domainList = new MemoryDomainList(dnsService);
+        domainList.configure(DomainListConfiguration.builder().defaultDomain(JAMES_APACHE_ORG_DOMAIN));
+        remoteDelivery = new RemoteDelivery(dnsService, domainList,
             queueFactory, new NoopMetricFactory(), RemoteDelivery.ThreadState.DO_NOT_START_THREADS);
     }
 
@@ -115,7 +122,7 @@ public class RemoteDeliveryTest {
             .extracting(MailProjection::from)
             .containsOnly(MailProjection.from(
                 FakeMail.builder()
-                    .name(MAIL_NAME + RemoteDelivery.NAME_JUNCTION + MailAddressFixture.JAMES_APACHE_ORG)
+                    .name(MAIL_NAME + RemoteDelivery.NAME_JUNCTION + JAMES_APACHE_ORG)
                     .recipient(MailAddressFixture.ANY_AT_JAMES)
                     .build()));
     }
@@ -137,7 +144,7 @@ public class RemoteDeliveryTest {
             .extracting(MailProjection::from)
             .containsOnly(
                 MailProjection.from(FakeMail.builder()
-                    .name(MAIL_NAME + RemoteDelivery.NAME_JUNCTION + MailAddressFixture.JAMES_APACHE_ORG)
+                    .name(MAIL_NAME + RemoteDelivery.NAME_JUNCTION + JAMES_APACHE_ORG)
                     .recipients(MailAddressFixture.ANY_AT_JAMES, MailAddressFixture.OTHER_AT_JAMES)
                     .build()),
                 MailProjection.from(FakeMail.builder()
diff --git a/server/protocols/jmap-integration-testing/jmap-integration-testing-common/src/test/java/org/apache/james/jmap/methods/integration/SetMessagesOutboxFlagUpdateTest.java b/server/protocols/jmap-integration-testing/jmap-integration-testing-common/src/test/java/org/apache/james/jmap/methods/integration/SetMessagesOutboxFlagUpdateTest.java
index 3972f4c..402bf4d 100644
--- a/server/protocols/jmap-integration-testing/jmap-integration-testing-common/src/test/java/org/apache/james/jmap/methods/integration/SetMessagesOutboxFlagUpdateTest.java
+++ b/server/protocols/jmap-integration-testing/jmap-integration-testing-common/src/test/java/org/apache/james/jmap/methods/integration/SetMessagesOutboxFlagUpdateTest.java
@@ -35,7 +35,6 @@ import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang3.NotImplementedException;
@@ -59,6 +58,7 @@ import io.restassured.RestAssured;
 import io.restassured.builder.RequestSpecBuilder;
 import io.restassured.http.ContentType;
 import io.restassured.parsing.Parser;
+import reactor.core.publisher.Flux;
 
 public abstract class SetMessagesOutboxFlagUpdateTest {
     private static final String USERNAME = "username@" + DOMAIN;
@@ -85,7 +85,6 @@ public abstract class SetMessagesOutboxFlagUpdateTest {
 
                 @Override
                 public void enQueue(Mail mail, long delay, TimeUnit unit) {
-
                 }
 
                 @Override
@@ -94,14 +93,8 @@ public abstract class SetMessagesOutboxFlagUpdateTest {
                 }
 
                 @Override
-                public MailQueueItem deQueue() {
-                    CountDownLatch blockingLatch = new CountDownLatch(1);
-                    try {
-                        blockingLatch.await();
-                    } catch (InterruptedException e) {
-                        throw new RuntimeException(e);
-                    }
-                    return null;
+                public Flux<MailQueueItem> deQueue() {
+                    return Flux.never();
                 }
             };
         }
diff --git a/server/protocols/jmap/src/test/java/org/apache/james/jmap/send/MailSpoolTest.java b/server/protocols/jmap/src/test/java/org/apache/james/jmap/send/MailSpoolTest.java
index 5984cde..64747db 100644
--- a/server/protocols/jmap/src/test/java/org/apache/james/jmap/send/MailSpoolTest.java
+++ b/server/protocols/jmap/src/test/java/org/apache/james/jmap/send/MailSpoolTest.java
@@ -33,6 +33,8 @@ import org.apache.mailet.base.test.FakeMail;
 import org.junit.Before;
 import org.junit.Test;
 
+import reactor.core.publisher.Flux;
+
 public class MailSpoolTest {
     private static final String USERNAME = "user";
     private static final TestMessageId MESSAGE_ID = TestMessageId.of(1);
@@ -57,7 +59,7 @@ public class MailSpoolTest {
 
         mailSpool.send(mail, new MailMetadata(MESSAGE_ID, USERNAME));
 
-        MailQueueItem actual = myQueue.deQueue();
+        MailQueueItem actual = Flux.from(myQueue.deQueue()).blockFirst();
         assertThat(actual.getMail().getName()).isEqualTo(NAME);
     }
 
@@ -69,7 +71,7 @@ public class MailSpoolTest {
 
         mailSpool.send(mail, new MailMetadata(MESSAGE_ID, USERNAME));
 
-        MailQueueItem actual = myQueue.deQueue();
+        MailQueueItem actual = Flux.from(myQueue.deQueue()).blockFirst();
         assertThat(actual.getMail().getAttribute(MailMetadata.MAIL_METADATA_USERNAME_ATTRIBUTE))
             .contains(new Attribute(MailMetadata.MAIL_METADATA_USERNAME_ATTRIBUTE, AttributeValue.of(USERNAME)));
         assertThat(actual.getMail().getAttribute(MailMetadata.MAIL_METADATA_MESSAGE_ID_ATTRIBUTE))
diff --git a/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQMailQueueTest.java b/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQMailQueueTest.java
index d502177..3c109cf 100644
--- a/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQMailQueueTest.java
+++ b/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQMailQueueTest.java
@@ -115,7 +115,7 @@ public class ActiveMQMailQueueTest implements DelayedManageableMailQueueContract
     @Test
     @Override
     @Disabled("JAMES-2309 Long overflow in JMS delays")
-    public void enqueueWithVeryLongDelayShouldDelayMail(ExecutorService executorService) {
+    public void enqueueWithVeryLongDelayShouldDelayMail() {
 
     }
 
diff --git a/server/queue/queue-api/src/main/java/org/apache/james/queue/api/MailQueue.java b/server/queue/queue-api/src/main/java/org/apache/james/queue/api/MailQueue.java
index 6c74bcd..e065924 100644
--- a/server/queue/queue-api/src/main/java/org/apache/james/queue/api/MailQueue.java
+++ b/server/queue/queue-api/src/main/java/org/apache/james/queue/api/MailQueue.java
@@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
 import javax.mail.MessagingException;
 
 import org.apache.mailet.Mail;
+import org.reactivestreams.Publisher;
 
 /**
  * <p>
@@ -95,7 +96,7 @@ public interface MailQueue {
      * Implementations should take care to do some kind of transactions to not
      * loose any mail on error
      */
-    MailQueueItem deQueue() throws MailQueueException, InterruptedException;
+    Publisher<MailQueueItem> deQueue();
 
     /**
      * Exception which will get thrown if any problems occur while working the
diff --git a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/DelayedMailQueueContract.java b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/DelayedMailQueueContract.java
index 1765ca7..fff18aa 100644
--- a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/DelayedMailQueueContract.java
+++ b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/DelayedMailQueueContract.java
@@ -21,74 +21,74 @@ package org.apache.james.queue.api;
 
 import static org.apache.james.queue.api.Mails.defaultMail;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
+import java.time.Duration;
+import java.time.ZonedDateTime;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
-import org.apache.james.junit.ExecutorExtension;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.LoggerFactory;
 
-import com.github.fge.lambdas.Throwing;
 import com.google.common.base.Stopwatch;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
-@ExtendWith(ExecutorExtension.class)
 public interface DelayedMailQueueContract {
 
     MailQueue getMailQueue();
 
     @Test
-    default void enqueueShouldDelayMailsWhenSpecified(ExecutorService executorService) throws Exception {
+    default void enqueueShouldDelayMailsWhenSpecified() throws Exception {
         getMailQueue().enQueue(defaultMail()
             .name("name")
             .build(),
-            2L,
+            5L,
             TimeUnit.SECONDS);
 
-        Future<?> future = executorService.submit(Throwing.runnable(() -> getMailQueue().deQueue()));
-        assertThatThrownBy(() -> future.get(1, TimeUnit.SECONDS))
-            .isInstanceOf(TimeoutException.class);
+        Mono<MailQueue.MailQueueItem> next = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.elastic()).next();
+        assertThatThrownBy(() -> next.block(Duration.ofSeconds(1)))
+            .isInstanceOf(RuntimeException.class);
     }
 
     @Test
-    default void enqueueWithNegativeDelayShouldNotDelayDelivery(ExecutorService executorService) throws Exception {
+    default void enqueueWithNegativeDelayShouldNotDelayDelivery() throws Exception {
         getMailQueue().enQueue(defaultMail()
             .name("name")
             .build(),
             -30L,
             TimeUnit.SECONDS);
 
-        Future<?> future = executorService.submit(Throwing.runnable(() -> getMailQueue().deQueue()));
-        future.get(1, TimeUnit.SECONDS);
+        Mono<MailQueue.MailQueueItem> next = Flux.from(getMailQueue().deQueue()).next();
+        assertThatCode(() -> next.block(Duration.ofSeconds(1))).doesNotThrowAnyException();
     }
 
     @Test
-    default void enqueueWithReasonablyLongDelayShouldDelayMail(ExecutorService executorService) throws Exception {
+    default void enqueueWithReasonablyLongDelayShouldDelayMail() throws Exception {
         getMailQueue().enQueue(defaultMail()
             .name("name")
             .build(),
             365 * 10,
             TimeUnit.DAYS);
 
-        Future<?> future = executorService.submit(Throwing.runnable(() -> getMailQueue().deQueue()));
-        assertThatThrownBy(() -> future.get(1, TimeUnit.SECONDS))
-            .isInstanceOf(TimeoutException.class);
+        Mono<MailQueue.MailQueueItem> next = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.elastic()).next();
+        assertThatThrownBy(() -> next.block(Duration.ofSeconds(1)))
+            .isInstanceOf(RuntimeException.class);
     }
 
     @Test
-    default void enqueueWithVeryLongDelayShouldDelayMail(ExecutorService executorService) throws Exception {
+    default void enqueueWithVeryLongDelayShouldDelayMail() throws Exception {
         getMailQueue().enQueue(defaultMail()
             .name("name")
             .build(),
             Long.MAX_VALUE,
             TimeUnit.DAYS);
 
-        Future<?> future = executorService.submit(Throwing.runnable(() -> getMailQueue().deQueue()));
-        assertThatThrownBy(() -> future.get(1, TimeUnit.SECONDS))
-            .isInstanceOf(TimeoutException.class);
+        Mono<MailQueue.MailQueueItem> next = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.elastic()).next();
+        assertThatThrownBy(() -> next.block(Duration.ofSeconds(1)))
+            .isInstanceOf(RuntimeException.class);
     }
 
     @Test
@@ -99,7 +99,7 @@ public interface DelayedMailQueueContract {
             1L,
             TimeUnit.SECONDS);
 
-        MailQueue.MailQueueItem mailQueueItem = getMailQueue().deQueue();
+        MailQueue.MailQueueItem mailQueueItem = Flux.from(getMailQueue().deQueue()).blockFirst();
         assertThat(mailQueueItem.getMail().getName()).isEqualTo("name1");
     }
 
@@ -107,6 +107,7 @@ public interface DelayedMailQueueContract {
     default void delayShouldAtLeastBeTheOneSpecified() throws Exception {
         long delay = 1L;
         TimeUnit unit = TimeUnit.SECONDS;
+
         Stopwatch started = Stopwatch.createStarted();
 
         getMailQueue().enQueue(defaultMail()
@@ -115,7 +116,8 @@ public interface DelayedMailQueueContract {
             delay,
             unit);
 
-        getMailQueue().deQueue();
+        MailQueue.MailQueueItem mailQueueItem = Flux.from(getMailQueue().deQueue()).blockFirst();
+        assertThat(mailQueueItem).isNotNull();
         assertThat(started.elapsed(TimeUnit.MILLISECONDS))
             .isGreaterThanOrEqualTo(unit.toMillis(delay));
     }
diff --git a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/DelayedManageableMailQueueContract.java b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/DelayedManageableMailQueueContract.java
index 5aa9238..2f27ed4 100644
--- a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/DelayedManageableMailQueueContract.java
+++ b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/DelayedManageableMailQueueContract.java
@@ -22,8 +22,8 @@ package org.apache.james.queue.api;
 import static org.apache.james.queue.api.Mails.defaultMail;
 import static org.assertj.core.api.Assertions.assertThat;
 
+import java.time.Duration;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.james.junit.ExecutorExtension;
@@ -31,6 +31,8 @@ import org.apache.mailet.Mail;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 
+import reactor.core.publisher.Flux;
+
 @ExtendWith(ExecutorExtension.class)
 public interface DelayedManageableMailQueueContract extends DelayedMailQueueContract, ManageableMailQueueContract {
 
@@ -47,8 +49,7 @@ public interface DelayedManageableMailQueueContract extends DelayedMailQueueCont
 
         getManageableMailQueue().flush();
 
-        Future<MailQueue.MailQueueItem> tryDequeue = executorService.submit(() -> getManageableMailQueue().deQueue());
-        assertThat(tryDequeue.get(1, TimeUnit.SECONDS).getMail().getName())
+        assertThat(Flux.from(getManageableMailQueue().deQueue()).blockFirst(Duration.ofSeconds(1)).getMail().getName())
             .isEqualTo("name1");
     }
 
diff --git a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/DelayedPriorityMailQueueContract.java b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/DelayedPriorityMailQueueContract.java
index 4ed561c..03d473e 100644
--- a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/DelayedPriorityMailQueueContract.java
+++ b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/DelayedPriorityMailQueueContract.java
@@ -22,10 +22,14 @@ package org.apache.james.queue.api;
 import static org.apache.james.queue.api.Mails.defaultMail;
 import static org.assertj.core.api.Assertions.assertThat;
 
+import java.util.Iterator;
 import java.util.concurrent.TimeUnit;
 
 import org.junit.jupiter.api.Test;
 
+import reactor.core.publisher.Flux;
+import reactor.core.scheduler.Schedulers;
+
 public interface DelayedPriorityMailQueueContract extends DelayedMailQueueContract, PriorityMailQueueContract {
 
     @Override
@@ -49,9 +53,10 @@ public interface DelayedPriorityMailQueueContract extends DelayedMailQueueContra
 
         Thread.sleep(unit.toMillis(2 * delay));
 
-        MailQueue.MailQueueItem item1 = getMailQueue().deQueue();
+        Iterator<MailQueue.MailQueueItem> mailQueueItems = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.elastic()).toIterable().iterator();
+        MailQueue.MailQueueItem item1 = mailQueueItems.next();
         item1.done(true);
-        MailQueue.MailQueueItem item2 = getMailQueue().deQueue();
+        MailQueue.MailQueueItem item2 = mailQueueItems.next();
         item2.done(true);
 
         assertThat(item1.getMail().getName()).isEqualTo("name2");
@@ -74,9 +79,10 @@ public interface DelayedPriorityMailQueueContract extends DelayedMailQueueContra
             delay,
             unit);
 
-        MailQueue.MailQueueItem item1 = getMailQueue().deQueue();
+        Iterator<MailQueue.MailQueueItem> mailQueueItems = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.elastic()).toIterable().iterator();
+        MailQueue.MailQueueItem item1 = mailQueueItems.next();
         item1.done(true);
-        MailQueue.MailQueueItem item2 = getMailQueue().deQueue();
+        MailQueue.MailQueueItem item2 = mailQueueItems.next();
         item2.done(true);
 
         assertThat(item1.getMail().getName()).isEqualTo("name1");
diff --git a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java
index 36ba41a..d9c615e 100644
--- a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java
+++ b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java
@@ -31,32 +31,32 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import java.io.Serializable;
 import java.time.Duration;
 import java.util.Date;
+import java.util.Iterator;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 import javax.mail.internet.MimeMessage;
 
 import org.apache.james.core.MailAddress;
 import org.apache.james.core.MaybeSender;
 import org.apache.james.core.builder.MimeMessageBuilder;
-import org.apache.james.junit.ExecutorExtension;
 import org.apache.james.util.concurrency.ConcurrentTestRunner;
 import org.apache.mailet.Attribute;
 import org.apache.mailet.Mail;
 import org.apache.mailet.PerRecipientHeaders;
 import org.apache.mailet.base.test.FakeMail;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
 
 import com.github.fge.lambdas.Throwing;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Strings;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
-@ExtendWith(ExecutorExtension.class)
 public interface MailQueueContract {
 
     MailQueue getMailQueue();
@@ -77,7 +77,7 @@ public interface MailQueueContract {
             .build();
         enQueue(mail);
 
-        MailQueue.MailQueueItem mailQueueItem = getMailQueue().deQueue();
+        MailQueue.MailQueueItem mailQueueItem = Flux.from(getMailQueue().deQueue()).blockFirst();
         assertThat(mailQueueItem.getMail().getName())
             .isEqualTo(name);
     }
@@ -89,7 +89,7 @@ public interface MailQueueContract {
             .recipients(RECIPIENT1, RECIPIENT2)
             .build());
 
-        MailQueue.MailQueueItem mailQueueItem = getMailQueue().deQueue();
+        MailQueue.MailQueueItem mailQueueItem = Flux.from(getMailQueue().deQueue()).blockFirst();
         assertThat(mailQueueItem.getMail().getRecipients())
             .containsOnly(RECIPIENT1, RECIPIENT2);
     }
@@ -104,7 +104,7 @@ public interface MailQueueContract {
             .lastUpdated(new Date())
             .build());
 
-        MailQueue.MailQueueItem mailQueueItem = getMailQueue().deQueue();
+        MailQueue.MailQueueItem mailQueueItem = Flux.from(getMailQueue().deQueue()).blockFirst();
         assertThat(mailQueueItem.getMail().getMaybeSender())
             .isEqualTo(MaybeSender.nullSender());
     }
@@ -118,7 +118,7 @@ public interface MailQueueContract {
             .lastUpdated(new Date())
             .build());
 
-        MailQueue.MailQueueItem mailQueueItem = getMailQueue().deQueue();
+        MailQueue.MailQueueItem mailQueueItem = Flux.from(getMailQueue().deQueue()).blockFirst();
         assertThat(mailQueueItem.getMail().getMaybeSender())
             .isEqualTo(MaybeSender.nullSender());
     }
@@ -130,7 +130,7 @@ public interface MailQueueContract {
             .sender(SENDER)
             .build());
 
-        MailQueue.MailQueueItem mailQueueItem = getMailQueue().deQueue();
+        MailQueue.MailQueueItem mailQueueItem = Flux.from(getMailQueue().deQueue()).blockFirst();
         assertThat(mailQueueItem.getMail().getMaybeSender())
             .isEqualTo(MaybeSender.of(SENDER));
     }
@@ -143,7 +143,7 @@ public interface MailQueueContract {
             .mimeMessage(originalMimeMessage)
             .build());
 
-        MailQueue.MailQueueItem mailQueueItem = getMailQueue().deQueue();
+        MailQueue.MailQueueItem mailQueueItem = Flux.from(getMailQueue().deQueue()).blockFirst();
         assertThat(asString(mailQueueItem.getMail().getMessage()))
             .isEqualTo(asString(originalMimeMessage));
     }
@@ -156,7 +156,7 @@ public interface MailQueueContract {
             .attribute(attribute)
             .build());
 
-        MailQueue.MailQueueItem mailQueueItem = getMailQueue().deQueue();
+        MailQueue.MailQueueItem mailQueueItem = Flux.from(getMailQueue().deQueue()).blockFirst();
         assertThat(mailQueueItem.getMail().getAttribute(attribute.getName()))
             .contains(attribute);
     }
@@ -169,7 +169,7 @@ public interface MailQueueContract {
             .errorMessage(errorMessage)
             .build());
 
-        MailQueue.MailQueueItem mailQueueItem = getMailQueue().deQueue();
+        MailQueue.MailQueueItem mailQueueItem = Flux.from(getMailQueue().deQueue()).blockFirst();
         assertThat(mailQueueItem.getMail().getErrorMessage())
             .isEqualTo(errorMessage);
     }
@@ -182,7 +182,7 @@ public interface MailQueueContract {
             .state(state)
             .build());
 
-        MailQueue.MailQueueItem mailQueueItem = getMailQueue().deQueue();
+        MailQueue.MailQueueItem mailQueueItem = Flux.from(getMailQueue().deQueue()).blockFirst();
         assertThat(mailQueueItem.getMail().getState())
             .isEqualTo(state);
     }
@@ -195,7 +195,7 @@ public interface MailQueueContract {
             .remoteAddr(remoteAddress)
             .build());
 
-        MailQueue.MailQueueItem mailQueueItem = getMailQueue().deQueue();
+        MailQueue.MailQueueItem mailQueueItem = Flux.from(getMailQueue().deQueue()).blockFirst();
         assertThat(mailQueueItem.getMail().getRemoteAddr())
             .isEqualTo(remoteAddress);
     }
@@ -208,7 +208,7 @@ public interface MailQueueContract {
             .remoteHost(remoteHost)
             .build());
 
-        MailQueue.MailQueueItem mailQueueItem = getMailQueue().deQueue();
+        MailQueue.MailQueueItem mailQueueItem = Flux.from(getMailQueue().deQueue()).blockFirst();
         assertThat(mailQueueItem.getMail().getRemoteHost())
             .isEqualTo(remoteHost);
     }
@@ -221,7 +221,7 @@ public interface MailQueueContract {
             .lastUpdated(lastUpdated)
             .build());
 
-        MailQueue.MailQueueItem mailQueueItem = getMailQueue().deQueue();
+        MailQueue.MailQueueItem mailQueueItem = Flux.from(getMailQueue().deQueue()).blockFirst();
         assertThat(mailQueueItem.getMail().getLastUpdated())
             .isEqualTo(lastUpdated);
     }
@@ -233,7 +233,7 @@ public interface MailQueueContract {
             .name(expectedName)
             .build());
 
-        MailQueue.MailQueueItem mailQueueItem = getMailQueue().deQueue();
+        MailQueue.MailQueueItem mailQueueItem = Flux.from(getMailQueue().deQueue()).blockFirst();
         assertThat(mailQueueItem.getMail().getName())
             .isEqualTo(expectedName);
     }
@@ -249,7 +249,7 @@ public interface MailQueueContract {
             .addHeaderForRecipient(header, RECIPIENT1)
             .build());
 
-        MailQueue.MailQueueItem mailQueueItem = getMailQueue().deQueue();
+        MailQueue.MailQueueItem mailQueueItem = Flux.from(getMailQueue().deQueue()).blockFirst();
         assertThat(mailQueueItem.getMail().getPerRecipientSpecificHeaders()
             .getHeadersForRecipient(RECIPIENT1))
             .containsOnly(header);
@@ -263,7 +263,7 @@ public interface MailQueueContract {
                 .attribute(attribute)
                 .build());
 
-        MailQueue.MailQueueItem mailQueueItem = getMailQueue().deQueue();
+        MailQueue.MailQueueItem mailQueueItem = Flux.from(getMailQueue().deQueue()).blockFirst();
         assertThat(mailQueueItem.getMail().getAttribute(attribute.getName()))
             .hasValueSatisfying(item -> {
                 assertThat(item)
@@ -284,9 +284,10 @@ public interface MailQueueContract {
             .name(secondExpectedName)
             .build());
 
-        MailQueue.MailQueueItem mailQueueItem1 = getMailQueue().deQueue();
+        Iterator<MailQueue.MailQueueItem> items = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.elastic()).toIterable().iterator();
+        MailQueue.MailQueueItem mailQueueItem1 = items.next();
         mailQueueItem1.done(true);
-        MailQueue.MailQueueItem mailQueueItem2 = getMailQueue().deQueue();
+        MailQueue.MailQueueItem mailQueueItem2 = items.next();
         mailQueueItem2.done(true);
         assertThat(mailQueueItem1.getMail().getName()).isEqualTo(firstExpectedName);
         assertThat(mailQueueItem2.getMail().getName()).isEqualTo(secondExpectedName);
@@ -301,8 +302,10 @@ public interface MailQueueContract {
             .name("name2")
             .build());
 
-        MailQueue.MailQueueItem mailQueueItem1 = getMailQueue().deQueue();
-        MailQueue.MailQueueItem mailQueueItem2 = getMailQueue().deQueue();
+        Iterator<MailQueue.MailQueueItem> items = Flux.from(getMailQueue().deQueue())
+            .subscribeOn(Schedulers.elastic()).toIterable().iterator();
+        MailQueue.MailQueueItem mailQueueItem1 = items.next();
+        MailQueue.MailQueueItem mailQueueItem2 = items.next();
         mailQueueItem1.done(true);
         mailQueueItem2.done(true);
         assertThat(mailQueueItem1.getMail().getName()).isEqualTo("name1");
@@ -319,8 +322,9 @@ public interface MailQueueContract {
             .name("name2")
             .build());
 
-        MailQueue.MailQueueItem mailQueueItem1 = getMailQueue().deQueue();
-        MailQueue.MailQueueItem mailQueueItem2 = getMailQueue().deQueue();
+        Iterator<MailQueue.MailQueueItem> items = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.elastic()).toIterable().iterator();
+        MailQueue.MailQueueItem mailQueueItem1 = items.next();
+        MailQueue.MailQueueItem mailQueueItem2 = items.next();
         mailQueueItem2.done(true);
         mailQueueItem1.done(true);
         assertThat(mailQueueItem1.getMail().getName()).isEqualTo("name1");
@@ -333,46 +337,47 @@ public interface MailQueueContract {
             .name("name1")
             .build());
 
-        MailQueue.MailQueueItem mailQueueItem1 = getMailQueue().deQueue();
+        Iterator<MailQueue.MailQueueItem> items = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.elastic()).toIterable().iterator();
+        MailQueue.MailQueueItem mailQueueItem1 = items.next();
         mailQueueItem1.done(false);
-        MailQueue.MailQueueItem mailQueueItem2 = getMailQueue().deQueue();
+        MailQueue.MailQueueItem mailQueueItem2 = items.next();
         mailQueueItem2.done(true);
         assertThat(mailQueueItem1.getMail().getName()).isEqualTo("name1");
         assertThat(mailQueueItem2.getMail().getName()).isEqualTo("name1");
     }
 
     @Test
-    default void dequeueShouldNotReturnInProcessingEmails(ExecutorService executorService) throws Exception {
+    default void dequeueShouldNotReturnInProcessingEmails() throws Exception {
         enQueue(defaultMail()
             .name("name")
             .build());
 
-        getMailQueue().deQueue();
+        LinkedBlockingQueue<MailQueue.MailQueueItem> queue = new LinkedBlockingQueue<>(1);
+        Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.elastic()).subscribe(Throwing.consumer(queue::put));
+        queue.take();
 
-        Future<?> future = executorService.submit(Throwing.runnable(() -> getMailQueue().deQueue()));
-        assertThatThrownBy(() -> future.get(2, TimeUnit.SECONDS))
-            .isInstanceOf(TimeoutException.class);
+        assertThat(queue.poll(2, TimeUnit.SECONDS)).isNull();
     }
 
     @Test
-    default void deQueueShouldBlockWhenNoMail(ExecutorService executorService) {
-        Future<?> future = executorService.submit(Throwing.runnable(() -> getMailQueue().deQueue()));
+    default void deQueueShouldBlockWhenNoMail() {
+        Mono<MailQueue.MailQueueItem> item = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.elastic()).next();
 
-        assertThatThrownBy(() -> future.get(2, TimeUnit.SECONDS))
-            .isInstanceOf(TimeoutException.class);
+        assertThatThrownBy(() -> item.block(Duration.ofSeconds(2)))
+            .isInstanceOf(RuntimeException.class);
     }
 
     @Test
-    default void deQueueShouldWaitForAMailToBeEnqueued(ExecutorService executorService) throws Exception {
+    default void deQueueShouldWaitForAMailToBeEnqueued() throws Exception {
         MailQueue testee = getMailQueue();
 
         Mail mail = defaultMail()
             .name("name")
             .build();
-        Future<MailQueue.MailQueueItem> tryDequeue = executorService.submit(testee::deQueue);
+        Mono<MailQueue.MailQueueItem> item = Flux.from(testee.deQueue()).next();
         testee.enQueue(mail);
 
-        assertThat(tryDequeue.get().getMail().getName()).isEqualTo("name");
+        assertThat(item.block(Duration.ofMinutes(1)).getMail().getName()).isEqualTo("name");
     }
 
     @Test
@@ -384,6 +389,17 @@ public interface MailQueueContract {
         int threadCount = 10;
         int operationCount = 10;
         int totalDequeuedMessages = 50;
+        LinkedBlockingQueue<MailQueue.MailQueueItem> itemQueue = new LinkedBlockingQueue<>(1);
+        Flux.from(testee
+            .deQueue())
+            .subscribeOn(Schedulers.elastic())
+            .flatMap(e -> {
+                try {
+                    itemQueue.put(e);
+                } catch (InterruptedException ignored) {
+                }
+                return Mono.empty();
+            }).subscribe();
         ConcurrentTestRunner.builder()
             .operation((threadNumber, step) -> {
                 if (step % 2 == 0) {
@@ -391,7 +407,7 @@ public interface MailQueueContract {
                         .name("name" + threadNumber + "-" + step)
                         .build());
                 } else {
-                    MailQueue.MailQueueItem mailQueueItem = testee.deQueue();
+                    MailQueue.MailQueueItem mailQueueItem = itemQueue.take();
                     dequeuedMails.add(mailQueueItem.getMail());
                     mailQueueItem.done(true);
                 }
@@ -416,6 +432,8 @@ public interface MailQueueContract {
         int threadCount = 10;
         int operationCount = 15;
         int totalDequeuedMessages = 50;
+        LinkedBlockingDeque<MailQueue.MailQueueItem> deque = new LinkedBlockingDeque<>();
+        Flux.from(testee.deQueue()).subscribeOn(Schedulers.elastic()).doOnNext(deque::addFirst).subscribe();
         ConcurrentTestRunner.builder()
             .operation((threadNumber, step) -> {
                 if (step % 3 == 0) {
@@ -424,11 +442,11 @@ public interface MailQueueContract {
                         .build());
                 }
                 if (step % 3 == 1) {
-                    MailQueue.MailQueueItem mailQueueItem = testee.deQueue();
+                    MailQueue.MailQueueItem mailQueueItem = deque.takeLast();
                     mailQueueItem.done(false);
                 }
                 if (step % 3 == 2) {
-                    MailQueue.MailQueueItem mailQueueItem = testee.deQueue();
+                    MailQueue.MailQueueItem mailQueueItem = deque.takeLast();
                     dequeuedMails.add(mailQueueItem.getMail());
                     mailQueueItem.done(true);
                 }
diff --git a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueMetricContract.java b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueMetricContract.java
index c315efd..9046b6c 100644
--- a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueMetricContract.java
+++ b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueMetricContract.java
@@ -32,12 +32,15 @@ import javax.mail.MessagingException;
 import org.apache.james.metrics.api.Gauge;
 import org.apache.mailet.base.test.FakeMail;
 import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 
 import com.github.fge.lambdas.Throwing;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
 @ExtendWith(MailQueueMetricExtension.class)
 public interface MailQueueMetricContract extends MailQueueContract {
@@ -55,9 +58,13 @@ public interface MailQueueMetricContract extends MailQueueContract {
     }
 
     default void deQueueMail(Integer times) {
-        IntStream
-            .rangeClosed(1, times)
-            .forEach(Throwing.intConsumer(time -> getMailQueue().deQueue().done(true)));
+        Flux.from(getMailQueue().deQueue())
+            .take(times)
+            .flatMap(x -> Mono.fromCallable(() -> {
+                x.done(true);
+                return x;
+            }))
+            .blockLast();
     }
 
     @Test
@@ -124,6 +131,7 @@ public interface MailQueueMetricContract extends MailQueueContract {
         verifyNoMoreInteractions(testSystem.getSpyDequeuedMailsTimeMetric());
     }
 
+    @Disabled("what do we want to measure ?")
     @Test
     default void dequeueShouldPublishDequeueTimeMetric(MailQueueMetricExtension.MailQueueMetricTestSystem testSystem) throws Exception {
         enQueueMail(2);
@@ -132,6 +140,7 @@ public interface MailQueueMetricContract extends MailQueueContract {
         verify(testSystem.getSpyDequeuedMailsTimeMetric(), times(2)).stopAndPublish();
     }
 
+    @Disabled("what do we want to measure ?")
     @Test
     default void dequeueShouldNotPublishEnqueueTimeMetric(MailQueueMetricExtension.MailQueueMetricTestSystem testSystem) throws Exception {
         enQueueMail(2);
diff --git a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/ManageableMailQueueContract.java b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/ManageableMailQueueContract.java
index ffa826f..336471a 100644
--- a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/ManageableMailQueueContract.java
+++ b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/ManageableMailQueueContract.java
@@ -30,6 +30,8 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.assertj.core.api.SoftAssertions.assertSoftly;
 
+import java.time.Duration;
+
 import javax.mail.internet.MimeMessage;
 
 import org.apache.james.core.builder.MimeMessageBuilder;
@@ -40,6 +42,7 @@ import org.junit.jupiter.api.Test;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterators;
+import reactor.core.publisher.Flux;
 
 public interface ManageableMailQueueContract extends MailQueueContract {
 
@@ -75,7 +78,7 @@ public interface ManageableMailQueueContract extends MailQueueContract {
     default void dequeueShouldDecreaseQueueSize() throws Exception {
         enQueue(defaultMail().name("name").build());
 
-        getManageableMailQueue().deQueue().done(true);
+        Flux.from(getManageableMailQueue().deQueue()).blockFirst().done(true);
 
         long size = getManageableMailQueue().getSize();
 
@@ -86,7 +89,7 @@ public interface ManageableMailQueueContract extends MailQueueContract {
     default void noAckShouldNotDecreaseSize() throws Exception {
         enQueue(defaultMail().name("name").build());
 
-        getManageableMailQueue().deQueue().done(false);
+        Flux.from(getManageableMailQueue().deQueue()).blockFirst().done(false);
 
         long size = getManageableMailQueue().getSize();
 
@@ -97,7 +100,7 @@ public interface ManageableMailQueueContract extends MailQueueContract {
     default void processedMailsShouldNotDecreaseSize() throws Exception {
         enQueue(defaultMail().name("name").build());
 
-        getManageableMailQueue().deQueue();
+        Flux.from(getManageableMailQueue().deQueue());
 
         long size = getManageableMailQueue().getSize();
 
@@ -158,7 +161,7 @@ public interface ManageableMailQueueContract extends MailQueueContract {
 
         getManageableMailQueue().browse();
 
-        assertThatCode(() -> getManageableMailQueue().deQueue()).doesNotThrowAnyException();
+        assertThatCode(() -> Flux.from(getManageableMailQueue().deQueue())).doesNotThrowAnyException();
 
     }
 
@@ -176,7 +179,7 @@ public interface ManageableMailQueueContract extends MailQueueContract {
 
         ManageableMailQueue.MailQueueIterator items = getManageableMailQueue().browse();
 
-        getManageableMailQueue().deQueue();
+        Flux.from(getManageableMailQueue().deQueue());
 
         assertThatCode(() ->  Iterators.consumingIterator(items)).doesNotThrowAnyException();
     }
@@ -196,7 +199,7 @@ public interface ManageableMailQueueContract extends MailQueueContract {
         ManageableMailQueue.MailQueueIterator items = getManageableMailQueue().browse();
         items.next();
 
-        assertThatCode(() -> getManageableMailQueue().deQueue()).doesNotThrowAnyException();
+        assertThatCode(() -> Flux.from(getManageableMailQueue().deQueue())).doesNotThrowAnyException();
 
     }
 
@@ -206,7 +209,7 @@ public interface ManageableMailQueueContract extends MailQueueContract {
             .name("name1")
             .build());
 
-        assertThat(getManageableMailQueue().deQueue())
+        assertThat(Flux.from(getManageableMailQueue().deQueue()).blockFirst(Duration.ofMinutes(1)))
             .isInstanceOf(MailQueueItemDecoratorFactory.MailQueueItemDecorator.class);
     }
 
@@ -225,7 +228,7 @@ public interface ManageableMailQueueContract extends MailQueueContract {
         ManageableMailQueue.MailQueueIterator items = getManageableMailQueue().browse();
         items.next();
 
-        getManageableMailQueue().deQueue();
+        Flux.from(getManageableMailQueue().deQueue());
 
         assertThatCode(() ->  Iterators.consumingIterator(items)).doesNotThrowAnyException();
     }
@@ -492,7 +495,7 @@ public interface ManageableMailQueueContract extends MailQueueContract {
         ManageableMailQueue.MailQueueIterator items = getManageableMailQueue().browse();
         items.next();
 
-        MailQueue.MailQueueItem mailQueueItem = getManageableMailQueue().deQueue();
+        MailQueue.MailQueueItem mailQueueItem = Flux.from(getManageableMailQueue().deQueue()).blockFirst();
 
         assertThat(mailQueueItem.getMail().getName()).isEqualTo("name1");
     }
diff --git a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/PriorityMailQueueContract.java b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/PriorityMailQueueContract.java
index 01f50bd..c8da0a6 100644
--- a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/PriorityMailQueueContract.java
+++ b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/PriorityMailQueueContract.java
@@ -22,16 +22,16 @@ package org.apache.james.queue.api;
 import static org.apache.james.queue.api.Mails.defaultMail;
 import static org.assertj.core.api.Assertions.assertThat;
 
-import java.util.stream.IntStream;
+import java.util.Iterator;
 
 import org.apache.mailet.Attribute;
 import org.apache.mailet.AttributeValue;
 import org.apache.mailet.Mail;
 import org.junit.jupiter.api.Test;
 
-import com.github.fge.lambdas.Throwing;
-import com.github.steveash.guavate.Guavate;
-import com.google.common.collect.ImmutableList;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
 public interface PriorityMailQueueContract {
 
@@ -89,13 +89,16 @@ public interface PriorityMailQueueContract {
             .attribute(mailPriority(5))
             .build());
 
-        ImmutableList<MailQueue.MailQueueItem> items = IntStream.range(1, 11).boxed()
-            .map(Throwing.function(i -> {
-                MailQueue.MailQueueItem item = getMailQueue().deQueue();
-                item.done(true);
-                return item;
-            }))
-            .collect(Guavate.toImmutableList());
+        Iterable<MailQueue.MailQueueItem> items = Flux.from(getMailQueue().deQueue()).take(10)
+            .flatMap(item -> {
+                try {
+                    item.done(true);
+                    return Mono.just(item);
+                } catch (MailQueue.MailQueueException e) {
+                    return Mono.error(e);
+                }
+            })
+            .toIterable();
 
         assertThat(items)
             .extracting(MailQueue.MailQueueItem::getMail)
@@ -114,9 +117,10 @@ public interface PriorityMailQueueContract {
             .attribute(mailPriority(1))
             .build());
 
-        MailQueue.MailQueueItem mailQueueItem1 = getMailQueue().deQueue();
+        Iterator<MailQueue.MailQueueItem> mailQueueItems = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.elastic()).toIterable().iterator();
+        MailQueue.MailQueueItem mailQueueItem1 = mailQueueItems.next();
         mailQueueItem1.done(true);
-        MailQueue.MailQueueItem mailQueueItem2 = getMailQueue().deQueue();
+        MailQueue.MailQueueItem mailQueueItem2 = mailQueueItems.next();
         mailQueueItem2.done(true);
         assertThat(mailQueueItem1.getMail().getName()).isEqualTo("name1");
         assertThat(mailQueueItem2.getMail().getName()).isEqualTo("name0");
@@ -133,9 +137,10 @@ public interface PriorityMailQueueContract {
             .attribute(mailPriority(8))
             .build());
 
-        MailQueue.MailQueueItem mailQueueItem1 = getMailQueue().deQueue();
+        Iterator<MailQueue.MailQueueItem> mailQueueItems = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.elastic()).toIterable().iterator();
+        MailQueue.MailQueueItem mailQueueItem1 = mailQueueItems.next();
         mailQueueItem1.done(true);
-        MailQueue.MailQueueItem mailQueueItem2 = getMailQueue().deQueue();
+        MailQueue.MailQueueItem mailQueueItem2 = mailQueueItems.next();
         mailQueueItem2.done(true);
         assertThat(mailQueueItem1.getMail().getName()).isEqualTo("name0");
         assertThat(mailQueueItem2.getMail().getName()).isEqualTo("name1");
@@ -156,11 +161,12 @@ public interface PriorityMailQueueContract {
             .attribute(mailPriority(6))
             .build());
 
-        MailQueue.MailQueueItem mailQueueItem1 = getMailQueue().deQueue();
+        Iterator<MailQueue.MailQueueItem> mailQueueItems = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.elastic()).toIterable().iterator();
+        MailQueue.MailQueueItem mailQueueItem1 = mailQueueItems.next();
         mailQueueItem1.done(true);
-        MailQueue.MailQueueItem mailQueueItem2 = getMailQueue().deQueue();
+        MailQueue.MailQueueItem mailQueueItem2 = mailQueueItems.next();
         mailQueueItem2.done(true);
-        MailQueue.MailQueueItem mailQueueItem3 = getMailQueue().deQueue();
+        MailQueue.MailQueueItem mailQueueItem3 = mailQueueItems.next();
         mailQueueItem3.done(true);
         assertThat(mailQueueItem1.getMail().getName()).isEqualTo("name3");
         assertThat(mailQueueItem2.getMail().getName()).isEqualTo("name1");
@@ -181,11 +187,12 @@ public interface PriorityMailQueueContract {
             .attribute(mailPriority(6))
             .build());
 
-        MailQueue.MailQueueItem mailQueueItem1 = getMailQueue().deQueue();
+        Iterator<MailQueue.MailQueueItem> mailQueueItems = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.elastic()).toIterable().iterator();
+        MailQueue.MailQueueItem mailQueueItem1 = mailQueueItems.next();
         mailQueueItem1.done(true);
-        MailQueue.MailQueueItem mailQueueItem2 = getMailQueue().deQueue();
+        MailQueue.MailQueueItem mailQueueItem2 = mailQueueItems.next();
         mailQueueItem2.done(true);
-        MailQueue.MailQueueItem mailQueueItem3 = getMailQueue().deQueue();
+        MailQueue.MailQueueItem mailQueueItem3 = mailQueueItems.next();
         mailQueueItem3.done(true);
         assertThat(mailQueueItem1.getMail().getName()).isEqualTo("name3");
         assertThat(mailQueueItem2.getMail().getName()).isEqualTo("name1");
@@ -198,7 +205,7 @@ public interface PriorityMailQueueContract {
             .name("name1")
             .build());
 
-        MailQueue.MailQueueItem mailQueueItem = getMailQueue().deQueue();
+        MailQueue.MailQueueItem mailQueueItem = Flux.from(getMailQueue().deQueue()).blockFirst();
         assertThat(mailQueueItem.getMail().getName()).isEqualTo("name1");
     }
 
diff --git a/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueue.java b/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueue.java
index 613504b..5052efd 100644
--- a/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueue.java
+++ b/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueue.java
@@ -63,6 +63,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.github.fge.lambdas.Throwing;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
 /**
  * {@link ManageableMailQueue} implementation which use the fs to store {@link Mail}'s
@@ -88,6 +90,7 @@ public class FileMailQueue implements ManageableMailQueue {
     private static final int SPLITCOUNT = 10;
     private static final SecureRandom RANDOM = new SecureRandom();
     private final String queueName;
+    private final Flux<MailQueueItem> flux;
 
     public FileMailQueue(MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory, File parentDir, String queuename, boolean sync) throws IOException {
         this.mailQueueItemDecoratorFactory = mailQueueItemDecoratorFactory;
@@ -96,6 +99,9 @@ public class FileMailQueue implements ManageableMailQueue {
         this.queueDir = new File(parentDir, queueName);
         this.queueDirName = queueDir.getAbsolutePath();
         init();
+        this.flux = Mono.defer(this::deQueueOneItem)
+            .repeat()
+            .limitRate(1);
     }
 
     @Override
@@ -231,7 +237,11 @@ public class FileMailQueue implements ManageableMailQueue {
     }
 
     @Override
-    public MailQueueItem deQueue() throws MailQueueException {
+    public Flux<MailQueueItem> deQueue() {
+        return flux;
+    }
+
+    private Mono<MailQueueItem> deQueueOneItem() {
         try {
             FileItem item = null;
             String k = null;
@@ -273,16 +283,16 @@ public class FileMailQueue implements ManageableMailQueue {
                             LifecycleUtil.dispose(mail);
                         }
                     };
-                    return mailQueueItemDecoratorFactory.decorate(fileMailQueueItem);
+                    return Mono.just(mailQueueItemDecoratorFactory.decorate(fileMailQueueItem));
                 }
                 // TODO: Think about exception handling in detail
             } catch (IOException | ClassNotFoundException | MessagingException e) {
-                throw new MailQueueException("Unable to dequeue", e);
+                return Mono.error(new MailQueueException("Unable to dequeue", e));
             }
 
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
-            throw new MailQueueException("Unable to dequeue", e);
+            return Mono.error(new MailQueueException("Unable to dequeue", e));
         }
     }
 
diff --git a/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java b/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java
index b783a40..e1a91af 100644
--- a/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java
+++ b/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java
@@ -83,6 +83,8 @@ import com.google.common.base.Joiner;
 import com.google.common.base.Splitter;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterators;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
 /**
  * <p>
@@ -97,6 +99,8 @@ import com.google.common.collect.Iterators;
  */
 public class JMSMailQueue implements ManageableMailQueue, JMSSupport, MailPrioritySupport, Disposable {
 
+    private final Flux<MailQueueItem> flux;
+
     protected static void closeSession(Session session) {
         if (session != null) {
             try {
@@ -195,6 +199,7 @@ public class JMSMailQueue implements ManageableMailQueue, JMSSupport, MailPriori
         } catch (JMSException e) {
             throw new RuntimeException(e);
         }
+        flux = Mono.defer(this::deQueueOneItem).repeat();
     }
 
     @Override
@@ -215,37 +220,39 @@ public class JMSMailQueue implements ManageableMailQueue, JMSSupport, MailPriori
      * </p>
      */
     @Override
-    public MailQueueItem deQueue() throws MailQueueException {
+    public Flux<MailQueueItem> deQueue() {
+        return flux;
+    }
+
+    private Mono<MailQueueItem> deQueueOneItem() {
         Session session = null;
         MessageConsumer consumer = null;
+        TimeMetric timeMetric = metricFactory.timer(DEQUEUED_TIMER_METRIC_NAME_PREFIX + queueName);
+        try {
+            session = connection.createSession(true, Session.SESSION_TRANSACTED);
+            Queue queue = session.createQueue(queueName);
+            consumer = session.createConsumer(queue, getMessageSelector());
 
-        while (true) {
-            TimeMetric timeMetric = metricFactory.timer(DEQUEUED_TIMER_METRIC_NAME_PREFIX + queueName);
-            try {
-                session = connection.createSession(true, Session.SESSION_TRANSACTED);
-                Queue queue = session.createQueue(queueName);
-                consumer = session.createConsumer(queue, getMessageSelector());
-
-                Message message = consumer.receive(10000);
+            Message message = consumer.receive(10000);
 
-                if (message != null) {
-                    dequeuedMailsMetric.increment();
-                    return createMailQueueItem(session, consumer, message);
-                } else {
-                    session.commit();
-                    closeConsumer(consumer);
-                    closeSession(session);
-                }
-
-            } catch (Exception e) {
-                rollback(session);
+            if (message != null) {
+                dequeuedMailsMetric.increment();
+                return Mono.just(createMailQueueItem(session, consumer, message));
+            } else {
+                session.commit();
                 closeConsumer(consumer);
                 closeSession(session);
-                throw new MailQueueException("Unable to dequeue next message", e);
-            } finally {
-                timeMetric.stopAndPublish();
             }
+
+        } catch (Exception e) {
+            rollback(session);
+            closeConsumer(consumer);
+            closeSession(session);
+            return Mono.error(new MailQueueException("Unable to dequeue next message", e));
+        } finally {
+            timeMetric.stopAndPublish();
         }
+        return Mono.empty();
     }
 
     @Override
diff --git a/server/queue/queue-memory/pom.xml b/server/queue/queue-memory/pom.xml
index 85adf5e..8f10a8a 100644
--- a/server/queue/queue-memory/pom.xml
+++ b/server/queue/queue-memory/pom.xml
@@ -46,6 +46,11 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>javax.inject</groupId>
             <artifactId>javax.inject</artifactId>
         </dependency>
diff --git a/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java b/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java
index 9704906..5c7551f 100644
--- a/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java
+++ b/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java
@@ -51,6 +51,8 @@ import com.google.common.base.Objects;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
 public class MemoryMailQueueFactory implements MailQueueFactory<ManageableMailQueue> {
 
@@ -85,12 +87,17 @@ public class MemoryMailQueueFactory implements MailQueueFactory<ManageableMailQu
         private final LinkedBlockingDeque<MemoryMailQueueItem> inProcessingMailItems;
         private final MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory;
         private final String name;
+        private final Flux<MailQueueItem> flux;
 
         public MemoryMailQueue(String name, MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory) {
             this.mailItems = new DelayQueue<>();
             this.inProcessingMailItems = new LinkedBlockingDeque<>();
             this.name = name;
             this.mailQueueItemDecoratorFactory = mailQueueItemDecoratorFactory;
+            this.flux = Mono.fromCallable(mailItems::take)
+                .repeat()
+                .flatMap(item -> Mono.just(inProcessingMailItems.add(item)).thenReturn(item))
+                .map(mailQueueItemDecoratorFactory::decorate);
         }
 
         @Override
@@ -136,10 +143,8 @@ public class MemoryMailQueueFactory implements MailQueueFactory<ManageableMailQu
         }
 
         @Override
-        public MailQueueItem deQueue() throws MailQueueException, InterruptedException {
-            MemoryMailQueueItem item = mailItems.take();
-            inProcessingMailItems.add(item);
-            return mailQueueItemDecoratorFactory.decorate(item);
+        public Flux<MailQueueItem> deQueue() {
+            return flux;
         }
 
         public Mail getLastMail() throws MailQueueException, InterruptedException {
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
index 104fed9..d053096 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
@@ -22,7 +22,6 @@ package org.apache.james.queue.rabbitmq;
 import static org.apache.james.queue.api.MailQueue.DEQUEUED_METRIC_NAME_PREFIX;
 
 import java.io.IOException;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.function.Consumer;
 import java.util.function.Function;
 
@@ -33,15 +32,16 @@ import org.apache.james.queue.rabbitmq.view.api.DeleteCondition;
 import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
 import org.apache.mailet.Mail;
 
-import com.github.fge.lambdas.Throwing;
 import com.github.fge.lambdas.consumers.ThrowingConsumer;
 import com.rabbitmq.client.Delivery;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 import reactor.rabbitmq.AcknowledgableDelivery;
 
 class Dequeuer {
 
     private static final boolean REQUEUE = true;
-    private final LinkedBlockingQueue<AcknowledgableDelivery> messages;
+    private final Flux<AcknowledgableDelivery> flux;
 
     private static class RabbitMQMailQueueItem implements MailQueue.MailQueueItem {
         private final Consumer<Boolean> ack;
@@ -75,27 +75,23 @@ class Dequeuer {
         this.mailReferenceSerializer = serializer;
         this.mailQueueView = mailQueueView;
         this.dequeueMetric = metricFactory.generate(DEQUEUED_METRIC_NAME_PREFIX + name.asString());
-        this.messages = messageIterator(name, rabbitClient);
-    }
-
-    private LinkedBlockingQueue<AcknowledgableDelivery> messageIterator(MailQueueName name, RabbitClient rabbitClient) {
-        LinkedBlockingQueue<AcknowledgableDelivery> dequeue = new LinkedBlockingQueue<>(1);
-        rabbitClient
+        this.flux = rabbitClient
             .receive(name)
-            .filter(getResponse -> getResponse.getBody() != null)
-            .doOnNext(Throwing.consumer(dequeue::put))
-            .subscribe();
-        return dequeue;
+            .filter(getResponse -> getResponse.getBody() != null);
     }
 
-    MailQueue.MailQueueItem deQueue() throws MailQueue.MailQueueException, InterruptedException {
-        return loadItem(messages.take());
+    Flux<MailQueue.MailQueueItem> deQueue() {
+        return flux.flatMap(this::loadItem);
     }
 
-    private RabbitMQMailQueueItem loadItem(AcknowledgableDelivery response) throws MailQueue.MailQueueException {
-        Mail mail = loadMail(response);
-        ThrowingConsumer<Boolean> ack = ack(response, response.getEnvelope().getDeliveryTag(), mail);
-        return new RabbitMQMailQueueItem(ack, mail);
+    private Mono<RabbitMQMailQueueItem> loadItem(AcknowledgableDelivery response) {
+        try {
+            Mail mail = loadMail(response);
+            ThrowingConsumer<Boolean> ack = ack(response, response.getEnvelope().getDeliveryTag(), mail);
+            return Mono.just(new RabbitMQMailQueueItem(ack, mail));
+        } catch (MailQueue.MailQueueException e) {
+            return Mono.error(e);
+        }
     }
 
     private ThrowingConsumer<Boolean> ack(AcknowledgableDelivery response, long deliveryTag, Mail mail) {
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
index 1873313..3f77cf1 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
 
 import com.github.fge.lambdas.Throwing;
 import com.google.common.base.MoreObjects;
+import reactor.core.publisher.Flux;
 
 public class RabbitMQMailQueue implements ManageableMailQueue {
 
@@ -75,9 +76,9 @@ public class RabbitMQMailQueue implements ManageableMailQueue {
     }
 
     @Override
-    public MailQueueItem deQueue() {
-        return metricFactory.runPublishingTimerMetric(DEQUEUED_TIMER_METRIC_NAME_PREFIX + name.asString(),
-            Throwing.supplier(() -> decoratorFactory.decorate(dequeuer.deQueue())).sneakyThrow());
+    public Flux<MailQueueItem> deQueue() {
+        return dequeuer.deQueue()
+            .map(decoratorFactory::decorate);
     }
 
     @Override
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
index 94b68e2..20678fe 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
@@ -62,6 +62,8 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
 import com.github.fge.lambdas.Throwing;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
 public class RabbitMQMailQueueTest implements ManageableMailQueueContract, MailQueueMetricContract {
     private static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory();
@@ -237,8 +239,13 @@ public class RabbitMQMailQueueTest implements ManageableMailQueueContract, MailQ
     }
 
     private void dequeueMails(int times) {
-        ManageableMailQueue mailQueue = getManageableMailQueue();
-        IntStream.rangeClosed(1, times)
-            .forEach(Throwing.intConsumer(bucketId -> mailQueue.deQueue().done(true)));
+        Flux.from(getManageableMailQueue()
+            .deQueue())
+            .take(times)
+            .flatMap(mailQueueItem -> Mono.fromCallable(() -> {
+                mailQueueItem.done(true);
+                return mailQueueItem;
+            }))
+            .blockLast();
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 02/04: MAILBOX-342 Add missing cassandra modules

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 9fdb43bff99a94dbfb838c5597172fad09d6ec74
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Thu Mar 14 16:01:40 2019 +0700

    MAILBOX-342 Add missing cassandra modules
---
 .../mailbox/cassandra/mail/CassandraGenericMailboxMapperTest.java   | 2 ++
 .../james/mailbox/cassandra/mail/CassandraMailboxDAOTest.java       | 2 ++
 .../james/mailbox/cassandra/mail/CassandraMailboxMapperAclTest.java | 2 ++
 .../cassandra/mail/CassandraMailboxMapperConcurrencyTest.java       | 2 ++
 .../james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java    | 6 +++++-
 5 files changed, 13 insertions(+), 1 deletion(-)

diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraGenericMailboxMapperTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraGenericMailboxMapperTest.java
index 0de81d2..1e4ad2b 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraGenericMailboxMapperTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraGenericMailboxMapperTest.java
@@ -22,6 +22,7 @@ package org.apache.james.mailbox.cassandra.mail;
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.DockerCassandraRule;
 import org.apache.james.backends.cassandra.components.CassandraModule;
+import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
 import org.apache.james.mailbox.cassandra.ids.CassandraId;
 import org.apache.james.mailbox.cassandra.mail.utils.GuiceUtils;
 import org.apache.james.mailbox.cassandra.modules.CassandraAclModule;
@@ -46,6 +47,7 @@ public class CassandraGenericMailboxMapperTest extends MailboxMapperTest {
     @BeforeClass
     public static void setUpClass() {
         CassandraModule modules = CassandraModule.aggregateModules(
+            CassandraSchemaVersionModule.MODULE,
             CassandraAclModule.MODULE,
             CassandraMailboxModule.MODULE,
             CassandraModSeqModule.MODULE,
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAOTest.java
index ad4c36b..51cbe0f 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAOTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAOTest.java
@@ -27,6 +27,7 @@ import java.util.Optional;
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.CassandraClusterExtension;
 import org.apache.james.backends.cassandra.components.CassandraModule;
+import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
 import org.apache.james.mailbox.cassandra.ids.CassandraId;
 import org.apache.james.mailbox.cassandra.modules.CassandraAclModule;
 import org.apache.james.mailbox.cassandra.modules.CassandraMailboxModule;
@@ -46,6 +47,7 @@ class CassandraMailboxDAOTest {
     @RegisterExtension
     static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(
         CassandraModule.aggregateModules(
+            CassandraSchemaVersionModule.MODULE,
             CassandraMailboxModule.MODULE,
             CassandraAclModule.MODULE));
 
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperAclTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperAclTest.java
index 57b854f..7484657 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperAclTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperAclTest.java
@@ -22,6 +22,7 @@ package org.apache.james.mailbox.cassandra.mail;
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.DockerCassandraRule;
 import org.apache.james.backends.cassandra.components.CassandraModule;
+import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
 import org.apache.james.mailbox.cassandra.ids.CassandraId;
 import org.apache.james.mailbox.cassandra.mail.utils.GuiceUtils;
 import org.apache.james.mailbox.cassandra.modules.CassandraAclModule;
@@ -44,6 +45,7 @@ public class CassandraMailboxMapperAclTest extends MailboxMapperACLTest {
     @BeforeClass
     public static void setUpClass() {
         CassandraModule modules = CassandraModule.aggregateModules(
+            CassandraSchemaVersionModule.MODULE,
             CassandraAclModule.MODULE,
             CassandraMailboxModule.MODULE);
         cassandra = CassandraCluster.create(modules, cassandraServer.getHost());
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperConcurrencyTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperConcurrencyTest.java
index 44f035c..2776049 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperConcurrencyTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperConcurrencyTest.java
@@ -27,6 +27,7 @@ import java.util.List;
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.CassandraClusterExtension;
 import org.apache.james.backends.cassandra.components.CassandraModule;
+import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
 import org.apache.james.mailbox.cassandra.mail.utils.GuiceUtils;
 import org.apache.james.mailbox.cassandra.modules.CassandraAclModule;
 import org.apache.james.mailbox.cassandra.modules.CassandraMailboxModule;
@@ -48,6 +49,7 @@ class CassandraMailboxMapperConcurrencyTest {
     @RegisterExtension
     static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(
         CassandraModule.aggregateModules(
+            CassandraSchemaVersionModule.MODULE,
             CassandraMailboxModule.MODULE,
             CassandraAclModule.MODULE));
 
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java
index 4ffa3dd..64297f5 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java
@@ -30,6 +30,7 @@ import org.apache.james.backends.cassandra.DockerCassandraRule;
 import org.apache.james.backends.cassandra.components.CassandraModule;
 import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
 import org.apache.james.backends.cassandra.utils.CassandraUtils;
+import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
 import org.apache.james.mailbox.cassandra.ids.CassandraId;
 import org.apache.james.mailbox.cassandra.modules.CassandraAclModule;
 import org.apache.james.mailbox.cassandra.modules.CassandraMailboxModule;
@@ -72,7 +73,10 @@ public class CassandraMailboxMapperTest {
 
     @BeforeClass
     public static void setUpClass() {
-        CassandraModule modules = CassandraModule.aggregateModules(CassandraMailboxModule.MODULE, CassandraAclModule.MODULE);
+        CassandraModule modules = CassandraModule.aggregateModules(
+            CassandraMailboxModule.MODULE,
+            CassandraSchemaVersionModule.MODULE,
+            CassandraAclModule.MODULE);
         cassandra = CassandraCluster.create(modules, cassandraServer.getHost());
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 04/04: JAMES-2544 dequeueTime time is always 0 now that the queue is reactive

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 60c7f631d3e2c30b765deec2dbd16da20ff5ee91
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Mon Mar 11 16:13:41 2019 +0100

    JAMES-2544 dequeueTime time is always 0 now that the queue is reactive
---
 .../MailQueue-1490071879988-dashboard.json         | 226 ---------------------
 .../java/org/apache/james/queue/api/MailQueue.java |   1 -
 .../james/queue/api/MailQueueMetricContract.java   |  28 ---
 .../james/queue/api/MailQueueMetricExtension.java  |   3 -
 .../org/apache/james/queue/jms/JMSMailQueue.java   |   3 -
 5 files changed, 261 deletions(-)

diff --git a/grafana-reporting/MailQueue-1490071879988-dashboard.json b/grafana-reporting/MailQueue-1490071879988-dashboard.json
index 01c16a4..af03b4d 100644
--- a/grafana-reporting/MailQueue-1490071879988-dashboard.json
+++ b/grafana-reporting/MailQueue-1490071879988-dashboard.json
@@ -49,119 +49,6 @@
           "bars": false,
           "datasource": "${DS_JAMES_ES}",
           "fill": 1,
-          "id": 23,
-          "legend": {
-            "avg": false,
-            "current": false,
-            "max": false,
-            "min": false,
-            "show": true,
-            "total": false,
-            "values": false
-          },
-          "lines": true,
-          "linewidth": 1,
-          "links": [],
-          "nullPointMode": "connected",
-          "percentage": false,
-          "pointradius": 5,
-          "points": false,
-          "renderer": "flot",
-          "seriesOverrides": [],
-          "span": 2,
-          "stack": false,
-          "steppedLine": false,
-          "targets": [
-            {
-              "bucketAggs": [
-                {
-                  "field": "@timestamp",
-                  "id": "2",
-                  "settings": {
-                    "interval": "auto",
-                    "min_doc_count": 0,
-                    "trimEdges": 0
-                  },
-                  "type": "date_histogram"
-                }
-              ],
-              "dsType": "elasticsearch",
-              "metrics": [
-                {
-                  "field": "p99",
-                  "id": "1",
-                  "meta": {},
-                  "pipelineAgg": "3",
-                  "settings": {},
-                  "type": "avg"
-                },
-                {
-                  "field": "p50",
-                  "id": "3",
-                  "meta": {},
-                  "settings": {},
-                  "type": "avg"
-                },
-                {
-                  "field": "p75",
-                  "id": "4",
-                  "meta": {},
-                  "settings": {},
-                  "type": "avg"
-                },
-                {
-                  "field": "p95",
-                  "id": "5",
-                  "meta": {},
-                  "settings": {},
-                  "type": "avg"
-                }
-              ],
-              "query": "name:\"dequeueTime:spool\"",
-              "refId": "A",
-              "timeField": "@timestamp"
-            }
-          ],
-          "thresholds": [],
-          "timeFrom": null,
-          "timeShift": null,
-          "title": "dequeueTime:spool",
-          "tooltip": {
-            "shared": true,
-            "sort": 0,
-            "value_type": "individual"
-          },
-          "type": "graph",
-          "xaxis": {
-            "mode": "time",
-            "name": null,
-            "show": true,
-            "values": []
-          },
-          "yaxes": [
-            {
-              "format": "ms",
-              "label": null,
-              "logBase": 1,
-              "max": null,
-              "min": null,
-              "show": true
-            },
-            {
-              "format": "short",
-              "label": null,
-              "logBase": 1,
-              "max": null,
-              "min": null,
-              "show": true
-            }
-          ]
-        },
-        {
-          "aliasColors": {},
-          "bars": false,
-          "datasource": "${DS_JAMES_ES}",
-          "fill": 1,
           "id": 28,
           "legend": {
             "avg": true,
@@ -602,119 +489,6 @@
           "bars": false,
           "datasource": "${DS_JAMES_ES}",
           "fill": 1,
-          "id": 29,
-          "legend": {
-            "avg": false,
-            "current": false,
-            "max": false,
-            "min": false,
-            "show": true,
-            "total": false,
-            "values": false
-          },
-          "lines": true,
-          "linewidth": 1,
-          "links": [],
-          "nullPointMode": "connected",
-          "percentage": false,
-          "pointradius": 5,
-          "points": false,
-          "renderer": "flot",
-          "seriesOverrides": [],
-          "span": 2,
-          "stack": false,
-          "steppedLine": false,
-          "targets": [
-            {
-              "bucketAggs": [
-                {
-                  "field": "@timestamp",
-                  "id": "2",
-                  "settings": {
-                    "interval": "auto",
-                    "min_doc_count": 0,
-                    "trimEdges": 0
-                  },
-                  "type": "date_histogram"
-                }
-              ],
-              "dsType": "elasticsearch",
-              "metrics": [
-                {
-                  "field": "p99",
-                  "id": "1",
-                  "meta": {},
-                  "pipelineAgg": "3",
-                  "settings": {},
-                  "type": "avg"
-                },
-                {
-                  "field": "p50",
-                  "id": "3",
-                  "meta": {},
-                  "settings": {},
-                  "type": "avg"
-                },
-                {
-                  "field": "p75",
-                  "id": "4",
-                  "meta": {},
-                  "settings": {},
-                  "type": "avg"
-                },
-                {
-                  "field": "p95",
-                  "id": "5",
-                  "meta": {},
-                  "settings": {},
-                  "type": "avg"
-                }
-              ],
-              "query": "name:\"dequeueTime:outgoing\"",
-              "refId": "A",
-              "timeField": "@timestamp"
-            }
-          ],
-          "thresholds": [],
-          "timeFrom": null,
-          "timeShift": null,
-          "title": "dequeueTime:outgoing",
-          "tooltip": {
-            "shared": true,
-            "sort": 0,
-            "value_type": "individual"
-          },
-          "type": "graph",
-          "xaxis": {
-            "mode": "time",
-            "name": null,
-            "show": true,
-            "values": []
-          },
-          "yaxes": [
-            {
-              "format": "ms",
-              "label": null,
-              "logBase": 1,
-              "max": null,
-              "min": null,
-              "show": true
-            },
-            {
-              "format": "short",
-              "label": null,
-              "logBase": 1,
-              "max": null,
-              "min": null,
-              "show": true
-            }
-          ]
-        },
-        {
-          "aliasColors": {},
-          "bars": false,
-          "datasource": "${DS_JAMES_ES}",
-          "fill": 1,
           "id": 32,
           "legend": {
             "avg": true,
diff --git a/server/queue/queue-api/src/main/java/org/apache/james/queue/api/MailQueue.java b/server/queue/queue-api/src/main/java/org/apache/james/queue/api/MailQueue.java
index e065924..5de2a47 100644
--- a/server/queue/queue-api/src/main/java/org/apache/james/queue/api/MailQueue.java
+++ b/server/queue/queue-api/src/main/java/org/apache/james/queue/api/MailQueue.java
@@ -61,7 +61,6 @@ public interface MailQueue {
     String ENQUEUED_METRIC_NAME_PREFIX = "enqueuedMail:";
     String DEQUEUED_METRIC_NAME_PREFIX = "dequeuedMail:";
     String ENQUEUED_TIMER_METRIC_NAME_PREFIX = "enqueueTime:";
-    String DEQUEUED_TIMER_METRIC_NAME_PREFIX = "dequeueTime:";
     String QUEUE_SIZE_METRIC_NAME_PREFIX = "mailQueueSize:";
 
     /**
diff --git a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueMetricContract.java b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueMetricContract.java
index 9046b6c..b4c436d 100644
--- a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueMetricContract.java
+++ b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueMetricContract.java
@@ -123,32 +123,4 @@ public interface MailQueueMetricContract extends MailQueueContract {
         verify(testSystem.getSpyEnqueuedMailsTimeMetric(), times(2)).stopAndPublish();
     }
 
-    @Test
-    default void enqueueShouldNotPublishDequeueTimeMetric(MailQueueMetricExtension.MailQueueMetricTestSystem testSystem) throws Exception {
-        enQueueMail(2);
-
-        verify(testSystem.getSpyEnqueuedMailsTimeMetric(), times(2)).stopAndPublish();
-        verifyNoMoreInteractions(testSystem.getSpyDequeuedMailsTimeMetric());
-    }
-
-    @Disabled("what do we want to measure ?")
-    @Test
-    default void dequeueShouldPublishDequeueTimeMetric(MailQueueMetricExtension.MailQueueMetricTestSystem testSystem) throws Exception {
-        enQueueMail(2);
-        deQueueMail(2);
-
-        verify(testSystem.getSpyDequeuedMailsTimeMetric(), times(2)).stopAndPublish();
-    }
-
-    @Disabled("what do we want to measure ?")
-    @Test
-    default void dequeueShouldNotPublishEnqueueTimeMetric(MailQueueMetricExtension.MailQueueMetricTestSystem testSystem) throws Exception {
-        enQueueMail(2);
-        verify(testSystem.getSpyEnqueuedMailsTimeMetric(), times(2)).stopAndPublish();
-
-        deQueueMail(2);
-        verify(testSystem.getSpyDequeuedMailsTimeMetric(), times(2)).stopAndPublish();
-        verifyNoMoreInteractions(testSystem.getSpyEnqueuedMailsTimeMetric());
-    }
-
 }
diff --git a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueMetricExtension.java b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueMetricExtension.java
index b214c25..93f8896 100644
--- a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueMetricExtension.java
+++ b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueMetricExtension.java
@@ -20,7 +20,6 @@
 package org.apache.james.queue.api;
 
 import static org.apache.james.queue.api.MailQueue.DEQUEUED_METRIC_NAME_PREFIX;
-import static org.apache.james.queue.api.MailQueue.DEQUEUED_TIMER_METRIC_NAME_PREFIX;
 import static org.apache.james.queue.api.MailQueue.ENQUEUED_METRIC_NAME_PREFIX;
 import static org.apache.james.queue.api.MailQueue.ENQUEUED_TIMER_METRIC_NAME_PREFIX;
 import static org.mockito.ArgumentMatchers.startsWith;
@@ -97,8 +96,6 @@ public class MailQueueMetricExtension implements BeforeEachCallback, ParameterRe
 
         when(testSystem.spyMetricFactory.timer(startsWith(ENQUEUED_TIMER_METRIC_NAME_PREFIX)))
             .thenReturn(testSystem.spyEnqueuedMailsTimeMetric);
-        when(testSystem.spyMetricFactory.timer(startsWith(DEQUEUED_TIMER_METRIC_NAME_PREFIX)))
-            .thenReturn(testSystem.spyDequeuedMailsTimeMetric);
     }
 
     @Override
diff --git a/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java b/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java
index e1a91af..e552e78 100644
--- a/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java
+++ b/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java
@@ -227,7 +227,6 @@ public class JMSMailQueue implements ManageableMailQueue, JMSSupport, MailPriori
     private Mono<MailQueueItem> deQueueOneItem() {
         Session session = null;
         MessageConsumer consumer = null;
-        TimeMetric timeMetric = metricFactory.timer(DEQUEUED_TIMER_METRIC_NAME_PREFIX + queueName);
         try {
             session = connection.createSession(true, Session.SESSION_TRANSACTED);
             Queue queue = session.createQueue(queueName);
@@ -249,8 +248,6 @@ public class JMSMailQueue implements ManageableMailQueue, JMSSupport, MailPriori
             closeConsumer(consumer);
             closeSession(session);
             return Mono.error(new MailQueueException("Unable to dequeue next message", e));
-        } finally {
-            timeMetric.stopAndPublish();
         }
         return Mono.empty();
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org