You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ma...@apache.org on 2020/03/17 09:35:34 UTC

[james-project] 01/04: JAMES-3070 Don't use cache for RabbitMQ Mailqueue Consumer

This is an automated email from the ASF dual-hosted git repository.

matthieu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 84454ded73591f899b7166cd036da6e306f260ef
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Wed Feb 19 16:10:23 2020 +0100

    JAMES-3070 Don't use cache for RabbitMQ Mailqueue Consumer
    
        Make MailQueue Closeable to free resources when using non-cached
        implementations.
    
        For that we needed to take care about MailQueue lifecycle at every
        usage place.
---
 .../mailetcontainer/impl/JamesMailSpooler.java     |  9 ++-
 .../mailetcontainer/impl/JamesMailetContext.java   | 15 ++++-
 .../james/transport/mailets/RemoteDelivery.java    |  6 ++
 .../org/apache/james/fetchmail/FetchScheduler.java |  7 ++-
 .../SetMessagesOutboxFlagUpdateTest.java           |  4 ++
 .../apache/james/jmap/draft/send/MailSpool.java    | 19 ++++++-
 .../apache/james/smtpserver/SendMailHandler.java   | 11 ++++
 .../apache/james/smtpserver/SMTPServerTest.java    |  2 +-
 .../james/webadmin/routes/MailQueueRoutes.java     | 45 +++++++++------
 .../james/webadmin/service/ClearMailQueueTask.java | 47 ++++++++++++----
 .../webadmin/service/ClearMailQueueTaskDTO.java    | 15 +++--
 .../service/DeleteMailsFromMailQueueTask.java      | 64 ++++++++++++++--------
 .../service/DeleteMailsFromMailQueueTaskDTO.java   | 12 ++--
 .../james/webadmin/routes/MailQueueRoutesTest.java | 35 ++++++------
 .../webadmin/service/ClearMailQueueTaskTest.java   | 10 ++--
 .../service/DeleteMailsFromMailQueueTaskTest.java  | 21 ++++---
 .../webadmin/service/ReprocessingService.java      | 60 ++++++++++++--------
 ...lQueue.java => ActiveMQCacheableMailQueue.java} | 18 +++---
 .../queue/activemq/ActiveMQMailQueueFactory.java   |  6 +-
 .../queue/activemq/ActiveMQMailQueueBlobTest.java  |  4 +-
 .../queue/activemq/ActiveMQMailQueueTest.java      |  6 +-
 .../java/org/apache/james/queue/api/MailQueue.java |  3 +-
 ...eMailQueue.java => FileCacheableMailQueue.java} | 13 +++--
 .../james/queue/file/FileMailQueueFactory.java     | 21 +++----
 ...java => FileCacheableMailQueueFactoryTest.java} |  2 +-
 ...ueTest.java => FileCacheableMailQueueTest.java} |  6 +-
 ...MSMailQueue.java => JMSCacheableMailQueue.java} | 18 ++++--
 .../james/queue/jms/JMSMailQueueFactory.java       |  6 +-
 .../apache/james/queue/jms/JMSMailQueueItem.java   |  6 +-
 .../queue/library/AbstractMailQueueFactory.java    | 10 ++--
 ....java => JMSCacheableMailQueueFactoryTest.java} |  2 +-
 ...eueTest.java => JMSCacheableMailQueueTest.java} |  8 +--
 .../library/AbstractMailQueueFactoryTest.java      |  2 +-
 .../james/queue/memory/MemoryMailQueueFactory.java | 24 ++++----
 ...va => MemoryCacheableMailQueueFactoryTest.java} |  2 +-
 ...Test.java => MemoryCacheableMailQueueTest.java} |  6 +-
 .../org/apache/james/queue/rabbitmq/Dequeuer.java  | 17 +++++-
 .../james/queue/rabbitmq/RabbitMQMailQueue.java    |  5 ++
 .../queue/rabbitmq/RabbitMQMailQueueFactory.java   | 28 ++--------
 .../rabbitmq/RabbitMqMailQueueFactoryTest.java     | 17 ------
 40 files changed, 370 insertions(+), 242 deletions(-)

diff --git a/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java b/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
index 29d5395..28383cd 100644
--- a/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
+++ b/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
@@ -21,6 +21,7 @@ package org.apache.james.mailetcontainer.impl;
 
 import static org.apache.james.metrics.api.TimeMetric.ExecutionResult.DEFAULT_100_MS_THRESHOLD;
 
+import java.io.IOException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -59,7 +60,6 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
     private static final Logger LOGGER = LoggerFactory.getLogger(JamesMailSpooler.class);
 
     public static final String SPOOL_PROCESSING = "spoolProcessing";
-    private MailQueue queue;
 
     /**
      * The number of threads used to move mail through the spool.
@@ -79,6 +79,8 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
     private reactor.core.Disposable disposable;
     private Scheduler spooler;
     private int parallelismLevel;
+    private MailQueue queue;
+
 
     @Inject
     public JamesMailSpooler(MetricFactory metricFactory, MailProcessor mailProcessor, MailQueueFactory<?> queueFactory) {
@@ -173,6 +175,11 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
         LOGGER.info("start dispose() ...");
         disposable.dispose();
         spooler.dispose();
+        try {
+            queue.close();
+        } catch (IOException e) {
+            LOGGER.debug("error closing queue", e);
+        }
         LOGGER.info("thread shutdown completed.");
     }
 
diff --git a/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailetContext.java b/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailetContext.java
index bceea55..e1c3483 100644
--- a/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailetContext.java
+++ b/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailetContext.java
@@ -19,6 +19,7 @@
 
 package org.apache.james.mailetcontainer.impl;
 
+import java.io.IOException;
 import java.util.Collection;
 import java.util.Date;
 import java.util.HashSet;
@@ -30,6 +31,7 @@ import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
+import javax.annotation.PreDestroy;
 import javax.inject.Inject;
 import javax.mail.Address;
 import javax.mail.Message;
@@ -50,6 +52,7 @@ import org.apache.james.dnsservice.library.MXHostAddressIterator;
 import org.apache.james.domainlist.api.DomainList;
 import org.apache.james.domainlist.api.DomainListException;
 import org.apache.james.lifecycle.api.Configurable;
+import org.apache.james.lifecycle.api.Disposable;
 import org.apache.james.lifecycle.api.LifecycleUtil;
 import org.apache.james.queue.api.MailQueue;
 import org.apache.james.queue.api.MailQueueFactory;
@@ -66,7 +69,7 @@ import org.slf4j.LoggerFactory;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
 
-public class JamesMailetContext implements MailetContext, Configurable {
+public class JamesMailetContext implements MailetContext, Configurable, Disposable {
     private static final Logger LOGGER = LoggerFactory.getLogger(JamesMailetContext.class);
 
     /**
@@ -89,6 +92,16 @@ public class JamesMailetContext implements MailetContext, Configurable {
         this.mailQueueFactory = mailQueueFactory;
     }
 
+    @PreDestroy
+    @Override
+    public void dispose() {
+        try {
+            rootMailQueue.close();
+        } catch (IOException e) {
+            LOGGER.debug("error closing queue", e);
+        }
+    }
+
     @Override
     public Collection<String> getMailServers(Domain host) {
         try {
diff --git a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java
index ab597e9..5cb3889 100644
--- a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java
+++ b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java
@@ -19,6 +19,7 @@
 
 package org.apache.james.transport.mailets;
 
+import java.io.IOException;
 import java.net.UnknownHostException;
 import java.util.Collection;
 import java.util.Map;
@@ -258,6 +259,11 @@ public class RemoteDelivery extends GenericMailet {
         if (startThreads == ThreadState.START_THREADS) {
             deliveryRunnable.dispose();
         }
+        try {
+            queue.close();
+        } catch (IOException e) {
+            LOGGER.debug("error closing queue", e);
+        }
     }
 
 }
diff --git a/server/protocols/fetchmail/src/main/java/org/apache/james/fetchmail/FetchScheduler.java b/server/protocols/fetchmail/src/main/java/org/apache/james/fetchmail/FetchScheduler.java
index a8175b6..869e5c1 100644
--- a/server/protocols/fetchmail/src/main/java/org/apache/james/fetchmail/FetchScheduler.java
+++ b/server/protocols/fetchmail/src/main/java/org/apache/james/fetchmail/FetchScheduler.java
@@ -19,6 +19,7 @@
 
 package org.apache.james.fetchmail;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ScheduledExecutorService;
@@ -67,6 +68,7 @@ public class FetchScheduler implements FetchSchedulerMBean, Configurable {
     private MailQueueFactory<?> queueFactory;
 
     private DomainList domainList;
+    private MailQueue queue;
 
     @Inject
     public void setMailQueueFactory(MailQueueFactory<?> queueFactory) {
@@ -105,7 +107,7 @@ public class FetchScheduler implements FetchSchedulerMBean, Configurable {
       The scheduler service that is used to trigger fetch tasks.
      */
             ScheduledExecutorService scheduler = new JMXEnabledScheduledThreadPoolExecutor(numThreads, jmxPath, "scheduler");
-            MailQueue queue = queueFactory.createQueue(MailQueueFactory.SPOOL);
+            queue = queueFactory.createQueue(MailQueueFactory.SPOOL);
 
             List<HierarchicalConfiguration<ImmutableNode>> fetchConfs = conf.configurationsAt("fetch");
             for (HierarchicalConfiguration<ImmutableNode> fetchConf : fetchConfs) {
@@ -132,9 +134,10 @@ public class FetchScheduler implements FetchSchedulerMBean, Configurable {
     }
 
     @PreDestroy
-    public void dispose() {
+    public void dispose() throws IOException {
         if (enabled) {
             LOGGER.info("FetchMail dispose...");
+            queue.close();
             for (ScheduledFuture<?> scheduler1 : schedulers) {
                 scheduler1.cancel(false);
             }
diff --git a/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/java/org/apache/james/jmap/draft/methods/integration/SetMessagesOutboxFlagUpdateTest.java b/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/java/org/apache/james/jmap/draft/methods/integration/SetMessagesOutboxFlagUpdateTest.java
index e9141ea..db8ea1e 100644
--- a/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/java/org/apache/james/jmap/draft/methods/integration/SetMessagesOutboxFlagUpdateTest.java
+++ b/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/java/org/apache/james/jmap/draft/methods/integration/SetMessagesOutboxFlagUpdateTest.java
@@ -80,6 +80,10 @@ public abstract class SetMessagesOutboxFlagUpdateTest {
         public MailQueue createQueue(String name) {
             return new MailQueue() {
                 @Override
+                public void close() throws IOException {
+                }
+
+                @Override
                 public String getName() {
                     return name;
                 }
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/MailSpool.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/MailSpool.java
index c652bc9..afda197 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/MailSpool.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/MailSpool.java
@@ -19,8 +19,12 @@
 
 package org.apache.james.jmap.draft.send;
 
+import java.io.IOException;
+
+import javax.annotation.PreDestroy;
 import javax.inject.Inject;
 
+import org.apache.james.lifecycle.api.Disposable;
 import org.apache.james.lifecycle.api.Startable;
 import org.apache.james.queue.api.MailQueue;
 import org.apache.james.queue.api.MailQueue.MailQueueException;
@@ -28,10 +32,14 @@ import org.apache.james.queue.api.MailQueueFactory;
 import org.apache.mailet.Attribute;
 import org.apache.mailet.AttributeValue;
 import org.apache.mailet.Mail;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 
-public class MailSpool implements Startable {
+public class MailSpool implements Startable, Disposable {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(MailSpool.class);
 
     private final MailQueueFactory<?> queueFactory;
     private MailQueue queue;
@@ -45,6 +53,15 @@ public class MailSpool implements Startable {
         queue = queueFactory.createQueue(MailQueueFactory.SPOOL);
     }
 
+    @PreDestroy
+    public void dispose() {
+        try {
+            queue.close();
+        } catch (IOException e) {
+            LOGGER.debug("error closing queue", e);
+        }
+    }
+
     public void send(Mail mail, MailMetadata metadata) throws MailQueueException {
         mail.setAttribute(new Attribute(MailMetadata.MAIL_METADATA_MESSAGE_ID_ATTRIBUTE, AttributeValue.of(metadata.getMessageId().serialize())));
         mail.setAttribute(new Attribute(MailMetadata.MAIL_METADATA_USERNAME_ATTRIBUTE, AttributeValue.of(metadata.getUsername())));
diff --git a/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/SendMailHandler.java b/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/SendMailHandler.java
index 4ef8201..ea82d37 100644
--- a/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/SendMailHandler.java
+++ b/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/SendMailHandler.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.smtpserver;
 
+import java.io.IOException;
+
 import javax.inject.Inject;
 import javax.mail.MessagingException;
 
@@ -52,6 +54,15 @@ public class SendMailHandler implements JamesMessageHook {
         queue = queueFactory.createQueue(MailQueueFactory.SPOOL);
     }
 
+    @Override
+    public void destroy() {
+        try {
+            queue.close();
+        } catch (IOException e) {
+            LOGGER.debug("error close queue", e);
+        }
+    }
+
     /**
      * Adds header to the message
      */
diff --git a/server/protocols/protocols-smtp/src/test/java/org/apache/james/smtpserver/SMTPServerTest.java b/server/protocols/protocols-smtp/src/test/java/org/apache/james/smtpserver/SMTPServerTest.java
index 8b4b19d..e3ae55e 100644
--- a/server/protocols/protocols-smtp/src/test/java/org/apache/james/smtpserver/SMTPServerTest.java
+++ b/server/protocols/protocols-smtp/src/test/java/org/apache/james/smtpserver/SMTPServerTest.java
@@ -199,7 +199,7 @@ public class SMTPServerTest {
     protected Configuration configuration;
     protected MockProtocolHandlerLoader chain;
     protected MemoryMailQueueFactory queueFactory;
-    protected MemoryMailQueueFactory.MemoryMailQueue queue;
+    protected MemoryMailQueueFactory.MemoryCacheableMailQueue queue;
     protected MemoryRecipientRewriteTable rewriteTable;
     private AliasReverseResolver aliasReverseResolver;
     protected CanSendFrom canSendFrom;
diff --git a/server/protocols/webadmin/webadmin-mailqueue/src/main/java/org/apache/james/webadmin/routes/MailQueueRoutes.java b/server/protocols/webadmin/webadmin-mailqueue/src/main/java/org/apache/james/webadmin/routes/MailQueueRoutes.java
index c2f6064..0338071 100644
--- a/server/protocols/webadmin/webadmin-mailqueue/src/main/java/org/apache/james/webadmin/routes/MailQueueRoutes.java
+++ b/server/protocols/webadmin/webadmin-mailqueue/src/main/java/org/apache/james/webadmin/routes/MailQueueRoutes.java
@@ -21,6 +21,8 @@ package org.apache.james.webadmin.routes;
 
 import static org.apache.james.webadmin.Constants.SEPARATOR;
 
+import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.time.ZonedDateTime;
 import java.util.List;
 import java.util.Optional;
@@ -32,6 +34,7 @@ import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
 
 import org.apache.james.core.MailAddress;
+import org.apache.james.queue.api.MailQueue;
 import org.apache.james.queue.api.MailQueue.MailQueueException;
 import org.apache.james.queue.api.MailQueueFactory;
 import org.apache.james.queue.api.ManageableMailQueue;
@@ -223,7 +226,7 @@ public class MailQueueRoutes implements Routes {
     private List<MailQueueItemDTO> listMails(Request request) {
         String mailQueueName = request.params(MAIL_QUEUE_NAME);
         return mailQueueFactory.getQueue(mailQueueName)
-                .map(name -> listMails(name, isDelayed(request.queryParams(DELAYED_QUERY_PARAM)), ParametersExtractor.extractLimit(request)))
+                .map(queue -> listMails(queue, isDelayed(request.queryParams(DELAYED_QUERY_PARAM)), ParametersExtractor.extractLimit(request)))
                 .orElseThrow(
                     () -> ErrorResponder.builder()
                         .message("%s can not be found", mailQueueName)
@@ -238,7 +241,7 @@ public class MailQueueRoutes implements Routes {
     }
 
     private List<MailQueueItemDTO> listMails(ManageableMailQueue queue, Optional<Boolean> isDelayed, Limit limit) {
-        try {
+        try (MailQueue closeable = queue) {
             return limit.applyOnStream(Iterators.toStream(queue.browse()))
                     .map(Throwing.function(MailQueueItemDTO::from).sneakyThrow())
                     .filter(item -> filter(item, isDelayed))
@@ -250,6 +253,8 @@ public class MailQueueRoutes implements Routes {
                 .message("Invalid request for listing the mails from the mail queue %s", queue)
                 .cause(e)
                 .haltError();
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
         }
     }
 
@@ -304,17 +309,10 @@ public class MailQueueRoutes implements Routes {
 
     private Task deleteMails(Request request) {
         String mailQueueName = request.params(MAIL_QUEUE_NAME);
-        return mailQueueFactory.getQueue(mailQueueName)
-            .map(name -> deleteMailsTask(name,
+        return deleteMailsTask(mailQueueName,
                     sender(request.queryParams(SENDER_QUERY_PARAM)),
                     name(request.queryParams(NAME_QUERY_PARAM)),
-                    recipient(request.queryParams(RECIPIENT_QUERY_PARAM))))
-            .orElseThrow(
-                () -> ErrorResponder.builder()
-                    .message("%s can not be found", mailQueueName)
-                    .statusCode(HttpStatus.NOT_FOUND_404)
-                    .type(ErrorResponder.ErrorType.NOT_FOUND)
-                    .haltError());
+                    recipient(request.queryParams(RECIPIENT_QUERY_PARAM)));
     }
 
     private Optional<MailAddress> sender(String senderAsString) throws HaltException {
@@ -383,10 +381,11 @@ public class MailQueueRoutes implements Routes {
     private String forceDelayedMailsDelivery(Request request, Response response) throws JsonExtractException, MailQueueException {
         assertDelayedParamIsTrue(request);
         assertPayloadContainsDelayedEntry(request);
-        ManageableMailQueue mailQueue = assertMailQueueExists(request);
-
-        mailQueue.flush();
-
+        try (ManageableMailQueue mailQueue = assertMailQueueExists(request)) {
+            mailQueue.flush();
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
         return Responses.returnNoContent(response);
     }
 
@@ -421,13 +420,13 @@ public class MailQueueRoutes implements Routes {
         }
     }
 
-    private Task deleteMailsTask(ManageableMailQueue queue, Optional<MailAddress> maybeSender, Optional<String> maybeName, Optional<MailAddress> maybeRecipient) {
+    private Task deleteMailsTask(String queueName, Optional<MailAddress> maybeSender, Optional<String> maybeName, Optional<MailAddress> maybeRecipient) {
         int paramCount = Booleans.countTrue(maybeSender.isPresent(), maybeName.isPresent(), maybeRecipient.isPresent());
         switch (paramCount) {
             case 0:
-                return new ClearMailQueueTask(queue);
+                return new ClearMailQueueTask(queueName, this::getQueueOrThrow);
             case 1:
-                return new DeleteMailsFromMailQueueTask(queue, maybeSender, maybeName, maybeRecipient);
+                return new DeleteMailsFromMailQueueTask(queueName, this::getQueueOrThrow, maybeSender, maybeName, maybeRecipient);
             default:
                 throw ErrorResponder.builder()
                     .statusCode(HttpStatus.BAD_REQUEST_400)
@@ -438,6 +437,16 @@ public class MailQueueRoutes implements Routes {
         }
     }
 
+    private ManageableMailQueue getQueueOrThrow(String queueName) {
+        return mailQueueFactory.getQueue(queueName)
+            .orElseThrow(
+                () -> ErrorResponder.builder()
+                    .message("%s can not be found", queueName)
+                    .statusCode(HttpStatus.NOT_FOUND_404)
+                    .type(ErrorResponder.ErrorType.NOT_FOUND)
+                    .haltError());
+    }
+
     private void assertDelayedParamIsTrue(Request request) {
         if (!isDelayed(request.queryParams(DELAYED_QUERY_PARAM)).orElse(false)) {
             throw ErrorResponder.builder()
diff --git a/server/protocols/webadmin/webadmin-mailqueue/src/main/java/org/apache/james/webadmin/service/ClearMailQueueTask.java b/server/protocols/webadmin/webadmin-mailqueue/src/main/java/org/apache/james/webadmin/service/ClearMailQueueTask.java
index ff4d777..ab9e25b 100644
--- a/server/protocols/webadmin/webadmin-mailqueue/src/main/java/org/apache/james/webadmin/service/ClearMailQueueTask.java
+++ b/server/protocols/webadmin/webadmin-mailqueue/src/main/java/org/apache/james/webadmin/service/ClearMailQueueTask.java
@@ -19,6 +19,7 @@
 
 package org.apache.james.webadmin.service;
 
+import java.io.IOException;
 import java.time.Clock;
 import java.time.Instant;
 import java.util.Optional;
@@ -31,9 +32,12 @@ import org.apache.james.task.TaskType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+
 public class ClearMailQueueTask implements Task {
 
     public static class AdditionalInformation implements TaskExecutionDetails.AdditionalInformation {
+
         private final String mailQueueName;
         private final long initialCount;
         private final long remainingCount;
@@ -62,30 +66,44 @@ public class ClearMailQueueTask implements Task {
         public Instant timestamp() {
             return timestamp;
         }
+
     }
 
     public static class UnknownSerializedQueue extends RuntimeException {
+
         public UnknownSerializedQueue(String queueName) {
             super("Unable to retrieve '" + queueName + "' queue");
         }
     }
 
+    @FunctionalInterface
+    public interface MailQueueFactory {
+        ManageableMailQueue create(String mailQueueName);
+    }
+
     private static final Logger LOGGER = LoggerFactory.getLogger(ClearMailQueueTask.class);
     public static final TaskType TYPE = TaskType.of("clear-mail-queue");
 
-    private final ManageableMailQueue queue;
-    private final long initialCount;
+    private final String queueName;
+    private final ClearMailQueueTask.MailQueueFactory factory;
+    private Optional<Long> initialCount;
+    private Optional<ManageableMailQueue> queue;
 
-    public ClearMailQueueTask(ManageableMailQueue queue) {
-        this.queue = queue;
-        initialCount = getRemainingSize();
+    public ClearMailQueueTask(String queueName, ClearMailQueueTask.MailQueueFactory factory) {
+        this.queueName = queueName;
+        this.factory = factory;
+        this.initialCount = Optional.empty();
+        this.queue = Optional.empty();
     }
 
     @Override
     public Result run() {
-        try {
+        try (ManageableMailQueue queue = factory.create(queueName)) {
+            this.initialCount = Optional.of(getRemainingSize(queue));
+            this.queue = Optional.of(queue);
             queue.clear();
-        } catch (MailQueue.MailQueueException e) {
+            this.queue = Optional.empty();
+        } catch (MailQueue.MailQueueException | IOException e) {
             LOGGER.error("Clear MailQueue got an exception", e);
             return Result.PARTIAL;
         }
@@ -100,16 +118,21 @@ public class ClearMailQueueTask implements Task {
 
     @Override
     public Optional<TaskExecutionDetails.AdditionalInformation> details() {
-        return Optional.of(new AdditionalInformation(queue.getName(), initialCount, getRemainingSize(), Clock.systemUTC().instant()));
+        return queue.map(q -> new AdditionalInformation(queueName, initialCount.get(), getRemainingSize(q), Clock.systemUTC().instant()));
+    }
+
+    String getQueueName() {
+        return queueName;
     }
 
-    ManageableMailQueue getQueue() {
-        return queue;
+    @VisibleForTesting
+    Optional<Long> initialCount() {
+        return initialCount;
     }
 
-    private long getRemainingSize() {
+    private long getRemainingSize(ManageableMailQueue mailQueue) {
         try {
-            return queue.getSize();
+            return mailQueue.getSize();
         } catch (MailQueue.MailQueueException e) {
             throw new RuntimeException(e);
         }
diff --git a/server/protocols/webadmin/webadmin-mailqueue/src/main/java/org/apache/james/webadmin/service/ClearMailQueueTaskDTO.java b/server/protocols/webadmin/webadmin-mailqueue/src/main/java/org/apache/james/webadmin/service/ClearMailQueueTaskDTO.java
index 8ba59f4..f4af298 100644
--- a/server/protocols/webadmin/webadmin-mailqueue/src/main/java/org/apache/james/webadmin/service/ClearMailQueueTaskDTO.java
+++ b/server/protocols/webadmin/webadmin-mailqueue/src/main/java/org/apache/james/webadmin/service/ClearMailQueueTaskDTO.java
@@ -39,19 +39,22 @@ public class ClearMailQueueTaskDTO implements TaskDTO {
     }
 
     public static ClearMailQueueTaskDTO toDTO(ClearMailQueueTask domainObject, String typeName) {
-        return new ClearMailQueueTaskDTO(typeName, domainObject.getQueue().getName());
+        return new ClearMailQueueTaskDTO(typeName, domainObject.getQueueName());
     }
 
     private final String type;
-    private final String queue;
+    private final String queueName;
 
-    public ClearMailQueueTaskDTO(@JsonProperty("type") String type, @JsonProperty("queue") String queue) {
+    public ClearMailQueueTaskDTO(@JsonProperty("type") String type, @JsonProperty("queue") String queueName) {
         this.type = type;
-        this.queue = queue;
+        this.queueName = queueName;
     }
 
     public ClearMailQueueTask fromDTO(MailQueueFactory<? extends ManageableMailQueue> mailQueueFactory) {
-        return new ClearMailQueueTask(mailQueueFactory.getQueue(queue).orElseThrow(() -> new ClearMailQueueTask.UnknownSerializedQueue(queue)));
+        return new ClearMailQueueTask(queueName,
+            name -> mailQueueFactory
+                .getQueue(name)
+                .orElseThrow(() -> new ClearMailQueueTask.UnknownSerializedQueue(queueName)));
     }
 
     @Override
@@ -60,6 +63,6 @@ public class ClearMailQueueTaskDTO implements TaskDTO {
     }
 
     public String getQueue() {
-        return queue;
+        return queueName;
     }
 }
diff --git a/server/protocols/webadmin/webadmin-mailqueue/src/main/java/org/apache/james/webadmin/service/DeleteMailsFromMailQueueTask.java b/server/protocols/webadmin/webadmin-mailqueue/src/main/java/org/apache/james/webadmin/service/DeleteMailsFromMailQueueTask.java
index e55b7d6..0a1a184 100644
--- a/server/protocols/webadmin/webadmin-mailqueue/src/main/java/org/apache/james/webadmin/service/DeleteMailsFromMailQueueTask.java
+++ b/server/protocols/webadmin/webadmin-mailqueue/src/main/java/org/apache/james/webadmin/service/DeleteMailsFromMailQueueTask.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.webadmin.service;
 
+import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.time.Clock;
 import java.time.Instant;
 import java.util.Optional;
@@ -95,38 +97,60 @@ public class DeleteMailsFromMailQueueTask implements Task {
         }
     }
 
+    @FunctionalInterface
+    public interface MailQueueFactory {
+        ManageableMailQueue create(String mailQueueName);
+    }
+
     public static final TaskType TYPE = TaskType.of("delete-mails-from-mail-queue");
 
-    private final ManageableMailQueue queue;
     private final Optional<MailAddress> maybeSender;
     private final Optional<String> maybeName;
     private final Optional<MailAddress> maybeRecipient;
+    private final MailQueueFactory factory;
+    private final String queueName;
+    private Optional<Long> initialCount;
+    private Optional<ManageableMailQueue> queue;
 
-    private final long initialCount;
-
-    public DeleteMailsFromMailQueueTask(ManageableMailQueue queue, Optional<MailAddress> maybeSender,
+    public DeleteMailsFromMailQueueTask(String queueName, MailQueueFactory factory,
+                                        Optional<MailAddress> maybeSender,
                                         Optional<String> maybeName, Optional<MailAddress> maybeRecipient) {
+        this.factory = factory;
+        this.queueName = queueName;
         Preconditions.checkArgument(
             Booleans.countTrue(maybeSender.isPresent(), maybeName.isPresent(), maybeRecipient.isPresent()) == 1,
             "You should provide one and only one of the query parameters 'sender', 'name' or 'recipient'.");
 
-        this.queue = queue;
         this.maybeSender = maybeSender;
         this.maybeName = maybeName;
         this.maybeRecipient = maybeRecipient;
-        this.initialCount = getRemainingSize();
+        this.initialCount = Optional.empty();
+        this.queue = Optional.empty();
+
     }
 
     @Override
     public Result run() {
-        maybeSender.ifPresent(Throwing.consumer(
-            (MailAddress sender) -> queue.remove(ManageableMailQueue.Type.Sender, sender.asString())));
-        maybeName.ifPresent(Throwing.consumer(
-            (String name) -> queue.remove(ManageableMailQueue.Type.Name, name)));
-        maybeRecipient.ifPresent(Throwing.consumer(
-            (MailAddress recipient) -> queue.remove(ManageableMailQueue.Type.Recipient, recipient.asString())));
-
-        return Result.COMPLETED;
+        try (ManageableMailQueue queue = factory.create(queueName)) {
+            this.initialCount = Optional.of(getRemainingSize(queue));
+            this.queue = Optional.of(queue);
+            maybeSender.ifPresent(Throwing.consumer(
+                (MailAddress sender) -> queue.remove(ManageableMailQueue.Type.Sender, sender.asString())));
+            maybeName.ifPresent(Throwing.consumer(
+                (String name) -> queue.remove(ManageableMailQueue.Type.Name, name)));
+            maybeRecipient.ifPresent(Throwing.consumer(
+                (MailAddress recipient) -> queue.remove(ManageableMailQueue.Type.Recipient, recipient.asString())));
+
+            this.queue = Optional.empty();
+
+            return Result.COMPLETED;
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+    }
+
+    public String getQueueName() {
+        return queueName;
     }
 
     @Override
@@ -134,10 +158,6 @@ public class DeleteMailsFromMailQueueTask implements Task {
         return TYPE;
     }
 
-    ManageableMailQueue getQueue() {
-        return queue;
-    }
-
     Optional<String> getMaybeName() {
         return maybeName;
     }
@@ -152,13 +172,13 @@ public class DeleteMailsFromMailQueueTask implements Task {
 
     @Override
     public Optional<TaskExecutionDetails.AdditionalInformation> details() {
-        return Optional.of(new AdditionalInformation(queue.getName(),
-            initialCount,
-            getRemainingSize(), maybeSender,
+        return this.queue.map(queue -> new AdditionalInformation(queueName,
+            initialCount.get(),
+            getRemainingSize(queue), maybeSender,
             maybeName, maybeRecipient, Clock.systemUTC().instant()));
     }
 
-    public long getRemainingSize() {
+    public long getRemainingSize(ManageableMailQueue queue) {
         try {
             return queue.getSize();
         } catch (MailQueue.MailQueueException e) {
diff --git a/server/protocols/webadmin/webadmin-mailqueue/src/main/java/org/apache/james/webadmin/service/DeleteMailsFromMailQueueTaskDTO.java b/server/protocols/webadmin/webadmin-mailqueue/src/main/java/org/apache/james/webadmin/service/DeleteMailsFromMailQueueTaskDTO.java
index e5dfa9e..12f3485 100644
--- a/server/protocols/webadmin/webadmin-mailqueue/src/main/java/org/apache/james/webadmin/service/DeleteMailsFromMailQueueTaskDTO.java
+++ b/server/protocols/webadmin/webadmin-mailqueue/src/main/java/org/apache/james/webadmin/service/DeleteMailsFromMailQueueTaskDTO.java
@@ -27,7 +27,7 @@ public class DeleteMailsFromMailQueueTaskDTO implements TaskDTO {
     public static DeleteMailsFromMailQueueTaskDTO toDTO(DeleteMailsFromMailQueueTask domainObject, String typeName) {
         return new DeleteMailsFromMailQueueTaskDTO(
             typeName,
-            domainObject.getQueue().getName(),
+            domainObject.getQueueName(),
             domainObject.getMaybeSender().map(MailAddress::asString),
             domainObject.getMaybeName(),
             domainObject.getMaybeRecipient().map(MailAddress::asString)
@@ -35,18 +35,18 @@ public class DeleteMailsFromMailQueueTaskDTO implements TaskDTO {
     }
 
     private final String type;
-    private final String queue;
+    private final String queueName;
     private final Optional<String> sender;
     private final Optional<String> name;
     private final Optional<String> recipient;
 
     public DeleteMailsFromMailQueueTaskDTO(@JsonProperty("type") String type,
-                                           @JsonProperty("queue") String queue,
+                                           @JsonProperty("queue") String queueName,
                                            @JsonProperty("sender") Optional<String> sender,
                                            @JsonProperty("name") Optional<String> name,
                                            @JsonProperty("recipient") Optional<String> recipient) {
         this.type = type;
-        this.queue = queue;
+        this.queueName = queueName;
         this.sender = sender;
         this.name = name;
         this.recipient = recipient;
@@ -54,7 +54,7 @@ public class DeleteMailsFromMailQueueTaskDTO implements TaskDTO {
 
     public DeleteMailsFromMailQueueTask fromDTO(MailQueueFactory<? extends ManageableMailQueue> mailQueueFactory) {
         return new DeleteMailsFromMailQueueTask(
-            mailQueueFactory.getQueue(queue).orElseThrow(() -> new DeleteMailsFromMailQueueTask.UnknownSerializedQueue(queue)),
+            queueName, name -> mailQueueFactory.getQueue(name).orElseThrow(() -> new DeleteMailsFromMailQueueTask.UnknownSerializedQueue(queueName)),
             sender.map(Throwing.<String, MailAddress>function(MailAddress::new).sneakyThrow()),
             name,
             recipient.map(Throwing.<String, MailAddress>function(MailAddress::new).sneakyThrow())
@@ -67,7 +67,7 @@ public class DeleteMailsFromMailQueueTaskDTO implements TaskDTO {
     }
 
     public String getQueue() {
-        return queue;
+        return queueName;
     }
 
     public Optional<String> getSender() {
diff --git a/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/routes/MailQueueRoutesTest.java b/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/routes/MailQueueRoutesTest.java
index 2cd4f49..f40f1ac 100644
--- a/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/routes/MailQueueRoutesTest.java
+++ b/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/routes/MailQueueRoutesTest.java
@@ -42,7 +42,6 @@ import org.apache.james.queue.api.Mails;
 import org.apache.james.queue.api.ManageableMailQueue;
 import org.apache.james.queue.api.RawMailQueueItemDecoratorFactory;
 import org.apache.james.queue.memory.MemoryMailQueueFactory;
-import org.apache.james.queue.memory.MemoryMailQueueFactory.MemoryMailQueue;
 import org.apache.james.task.Hostname;
 import org.apache.james.task.MemoryTaskManager;
 import org.apache.james.task.TaskManager;
@@ -229,7 +228,7 @@ class MailQueueRoutesTest {
 
             @Test
             public void listMailsShouldReturnMailsWhenSome() throws Exception {
-                MemoryMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
+                MemoryMailQueueFactory.MemoryCacheableMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
                 queue.enQueue(Mails.defaultMail().name("name").build());
                 queue.enQueue(Mails.defaultMail().name("name").build());
 
@@ -243,7 +242,7 @@ class MailQueueRoutesTest {
 
             @Test
             public void listMailsShouldReturnMailDetailsWhenSome() throws Exception {
-                MemoryMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
+                MemoryMailQueueFactory.MemoryCacheableMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
                 FakeMail mail = Mails.defaultMail().name("name").build();
                 queue.enQueue(mail);
 
@@ -265,7 +264,7 @@ class MailQueueRoutesTest {
 
             @Test
             public void listMailsShouldReturnEmptyWhenNoDelayedMailsAndAskFor() throws Exception {
-                MemoryMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
+                MemoryMailQueueFactory.MemoryCacheableMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
                 FakeMail mail = Mails.defaultMail().name("name").build();
                 queue.enQueue(mail);
 
@@ -281,7 +280,7 @@ class MailQueueRoutesTest {
 
             @Test
             public void listMailsShouldReturnCurrentMailsWhenMailsAndAskForNotDelayed() throws Exception {
-                MemoryMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
+                MemoryMailQueueFactory.MemoryCacheableMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
                 FakeMail mail = Mails.defaultMail().name("name").build();
                 queue.enQueue(mail);
 
@@ -297,7 +296,7 @@ class MailQueueRoutesTest {
 
             @Test
             public void listMailsShouldReturnDelayedMailsWhenAskFor() throws Exception {
-                MemoryMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
+                MemoryMailQueueFactory.MemoryCacheableMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
                 FakeMail mail = Mails.defaultMail().name("name").build();
                 queue.enQueue(mail, 10, TimeUnit.MINUTES);
 
@@ -313,7 +312,7 @@ class MailQueueRoutesTest {
 
             @Test
             public void listMailsShouldReturnOneMailWhenMailsAndAskForALimitOfOne() throws Exception {
-                MemoryMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
+                MemoryMailQueueFactory.MemoryCacheableMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
                 FakeMail mail = Mails.defaultMail().name("name").build();
                 queue.enQueue(mail);
                 queue.enQueue(mail);
@@ -336,7 +335,7 @@ class MailQueueRoutesTest {
 
         @Test
         public void getMailQueueShouldReturnTheMailQueueDataWhenMailQueueExists() throws Exception {
-            MemoryMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
+            MemoryMailQueueFactory.MemoryCacheableMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
             queue.enQueue(Mails.defaultMail().name("name").build());
 
             when()
@@ -509,7 +508,7 @@ class MailQueueRoutesTest {
         class SideEffects {
             @Test
             public void forcingDelayedMailsDeliveryShouldActuallyChangePropertyOnMails() throws Exception {
-                MemoryMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
+                MemoryMailQueueFactory.MemoryCacheableMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
                 FakeMail mail = Mails.defaultMail().name("name").build();
                 queue.enQueue(mail, 10L, TimeUnit.MINUTES);
                 queue.enQueue(mail, 10L, TimeUnit.MINUTES);
@@ -656,7 +655,7 @@ class MailQueueRoutesTest {
 
             @Test
             public void deleteMailsTasksShouldHaveDetailsWhenSenderIsGiven() throws Exception {
-                MemoryMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
+                MemoryMailQueueFactory.MemoryCacheableMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
 
                 queue.enQueue(FakeMail.builder()
                     .name(FAKE_MAIL_NAME_1)
@@ -693,7 +692,7 @@ class MailQueueRoutesTest {
 
             @Test
             public void deleteMailsTasksShouldHaveDetailsWhenNameIsGiven() throws Exception {
-                MemoryMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
+                MemoryMailQueueFactory.MemoryCacheableMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
 
                 queue.enQueue(FakeMail.builder()
                     .name(FAKE_MAIL_NAME_1)
@@ -728,7 +727,7 @@ class MailQueueRoutesTest {
 
             @Test
             public void deleteMailsTasksShouldHaveDetailsWhenRecipientIsGiven() throws Exception {
-                MemoryMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
+                MemoryMailQueueFactory.MemoryCacheableMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
 
                 queue.enQueue(FakeMail.builder()
                     .name(FAKE_MAIL_NAME_1)
@@ -774,7 +773,7 @@ class MailQueueRoutesTest {
 
             @Test
             public void deleteMailsShouldDeleteMailsWhenSenderIsGiven() throws Exception {
-                MemoryMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
+                MemoryMailQueueFactory.MemoryCacheableMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
 
                 queue.enQueue(FakeMail.builder()
                     .name(FAKE_MAIL_NAME_1)
@@ -808,7 +807,7 @@ class MailQueueRoutesTest {
 
             @Test
             public void deleteMailsShouldDeleteMailsWhenNameIsGiven() throws Exception {
-                MemoryMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
+                MemoryMailQueueFactory.MemoryCacheableMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
 
                 queue.enQueue(FakeMail.builder()
                     .name(FAKE_MAIL_NAME_1)
@@ -840,7 +839,7 @@ class MailQueueRoutesTest {
 
             @Test
             public void deleteMailsShouldDeleteMailsWhenRecipientIsGiven() throws Exception {
-                MemoryMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
+                MemoryMailQueueFactory.MemoryCacheableMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
 
                 queue.enQueue(FakeMail.builder()
                     .name(FAKE_MAIL_NAME_1)
@@ -880,7 +879,7 @@ class MailQueueRoutesTest {
 
             @Test
             public void deleteMailsShouldDeleteMailsWhenTheyAreMatching() throws Exception {
-                MemoryMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
+                MemoryMailQueueFactory.MemoryCacheableMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
                 String recipient = "recipient@james.org";
                 queue.enQueue(Mails.defaultMail()
                         .name("name")
@@ -945,7 +944,7 @@ class MailQueueRoutesTest {
 
         @Test
         public void clearMailQueueShouldHaveDetailsWhenNoQueryParameters() throws Exception {
-            MemoryMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
+            MemoryMailQueueFactory.MemoryCacheableMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
 
             queue.enQueue(FakeMail.builder()
                 .name(FAKE_MAIL_NAME_1)
@@ -982,7 +981,7 @@ class MailQueueRoutesTest {
 
         @Test
         void clearMailQueueShouldDeleteAllMailsInQueueWhenNoQueryParameters() throws Exception {
-            MemoryMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
+            MemoryMailQueueFactory.MemoryCacheableMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
 
             queue.enQueue(FakeMail.builder()
                 .name(FAKE_MAIL_NAME_1)
diff --git a/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/service/ClearMailQueueTaskTest.java b/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/service/ClearMailQueueTaskTest.java
index 6d04e01..94b4432 100644
--- a/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/service/ClearMailQueueTaskTest.java
+++ b/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/service/ClearMailQueueTaskTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.james.webadmin.service;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
@@ -46,23 +47,24 @@ class ClearMailQueueTaskTest {
         when(mockedQueue.getName()).thenReturn(QUEUE_NAME);
         when(mailQueueFactory.getQueue(anyString())).thenAnswer(arg -> Optional.of(mockedQueue));
 
-        ManageableMailQueue queue = mailQueueFactory.getQueue(QUEUE_NAME).get();
-        ClearMailQueueTask task = new ClearMailQueueTask(queue);
+        ClearMailQueueTask.MailQueueFactory factory = queueName -> mailQueueFactory.getQueue(queueName).orElseThrow(RuntimeException::new);
+        ClearMailQueueTask task = new ClearMailQueueTask(QUEUE_NAME, factory);
 
         JsonSerializationVerifier.dtoModule(ClearMailQueueTaskDTO.module(mailQueueFactory))
             .bean(task)
             .json(SERIALIZED)
+            .equalityTester((a, b) -> assertThat(a.getQueueName()).isEqualTo(b.getQueueName()))
             .verify();
     }
 
     @Test
-    void taskShouldThrowWhenDeserializeAnUnknownQueue() {
+    void taskShouldThrowWhenRunOnAnUnknownQueue() {
         MailQueueFactory<ManageableMailQueue> mailQueueFactory = mock(MailQueueFactory.class);
         when(mailQueueFactory.getQueue(anyString())).thenReturn(Optional.empty());
         JsonTaskSerializer testee = JsonTaskSerializer.of(ClearMailQueueTaskDTO.module(mailQueueFactory));
 
         String serializedJson = "{\"type\": \"clear-mail-queue\", \"queue\": \"anyQueue\"}";
-        assertThatThrownBy(() -> testee.deserialize(serializedJson))
+        assertThatThrownBy(() -> testee.deserialize(serializedJson).run())
             .isInstanceOf(ClearMailQueueTask.UnknownSerializedQueue.class);
     }
 
diff --git a/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/service/DeleteMailsFromMailQueueTaskTest.java b/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/service/DeleteMailsFromMailQueueTaskTest.java
index c6797c2..6e44829 100644
--- a/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/service/DeleteMailsFromMailQueueTaskTest.java
+++ b/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/service/DeleteMailsFromMailQueueTaskTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.james.webadmin.service;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
@@ -53,26 +54,32 @@ class DeleteMailsFromMailQueueTaskTest {
 
     @Test
     void taskShouldBeSerializable() throws Exception {
-        ManageableMailQueue queue = mailQueueFactory.getQueue(queueName).get();
-        DeleteMailsFromMailQueueTask taskSender = new DeleteMailsFromMailQueueTask(queue, Optional.of(new MailAddress("a@b.c")), Optional.empty(), Optional.empty());
-        DeleteMailsFromMailQueueTask taskName = new DeleteMailsFromMailQueueTask(queue, Optional.empty(), Optional.of("name"), Optional.empty());
-        DeleteMailsFromMailQueueTask taskRecipient = new DeleteMailsFromMailQueueTask(queue, Optional.empty(), Optional.empty(),  Optional.of(new MailAddress("d@e.f")));
+        DeleteMailsFromMailQueueTask.MailQueueFactory factory = name -> this.mailQueueFactory.getQueue(name).orElseThrow(RuntimeException::new);
+        DeleteMailsFromMailQueueTask taskSender = new DeleteMailsFromMailQueueTask(queueName, factory, Optional.of(new MailAddress("a@b.c")), Optional.empty(), Optional.empty());
+        DeleteMailsFromMailQueueTask taskName = new DeleteMailsFromMailQueueTask(queueName, factory, Optional.empty(), Optional.of("name"), Optional.empty());
+        DeleteMailsFromMailQueueTask taskRecipient = new DeleteMailsFromMailQueueTask(queueName, factory, Optional.empty(), Optional.empty(), Optional.of(new MailAddress("d@e.f")));
 
-        JsonSerializationVerifier.dtoModule(DeleteMailsFromMailQueueTaskDTO.module(mailQueueFactory))
+        JsonSerializationVerifier.dtoModule(DeleteMailsFromMailQueueTaskDTO.module(this.mailQueueFactory))
             .testCase(taskSender, "{\"type\": \"delete-mails-from-mail-queue\", \"queue\": \"anyQueue\", \"sender\": \"a@b.c\"}")
             .testCase(taskName, "{\"type\": \"delete-mails-from-mail-queue\", \"queue\": \"anyQueue\", \"name\": \"name\"}")
             .testCase(taskRecipient, "{\"type\": \"delete-mails-from-mail-queue\", \"queue\": \"anyQueue\", \"recipient\": \"d@e.f\"}")
+            .equalityTester((a, b) -> {
+                assertThat(a.getQueueName()).isEqualTo(b.getQueueName());
+                assertThat(a.getMaybeName()).isEqualTo(b.getMaybeName());
+                assertThat(a.getMaybeSender()).isEqualTo(b.getMaybeSender());
+                assertThat(a.getMaybeRecipient()).isEqualTo(b.getMaybeRecipient());
+            })
             .verify();
     }
 
     @Test
-    void taskShouldThrowWhenDeserializeAnUnknownQueue() {
+    void taskShouldThrowWhenRunOnAnUnknownQueue() {
         MailQueueFactory<ManageableMailQueue> mailQueueFactory = mock(MailQueueFactory.class);
         when(mailQueueFactory.getQueue(anyString())).thenReturn(Optional.empty());
         JsonTaskSerializer testee = JsonTaskSerializer.of(DeleteMailsFromMailQueueTaskDTO.module(mailQueueFactory));
 
         String serializedJson = "{\"type\": \"delete-mails-from-mail-queue\", \"queue\": \"anyQueue\", \"sender\": \"a@b.c\"}";
-        assertThatThrownBy(() -> testee.deserialize(serializedJson))
+        assertThatThrownBy(() -> testee.deserialize(serializedJson).run())
             .isInstanceOf(DeleteMailsFromMailQueueTask.UnknownSerializedQueue.class);
     }
 
diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingService.java b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingService.java
index 6ccb045..6171b00 100644
--- a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingService.java
+++ b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingService.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.webadmin.service;
 
+import java.io.Closeable;
+import java.io.IOException;
 import java.util.Optional;
 import java.util.function.Consumer;
 
@@ -35,17 +37,22 @@ import org.apache.james.queue.api.MailQueueFactory;
 import org.apache.james.util.OptionalUtils;
 import org.apache.james.util.streams.Iterators;
 import org.apache.mailet.Mail;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.github.fge.lambdas.Throwing;
 
 public class ReprocessingService {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(ReprocessingService.class);
+
     public static class MissingKeyException extends RuntimeException {
         MissingKeyException(MailKey key) {
             super(key.asString() + " can not be found");
         }
     }
 
-    static class Reprocessor {
+    static class Reprocessor implements Closeable {
         private final MailQueue mailQueue;
         private final Optional<String> targetProcessor;
 
@@ -63,6 +70,15 @@ public class ReprocessingService {
                 throw new RuntimeException("Error encountered while reprocessing mail " + mail.getName(), e);
             }
         }
+
+        @Override
+        public void close() {
+            try {
+                mailQueue.close();
+            } catch (IOException e) {
+                LOGGER.debug("error closing queue", e);
+            }
+        }
     }
 
     private final MailQueueFactory<?> mailQueueFactory;
@@ -76,30 +92,30 @@ public class ReprocessingService {
     }
 
     public void reprocessAll(MailRepositoryPath path, Optional<String> targetProcessor, String targetQueue, Consumer<MailKey> keyListener) throws MailRepositoryStore.MailRepositoryStoreException, MessagingException {
-        Reprocessor reprocessor = new Reprocessor(getMailQueue(targetQueue), targetProcessor);
-
-        mailRepositoryStoreService
-            .getRepositories(path)
-            .forEach(Throwing.consumer((MailRepository repository) ->
-                Iterators.toStream(repository.list())
-                    .peek(keyListener)
-                    .map(Throwing.function(key -> Optional.ofNullable(repository.retrieve(key))))
-                    .flatMap(OptionalUtils::toStream)
-                    .forEach(mail -> reprocessor.reprocess(repository, mail))));
+        try (Reprocessor reprocessor = new Reprocessor(getMailQueue(targetQueue), targetProcessor)) {
+            mailRepositoryStoreService
+                .getRepositories(path)
+                .forEach(Throwing.consumer((MailRepository repository) ->
+                    Iterators.toStream(repository.list())
+                        .peek(keyListener)
+                        .map(Throwing.function(key -> Optional.ofNullable(repository.retrieve(key))))
+                        .flatMap(OptionalUtils::toStream)
+                        .forEach(mail -> reprocessor.reprocess(repository, mail))));
+        }
     }
 
     public void reprocess(MailRepositoryPath path, MailKey key, Optional<String> targetProcessor, String targetQueue) throws MailRepositoryStore.MailRepositoryStoreException, MessagingException {
-        Reprocessor reprocessor = new Reprocessor(getMailQueue(targetQueue), targetProcessor);
-
-        Pair<MailRepository, Mail> mailPair = mailRepositoryStoreService
-            .getRepositories(path)
-            .map(Throwing.function(repository -> Pair.of(repository, Optional.ofNullable(repository.retrieve(key)))))
-            .filter(pair -> pair.getRight().isPresent())
-            .map(pair -> Pair.of(pair.getLeft(), pair.getRight().get()))
-            .findFirst()
-            .orElseThrow(() -> new MissingKeyException(key));
-
-        reprocessor.reprocess(mailPair.getKey(), mailPair.getValue());
+        try (Reprocessor reprocessor = new Reprocessor(getMailQueue(targetQueue), targetProcessor)) {
+            Pair<MailRepository, Mail> mailPair = mailRepositoryStoreService
+                .getRepositories(path)
+                .map(Throwing.function(repository -> Pair.of(repository, Optional.ofNullable(repository.retrieve(key)))))
+                .filter(pair -> pair.getRight().isPresent())
+                .map(pair -> Pair.of(pair.getLeft(), pair.getRight().get()))
+                .findFirst()
+                .orElseThrow(() -> new MissingKeyException(key));
+
+            reprocessor.reprocess(mailPair.getKey(), mailPair.getValue());
+        }
     }
 
     private MailQueue getMailQueue(String targetQueue) {
diff --git a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQCacheableMailQueue.java
similarity index 91%
rename from server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java
rename to server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQCacheableMailQueue.java
index c6e094b..145acd5 100644
--- a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java
+++ b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQCacheableMailQueue.java
@@ -44,7 +44,7 @@ import org.apache.james.metrics.api.GaugeRegistry;
 import org.apache.james.metrics.api.MetricFactory;
 import org.apache.james.queue.api.MailQueue;
 import org.apache.james.queue.api.MailQueueItemDecoratorFactory;
-import org.apache.james.queue.jms.JMSMailQueue;
+import org.apache.james.queue.jms.JMSCacheableMailQueue;
 import org.apache.james.server.core.MailImpl;
 import org.apache.james.server.core.MimeMessageCopyOnWriteProxy;
 import org.apache.james.server.core.MimeMessageInputStream;
@@ -89,17 +89,17 @@ import org.slf4j.LoggerFactory;
  * </p>
  * To have a good throughput you should use a caching connection factory. </p>
  */
-public class ActiveMQMailQueue extends JMSMailQueue implements ActiveMQSupport {
-    private static final Logger LOGGER = LoggerFactory.getLogger(ActiveMQMailQueue.class);
+public class ActiveMQCacheableMailQueue extends JMSCacheableMailQueue implements ActiveMQSupport {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ActiveMQCacheableMailQueue.class);
 
     private final boolean useBlob;
 
     /**
-     * Construct a {@link ActiveMQMailQueue} which only use {@link BlobMessage}
+     * Construct a {@link ActiveMQCacheableMailQueue} which only use {@link BlobMessage}
      * 
      */
-    public ActiveMQMailQueue(ConnectionFactory connectionFactory, MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory, String queuename, MetricFactory metricFactory,
-                             GaugeRegistry gaugeRegistry) {
+    public ActiveMQCacheableMailQueue(ConnectionFactory connectionFactory, MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory, String queuename, MetricFactory metricFactory,
+                                      GaugeRegistry gaugeRegistry) {
         this(connectionFactory, mailQueueItemDecoratorFactory, queuename, true, metricFactory, gaugeRegistry);
     }
 
@@ -110,8 +110,8 @@ public class ActiveMQMailQueue extends JMSMailQueue implements ActiveMQSupport {
      * @param queuename
      * @param useBlob
      */
-    public ActiveMQMailQueue(ConnectionFactory connectionFactory, MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory, String queuename, boolean useBlob, MetricFactory metricFactory,
-                             GaugeRegistry gaugeRegistry) {
+    public ActiveMQCacheableMailQueue(ConnectionFactory connectionFactory, MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory, String queuename, boolean useBlob, MetricFactory metricFactory,
+                                      GaugeRegistry gaugeRegistry) {
         super(connectionFactory, mailQueueItemDecoratorFactory, queuename, metricFactory, gaugeRegistry);
         this.useBlob = useBlob;
     }
@@ -268,7 +268,7 @@ public class ActiveMQMailQueue extends JMSMailQueue implements ActiveMQSupport {
 
     /**
      * Try to use ActiveMQ StatisticsPlugin to get size and if that fails
-     * fallback to {@link JMSMailQueue#getSize()}
+     * fallback to {@link JMSCacheableMailQueue#getSize()}
      */
     @Override
     public long getSize() throws MailQueueException {
diff --git a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueFactory.java b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueFactory.java
index 5008508..5b61ca5 100644
--- a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueFactory.java
+++ b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueFactory.java
@@ -30,7 +30,7 @@ import org.apache.james.queue.jms.JMSMailQueueFactory;
 
 /**
  * {@link MailQueueFactory} implementations which return
- * {@link ActiveMQMailQueue} instances
+ * {@link ActiveMQCacheableMailQueue} instances
  */
 public class ActiveMQMailQueueFactory extends JMSMailQueueFactory {
 
@@ -51,7 +51,7 @@ public class ActiveMQMailQueueFactory extends JMSMailQueueFactory {
     }
 
     @Override
-    protected ManageableMailQueue createMailQueue(String name) {
-        return new ActiveMQMailQueue(connectionFactory, mailQueueItemDecoratorFactory, name, useBlob, metricFactory, gaugeRegistry);
+    protected ManageableMailQueue createCacheableMailQueue(String name) {
+        return new ActiveMQCacheableMailQueue(connectionFactory, mailQueueItemDecoratorFactory, name, useBlob, metricFactory, gaugeRegistry);
     }
 }
diff --git a/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQMailQueueBlobTest.java b/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQMailQueueBlobTest.java
index b2c3cea..da22eb7 100644
--- a/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQMailQueueBlobTest.java
+++ b/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQMailQueueBlobTest.java
@@ -59,7 +59,7 @@ public class ActiveMQMailQueueBlobTest implements DelayedManageableMailQueueCont
     static final String BASE_DIR = "file://target/james-test";
     static final boolean USE_BLOB = true;
 
-    ActiveMQMailQueue mailQueue;
+    ActiveMQCacheableMailQueue mailQueue;
     MyFileSystem fileSystem;
 
     @BeforeEach
@@ -78,7 +78,7 @@ public class ActiveMQMailQueueBlobTest implements DelayedManageableMailQueueCont
         MetricFactory metricFactory = metricTestSystem.getMetricFactory();
         GaugeRegistry gaugeRegistry = metricTestSystem.getSpyGaugeRegistry();
         String queueName = BrokerExtension.generateRandomQueueName(broker);
-        mailQueue = new ActiveMQMailQueue(connectionFactory, mailQueueItemDecoratorFactory, queueName, USE_BLOB, metricFactory, gaugeRegistry);
+        mailQueue = new ActiveMQCacheableMailQueue(connectionFactory, mailQueueItemDecoratorFactory, queueName, USE_BLOB, metricFactory, gaugeRegistry);
     }
 
     @AfterEach
diff --git a/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQMailQueueTest.java b/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQMailQueueTest.java
index 609cef1..e7a08dd 100644
--- a/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQMailQueueTest.java
+++ b/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQMailQueueTest.java
@@ -18,8 +18,6 @@
  ****************************************************************/
 package org.apache.james.queue.activemq;
 
-import javax.jms.ConnectionFactory;
-
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.ActiveMQPrefetchPolicy;
 import org.apache.activemq.broker.BrokerService;
@@ -48,7 +46,7 @@ public class ActiveMQMailQueueTest implements DelayedManageableMailQueueContract
 
     static final boolean USE_BLOB = true;
 
-    ActiveMQMailQueue mailQueue;
+    ActiveMQCacheableMailQueue mailQueue;
 
     @BeforeEach
     public void setUp(BrokerService broker, MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem) {
@@ -60,7 +58,7 @@ public class ActiveMQMailQueueTest implements DelayedManageableMailQueueContract
         MetricFactory metricFactory = metricTestSystem.getMetricFactory();
         GaugeRegistry gaugeRegistry = metricTestSystem.getSpyGaugeRegistry();
         String queueName = BrokerExtension.generateRandomQueueName(broker);
-        mailQueue = new ActiveMQMailQueue(connectionFactory, mailQueueItemDecoratorFactory, queueName, !USE_BLOB, metricFactory, gaugeRegistry);
+        mailQueue = new ActiveMQCacheableMailQueue(connectionFactory, mailQueueItemDecoratorFactory, queueName, !USE_BLOB, metricFactory, gaugeRegistry);
     }
 
     @AfterEach
diff --git a/server/queue/queue-api/src/main/java/org/apache/james/queue/api/MailQueue.java b/server/queue/queue-api/src/main/java/org/apache/james/queue/api/MailQueue.java
index 559d998..d4e9c3a 100644
--- a/server/queue/queue-api/src/main/java/org/apache/james/queue/api/MailQueue.java
+++ b/server/queue/queue-api/src/main/java/org/apache/james/queue/api/MailQueue.java
@@ -19,6 +19,7 @@
 
 package org.apache.james.queue.api;
 
+import java.io.Closeable;
 import java.time.Duration;
 import java.util.concurrent.TimeUnit;
 
@@ -58,7 +59,7 @@ import org.threeten.extra.Temporals;
  * </ul>
  * </p>
  */
-public interface MailQueue {
+public interface MailQueue extends Closeable {
 
     String ENQUEUED_METRIC_NAME_PREFIX = "enqueuedMail:";
     String DEQUEUED_METRIC_NAME_PREFIX = "dequeuedMail:";
diff --git a/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueue.java b/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileCacheableMailQueue.java
similarity index 97%
rename from server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueue.java
rename to server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileCacheableMailQueue.java
index 4f288b3..6142be8 100644
--- a/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueue.java
+++ b/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileCacheableMailQueue.java
@@ -71,15 +71,15 @@ import reactor.core.publisher.Mono;
 /**
  * {@link ManageableMailQueue} implementation which use the fs to store {@link Mail}'s
  * <p/>
- * On create of the {@link FileMailQueue} the {@link #init()} will get called. This takes care of
+ * On create of the {@link FileCacheableMailQueue} the {@link #init()} will get called. This takes care of
  * loading the needed meta-data into memory for fast access.
  *
  * @deprecated FileMailQueue implementation is unmaintained, incomplete and not thread safe
  * We recommend using embedded ActiveMQMailQueue implementation instead
  */
 @Deprecated
-public class FileMailQueue implements ManageableMailQueue {
-    private static final Logger LOGGER = LoggerFactory.getLogger(FileMailQueue.class);
+public class FileCacheableMailQueue implements ManageableMailQueue {
+    private static final Logger LOGGER = LoggerFactory.getLogger(FileCacheableMailQueue.class);
 
     private final Map<String, FileItem> keyMappings = Collections.synchronizedMap(new LinkedHashMap<>());
     private final BlockingQueue<String> inmemoryQueue = new LinkedBlockingQueue<>();
@@ -98,7 +98,7 @@ public class FileMailQueue implements ManageableMailQueue {
     private final String queueName;
     private final Flux<MailQueueItem> flux;
 
-    public FileMailQueue(MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory, File parentDir, String queuename, boolean sync) throws IOException {
+    public FileCacheableMailQueue(MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory, File parentDir, String queuename, boolean sync) throws IOException {
         this.mailQueueItemDecoratorFactory = mailQueueItemDecoratorFactory;
         this.sync = sync;
         this.queueName = queuename;
@@ -111,6 +111,11 @@ public class FileMailQueue implements ManageableMailQueue {
     }
 
     @Override
+    public void close() {
+        //There's no resource to free
+    }
+
+    @Override
     public String getName() {
         return queueName;
     }
diff --git a/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueueFactory.java b/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueueFactory.java
index 87d88a8..623e1e0 100644
--- a/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueueFactory.java
+++ b/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueueFactory.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 import javax.inject.Inject;
 
@@ -34,7 +35,7 @@ import org.apache.james.queue.api.ManageableMailQueue;
 import com.google.common.collect.ImmutableSet;
 
 /**
- * {@link MailQueueFactory} implementation which returns {@link FileMailQueue} instances
+ * {@link MailQueueFactory} implementation which returns {@link FileCacheableMailQueue} instances
  *
  * @deprecated FileMailQueue implementation is unmaintained, incomplete and not thread safe
  * We recommend using embedded ActiveMQMailQueue implementation instead
@@ -42,7 +43,7 @@ import com.google.common.collect.ImmutableSet;
 @Deprecated
 public class FileMailQueueFactory implements MailQueueFactory<ManageableMailQueue> {
 
-    private final Map<String, ManageableMailQueue> queues = new HashMap<>();
+    private final Map<String, ManageableMailQueue> queues = new ConcurrentHashMap<>();
     private MailQueueItemDecoratorFactory mailQueueActionItemDecoratorFactory;
     private FileSystem fs;
     private boolean sync = true;
@@ -59,7 +60,7 @@ public class FileMailQueueFactory implements MailQueueFactory<ManageableMailQueu
     }
 
     /**
-     * If <code>true</code> the later created {@link FileMailQueue} will call <code>fsync</code> after each message {@link FileMailQueue#enQueue(org.apache.mailet.Mail)} call. This
+     * If <code>true</code> the later created {@link FileCacheableMailQueue} will call <code>fsync</code> after each message {@link FileCacheableMailQueue#enQueue(org.apache.mailet.Mail)} call. This
      * is needed to be fully RFC conform but gives a performance penalty. If you are brave enough you man set it to <code>false</code>
      * <p/>
      * The default is <code>true</code>
@@ -77,19 +78,13 @@ public class FileMailQueueFactory implements MailQueueFactory<ManageableMailQueu
 
     @Override
     public ManageableMailQueue createQueue(String name) {
-        return getQueue(name).orElseGet(() -> createAndRegisterQueue(name));
-    }
-
-    private ManageableMailQueue createAndRegisterQueue(String name) {
-        synchronized (queues) {
+        return queues.computeIfAbsent(name, mailQueueName -> {
             try {
-                FileMailQueue queue = new FileMailQueue(mailQueueActionItemDecoratorFactory, fs.getFile("file://var/store/queue"), name, sync);
-                queues.put(name, queue);
-                return queue;
+                return new FileCacheableMailQueue(mailQueueActionItemDecoratorFactory, fs.getFile("file://var/store/queue"), mailQueueName, sync);
             } catch (IOException e) {
-                throw new RuntimeException("Unable to access queue " + name, e);
+                throw new RuntimeException("Unable to access queue " + mailQueueName, e);
             }
-        }
+        });
     }
 
 }
diff --git a/server/queue/queue-file/src/test/java/org/apache/james/queue/file/FileMailQueueFactoryTest.java b/server/queue/queue-file/src/test/java/org/apache/james/queue/file/FileCacheableMailQueueFactoryTest.java
similarity index 94%
rename from server/queue/queue-file/src/test/java/org/apache/james/queue/file/FileMailQueueFactoryTest.java
rename to server/queue/queue-file/src/test/java/org/apache/james/queue/file/FileCacheableMailQueueFactoryTest.java
index 926092f..923a8db 100644
--- a/server/queue/queue-file/src/test/java/org/apache/james/queue/file/FileMailQueueFactoryTest.java
+++ b/server/queue/queue-file/src/test/java/org/apache/james/queue/file/FileCacheableMailQueueFactoryTest.java
@@ -35,7 +35,7 @@ import org.junit.jupiter.api.Disabled;
     " - JAMES-2954 Incomplete browse implementation" +
     " - JAMES-2544 Mixing concurrent operation might lead to a deadlock and missing fields" +
     " - JAMES-2979 dequeue is not thread safe")
-public class FileMailQueueFactoryTest implements MailQueueFactoryContract<ManageableMailQueue>, ManageableMailQueueFactoryContract {
+public class FileCacheableMailQueueFactoryTest implements MailQueueFactoryContract<ManageableMailQueue>, ManageableMailQueueFactoryContract {
     private FileMailQueueFactory mailQueueFactory;
     private MockFileSystem fileSystem;
 
diff --git a/server/queue/queue-file/src/test/java/org/apache/james/queue/file/FileMailQueueTest.java b/server/queue/queue-file/src/test/java/org/apache/james/queue/file/FileCacheableMailQueueTest.java
similarity index 90%
rename from server/queue/queue-file/src/test/java/org/apache/james/queue/file/FileMailQueueTest.java
rename to server/queue/queue-file/src/test/java/org/apache/james/queue/file/FileCacheableMailQueueTest.java
index f2659d6..2b479e9 100644
--- a/server/queue/queue-file/src/test/java/org/apache/james/queue/file/FileMailQueueTest.java
+++ b/server/queue/queue-file/src/test/java/org/apache/james/queue/file/FileCacheableMailQueueTest.java
@@ -34,16 +34,16 @@ import org.junit.rules.TemporaryFolder;
     " - JAMES-2954 Incomplete browse implementation" +
     " - JAMES-2544 Mixing concurrent operation might lead to a deadlock and missing fields" +
     " - JAMES-2979 dequeue is not thread safe")
-public class FileMailQueueTest implements DelayedManageableMailQueueContract {
+public class FileCacheableMailQueueTest implements DelayedManageableMailQueueContract {
     private static final boolean SYNC = true;
 
     private TemporaryFolder temporaryFolder = new TemporaryFolder();
-    private FileMailQueue mailQueue;
+    private FileCacheableMailQueue mailQueue;
 
     @BeforeEach
     public void setUp() throws Exception {
         temporaryFolder.create();
-        mailQueue = new FileMailQueue(new RawMailQueueItemDecoratorFactory(), temporaryFolder.newFolder(), "test", SYNC);
+        mailQueue = new FileCacheableMailQueue(new RawMailQueueItemDecoratorFactory(), temporaryFolder.newFolder(), "test", SYNC);
     }
 
     @AfterEach
diff --git a/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java b/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSCacheableMailQueue.java
similarity index 97%
rename from server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java
rename to server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSCacheableMailQueue.java
index 2511f49..5171cae 100644
--- a/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java
+++ b/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSCacheableMailQueue.java
@@ -99,7 +99,7 @@ import reactor.core.publisher.Mono;
  * {@link Mail} objects.
  * </p>
  */
-public class JMSMailQueue implements ManageableMailQueue, JMSSupport, MailPrioritySupport, Disposable {
+public class JMSCacheableMailQueue implements ManageableMailQueue, JMSSupport, MailPrioritySupport, Disposable {
 
     private final Flux<MailQueueItem> flux;
 
@@ -153,7 +153,7 @@ public class JMSMailQueue implements ManageableMailQueue, JMSSupport, MailPriori
         }
     }
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(JMSMailQueue.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(JMSCacheableMailQueue.class);
 
     public static final String FORCE_DELIVERY = "FORCE_DELIVERY";
 
@@ -172,9 +172,9 @@ public class JMSMailQueue implements ManageableMailQueue, JMSSupport, MailPriori
     private final Joiner joiner;
     private final Splitter splitter;
 
-    public JMSMailQueue(ConnectionFactory connectionFactory, MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory,
-                        String queueName, MetricFactory metricFactory,
-                        GaugeRegistry gaugeRegistry) {
+    public JMSCacheableMailQueue(ConnectionFactory connectionFactory, MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory,
+                                 String queueName, MetricFactory metricFactory,
+                                 GaugeRegistry gaugeRegistry) {
         try {
             connection = connectionFactory.createConnection();
             connection.start();
@@ -204,6 +204,14 @@ public class JMSMailQueue implements ManageableMailQueue, JMSSupport, MailPriori
         flux = Mono.defer(this::deQueueOneItem).repeat();
     }
 
+    /**
+     * To allow connection reuse (the queue is cacheable), we don't close the queue
+     * on close(), use {@link JMSCacheableMailQueue#dispose} to release resources
+     */
+    @Override
+    public void close() {
+    }
+
     @Override
     public String getName() {
         return queueName;
diff --git a/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueFactory.java b/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueFactory.java
index 051e3c0..701e0b3 100644
--- a/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueFactory.java
+++ b/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueFactory.java
@@ -48,8 +48,8 @@ public class JMSMailQueueFactory extends AbstractMailQueueFactory<ManageableMail
     }
 
     @Override
-    protected ManageableMailQueue createMailQueue(String name) {
-        return new JMSMailQueue(connectionFactory, mailQueueItemDecoratorFactory, name, metricFactory, gaugeRegistry);
+    protected ManageableMailQueue createCacheableMailQueue(String name) {
+        return new JMSCacheableMailQueue(connectionFactory, mailQueueItemDecoratorFactory, name, metricFactory, gaugeRegistry);
     }
-    
+
 }
diff --git a/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueItem.java b/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueItem.java
index ad10836..d20c642 100644
--- a/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueItem.java
+++ b/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueItem.java
@@ -48,13 +48,13 @@ public class JMSMailQueueItem implements MailQueueItem {
             if (success) {
                 session.commit();
             } else {
-                JMSMailQueue.rollback(session);
+                JMSCacheableMailQueue.rollback(session);
             }
         } catch (JMSException ex) {
             throw new MailQueueException("Unable to commit dequeue operation for mail " + mail.getName(), ex);
         } finally {
-            JMSMailQueue.closeConsumer(consumer);
-            JMSMailQueue.closeSession(session);
+            JMSCacheableMailQueue.closeConsumer(consumer);
+            JMSCacheableMailQueue.closeSession(session);
         }
     }
 
diff --git a/server/queue/queue-jms/src/main/java/org/apache/james/queue/library/AbstractMailQueueFactory.java b/server/queue/queue-jms/src/main/java/org/apache/james/queue/library/AbstractMailQueueFactory.java
index e1a29dd..fdd4b86 100644
--- a/server/queue/queue-jms/src/main/java/org/apache/james/queue/library/AbstractMailQueueFactory.java
+++ b/server/queue/queue-jms/src/main/java/org/apache/james/queue/library/AbstractMailQueueFactory.java
@@ -102,7 +102,7 @@ public abstract class AbstractMailQueueFactory<T extends MailQueue> implements M
     }
 
     private T createAndRegisterQueue(String name) {
-        T queue = createMailQueue(name);
+        T queue = createCacheableMailQueue(name);
         if (useJMX) {
             registerMBean(name, queue);
         }
@@ -111,12 +111,10 @@ public abstract class AbstractMailQueueFactory<T extends MailQueue> implements M
     }
 
     /**
-     * Create a {@link MailQueue} for the given name
-     * 
-     * @param name
-     * @return queue
+     * Create a {@link MailQueue} for the given name that happens to do nothing on close()
+     * to be able to cache the instance
      */
-    protected abstract T createMailQueue(String name);
+    protected abstract T createCacheableMailQueue(String name);
 
     protected synchronized void registerMBean(String queuename, MailQueue queue) {
 
diff --git a/server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSMailQueueFactoryTest.java b/server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSCacheableMailQueueFactoryTest.java
similarity index 95%
rename from server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSMailQueueFactoryTest.java
rename to server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSCacheableMailQueueFactoryTest.java
index f98cf9d..5a62538 100644
--- a/server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSMailQueueFactoryTest.java
+++ b/server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSCacheableMailQueueFactoryTest.java
@@ -37,7 +37,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
 
 
 @ExtendWith(BrokerExtension.class)
-public class JMSMailQueueFactoryTest implements MailQueueFactoryContract<ManageableMailQueue>, ManageableMailQueueFactoryContract {
+public class JMSCacheableMailQueueFactoryTest implements MailQueueFactoryContract<ManageableMailQueue>, ManageableMailQueueFactoryContract {
 
     private JMSMailQueueFactory mailQueueFactory;
 
diff --git a/server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSMailQueueTest.java b/server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSCacheableMailQueueTest.java
similarity index 92%
rename from server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSMailQueueTest.java
rename to server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSCacheableMailQueueTest.java
index f436a1d..061218a 100644
--- a/server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSMailQueueTest.java
+++ b/server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSCacheableMailQueueTest.java
@@ -19,8 +19,6 @@
 
 package org.apache.james.queue.jms;
 
-import javax.jms.ConnectionFactory;
-
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.ActiveMQPrefetchPolicy;
 import org.apache.activemq.broker.BrokerService;
@@ -41,10 +39,10 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 
 @ExtendWith(BrokerExtension.class)
-public class JMSMailQueueTest implements DelayedManageableMailQueueContract, PriorityManageableMailQueueContract, DelayedPriorityMailQueueContract,
+public class JMSCacheableMailQueueTest implements DelayedManageableMailQueueContract, PriorityManageableMailQueueContract, DelayedPriorityMailQueueContract,
     MailQueueMetricContract {
 
-    private JMSMailQueue mailQueue;
+    private JMSCacheableMailQueue mailQueue;
 
     @BeforeEach
     void setUp(BrokerService broker, MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem) {
@@ -56,7 +54,7 @@ public class JMSMailQueueTest implements DelayedManageableMailQueueContract, Pri
         MetricFactory metricFactory = metricTestSystem.getMetricFactory();
         GaugeRegistry gaugeRegistry = metricTestSystem.getSpyGaugeRegistry();
         String queueName = BrokerExtension.generateRandomQueueName(broker);
-        mailQueue = new JMSMailQueue(connectionFactory, mailQueueItemDecoratorFactory, queueName, metricFactory, gaugeRegistry);
+        mailQueue = new JMSCacheableMailQueue(connectionFactory, mailQueueItemDecoratorFactory, queueName, metricFactory, gaugeRegistry);
     }
 
     @AfterEach
diff --git a/server/queue/queue-jms/src/test/java/org/apache/james/queue/library/AbstractMailQueueFactoryTest.java b/server/queue/queue-jms/src/test/java/org/apache/james/queue/library/AbstractMailQueueFactoryTest.java
index 2ae2c50..2f25606 100644
--- a/server/queue/queue-jms/src/test/java/org/apache/james/queue/library/AbstractMailQueueFactoryTest.java
+++ b/server/queue/queue-jms/src/test/java/org/apache/james/queue/library/AbstractMailQueueFactoryTest.java
@@ -46,7 +46,7 @@ public class AbstractMailQueueFactoryTest {
         mBeanServer = mock(MBeanServer.class);
         abstractMailQueueFactory = new AbstractMailQueueFactory<ManageableMailQueue>() {
             @Override
-            protected ManageableMailQueue createMailQueue(String name) {
+            protected ManageableMailQueue createCacheableMailQueue(String name) {
                 return mock(ManageableMailQueue.class);
             }
         };
diff --git a/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java b/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java
index 25b80b6..4aa75ec 100644
--- a/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java
+++ b/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java
@@ -19,6 +19,7 @@
 
 package org.apache.james.queue.memory;
 
+import java.io.IOException;
 import java.time.DateTimeException;
 import java.time.Duration;
 import java.time.Instant;
@@ -60,7 +61,7 @@ import reactor.core.scheduler.Schedulers;
 
 public class MemoryMailQueueFactory implements MailQueueFactory<ManageableMailQueue> {
 
-    private final ConcurrentHashMap<String, MemoryMailQueueFactory.MemoryMailQueue> mailQueues;
+    private final ConcurrentHashMap<String, MemoryCacheableMailQueue> mailQueues;
     private final MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory;
 
     @Inject
@@ -80,19 +81,17 @@ public class MemoryMailQueueFactory implements MailQueueFactory<ManageableMailQu
     }
 
     @Override
-    public MemoryMailQueueFactory.MemoryMailQueue createQueue(String name) {
-        MemoryMailQueueFactory.MemoryMailQueue newMailQueue = new MemoryMailQueue(name, mailQueueItemDecoratorFactory);
-        return Optional.ofNullable(mailQueues.putIfAbsent(name, newMailQueue))
-            .orElse(newMailQueue);
+    public MemoryCacheableMailQueue createQueue(String name) {
+        return mailQueues.computeIfAbsent(name, mailQueueName -> new MemoryCacheableMailQueue(mailQueueName, mailQueueItemDecoratorFactory));
     }
 
-    public static class MemoryMailQueue implements ManageableMailQueue {
+    public static class MemoryCacheableMailQueue implements ManageableMailQueue {
         private final DelayQueue<MemoryMailQueueItem> mailItems;
         private final LinkedBlockingDeque<MemoryMailQueueItem> inProcessingMailItems;
         private final String name;
         private final Flux<MailQueueItem> flux;
 
-        public MemoryMailQueue(String name, MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory) {
+        public MemoryCacheableMailQueue(String name, MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory) {
             this.mailItems = new DelayQueue<>();
             this.inProcessingMailItems = new LinkedBlockingDeque<>();
             this.name = name;
@@ -105,6 +104,11 @@ public class MemoryMailQueueFactory implements MailQueueFactory<ManageableMailQu
         }
 
         @Override
+        public void close() {
+            //There's no resource to free
+        }
+
+        @Override
         public String getName() {
             return name;
         }
@@ -244,7 +248,7 @@ public class MemoryMailQueueFactory implements MailQueueFactory<ManageableMailQu
                 return false;
             }
 
-            MemoryMailQueue that = (MemoryMailQueue) o;
+            MemoryCacheableMailQueue that = (MemoryCacheableMailQueue) o;
 
             return Objects.equal(this.name, that.name);
         }
@@ -257,10 +261,10 @@ public class MemoryMailQueueFactory implements MailQueueFactory<ManageableMailQu
 
     public static class MemoryMailQueueItem implements MailQueue.MailQueueItem, Delayed {
         private final Mail mail;
-        private final MemoryMailQueue queue;
+        private final MemoryCacheableMailQueue queue;
         private final ZonedDateTime delivery;
 
-        public MemoryMailQueueItem(Mail mail, MemoryMailQueue queue, ZonedDateTime delivery) {
+        public MemoryMailQueueItem(Mail mail, MemoryCacheableMailQueue queue, ZonedDateTime delivery) {
             this.mail = mail;
             this.queue = queue;
             this.delivery = delivery;
diff --git a/server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryMailQueueFactoryTest.java b/server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryCacheableMailQueueFactoryTest.java
similarity index 93%
rename from server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryMailQueueFactoryTest.java
rename to server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryCacheableMailQueueFactoryTest.java
index 28002ca..aa2e294 100644
--- a/server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryMailQueueFactoryTest.java
+++ b/server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryCacheableMailQueueFactoryTest.java
@@ -26,7 +26,7 @@ import org.apache.james.queue.api.ManageableMailQueueFactoryContract;
 import org.apache.james.queue.api.RawMailQueueItemDecoratorFactory;
 import org.junit.jupiter.api.BeforeEach;
 
-class MemoryMailQueueFactoryTest implements MailQueueFactoryContract<ManageableMailQueue>, ManageableMailQueueFactoryContract {
+class MemoryCacheableMailQueueFactoryTest implements MailQueueFactoryContract<ManageableMailQueue>, ManageableMailQueueFactoryContract {
 
     MemoryMailQueueFactory memoryMailQueueFactory;
 
diff --git a/server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryMailQueueTest.java b/server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryCacheableMailQueueTest.java
similarity index 91%
rename from server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryMailQueueTest.java
rename to server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryCacheableMailQueueTest.java
index 085e565..5868995 100644
--- a/server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryMailQueueTest.java
+++ b/server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryCacheableMailQueueTest.java
@@ -29,13 +29,13 @@ import org.apache.james.queue.api.RawMailQueueItemDecoratorFactory;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-public class MemoryMailQueueTest implements DelayedManageableMailQueueContract {
+public class MemoryCacheableMailQueueTest implements DelayedManageableMailQueueContract {
 
-    private MemoryMailQueueFactory.MemoryMailQueue mailQueue;
+    private MemoryMailQueueFactory.MemoryCacheableMailQueue mailQueue;
 
     @BeforeEach
     public void setUp() {
-        mailQueue = new MemoryMailQueueFactory.MemoryMailQueue("test", new RawMailQueueItemDecoratorFactory());
+        mailQueue = new MemoryMailQueueFactory.MemoryCacheableMailQueue("test", new RawMailQueueItemDecoratorFactory());
     }
 
     @Override
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
index ece7018..64c0c93 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
@@ -21,6 +21,7 @@ package org.apache.james.queue.rabbitmq;
 
 import static org.apache.james.queue.api.MailQueue.DEQUEUED_METRIC_NAME_PREFIX;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -40,13 +41,14 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.rabbitmq.AcknowledgableDelivery;
 import reactor.rabbitmq.ConsumeOptions;
+import reactor.rabbitmq.Receiver;
 
-class Dequeuer {
+class Dequeuer implements Closeable {
     private static final boolean REQUEUE = true;
     private static final int EXECUTION_RATE = 5;
-    private final Flux<AcknowledgableDelivery> flux;
 
     private static class RabbitMQMailQueueItem implements MailQueue.MailQueueItem {
+
         private final Consumer<Boolean> ack;
         private final EnqueueId enqueueId;
         private final Mail mail;
@@ -70,12 +72,15 @@ class Dequeuer {
         public void done(boolean success) {
             ack.accept(success);
         }
+
     }
 
     private final Function<MailReferenceDTO, MailWithEnqueueId> mailLoader;
     private final Metric dequeueMetric;
     private final MailReferenceSerializer mailReferenceSerializer;
     private final MailQueueView mailQueueView;
+    private final Receiver receiver;
+    private final Flux<AcknowledgableDelivery> flux;
 
     Dequeuer(MailQueueName name, ReceiverProvider receiverProvider, Function<MailReferenceDTO, MailWithEnqueueId> mailLoader,
              MailReferenceSerializer serializer, MetricFactory metricFactory,
@@ -84,11 +89,17 @@ class Dequeuer {
         this.mailReferenceSerializer = serializer;
         this.mailQueueView = mailQueueView;
         this.dequeueMetric = metricFactory.generate(DEQUEUED_METRIC_NAME_PREFIX + name.asString());
-        this.flux = receiverProvider.createReceiver()
+        this.receiver = receiverProvider.createReceiver();
+        this.flux = this.receiver
             .consumeManualAck(name.toWorkQueueName().asString(), new ConsumeOptions().qos(EXECUTION_RATE))
             .filter(getResponse -> getResponse.getBody() != null);
     }
 
+    @Override
+    public void close() {
+        receiver.close();
+    }
+
     Flux<? extends MailQueue.MailQueueItem> deQueue() {
         return flux.concatMap(this::loadItem)
             .concatMap(this::filterIfDeleted);
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
index 2dc7663..b09fe30 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
@@ -58,6 +58,11 @@ public class RabbitMQMailQueue implements ManageableMailQueue {
     }
 
     @Override
+    public void close() {
+        dequeuer.close();
+    }
+
+    @Override
     public String getName() {
         return name.asString();
     }
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
index f771118..57bde2f 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
@@ -29,7 +29,6 @@ import static org.apache.james.queue.api.MailQueue.QUEUE_SIZE_METRIC_NAME_PREFIX
 import java.time.Clock;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Function;
 
 import javax.inject.Inject;
@@ -46,6 +45,7 @@ import org.apache.james.queue.api.MailQueueFactory;
 import org.apache.james.queue.api.MailQueueItemDecoratorFactory;
 import org.apache.james.queue.rabbitmq.view.RabbitMQMailQueueConfiguration;
 import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
+import org.apache.james.util.OptionalUtils;
 
 import com.github.fge.lambdas.Throwing;
 import com.google.common.annotations.VisibleForTesting;
@@ -119,26 +119,8 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
         }
     }
 
-    /**
-     * RabbitMQMailQueue should have a single instance in a given JVM for a given MailQueueName.
-     * This class helps at keeping track of previously instantiated MailQueues.
-     */
-    private class RabbitMQMailQueueObjectPool {
-
-        private final ConcurrentHashMap<MailQueueName, RabbitMQMailQueue> instantiatedQueues;
-
-        RabbitMQMailQueueObjectPool() {
-            this.instantiatedQueues = new ConcurrentHashMap<>();
-        }
-
-        RabbitMQMailQueue retrieveInstanceFor(MailQueueName name) {
-            return instantiatedQueues.computeIfAbsent(name, privateFactory::create);
-        }
-    }
-
     private final RabbitMQMailQueueManagement mqManagementApi;
     private final PrivateFactory privateFactory;
-    private final RabbitMQMailQueueObjectPool mailQueueObjectPool;
     private final Sender sender;
 
     @VisibleForTesting
@@ -149,7 +131,6 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
         this.sender = sender;
         this.mqManagementApi = mqManagementApi;
         this.privateFactory = privateFactory;
-        this.mailQueueObjectPool = new RabbitMQMailQueueObjectPool();
     }
 
     @Override
@@ -166,8 +147,9 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
 
     @Override
     public Set<RabbitMQMailQueue> listCreatedMailQueues() {
+        //TODO: it creates connections and leak them
         return mqManagementApi.listCreatedMailQueueNames()
-            .map(mailQueueObjectPool::retrieveInstanceFor)
+            .flatMap(name -> OptionalUtils.toStream(getQueue(name.asString())))
             .collect(ImmutableSet.toImmutableSet());
     }
 
@@ -188,13 +170,13 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
                 .routingKey(EMPTY_ROUTING_KEY)))
             .then()
             .block();
-        return mailQueueObjectPool.retrieveInstanceFor(mailQueueName);
+        return privateFactory.create(mailQueueName);
     }
 
     private Optional<RabbitMQMailQueue> getQueueFromRabbitServer(MailQueueName name) {
         return mqManagementApi.listCreatedMailQueueNames()
             .filter(name::equals)
-            .map(mailQueueObjectPool::retrieveInstanceFor)
+            .map(privateFactory::create)
             .findFirst();
     }
 }
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
index a717476..5651d02 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
@@ -91,21 +91,4 @@ class RabbitMqMailQueueFactoryTest implements MailQueueFactoryContract<RabbitMQM
         return mailQueueFactory;
     }
 
-    @Test
-    void createQueueShouldReturnTheSameInstanceWhenParallelCreateSameQueueName() throws Exception {
-        Set<RabbitMQMailQueue> createdRabbitMQMailQueues =  ConcurrentHashMap.newKeySet();
-
-        ConcurrentTestRunner.builder()
-            .operation((threadNumber, operationNumber) ->
-                createdRabbitMQMailQueues.add(mailQueueFactory.createQueue("spool")))
-            .threadCount(100)
-            .operationCount(10)
-            .runSuccessfullyWithin(Duration.ofMinutes(10));
-
-        assertThat(mailQueueFactory.listCreatedMailQueues())
-            .hasSize(1)
-            .isEqualTo(createdRabbitMQMailQueues)
-            .extracting(RabbitMQMailQueue::getName)
-            .hasOnlyOneElementSatisfying(queueName -> assertThat(queueName).isEqualTo("spool"));
-    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org