You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ma...@apache.org on 2020/03/17 09:35:34 UTC
[james-project] 01/04: JAMES-3070 Don't use cache for RabbitMQ
Mailqueue Consumer
This is an automated email from the ASF dual-hosted git repository.
matthieu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 84454ded73591f899b7166cd036da6e306f260ef
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Wed Feb 19 16:10:23 2020 +0100
JAMES-3070 Don't use cache for RabbitMQ Mailqueue Consumer
Make MailQueue Closeable to free resources when using non-cached
implementations.
For that we needed to take care about MailQueue lifecycle at every
usage place.
---
.../mailetcontainer/impl/JamesMailSpooler.java | 9 ++-
.../mailetcontainer/impl/JamesMailetContext.java | 15 ++++-
.../james/transport/mailets/RemoteDelivery.java | 6 ++
.../org/apache/james/fetchmail/FetchScheduler.java | 7 ++-
.../SetMessagesOutboxFlagUpdateTest.java | 4 ++
.../apache/james/jmap/draft/send/MailSpool.java | 19 ++++++-
.../apache/james/smtpserver/SendMailHandler.java | 11 ++++
.../apache/james/smtpserver/SMTPServerTest.java | 2 +-
.../james/webadmin/routes/MailQueueRoutes.java | 45 +++++++++------
.../james/webadmin/service/ClearMailQueueTask.java | 47 ++++++++++++----
.../webadmin/service/ClearMailQueueTaskDTO.java | 15 +++--
.../service/DeleteMailsFromMailQueueTask.java | 64 ++++++++++++++--------
.../service/DeleteMailsFromMailQueueTaskDTO.java | 12 ++--
.../james/webadmin/routes/MailQueueRoutesTest.java | 35 ++++++------
.../webadmin/service/ClearMailQueueTaskTest.java | 10 ++--
.../service/DeleteMailsFromMailQueueTaskTest.java | 21 ++++---
.../webadmin/service/ReprocessingService.java | 60 ++++++++++++--------
...lQueue.java => ActiveMQCacheableMailQueue.java} | 18 +++---
.../queue/activemq/ActiveMQMailQueueFactory.java | 6 +-
.../queue/activemq/ActiveMQMailQueueBlobTest.java | 4 +-
.../queue/activemq/ActiveMQMailQueueTest.java | 6 +-
.../java/org/apache/james/queue/api/MailQueue.java | 3 +-
...eMailQueue.java => FileCacheableMailQueue.java} | 13 +++--
.../james/queue/file/FileMailQueueFactory.java | 21 +++----
...java => FileCacheableMailQueueFactoryTest.java} | 2 +-
...ueTest.java => FileCacheableMailQueueTest.java} | 6 +-
...MSMailQueue.java => JMSCacheableMailQueue.java} | 18 ++++--
.../james/queue/jms/JMSMailQueueFactory.java | 6 +-
.../apache/james/queue/jms/JMSMailQueueItem.java | 6 +-
.../queue/library/AbstractMailQueueFactory.java | 10 ++--
....java => JMSCacheableMailQueueFactoryTest.java} | 2 +-
...eueTest.java => JMSCacheableMailQueueTest.java} | 8 +--
.../library/AbstractMailQueueFactoryTest.java | 2 +-
.../james/queue/memory/MemoryMailQueueFactory.java | 24 ++++----
...va => MemoryCacheableMailQueueFactoryTest.java} | 2 +-
...Test.java => MemoryCacheableMailQueueTest.java} | 6 +-
.../org/apache/james/queue/rabbitmq/Dequeuer.java | 17 +++++-
.../james/queue/rabbitmq/RabbitMQMailQueue.java | 5 ++
.../queue/rabbitmq/RabbitMQMailQueueFactory.java | 28 ++--------
.../rabbitmq/RabbitMqMailQueueFactoryTest.java | 17 ------
40 files changed, 370 insertions(+), 242 deletions(-)
diff --git a/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java b/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
index 29d5395..28383cd 100644
--- a/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
+++ b/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
@@ -21,6 +21,7 @@ package org.apache.james.mailetcontainer.impl;
import static org.apache.james.metrics.api.TimeMetric.ExecutionResult.DEFAULT_100_MS_THRESHOLD;
+import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
@@ -59,7 +60,6 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
private static final Logger LOGGER = LoggerFactory.getLogger(JamesMailSpooler.class);
public static final String SPOOL_PROCESSING = "spoolProcessing";
- private MailQueue queue;
/**
* The number of threads used to move mail through the spool.
@@ -79,6 +79,8 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
private reactor.core.Disposable disposable;
private Scheduler spooler;
private int parallelismLevel;
+ private MailQueue queue;
+
@Inject
public JamesMailSpooler(MetricFactory metricFactory, MailProcessor mailProcessor, MailQueueFactory<?> queueFactory) {
@@ -173,6 +175,11 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
LOGGER.info("start dispose() ...");
disposable.dispose();
spooler.dispose();
+ try {
+ queue.close();
+ } catch (IOException e) {
+ LOGGER.debug("error closing queue", e);
+ }
LOGGER.info("thread shutdown completed.");
}
diff --git a/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailetContext.java b/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailetContext.java
index bceea55..e1c3483 100644
--- a/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailetContext.java
+++ b/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailetContext.java
@@ -19,6 +19,7 @@
package org.apache.james.mailetcontainer.impl;
+import java.io.IOException;
import java.util.Collection;
import java.util.Date;
import java.util.HashSet;
@@ -30,6 +31,7 @@ import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
+import javax.annotation.PreDestroy;
import javax.inject.Inject;
import javax.mail.Address;
import javax.mail.Message;
@@ -50,6 +52,7 @@ import org.apache.james.dnsservice.library.MXHostAddressIterator;
import org.apache.james.domainlist.api.DomainList;
import org.apache.james.domainlist.api.DomainListException;
import org.apache.james.lifecycle.api.Configurable;
+import org.apache.james.lifecycle.api.Disposable;
import org.apache.james.lifecycle.api.LifecycleUtil;
import org.apache.james.queue.api.MailQueue;
import org.apache.james.queue.api.MailQueueFactory;
@@ -66,7 +69,7 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
-public class JamesMailetContext implements MailetContext, Configurable {
+public class JamesMailetContext implements MailetContext, Configurable, Disposable {
private static final Logger LOGGER = LoggerFactory.getLogger(JamesMailetContext.class);
/**
@@ -89,6 +92,16 @@ public class JamesMailetContext implements MailetContext, Configurable {
this.mailQueueFactory = mailQueueFactory;
}
+ @PreDestroy
+ @Override
+ public void dispose() {
+ try {
+ rootMailQueue.close();
+ } catch (IOException e) {
+ LOGGER.debug("error closing queue", e);
+ }
+ }
+
@Override
public Collection<String> getMailServers(Domain host) {
try {
diff --git a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java
index ab597e9..5cb3889 100644
--- a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java
+++ b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java
@@ -19,6 +19,7 @@
package org.apache.james.transport.mailets;
+import java.io.IOException;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.Map;
@@ -258,6 +259,11 @@ public class RemoteDelivery extends GenericMailet {
if (startThreads == ThreadState.START_THREADS) {
deliveryRunnable.dispose();
}
+ try {
+ queue.close();
+ } catch (IOException e) {
+ LOGGER.debug("error closing queue", e);
+ }
}
}
diff --git a/server/protocols/fetchmail/src/main/java/org/apache/james/fetchmail/FetchScheduler.java b/server/protocols/fetchmail/src/main/java/org/apache/james/fetchmail/FetchScheduler.java
index a8175b6..869e5c1 100644
--- a/server/protocols/fetchmail/src/main/java/org/apache/james/fetchmail/FetchScheduler.java
+++ b/server/protocols/fetchmail/src/main/java/org/apache/james/fetchmail/FetchScheduler.java
@@ -19,6 +19,7 @@
package org.apache.james.fetchmail;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
@@ -67,6 +68,7 @@ public class FetchScheduler implements FetchSchedulerMBean, Configurable {
private MailQueueFactory<?> queueFactory;
private DomainList domainList;
+ private MailQueue queue;
@Inject
public void setMailQueueFactory(MailQueueFactory<?> queueFactory) {
@@ -105,7 +107,7 @@ public class FetchScheduler implements FetchSchedulerMBean, Configurable {
The scheduler service that is used to trigger fetch tasks.
*/
ScheduledExecutorService scheduler = new JMXEnabledScheduledThreadPoolExecutor(numThreads, jmxPath, "scheduler");
- MailQueue queue = queueFactory.createQueue(MailQueueFactory.SPOOL);
+ queue = queueFactory.createQueue(MailQueueFactory.SPOOL);
List<HierarchicalConfiguration<ImmutableNode>> fetchConfs = conf.configurationsAt("fetch");
for (HierarchicalConfiguration<ImmutableNode> fetchConf : fetchConfs) {
@@ -132,9 +134,10 @@ public class FetchScheduler implements FetchSchedulerMBean, Configurable {
}
@PreDestroy
- public void dispose() {
+ public void dispose() throws IOException {
if (enabled) {
LOGGER.info("FetchMail dispose...");
+ queue.close();
for (ScheduledFuture<?> scheduler1 : schedulers) {
scheduler1.cancel(false);
}
diff --git a/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/java/org/apache/james/jmap/draft/methods/integration/SetMessagesOutboxFlagUpdateTest.java b/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/java/org/apache/james/jmap/draft/methods/integration/SetMessagesOutboxFlagUpdateTest.java
index e9141ea..db8ea1e 100644
--- a/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/java/org/apache/james/jmap/draft/methods/integration/SetMessagesOutboxFlagUpdateTest.java
+++ b/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/java/org/apache/james/jmap/draft/methods/integration/SetMessagesOutboxFlagUpdateTest.java
@@ -80,6 +80,10 @@ public abstract class SetMessagesOutboxFlagUpdateTest {
public MailQueue createQueue(String name) {
return new MailQueue() {
@Override
+ public void close() throws IOException {
+ }
+
+ @Override
public String getName() {
return name;
}
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/MailSpool.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/MailSpool.java
index c652bc9..afda197 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/MailSpool.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/MailSpool.java
@@ -19,8 +19,12 @@
package org.apache.james.jmap.draft.send;
+import java.io.IOException;
+
+import javax.annotation.PreDestroy;
import javax.inject.Inject;
+import org.apache.james.lifecycle.api.Disposable;
import org.apache.james.lifecycle.api.Startable;
import org.apache.james.queue.api.MailQueue;
import org.apache.james.queue.api.MailQueue.MailQueueException;
@@ -28,10 +32,14 @@ import org.apache.james.queue.api.MailQueueFactory;
import org.apache.mailet.Attribute;
import org.apache.mailet.AttributeValue;
import org.apache.mailet.Mail;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
-public class MailSpool implements Startable {
+public class MailSpool implements Startable, Disposable {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(MailSpool.class);
private final MailQueueFactory<?> queueFactory;
private MailQueue queue;
@@ -45,6 +53,15 @@ public class MailSpool implements Startable {
queue = queueFactory.createQueue(MailQueueFactory.SPOOL);
}
+ @PreDestroy
+ public void dispose() {
+ try {
+ queue.close();
+ } catch (IOException e) {
+ LOGGER.debug("error closing queue", e);
+ }
+ }
+
public void send(Mail mail, MailMetadata metadata) throws MailQueueException {
mail.setAttribute(new Attribute(MailMetadata.MAIL_METADATA_MESSAGE_ID_ATTRIBUTE, AttributeValue.of(metadata.getMessageId().serialize())));
mail.setAttribute(new Attribute(MailMetadata.MAIL_METADATA_USERNAME_ATTRIBUTE, AttributeValue.of(metadata.getUsername())));
diff --git a/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/SendMailHandler.java b/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/SendMailHandler.java
index 4ef8201..ea82d37 100644
--- a/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/SendMailHandler.java
+++ b/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/SendMailHandler.java
@@ -19,6 +19,8 @@
package org.apache.james.smtpserver;
+import java.io.IOException;
+
import javax.inject.Inject;
import javax.mail.MessagingException;
@@ -52,6 +54,15 @@ public class SendMailHandler implements JamesMessageHook {
queue = queueFactory.createQueue(MailQueueFactory.SPOOL);
}
+ @Override
+ public void destroy() {
+ try {
+ queue.close();
+ } catch (IOException e) {
+ LOGGER.debug("error close queue", e);
+ }
+ }
+
/**
* Adds header to the message
*/
diff --git a/server/protocols/protocols-smtp/src/test/java/org/apache/james/smtpserver/SMTPServerTest.java b/server/protocols/protocols-smtp/src/test/java/org/apache/james/smtpserver/SMTPServerTest.java
index 8b4b19d..e3ae55e 100644
--- a/server/protocols/protocols-smtp/src/test/java/org/apache/james/smtpserver/SMTPServerTest.java
+++ b/server/protocols/protocols-smtp/src/test/java/org/apache/james/smtpserver/SMTPServerTest.java
@@ -199,7 +199,7 @@ public class SMTPServerTest {
protected Configuration configuration;
protected MockProtocolHandlerLoader chain;
protected MemoryMailQueueFactory queueFactory;
- protected MemoryMailQueueFactory.MemoryMailQueue queue;
+ protected MemoryMailQueueFactory.MemoryCacheableMailQueue queue;
protected MemoryRecipientRewriteTable rewriteTable;
private AliasReverseResolver aliasReverseResolver;
protected CanSendFrom canSendFrom;
diff --git a/server/protocols/webadmin/webadmin-mailqueue/src/main/java/org/apache/james/webadmin/routes/MailQueueRoutes.java b/server/protocols/webadmin/webadmin-mailqueue/src/main/java/org/apache/james/webadmin/routes/MailQueueRoutes.java
index c2f6064..0338071 100644
--- a/server/protocols/webadmin/webadmin-mailqueue/src/main/java/org/apache/james/webadmin/routes/MailQueueRoutes.java
+++ b/server/protocols/webadmin/webadmin-mailqueue/src/main/java/org/apache/james/webadmin/routes/MailQueueRoutes.java
@@ -21,6 +21,8 @@ package org.apache.james.webadmin.routes;
import static org.apache.james.webadmin.Constants.SEPARATOR;
+import java.io.IOException;
+import java.io.UncheckedIOException;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.Optional;
@@ -32,6 +34,7 @@ import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import org.apache.james.core.MailAddress;
+import org.apache.james.queue.api.MailQueue;
import org.apache.james.queue.api.MailQueue.MailQueueException;
import org.apache.james.queue.api.MailQueueFactory;
import org.apache.james.queue.api.ManageableMailQueue;
@@ -223,7 +226,7 @@ public class MailQueueRoutes implements Routes {
private List<MailQueueItemDTO> listMails(Request request) {
String mailQueueName = request.params(MAIL_QUEUE_NAME);
return mailQueueFactory.getQueue(mailQueueName)
- .map(name -> listMails(name, isDelayed(request.queryParams(DELAYED_QUERY_PARAM)), ParametersExtractor.extractLimit(request)))
+ .map(queue -> listMails(queue, isDelayed(request.queryParams(DELAYED_QUERY_PARAM)), ParametersExtractor.extractLimit(request)))
.orElseThrow(
() -> ErrorResponder.builder()
.message("%s can not be found", mailQueueName)
@@ -238,7 +241,7 @@ public class MailQueueRoutes implements Routes {
}
private List<MailQueueItemDTO> listMails(ManageableMailQueue queue, Optional<Boolean> isDelayed, Limit limit) {
- try {
+ try (MailQueue closeable = queue) {
return limit.applyOnStream(Iterators.toStream(queue.browse()))
.map(Throwing.function(MailQueueItemDTO::from).sneakyThrow())
.filter(item -> filter(item, isDelayed))
@@ -250,6 +253,8 @@ public class MailQueueRoutes implements Routes {
.message("Invalid request for listing the mails from the mail queue %s", queue)
.cause(e)
.haltError();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
}
}
@@ -304,17 +309,10 @@ public class MailQueueRoutes implements Routes {
private Task deleteMails(Request request) {
String mailQueueName = request.params(MAIL_QUEUE_NAME);
- return mailQueueFactory.getQueue(mailQueueName)
- .map(name -> deleteMailsTask(name,
+ return deleteMailsTask(mailQueueName,
sender(request.queryParams(SENDER_QUERY_PARAM)),
name(request.queryParams(NAME_QUERY_PARAM)),
- recipient(request.queryParams(RECIPIENT_QUERY_PARAM))))
- .orElseThrow(
- () -> ErrorResponder.builder()
- .message("%s can not be found", mailQueueName)
- .statusCode(HttpStatus.NOT_FOUND_404)
- .type(ErrorResponder.ErrorType.NOT_FOUND)
- .haltError());
+ recipient(request.queryParams(RECIPIENT_QUERY_PARAM)));
}
private Optional<MailAddress> sender(String senderAsString) throws HaltException {
@@ -383,10 +381,11 @@ public class MailQueueRoutes implements Routes {
private String forceDelayedMailsDelivery(Request request, Response response) throws JsonExtractException, MailQueueException {
assertDelayedParamIsTrue(request);
assertPayloadContainsDelayedEntry(request);
- ManageableMailQueue mailQueue = assertMailQueueExists(request);
-
- mailQueue.flush();
-
+ try (ManageableMailQueue mailQueue = assertMailQueueExists(request)) {
+ mailQueue.flush();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
return Responses.returnNoContent(response);
}
@@ -421,13 +420,13 @@ public class MailQueueRoutes implements Routes {
}
}
- private Task deleteMailsTask(ManageableMailQueue queue, Optional<MailAddress> maybeSender, Optional<String> maybeName, Optional<MailAddress> maybeRecipient) {
+ private Task deleteMailsTask(String queueName, Optional<MailAddress> maybeSender, Optional<String> maybeName, Optional<MailAddress> maybeRecipient) {
int paramCount = Booleans.countTrue(maybeSender.isPresent(), maybeName.isPresent(), maybeRecipient.isPresent());
switch (paramCount) {
case 0:
- return new ClearMailQueueTask(queue);
+ return new ClearMailQueueTask(queueName, this::getQueueOrThrow);
case 1:
- return new DeleteMailsFromMailQueueTask(queue, maybeSender, maybeName, maybeRecipient);
+ return new DeleteMailsFromMailQueueTask(queueName, this::getQueueOrThrow, maybeSender, maybeName, maybeRecipient);
default:
throw ErrorResponder.builder()
.statusCode(HttpStatus.BAD_REQUEST_400)
@@ -438,6 +437,16 @@ public class MailQueueRoutes implements Routes {
}
}
+ private ManageableMailQueue getQueueOrThrow(String queueName) {
+ return mailQueueFactory.getQueue(queueName)
+ .orElseThrow(
+ () -> ErrorResponder.builder()
+ .message("%s can not be found", queueName)
+ .statusCode(HttpStatus.NOT_FOUND_404)
+ .type(ErrorResponder.ErrorType.NOT_FOUND)
+ .haltError());
+ }
+
private void assertDelayedParamIsTrue(Request request) {
if (!isDelayed(request.queryParams(DELAYED_QUERY_PARAM)).orElse(false)) {
throw ErrorResponder.builder()
diff --git a/server/protocols/webadmin/webadmin-mailqueue/src/main/java/org/apache/james/webadmin/service/ClearMailQueueTask.java b/server/protocols/webadmin/webadmin-mailqueue/src/main/java/org/apache/james/webadmin/service/ClearMailQueueTask.java
index ff4d777..ab9e25b 100644
--- a/server/protocols/webadmin/webadmin-mailqueue/src/main/java/org/apache/james/webadmin/service/ClearMailQueueTask.java
+++ b/server/protocols/webadmin/webadmin-mailqueue/src/main/java/org/apache/james/webadmin/service/ClearMailQueueTask.java
@@ -19,6 +19,7 @@
package org.apache.james.webadmin.service;
+import java.io.IOException;
import java.time.Clock;
import java.time.Instant;
import java.util.Optional;
@@ -31,9 +32,12 @@ import org.apache.james.task.TaskType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
+
public class ClearMailQueueTask implements Task {
public static class AdditionalInformation implements TaskExecutionDetails.AdditionalInformation {
+
private final String mailQueueName;
private final long initialCount;
private final long remainingCount;
@@ -62,30 +66,44 @@ public class ClearMailQueueTask implements Task {
public Instant timestamp() {
return timestamp;
}
+
}
public static class UnknownSerializedQueue extends RuntimeException {
+
public UnknownSerializedQueue(String queueName) {
super("Unable to retrieve '" + queueName + "' queue");
}
}
+ @FunctionalInterface
+ public interface MailQueueFactory {
+ ManageableMailQueue create(String mailQueueName);
+ }
+
private static final Logger LOGGER = LoggerFactory.getLogger(ClearMailQueueTask.class);
public static final TaskType TYPE = TaskType.of("clear-mail-queue");
- private final ManageableMailQueue queue;
- private final long initialCount;
+ private final String queueName;
+ private final ClearMailQueueTask.MailQueueFactory factory;
+ private Optional<Long> initialCount;
+ private Optional<ManageableMailQueue> queue;
- public ClearMailQueueTask(ManageableMailQueue queue) {
- this.queue = queue;
- initialCount = getRemainingSize();
+ public ClearMailQueueTask(String queueName, ClearMailQueueTask.MailQueueFactory factory) {
+ this.queueName = queueName;
+ this.factory = factory;
+ this.initialCount = Optional.empty();
+ this.queue = Optional.empty();
}
@Override
public Result run() {
- try {
+ try (ManageableMailQueue queue = factory.create(queueName)) {
+ this.initialCount = Optional.of(getRemainingSize(queue));
+ this.queue = Optional.of(queue);
queue.clear();
- } catch (MailQueue.MailQueueException e) {
+ this.queue = Optional.empty();
+ } catch (MailQueue.MailQueueException | IOException e) {
LOGGER.error("Clear MailQueue got an exception", e);
return Result.PARTIAL;
}
@@ -100,16 +118,21 @@ public class ClearMailQueueTask implements Task {
@Override
public Optional<TaskExecutionDetails.AdditionalInformation> details() {
- return Optional.of(new AdditionalInformation(queue.getName(), initialCount, getRemainingSize(), Clock.systemUTC().instant()));
+ return queue.map(q -> new AdditionalInformation(queueName, initialCount.get(), getRemainingSize(q), Clock.systemUTC().instant()));
+ }
+
+ String getQueueName() {
+ return queueName;
}
- ManageableMailQueue getQueue() {
- return queue;
+ @VisibleForTesting
+ Optional<Long> initialCount() {
+ return initialCount;
}
- private long getRemainingSize() {
+ private long getRemainingSize(ManageableMailQueue mailQueue) {
try {
- return queue.getSize();
+ return mailQueue.getSize();
} catch (MailQueue.MailQueueException e) {
throw new RuntimeException(e);
}
diff --git a/server/protocols/webadmin/webadmin-mailqueue/src/main/java/org/apache/james/webadmin/service/ClearMailQueueTaskDTO.java b/server/protocols/webadmin/webadmin-mailqueue/src/main/java/org/apache/james/webadmin/service/ClearMailQueueTaskDTO.java
index 8ba59f4..f4af298 100644
--- a/server/protocols/webadmin/webadmin-mailqueue/src/main/java/org/apache/james/webadmin/service/ClearMailQueueTaskDTO.java
+++ b/server/protocols/webadmin/webadmin-mailqueue/src/main/java/org/apache/james/webadmin/service/ClearMailQueueTaskDTO.java
@@ -39,19 +39,22 @@ public class ClearMailQueueTaskDTO implements TaskDTO {
}
public static ClearMailQueueTaskDTO toDTO(ClearMailQueueTask domainObject, String typeName) {
- return new ClearMailQueueTaskDTO(typeName, domainObject.getQueue().getName());
+ return new ClearMailQueueTaskDTO(typeName, domainObject.getQueueName());
}
private final String type;
- private final String queue;
+ private final String queueName;
- public ClearMailQueueTaskDTO(@JsonProperty("type") String type, @JsonProperty("queue") String queue) {
+ public ClearMailQueueTaskDTO(@JsonProperty("type") String type, @JsonProperty("queue") String queueName) {
this.type = type;
- this.queue = queue;
+ this.queueName = queueName;
}
public ClearMailQueueTask fromDTO(MailQueueFactory<? extends ManageableMailQueue> mailQueueFactory) {
- return new ClearMailQueueTask(mailQueueFactory.getQueue(queue).orElseThrow(() -> new ClearMailQueueTask.UnknownSerializedQueue(queue)));
+ return new ClearMailQueueTask(queueName,
+ name -> mailQueueFactory
+ .getQueue(name)
+ .orElseThrow(() -> new ClearMailQueueTask.UnknownSerializedQueue(queueName)));
}
@Override
@@ -60,6 +63,6 @@ public class ClearMailQueueTaskDTO implements TaskDTO {
}
public String getQueue() {
- return queue;
+ return queueName;
}
}
diff --git a/server/protocols/webadmin/webadmin-mailqueue/src/main/java/org/apache/james/webadmin/service/DeleteMailsFromMailQueueTask.java b/server/protocols/webadmin/webadmin-mailqueue/src/main/java/org/apache/james/webadmin/service/DeleteMailsFromMailQueueTask.java
index e55b7d6..0a1a184 100644
--- a/server/protocols/webadmin/webadmin-mailqueue/src/main/java/org/apache/james/webadmin/service/DeleteMailsFromMailQueueTask.java
+++ b/server/protocols/webadmin/webadmin-mailqueue/src/main/java/org/apache/james/webadmin/service/DeleteMailsFromMailQueueTask.java
@@ -19,6 +19,8 @@
package org.apache.james.webadmin.service;
+import java.io.IOException;
+import java.io.UncheckedIOException;
import java.time.Clock;
import java.time.Instant;
import java.util.Optional;
@@ -95,38 +97,60 @@ public class DeleteMailsFromMailQueueTask implements Task {
}
}
+ @FunctionalInterface
+ public interface MailQueueFactory {
+ ManageableMailQueue create(String mailQueueName);
+ }
+
public static final TaskType TYPE = TaskType.of("delete-mails-from-mail-queue");
- private final ManageableMailQueue queue;
private final Optional<MailAddress> maybeSender;
private final Optional<String> maybeName;
private final Optional<MailAddress> maybeRecipient;
+ private final MailQueueFactory factory;
+ private final String queueName;
+ private Optional<Long> initialCount;
+ private Optional<ManageableMailQueue> queue;
- private final long initialCount;
-
- public DeleteMailsFromMailQueueTask(ManageableMailQueue queue, Optional<MailAddress> maybeSender,
+ public DeleteMailsFromMailQueueTask(String queueName, MailQueueFactory factory,
+ Optional<MailAddress> maybeSender,
Optional<String> maybeName, Optional<MailAddress> maybeRecipient) {
+ this.factory = factory;
+ this.queueName = queueName;
Preconditions.checkArgument(
Booleans.countTrue(maybeSender.isPresent(), maybeName.isPresent(), maybeRecipient.isPresent()) == 1,
"You should provide one and only one of the query parameters 'sender', 'name' or 'recipient'.");
- this.queue = queue;
this.maybeSender = maybeSender;
this.maybeName = maybeName;
this.maybeRecipient = maybeRecipient;
- this.initialCount = getRemainingSize();
+ this.initialCount = Optional.empty();
+ this.queue = Optional.empty();
+
}
@Override
public Result run() {
- maybeSender.ifPresent(Throwing.consumer(
- (MailAddress sender) -> queue.remove(ManageableMailQueue.Type.Sender, sender.asString())));
- maybeName.ifPresent(Throwing.consumer(
- (String name) -> queue.remove(ManageableMailQueue.Type.Name, name)));
- maybeRecipient.ifPresent(Throwing.consumer(
- (MailAddress recipient) -> queue.remove(ManageableMailQueue.Type.Recipient, recipient.asString())));
-
- return Result.COMPLETED;
+ try (ManageableMailQueue queue = factory.create(queueName)) {
+ this.initialCount = Optional.of(getRemainingSize(queue));
+ this.queue = Optional.of(queue);
+ maybeSender.ifPresent(Throwing.consumer(
+ (MailAddress sender) -> queue.remove(ManageableMailQueue.Type.Sender, sender.asString())));
+ maybeName.ifPresent(Throwing.consumer(
+ (String name) -> queue.remove(ManageableMailQueue.Type.Name, name)));
+ maybeRecipient.ifPresent(Throwing.consumer(
+ (MailAddress recipient) -> queue.remove(ManageableMailQueue.Type.Recipient, recipient.asString())));
+
+ this.queue = Optional.empty();
+
+ return Result.COMPLETED;
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ public String getQueueName() {
+ return queueName;
}
@Override
@@ -134,10 +158,6 @@ public class DeleteMailsFromMailQueueTask implements Task {
return TYPE;
}
- ManageableMailQueue getQueue() {
- return queue;
- }
-
Optional<String> getMaybeName() {
return maybeName;
}
@@ -152,13 +172,13 @@ public class DeleteMailsFromMailQueueTask implements Task {
@Override
public Optional<TaskExecutionDetails.AdditionalInformation> details() {
- return Optional.of(new AdditionalInformation(queue.getName(),
- initialCount,
- getRemainingSize(), maybeSender,
+ return this.queue.map(queue -> new AdditionalInformation(queueName,
+ initialCount.get(),
+ getRemainingSize(queue), maybeSender,
maybeName, maybeRecipient, Clock.systemUTC().instant()));
}
- public long getRemainingSize() {
+ public long getRemainingSize(ManageableMailQueue queue) {
try {
return queue.getSize();
} catch (MailQueue.MailQueueException e) {
diff --git a/server/protocols/webadmin/webadmin-mailqueue/src/main/java/org/apache/james/webadmin/service/DeleteMailsFromMailQueueTaskDTO.java b/server/protocols/webadmin/webadmin-mailqueue/src/main/java/org/apache/james/webadmin/service/DeleteMailsFromMailQueueTaskDTO.java
index e5dfa9e..12f3485 100644
--- a/server/protocols/webadmin/webadmin-mailqueue/src/main/java/org/apache/james/webadmin/service/DeleteMailsFromMailQueueTaskDTO.java
+++ b/server/protocols/webadmin/webadmin-mailqueue/src/main/java/org/apache/james/webadmin/service/DeleteMailsFromMailQueueTaskDTO.java
@@ -27,7 +27,7 @@ public class DeleteMailsFromMailQueueTaskDTO implements TaskDTO {
public static DeleteMailsFromMailQueueTaskDTO toDTO(DeleteMailsFromMailQueueTask domainObject, String typeName) {
return new DeleteMailsFromMailQueueTaskDTO(
typeName,
- domainObject.getQueue().getName(),
+ domainObject.getQueueName(),
domainObject.getMaybeSender().map(MailAddress::asString),
domainObject.getMaybeName(),
domainObject.getMaybeRecipient().map(MailAddress::asString)
@@ -35,18 +35,18 @@ public class DeleteMailsFromMailQueueTaskDTO implements TaskDTO {
}
private final String type;
- private final String queue;
+ private final String queueName;
private final Optional<String> sender;
private final Optional<String> name;
private final Optional<String> recipient;
public DeleteMailsFromMailQueueTaskDTO(@JsonProperty("type") String type,
- @JsonProperty("queue") String queue,
+ @JsonProperty("queue") String queueName,
@JsonProperty("sender") Optional<String> sender,
@JsonProperty("name") Optional<String> name,
@JsonProperty("recipient") Optional<String> recipient) {
this.type = type;
- this.queue = queue;
+ this.queueName = queueName;
this.sender = sender;
this.name = name;
this.recipient = recipient;
@@ -54,7 +54,7 @@ public class DeleteMailsFromMailQueueTaskDTO implements TaskDTO {
public DeleteMailsFromMailQueueTask fromDTO(MailQueueFactory<? extends ManageableMailQueue> mailQueueFactory) {
return new DeleteMailsFromMailQueueTask(
- mailQueueFactory.getQueue(queue).orElseThrow(() -> new DeleteMailsFromMailQueueTask.UnknownSerializedQueue(queue)),
+ queueName, name -> mailQueueFactory.getQueue(name).orElseThrow(() -> new DeleteMailsFromMailQueueTask.UnknownSerializedQueue(queueName)),
sender.map(Throwing.<String, MailAddress>function(MailAddress::new).sneakyThrow()),
name,
recipient.map(Throwing.<String, MailAddress>function(MailAddress::new).sneakyThrow())
@@ -67,7 +67,7 @@ public class DeleteMailsFromMailQueueTaskDTO implements TaskDTO {
}
public String getQueue() {
- return queue;
+ return queueName;
}
public Optional<String> getSender() {
diff --git a/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/routes/MailQueueRoutesTest.java b/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/routes/MailQueueRoutesTest.java
index 2cd4f49..f40f1ac 100644
--- a/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/routes/MailQueueRoutesTest.java
+++ b/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/routes/MailQueueRoutesTest.java
@@ -42,7 +42,6 @@ import org.apache.james.queue.api.Mails;
import org.apache.james.queue.api.ManageableMailQueue;
import org.apache.james.queue.api.RawMailQueueItemDecoratorFactory;
import org.apache.james.queue.memory.MemoryMailQueueFactory;
-import org.apache.james.queue.memory.MemoryMailQueueFactory.MemoryMailQueue;
import org.apache.james.task.Hostname;
import org.apache.james.task.MemoryTaskManager;
import org.apache.james.task.TaskManager;
@@ -229,7 +228,7 @@ class MailQueueRoutesTest {
@Test
public void listMailsShouldReturnMailsWhenSome() throws Exception {
- MemoryMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
+ MemoryMailQueueFactory.MemoryCacheableMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
queue.enQueue(Mails.defaultMail().name("name").build());
queue.enQueue(Mails.defaultMail().name("name").build());
@@ -243,7 +242,7 @@ class MailQueueRoutesTest {
@Test
public void listMailsShouldReturnMailDetailsWhenSome() throws Exception {
- MemoryMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
+ MemoryMailQueueFactory.MemoryCacheableMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
FakeMail mail = Mails.defaultMail().name("name").build();
queue.enQueue(mail);
@@ -265,7 +264,7 @@ class MailQueueRoutesTest {
@Test
public void listMailsShouldReturnEmptyWhenNoDelayedMailsAndAskFor() throws Exception {
- MemoryMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
+ MemoryMailQueueFactory.MemoryCacheableMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
FakeMail mail = Mails.defaultMail().name("name").build();
queue.enQueue(mail);
@@ -281,7 +280,7 @@ class MailQueueRoutesTest {
@Test
public void listMailsShouldReturnCurrentMailsWhenMailsAndAskForNotDelayed() throws Exception {
- MemoryMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
+ MemoryMailQueueFactory.MemoryCacheableMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
FakeMail mail = Mails.defaultMail().name("name").build();
queue.enQueue(mail);
@@ -297,7 +296,7 @@ class MailQueueRoutesTest {
@Test
public void listMailsShouldReturnDelayedMailsWhenAskFor() throws Exception {
- MemoryMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
+ MemoryMailQueueFactory.MemoryCacheableMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
FakeMail mail = Mails.defaultMail().name("name").build();
queue.enQueue(mail, 10, TimeUnit.MINUTES);
@@ -313,7 +312,7 @@ class MailQueueRoutesTest {
@Test
public void listMailsShouldReturnOneMailWhenMailsAndAskForALimitOfOne() throws Exception {
- MemoryMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
+ MemoryMailQueueFactory.MemoryCacheableMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
FakeMail mail = Mails.defaultMail().name("name").build();
queue.enQueue(mail);
queue.enQueue(mail);
@@ -336,7 +335,7 @@ class MailQueueRoutesTest {
@Test
public void getMailQueueShouldReturnTheMailQueueDataWhenMailQueueExists() throws Exception {
- MemoryMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
+ MemoryMailQueueFactory.MemoryCacheableMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
queue.enQueue(Mails.defaultMail().name("name").build());
when()
@@ -509,7 +508,7 @@ class MailQueueRoutesTest {
class SideEffects {
@Test
public void forcingDelayedMailsDeliveryShouldActuallyChangePropertyOnMails() throws Exception {
- MemoryMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
+ MemoryMailQueueFactory.MemoryCacheableMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
FakeMail mail = Mails.defaultMail().name("name").build();
queue.enQueue(mail, 10L, TimeUnit.MINUTES);
queue.enQueue(mail, 10L, TimeUnit.MINUTES);
@@ -656,7 +655,7 @@ class MailQueueRoutesTest {
@Test
public void deleteMailsTasksShouldHaveDetailsWhenSenderIsGiven() throws Exception {
- MemoryMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
+ MemoryMailQueueFactory.MemoryCacheableMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
queue.enQueue(FakeMail.builder()
.name(FAKE_MAIL_NAME_1)
@@ -693,7 +692,7 @@ class MailQueueRoutesTest {
@Test
public void deleteMailsTasksShouldHaveDetailsWhenNameIsGiven() throws Exception {
- MemoryMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
+ MemoryMailQueueFactory.MemoryCacheableMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
queue.enQueue(FakeMail.builder()
.name(FAKE_MAIL_NAME_1)
@@ -728,7 +727,7 @@ class MailQueueRoutesTest {
@Test
public void deleteMailsTasksShouldHaveDetailsWhenRecipientIsGiven() throws Exception {
- MemoryMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
+ MemoryMailQueueFactory.MemoryCacheableMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
queue.enQueue(FakeMail.builder()
.name(FAKE_MAIL_NAME_1)
@@ -774,7 +773,7 @@ class MailQueueRoutesTest {
@Test
public void deleteMailsShouldDeleteMailsWhenSenderIsGiven() throws Exception {
- MemoryMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
+ MemoryMailQueueFactory.MemoryCacheableMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
queue.enQueue(FakeMail.builder()
.name(FAKE_MAIL_NAME_1)
@@ -808,7 +807,7 @@ class MailQueueRoutesTest {
@Test
public void deleteMailsShouldDeleteMailsWhenNameIsGiven() throws Exception {
- MemoryMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
+ MemoryMailQueueFactory.MemoryCacheableMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
queue.enQueue(FakeMail.builder()
.name(FAKE_MAIL_NAME_1)
@@ -840,7 +839,7 @@ class MailQueueRoutesTest {
@Test
public void deleteMailsShouldDeleteMailsWhenRecipientIsGiven() throws Exception {
- MemoryMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
+ MemoryMailQueueFactory.MemoryCacheableMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
queue.enQueue(FakeMail.builder()
.name(FAKE_MAIL_NAME_1)
@@ -880,7 +879,7 @@ class MailQueueRoutesTest {
@Test
public void deleteMailsShouldDeleteMailsWhenTheyAreMatching() throws Exception {
- MemoryMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
+ MemoryMailQueueFactory.MemoryCacheableMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
String recipient = "recipient@james.org";
queue.enQueue(Mails.defaultMail()
.name("name")
@@ -945,7 +944,7 @@ class MailQueueRoutesTest {
@Test
public void clearMailQueueShouldHaveDetailsWhenNoQueryParameters() throws Exception {
- MemoryMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
+ MemoryMailQueueFactory.MemoryCacheableMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
queue.enQueue(FakeMail.builder()
.name(FAKE_MAIL_NAME_1)
@@ -982,7 +981,7 @@ class MailQueueRoutesTest {
@Test
void clearMailQueueShouldDeleteAllMailsInQueueWhenNoQueryParameters() throws Exception {
- MemoryMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
+ MemoryMailQueueFactory.MemoryCacheableMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
queue.enQueue(FakeMail.builder()
.name(FAKE_MAIL_NAME_1)
diff --git a/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/service/ClearMailQueueTaskTest.java b/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/service/ClearMailQueueTaskTest.java
index 6d04e01..94b4432 100644
--- a/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/service/ClearMailQueueTaskTest.java
+++ b/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/service/ClearMailQueueTaskTest.java
@@ -17,6 +17,7 @@
package org.apache.james.webadmin.service;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
@@ -46,23 +47,24 @@ class ClearMailQueueTaskTest {
when(mockedQueue.getName()).thenReturn(QUEUE_NAME);
when(mailQueueFactory.getQueue(anyString())).thenAnswer(arg -> Optional.of(mockedQueue));
- ManageableMailQueue queue = mailQueueFactory.getQueue(QUEUE_NAME).get();
- ClearMailQueueTask task = new ClearMailQueueTask(queue);
+ ClearMailQueueTask.MailQueueFactory factory = queueName -> mailQueueFactory.getQueue(queueName).orElseThrow(RuntimeException::new);
+ ClearMailQueueTask task = new ClearMailQueueTask(QUEUE_NAME, factory);
JsonSerializationVerifier.dtoModule(ClearMailQueueTaskDTO.module(mailQueueFactory))
.bean(task)
.json(SERIALIZED)
+ .equalityTester((a, b) -> assertThat(a.getQueueName()).isEqualTo(b.getQueueName()))
.verify();
}
@Test
- void taskShouldThrowWhenDeserializeAnUnknownQueue() {
+ void taskShouldThrowWhenRunOnAnUnknownQueue() {
MailQueueFactory<ManageableMailQueue> mailQueueFactory = mock(MailQueueFactory.class);
when(mailQueueFactory.getQueue(anyString())).thenReturn(Optional.empty());
JsonTaskSerializer testee = JsonTaskSerializer.of(ClearMailQueueTaskDTO.module(mailQueueFactory));
String serializedJson = "{\"type\": \"clear-mail-queue\", \"queue\": \"anyQueue\"}";
- assertThatThrownBy(() -> testee.deserialize(serializedJson))
+ assertThatThrownBy(() -> testee.deserialize(serializedJson).run())
.isInstanceOf(ClearMailQueueTask.UnknownSerializedQueue.class);
}
diff --git a/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/service/DeleteMailsFromMailQueueTaskTest.java b/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/service/DeleteMailsFromMailQueueTaskTest.java
index c6797c2..6e44829 100644
--- a/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/service/DeleteMailsFromMailQueueTaskTest.java
+++ b/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/service/DeleteMailsFromMailQueueTaskTest.java
@@ -17,6 +17,7 @@
package org.apache.james.webadmin.service;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
@@ -53,26 +54,32 @@ class DeleteMailsFromMailQueueTaskTest {
@Test
void taskShouldBeSerializable() throws Exception {
- ManageableMailQueue queue = mailQueueFactory.getQueue(queueName).get();
- DeleteMailsFromMailQueueTask taskSender = new DeleteMailsFromMailQueueTask(queue, Optional.of(new MailAddress("a@b.c")), Optional.empty(), Optional.empty());
- DeleteMailsFromMailQueueTask taskName = new DeleteMailsFromMailQueueTask(queue, Optional.empty(), Optional.of("name"), Optional.empty());
- DeleteMailsFromMailQueueTask taskRecipient = new DeleteMailsFromMailQueueTask(queue, Optional.empty(), Optional.empty(), Optional.of(new MailAddress("d@e.f")));
+ DeleteMailsFromMailQueueTask.MailQueueFactory factory = name -> this.mailQueueFactory.getQueue(name).orElseThrow(RuntimeException::new);
+ DeleteMailsFromMailQueueTask taskSender = new DeleteMailsFromMailQueueTask(queueName, factory, Optional.of(new MailAddress("a@b.c")), Optional.empty(), Optional.empty());
+ DeleteMailsFromMailQueueTask taskName = new DeleteMailsFromMailQueueTask(queueName, factory, Optional.empty(), Optional.of("name"), Optional.empty());
+ DeleteMailsFromMailQueueTask taskRecipient = new DeleteMailsFromMailQueueTask(queueName, factory, Optional.empty(), Optional.empty(), Optional.of(new MailAddress("d@e.f")));
- JsonSerializationVerifier.dtoModule(DeleteMailsFromMailQueueTaskDTO.module(mailQueueFactory))
+ JsonSerializationVerifier.dtoModule(DeleteMailsFromMailQueueTaskDTO.module(this.mailQueueFactory))
.testCase(taskSender, "{\"type\": \"delete-mails-from-mail-queue\", \"queue\": \"anyQueue\", \"sender\": \"a@b.c\"}")
.testCase(taskName, "{\"type\": \"delete-mails-from-mail-queue\", \"queue\": \"anyQueue\", \"name\": \"name\"}")
.testCase(taskRecipient, "{\"type\": \"delete-mails-from-mail-queue\", \"queue\": \"anyQueue\", \"recipient\": \"d@e.f\"}")
+ .equalityTester((a, b) -> {
+ assertThat(a.getQueueName()).isEqualTo(b.getQueueName());
+ assertThat(a.getMaybeName()).isEqualTo(b.getMaybeName());
+ assertThat(a.getMaybeSender()).isEqualTo(b.getMaybeSender());
+ assertThat(a.getMaybeRecipient()).isEqualTo(b.getMaybeRecipient());
+ })
.verify();
}
@Test
- void taskShouldThrowWhenDeserializeAnUnknownQueue() {
+ void taskShouldThrowWhenRunOnAnUnknownQueue() {
MailQueueFactory<ManageableMailQueue> mailQueueFactory = mock(MailQueueFactory.class);
when(mailQueueFactory.getQueue(anyString())).thenReturn(Optional.empty());
JsonTaskSerializer testee = JsonTaskSerializer.of(DeleteMailsFromMailQueueTaskDTO.module(mailQueueFactory));
String serializedJson = "{\"type\": \"delete-mails-from-mail-queue\", \"queue\": \"anyQueue\", \"sender\": \"a@b.c\"}";
- assertThatThrownBy(() -> testee.deserialize(serializedJson))
+ assertThatThrownBy(() -> testee.deserialize(serializedJson).run())
.isInstanceOf(DeleteMailsFromMailQueueTask.UnknownSerializedQueue.class);
}
diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingService.java b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingService.java
index 6ccb045..6171b00 100644
--- a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingService.java
+++ b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingService.java
@@ -19,6 +19,8 @@
package org.apache.james.webadmin.service;
+import java.io.Closeable;
+import java.io.IOException;
import java.util.Optional;
import java.util.function.Consumer;
@@ -35,17 +37,22 @@ import org.apache.james.queue.api.MailQueueFactory;
import org.apache.james.util.OptionalUtils;
import org.apache.james.util.streams.Iterators;
import org.apache.mailet.Mail;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.github.fge.lambdas.Throwing;
public class ReprocessingService {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ReprocessingService.class);
+
public static class MissingKeyException extends RuntimeException {
MissingKeyException(MailKey key) {
super(key.asString() + " can not be found");
}
}
- static class Reprocessor {
+ static class Reprocessor implements Closeable {
private final MailQueue mailQueue;
private final Optional<String> targetProcessor;
@@ -63,6 +70,15 @@ public class ReprocessingService {
throw new RuntimeException("Error encountered while reprocessing mail " + mail.getName(), e);
}
}
+
+ @Override
+ public void close() {
+ try {
+ mailQueue.close();
+ } catch (IOException e) {
+ LOGGER.debug("error closing queue", e);
+ }
+ }
}
private final MailQueueFactory<?> mailQueueFactory;
@@ -76,30 +92,30 @@ public class ReprocessingService {
}
public void reprocessAll(MailRepositoryPath path, Optional<String> targetProcessor, String targetQueue, Consumer<MailKey> keyListener) throws MailRepositoryStore.MailRepositoryStoreException, MessagingException {
- Reprocessor reprocessor = new Reprocessor(getMailQueue(targetQueue), targetProcessor);
-
- mailRepositoryStoreService
- .getRepositories(path)
- .forEach(Throwing.consumer((MailRepository repository) ->
- Iterators.toStream(repository.list())
- .peek(keyListener)
- .map(Throwing.function(key -> Optional.ofNullable(repository.retrieve(key))))
- .flatMap(OptionalUtils::toStream)
- .forEach(mail -> reprocessor.reprocess(repository, mail))));
+ try (Reprocessor reprocessor = new Reprocessor(getMailQueue(targetQueue), targetProcessor)) {
+ mailRepositoryStoreService
+ .getRepositories(path)
+ .forEach(Throwing.consumer((MailRepository repository) ->
+ Iterators.toStream(repository.list())
+ .peek(keyListener)
+ .map(Throwing.function(key -> Optional.ofNullable(repository.retrieve(key))))
+ .flatMap(OptionalUtils::toStream)
+ .forEach(mail -> reprocessor.reprocess(repository, mail))));
+ }
}
public void reprocess(MailRepositoryPath path, MailKey key, Optional<String> targetProcessor, String targetQueue) throws MailRepositoryStore.MailRepositoryStoreException, MessagingException {
- Reprocessor reprocessor = new Reprocessor(getMailQueue(targetQueue), targetProcessor);
-
- Pair<MailRepository, Mail> mailPair = mailRepositoryStoreService
- .getRepositories(path)
- .map(Throwing.function(repository -> Pair.of(repository, Optional.ofNullable(repository.retrieve(key)))))
- .filter(pair -> pair.getRight().isPresent())
- .map(pair -> Pair.of(pair.getLeft(), pair.getRight().get()))
- .findFirst()
- .orElseThrow(() -> new MissingKeyException(key));
-
- reprocessor.reprocess(mailPair.getKey(), mailPair.getValue());
+ try (Reprocessor reprocessor = new Reprocessor(getMailQueue(targetQueue), targetProcessor)) {
+ Pair<MailRepository, Mail> mailPair = mailRepositoryStoreService
+ .getRepositories(path)
+ .map(Throwing.function(repository -> Pair.of(repository, Optional.ofNullable(repository.retrieve(key)))))
+ .filter(pair -> pair.getRight().isPresent())
+ .map(pair -> Pair.of(pair.getLeft(), pair.getRight().get()))
+ .findFirst()
+ .orElseThrow(() -> new MissingKeyException(key));
+
+ reprocessor.reprocess(mailPair.getKey(), mailPair.getValue());
+ }
}
private MailQueue getMailQueue(String targetQueue) {
diff --git a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQCacheableMailQueue.java
similarity index 91%
rename from server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java
rename to server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQCacheableMailQueue.java
index c6e094b..145acd5 100644
--- a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java
+++ b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQCacheableMailQueue.java
@@ -44,7 +44,7 @@ import org.apache.james.metrics.api.GaugeRegistry;
import org.apache.james.metrics.api.MetricFactory;
import org.apache.james.queue.api.MailQueue;
import org.apache.james.queue.api.MailQueueItemDecoratorFactory;
-import org.apache.james.queue.jms.JMSMailQueue;
+import org.apache.james.queue.jms.JMSCacheableMailQueue;
import org.apache.james.server.core.MailImpl;
import org.apache.james.server.core.MimeMessageCopyOnWriteProxy;
import org.apache.james.server.core.MimeMessageInputStream;
@@ -89,17 +89,17 @@ import org.slf4j.LoggerFactory;
* </p>
* To have a good throughput you should use a caching connection factory. </p>
*/
-public class ActiveMQMailQueue extends JMSMailQueue implements ActiveMQSupport {
- private static final Logger LOGGER = LoggerFactory.getLogger(ActiveMQMailQueue.class);
+public class ActiveMQCacheableMailQueue extends JMSCacheableMailQueue implements ActiveMQSupport {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ActiveMQCacheableMailQueue.class);
private final boolean useBlob;
/**
- * Construct a {@link ActiveMQMailQueue} which only use {@link BlobMessage}
+ * Construct a {@link ActiveMQCacheableMailQueue} which only use {@link BlobMessage}
*
*/
- public ActiveMQMailQueue(ConnectionFactory connectionFactory, MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory, String queuename, MetricFactory metricFactory,
- GaugeRegistry gaugeRegistry) {
+ public ActiveMQCacheableMailQueue(ConnectionFactory connectionFactory, MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory, String queuename, MetricFactory metricFactory,
+ GaugeRegistry gaugeRegistry) {
this(connectionFactory, mailQueueItemDecoratorFactory, queuename, true, metricFactory, gaugeRegistry);
}
@@ -110,8 +110,8 @@ public class ActiveMQMailQueue extends JMSMailQueue implements ActiveMQSupport {
* @param queuename
* @param useBlob
*/
- public ActiveMQMailQueue(ConnectionFactory connectionFactory, MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory, String queuename, boolean useBlob, MetricFactory metricFactory,
- GaugeRegistry gaugeRegistry) {
+ public ActiveMQCacheableMailQueue(ConnectionFactory connectionFactory, MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory, String queuename, boolean useBlob, MetricFactory metricFactory,
+ GaugeRegistry gaugeRegistry) {
super(connectionFactory, mailQueueItemDecoratorFactory, queuename, metricFactory, gaugeRegistry);
this.useBlob = useBlob;
}
@@ -268,7 +268,7 @@ public class ActiveMQMailQueue extends JMSMailQueue implements ActiveMQSupport {
/**
* Try to use ActiveMQ StatisticsPlugin to get size and if that fails
- * fallback to {@link JMSMailQueue#getSize()}
+ * fallback to {@link JMSCacheableMailQueue#getSize()}
*/
@Override
public long getSize() throws MailQueueException {
diff --git a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueFactory.java b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueFactory.java
index 5008508..5b61ca5 100644
--- a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueFactory.java
+++ b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueFactory.java
@@ -30,7 +30,7 @@ import org.apache.james.queue.jms.JMSMailQueueFactory;
/**
* {@link MailQueueFactory} implementations which return
- * {@link ActiveMQMailQueue} instances
+ * {@link ActiveMQCacheableMailQueue} instances
*/
public class ActiveMQMailQueueFactory extends JMSMailQueueFactory {
@@ -51,7 +51,7 @@ public class ActiveMQMailQueueFactory extends JMSMailQueueFactory {
}
@Override
- protected ManageableMailQueue createMailQueue(String name) {
- return new ActiveMQMailQueue(connectionFactory, mailQueueItemDecoratorFactory, name, useBlob, metricFactory, gaugeRegistry);
+ protected ManageableMailQueue createCacheableMailQueue(String name) {
+ return new ActiveMQCacheableMailQueue(connectionFactory, mailQueueItemDecoratorFactory, name, useBlob, metricFactory, gaugeRegistry);
}
}
diff --git a/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQMailQueueBlobTest.java b/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQMailQueueBlobTest.java
index b2c3cea..da22eb7 100644
--- a/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQMailQueueBlobTest.java
+++ b/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQMailQueueBlobTest.java
@@ -59,7 +59,7 @@ public class ActiveMQMailQueueBlobTest implements DelayedManageableMailQueueCont
static final String BASE_DIR = "file://target/james-test";
static final boolean USE_BLOB = true;
- ActiveMQMailQueue mailQueue;
+ ActiveMQCacheableMailQueue mailQueue;
MyFileSystem fileSystem;
@BeforeEach
@@ -78,7 +78,7 @@ public class ActiveMQMailQueueBlobTest implements DelayedManageableMailQueueCont
MetricFactory metricFactory = metricTestSystem.getMetricFactory();
GaugeRegistry gaugeRegistry = metricTestSystem.getSpyGaugeRegistry();
String queueName = BrokerExtension.generateRandomQueueName(broker);
- mailQueue = new ActiveMQMailQueue(connectionFactory, mailQueueItemDecoratorFactory, queueName, USE_BLOB, metricFactory, gaugeRegistry);
+ mailQueue = new ActiveMQCacheableMailQueue(connectionFactory, mailQueueItemDecoratorFactory, queueName, USE_BLOB, metricFactory, gaugeRegistry);
}
@AfterEach
diff --git a/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQMailQueueTest.java b/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQMailQueueTest.java
index 609cef1..e7a08dd 100644
--- a/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQMailQueueTest.java
+++ b/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQMailQueueTest.java
@@ -18,8 +18,6 @@
****************************************************************/
package org.apache.james.queue.activemq;
-import javax.jms.ConnectionFactory;
-
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.broker.BrokerService;
@@ -48,7 +46,7 @@ public class ActiveMQMailQueueTest implements DelayedManageableMailQueueContract
static final boolean USE_BLOB = true;
- ActiveMQMailQueue mailQueue;
+ ActiveMQCacheableMailQueue mailQueue;
@BeforeEach
public void setUp(BrokerService broker, MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem) {
@@ -60,7 +58,7 @@ public class ActiveMQMailQueueTest implements DelayedManageableMailQueueContract
MetricFactory metricFactory = metricTestSystem.getMetricFactory();
GaugeRegistry gaugeRegistry = metricTestSystem.getSpyGaugeRegistry();
String queueName = BrokerExtension.generateRandomQueueName(broker);
- mailQueue = new ActiveMQMailQueue(connectionFactory, mailQueueItemDecoratorFactory, queueName, !USE_BLOB, metricFactory, gaugeRegistry);
+ mailQueue = new ActiveMQCacheableMailQueue(connectionFactory, mailQueueItemDecoratorFactory, queueName, !USE_BLOB, metricFactory, gaugeRegistry);
}
@AfterEach
diff --git a/server/queue/queue-api/src/main/java/org/apache/james/queue/api/MailQueue.java b/server/queue/queue-api/src/main/java/org/apache/james/queue/api/MailQueue.java
index 559d998..d4e9c3a 100644
--- a/server/queue/queue-api/src/main/java/org/apache/james/queue/api/MailQueue.java
+++ b/server/queue/queue-api/src/main/java/org/apache/james/queue/api/MailQueue.java
@@ -19,6 +19,7 @@
package org.apache.james.queue.api;
+import java.io.Closeable;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
@@ -58,7 +59,7 @@ import org.threeten.extra.Temporals;
* </ul>
* </p>
*/
-public interface MailQueue {
+public interface MailQueue extends Closeable {
String ENQUEUED_METRIC_NAME_PREFIX = "enqueuedMail:";
String DEQUEUED_METRIC_NAME_PREFIX = "dequeuedMail:";
diff --git a/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueue.java b/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileCacheableMailQueue.java
similarity index 97%
rename from server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueue.java
rename to server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileCacheableMailQueue.java
index 4f288b3..6142be8 100644
--- a/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueue.java
+++ b/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileCacheableMailQueue.java
@@ -71,15 +71,15 @@ import reactor.core.publisher.Mono;
/**
* {@link ManageableMailQueue} implementation which use the fs to store {@link Mail}'s
* <p/>
- * On create of the {@link FileMailQueue} the {@link #init()} will get called. This takes care of
+ * On create of the {@link FileCacheableMailQueue} the {@link #init()} will get called. This takes care of
* loading the needed meta-data into memory for fast access.
*
* @deprecated FileMailQueue implementation is unmaintained, incomplete and not thread safe
* We recommend using embedded ActiveMQMailQueue implementation instead
*/
@Deprecated
-public class FileMailQueue implements ManageableMailQueue {
- private static final Logger LOGGER = LoggerFactory.getLogger(FileMailQueue.class);
+public class FileCacheableMailQueue implements ManageableMailQueue {
+ private static final Logger LOGGER = LoggerFactory.getLogger(FileCacheableMailQueue.class);
private final Map<String, FileItem> keyMappings = Collections.synchronizedMap(new LinkedHashMap<>());
private final BlockingQueue<String> inmemoryQueue = new LinkedBlockingQueue<>();
@@ -98,7 +98,7 @@ public class FileMailQueue implements ManageableMailQueue {
private final String queueName;
private final Flux<MailQueueItem> flux;
- public FileMailQueue(MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory, File parentDir, String queuename, boolean sync) throws IOException {
+ public FileCacheableMailQueue(MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory, File parentDir, String queuename, boolean sync) throws IOException {
this.mailQueueItemDecoratorFactory = mailQueueItemDecoratorFactory;
this.sync = sync;
this.queueName = queuename;
@@ -111,6 +111,11 @@ public class FileMailQueue implements ManageableMailQueue {
}
@Override
+ public void close() {
+ //There's no resource to free
+ }
+
+ @Override
public String getName() {
return queueName;
}
diff --git a/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueueFactory.java b/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueueFactory.java
index 87d88a8..623e1e0 100644
--- a/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueueFactory.java
+++ b/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueueFactory.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import javax.inject.Inject;
@@ -34,7 +35,7 @@ import org.apache.james.queue.api.ManageableMailQueue;
import com.google.common.collect.ImmutableSet;
/**
- * {@link MailQueueFactory} implementation which returns {@link FileMailQueue} instances
+ * {@link MailQueueFactory} implementation which returns {@link FileCacheableMailQueue} instances
*
* @deprecated FileMailQueue implementation is unmaintained, incomplete and not thread safe
* We recommend using embedded ActiveMQMailQueue implementation instead
@@ -42,7 +43,7 @@ import com.google.common.collect.ImmutableSet;
@Deprecated
public class FileMailQueueFactory implements MailQueueFactory<ManageableMailQueue> {
- private final Map<String, ManageableMailQueue> queues = new HashMap<>();
+ private final Map<String, ManageableMailQueue> queues = new ConcurrentHashMap<>();
private MailQueueItemDecoratorFactory mailQueueActionItemDecoratorFactory;
private FileSystem fs;
private boolean sync = true;
@@ -59,7 +60,7 @@ public class FileMailQueueFactory implements MailQueueFactory<ManageableMailQueu
}
/**
- * If <code>true</code> the later created {@link FileMailQueue} will call <code>fsync</code> after each message {@link FileMailQueue#enQueue(org.apache.mailet.Mail)} call. This
+ * If <code>true</code> the later created {@link FileCacheableMailQueue} will call <code>fsync</code> after each message {@link FileCacheableMailQueue#enQueue(org.apache.mailet.Mail)} call. This
* is needed to be fully RFC conform but gives a performance penalty. If you are brave enough you man set it to <code>false</code>
* <p/>
* The default is <code>true</code>
@@ -77,19 +78,13 @@ public class FileMailQueueFactory implements MailQueueFactory<ManageableMailQueu
@Override
public ManageableMailQueue createQueue(String name) {
- return getQueue(name).orElseGet(() -> createAndRegisterQueue(name));
- }
-
- private ManageableMailQueue createAndRegisterQueue(String name) {
- synchronized (queues) {
+ return queues.computeIfAbsent(name, mailQueueName -> {
try {
- FileMailQueue queue = new FileMailQueue(mailQueueActionItemDecoratorFactory, fs.getFile("file://var/store/queue"), name, sync);
- queues.put(name, queue);
- return queue;
+ return new FileCacheableMailQueue(mailQueueActionItemDecoratorFactory, fs.getFile("file://var/store/queue"), mailQueueName, sync);
} catch (IOException e) {
- throw new RuntimeException("Unable to access queue " + name, e);
+ throw new RuntimeException("Unable to access queue " + mailQueueName, e);
}
- }
+ });
}
}
diff --git a/server/queue/queue-file/src/test/java/org/apache/james/queue/file/FileMailQueueFactoryTest.java b/server/queue/queue-file/src/test/java/org/apache/james/queue/file/FileCacheableMailQueueFactoryTest.java
similarity index 94%
rename from server/queue/queue-file/src/test/java/org/apache/james/queue/file/FileMailQueueFactoryTest.java
rename to server/queue/queue-file/src/test/java/org/apache/james/queue/file/FileCacheableMailQueueFactoryTest.java
index 926092f..923a8db 100644
--- a/server/queue/queue-file/src/test/java/org/apache/james/queue/file/FileMailQueueFactoryTest.java
+++ b/server/queue/queue-file/src/test/java/org/apache/james/queue/file/FileCacheableMailQueueFactoryTest.java
@@ -35,7 +35,7 @@ import org.junit.jupiter.api.Disabled;
" - JAMES-2954 Incomplete browse implementation" +
" - JAMES-2544 Mixing concurrent operation might lead to a deadlock and missing fields" +
" - JAMES-2979 dequeue is not thread safe")
-public class FileMailQueueFactoryTest implements MailQueueFactoryContract<ManageableMailQueue>, ManageableMailQueueFactoryContract {
+public class FileCacheableMailQueueFactoryTest implements MailQueueFactoryContract<ManageableMailQueue>, ManageableMailQueueFactoryContract {
private FileMailQueueFactory mailQueueFactory;
private MockFileSystem fileSystem;
diff --git a/server/queue/queue-file/src/test/java/org/apache/james/queue/file/FileMailQueueTest.java b/server/queue/queue-file/src/test/java/org/apache/james/queue/file/FileCacheableMailQueueTest.java
similarity index 90%
rename from server/queue/queue-file/src/test/java/org/apache/james/queue/file/FileMailQueueTest.java
rename to server/queue/queue-file/src/test/java/org/apache/james/queue/file/FileCacheableMailQueueTest.java
index f2659d6..2b479e9 100644
--- a/server/queue/queue-file/src/test/java/org/apache/james/queue/file/FileMailQueueTest.java
+++ b/server/queue/queue-file/src/test/java/org/apache/james/queue/file/FileCacheableMailQueueTest.java
@@ -34,16 +34,16 @@ import org.junit.rules.TemporaryFolder;
" - JAMES-2954 Incomplete browse implementation" +
" - JAMES-2544 Mixing concurrent operation might lead to a deadlock and missing fields" +
" - JAMES-2979 dequeue is not thread safe")
-public class FileMailQueueTest implements DelayedManageableMailQueueContract {
+public class FileCacheableMailQueueTest implements DelayedManageableMailQueueContract {
private static final boolean SYNC = true;
private TemporaryFolder temporaryFolder = new TemporaryFolder();
- private FileMailQueue mailQueue;
+ private FileCacheableMailQueue mailQueue;
@BeforeEach
public void setUp() throws Exception {
temporaryFolder.create();
- mailQueue = new FileMailQueue(new RawMailQueueItemDecoratorFactory(), temporaryFolder.newFolder(), "test", SYNC);
+ mailQueue = new FileCacheableMailQueue(new RawMailQueueItemDecoratorFactory(), temporaryFolder.newFolder(), "test", SYNC);
}
@AfterEach
diff --git a/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java b/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSCacheableMailQueue.java
similarity index 97%
rename from server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java
rename to server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSCacheableMailQueue.java
index 2511f49..5171cae 100644
--- a/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java
+++ b/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSCacheableMailQueue.java
@@ -99,7 +99,7 @@ import reactor.core.publisher.Mono;
* {@link Mail} objects.
* </p>
*/
-public class JMSMailQueue implements ManageableMailQueue, JMSSupport, MailPrioritySupport, Disposable {
+public class JMSCacheableMailQueue implements ManageableMailQueue, JMSSupport, MailPrioritySupport, Disposable {
private final Flux<MailQueueItem> flux;
@@ -153,7 +153,7 @@ public class JMSMailQueue implements ManageableMailQueue, JMSSupport, MailPriori
}
}
- private static final Logger LOGGER = LoggerFactory.getLogger(JMSMailQueue.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(JMSCacheableMailQueue.class);
public static final String FORCE_DELIVERY = "FORCE_DELIVERY";
@@ -172,9 +172,9 @@ public class JMSMailQueue implements ManageableMailQueue, JMSSupport, MailPriori
private final Joiner joiner;
private final Splitter splitter;
- public JMSMailQueue(ConnectionFactory connectionFactory, MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory,
- String queueName, MetricFactory metricFactory,
- GaugeRegistry gaugeRegistry) {
+ public JMSCacheableMailQueue(ConnectionFactory connectionFactory, MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory,
+ String queueName, MetricFactory metricFactory,
+ GaugeRegistry gaugeRegistry) {
try {
connection = connectionFactory.createConnection();
connection.start();
@@ -204,6 +204,14 @@ public class JMSMailQueue implements ManageableMailQueue, JMSSupport, MailPriori
flux = Mono.defer(this::deQueueOneItem).repeat();
}
+ /**
+ * To allow connection reuse (the queue is cacheable), we don't close the queue
+ * on close(), use {@link JMSCacheableMailQueue#dispose} to release resources
+ */
+ @Override
+ public void close() {
+ }
+
@Override
public String getName() {
return queueName;
diff --git a/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueFactory.java b/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueFactory.java
index 051e3c0..701e0b3 100644
--- a/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueFactory.java
+++ b/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueFactory.java
@@ -48,8 +48,8 @@ public class JMSMailQueueFactory extends AbstractMailQueueFactory<ManageableMail
}
@Override
- protected ManageableMailQueue createMailQueue(String name) {
- return new JMSMailQueue(connectionFactory, mailQueueItemDecoratorFactory, name, metricFactory, gaugeRegistry);
+ protected ManageableMailQueue createCacheableMailQueue(String name) {
+ return new JMSCacheableMailQueue(connectionFactory, mailQueueItemDecoratorFactory, name, metricFactory, gaugeRegistry);
}
-
+
}
diff --git a/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueItem.java b/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueItem.java
index ad10836..d20c642 100644
--- a/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueItem.java
+++ b/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueItem.java
@@ -48,13 +48,13 @@ public class JMSMailQueueItem implements MailQueueItem {
if (success) {
session.commit();
} else {
- JMSMailQueue.rollback(session);
+ JMSCacheableMailQueue.rollback(session);
}
} catch (JMSException ex) {
throw new MailQueueException("Unable to commit dequeue operation for mail " + mail.getName(), ex);
} finally {
- JMSMailQueue.closeConsumer(consumer);
- JMSMailQueue.closeSession(session);
+ JMSCacheableMailQueue.closeConsumer(consumer);
+ JMSCacheableMailQueue.closeSession(session);
}
}
diff --git a/server/queue/queue-jms/src/main/java/org/apache/james/queue/library/AbstractMailQueueFactory.java b/server/queue/queue-jms/src/main/java/org/apache/james/queue/library/AbstractMailQueueFactory.java
index e1a29dd..fdd4b86 100644
--- a/server/queue/queue-jms/src/main/java/org/apache/james/queue/library/AbstractMailQueueFactory.java
+++ b/server/queue/queue-jms/src/main/java/org/apache/james/queue/library/AbstractMailQueueFactory.java
@@ -102,7 +102,7 @@ public abstract class AbstractMailQueueFactory<T extends MailQueue> implements M
}
private T createAndRegisterQueue(String name) {
- T queue = createMailQueue(name);
+ T queue = createCacheableMailQueue(name);
if (useJMX) {
registerMBean(name, queue);
}
@@ -111,12 +111,10 @@ public abstract class AbstractMailQueueFactory<T extends MailQueue> implements M
}
/**
- * Create a {@link MailQueue} for the given name
- *
- * @param name
- * @return queue
+ * Create a {@link MailQueue} for the given name that happens to do nothing on close()
+ * to be able to cache the instance
*/
- protected abstract T createMailQueue(String name);
+ protected abstract T createCacheableMailQueue(String name);
protected synchronized void registerMBean(String queuename, MailQueue queue) {
diff --git a/server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSMailQueueFactoryTest.java b/server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSCacheableMailQueueFactoryTest.java
similarity index 95%
rename from server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSMailQueueFactoryTest.java
rename to server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSCacheableMailQueueFactoryTest.java
index f98cf9d..5a62538 100644
--- a/server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSMailQueueFactoryTest.java
+++ b/server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSCacheableMailQueueFactoryTest.java
@@ -37,7 +37,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
@ExtendWith(BrokerExtension.class)
-public class JMSMailQueueFactoryTest implements MailQueueFactoryContract<ManageableMailQueue>, ManageableMailQueueFactoryContract {
+public class JMSCacheableMailQueueFactoryTest implements MailQueueFactoryContract<ManageableMailQueue>, ManageableMailQueueFactoryContract {
private JMSMailQueueFactory mailQueueFactory;
diff --git a/server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSMailQueueTest.java b/server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSCacheableMailQueueTest.java
similarity index 92%
rename from server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSMailQueueTest.java
rename to server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSCacheableMailQueueTest.java
index f436a1d..061218a 100644
--- a/server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSMailQueueTest.java
+++ b/server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSCacheableMailQueueTest.java
@@ -19,8 +19,6 @@
package org.apache.james.queue.jms;
-import javax.jms.ConnectionFactory;
-
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.broker.BrokerService;
@@ -41,10 +39,10 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@ExtendWith(BrokerExtension.class)
-public class JMSMailQueueTest implements DelayedManageableMailQueueContract, PriorityManageableMailQueueContract, DelayedPriorityMailQueueContract,
+public class JMSCacheableMailQueueTest implements DelayedManageableMailQueueContract, PriorityManageableMailQueueContract, DelayedPriorityMailQueueContract,
MailQueueMetricContract {
- private JMSMailQueue mailQueue;
+ private JMSCacheableMailQueue mailQueue;
@BeforeEach
void setUp(BrokerService broker, MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem) {
@@ -56,7 +54,7 @@ public class JMSMailQueueTest implements DelayedManageableMailQueueContract, Pri
MetricFactory metricFactory = metricTestSystem.getMetricFactory();
GaugeRegistry gaugeRegistry = metricTestSystem.getSpyGaugeRegistry();
String queueName = BrokerExtension.generateRandomQueueName(broker);
- mailQueue = new JMSMailQueue(connectionFactory, mailQueueItemDecoratorFactory, queueName, metricFactory, gaugeRegistry);
+ mailQueue = new JMSCacheableMailQueue(connectionFactory, mailQueueItemDecoratorFactory, queueName, metricFactory, gaugeRegistry);
}
@AfterEach
diff --git a/server/queue/queue-jms/src/test/java/org/apache/james/queue/library/AbstractMailQueueFactoryTest.java b/server/queue/queue-jms/src/test/java/org/apache/james/queue/library/AbstractMailQueueFactoryTest.java
index 2ae2c50..2f25606 100644
--- a/server/queue/queue-jms/src/test/java/org/apache/james/queue/library/AbstractMailQueueFactoryTest.java
+++ b/server/queue/queue-jms/src/test/java/org/apache/james/queue/library/AbstractMailQueueFactoryTest.java
@@ -46,7 +46,7 @@ public class AbstractMailQueueFactoryTest {
mBeanServer = mock(MBeanServer.class);
abstractMailQueueFactory = new AbstractMailQueueFactory<ManageableMailQueue>() {
@Override
- protected ManageableMailQueue createMailQueue(String name) {
+ protected ManageableMailQueue createCacheableMailQueue(String name) {
return mock(ManageableMailQueue.class);
}
};
diff --git a/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java b/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java
index 25b80b6..4aa75ec 100644
--- a/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java
+++ b/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java
@@ -19,6 +19,7 @@
package org.apache.james.queue.memory;
+import java.io.IOException;
import java.time.DateTimeException;
import java.time.Duration;
import java.time.Instant;
@@ -60,7 +61,7 @@ import reactor.core.scheduler.Schedulers;
public class MemoryMailQueueFactory implements MailQueueFactory<ManageableMailQueue> {
- private final ConcurrentHashMap<String, MemoryMailQueueFactory.MemoryMailQueue> mailQueues;
+ private final ConcurrentHashMap<String, MemoryCacheableMailQueue> mailQueues;
private final MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory;
@Inject
@@ -80,19 +81,17 @@ public class MemoryMailQueueFactory implements MailQueueFactory<ManageableMailQu
}
@Override
- public MemoryMailQueueFactory.MemoryMailQueue createQueue(String name) {
- MemoryMailQueueFactory.MemoryMailQueue newMailQueue = new MemoryMailQueue(name, mailQueueItemDecoratorFactory);
- return Optional.ofNullable(mailQueues.putIfAbsent(name, newMailQueue))
- .orElse(newMailQueue);
+ public MemoryCacheableMailQueue createQueue(String name) {
+ return mailQueues.computeIfAbsent(name, mailQueueName -> new MemoryCacheableMailQueue(mailQueueName, mailQueueItemDecoratorFactory));
}
- public static class MemoryMailQueue implements ManageableMailQueue {
+ public static class MemoryCacheableMailQueue implements ManageableMailQueue {
private final DelayQueue<MemoryMailQueueItem> mailItems;
private final LinkedBlockingDeque<MemoryMailQueueItem> inProcessingMailItems;
private final String name;
private final Flux<MailQueueItem> flux;
- public MemoryMailQueue(String name, MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory) {
+ public MemoryCacheableMailQueue(String name, MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory) {
this.mailItems = new DelayQueue<>();
this.inProcessingMailItems = new LinkedBlockingDeque<>();
this.name = name;
@@ -105,6 +104,11 @@ public class MemoryMailQueueFactory implements MailQueueFactory<ManageableMailQu
}
@Override
+ public void close() {
+ //There's no resource to free
+ }
+
+ @Override
public String getName() {
return name;
}
@@ -244,7 +248,7 @@ public class MemoryMailQueueFactory implements MailQueueFactory<ManageableMailQu
return false;
}
- MemoryMailQueue that = (MemoryMailQueue) o;
+ MemoryCacheableMailQueue that = (MemoryCacheableMailQueue) o;
return Objects.equal(this.name, that.name);
}
@@ -257,10 +261,10 @@ public class MemoryMailQueueFactory implements MailQueueFactory<ManageableMailQu
public static class MemoryMailQueueItem implements MailQueue.MailQueueItem, Delayed {
private final Mail mail;
- private final MemoryMailQueue queue;
+ private final MemoryCacheableMailQueue queue;
private final ZonedDateTime delivery;
- public MemoryMailQueueItem(Mail mail, MemoryMailQueue queue, ZonedDateTime delivery) {
+ public MemoryMailQueueItem(Mail mail, MemoryCacheableMailQueue queue, ZonedDateTime delivery) {
this.mail = mail;
this.queue = queue;
this.delivery = delivery;
diff --git a/server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryMailQueueFactoryTest.java b/server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryCacheableMailQueueFactoryTest.java
similarity index 93%
rename from server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryMailQueueFactoryTest.java
rename to server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryCacheableMailQueueFactoryTest.java
index 28002ca..aa2e294 100644
--- a/server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryMailQueueFactoryTest.java
+++ b/server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryCacheableMailQueueFactoryTest.java
@@ -26,7 +26,7 @@ import org.apache.james.queue.api.ManageableMailQueueFactoryContract;
import org.apache.james.queue.api.RawMailQueueItemDecoratorFactory;
import org.junit.jupiter.api.BeforeEach;
-class MemoryMailQueueFactoryTest implements MailQueueFactoryContract<ManageableMailQueue>, ManageableMailQueueFactoryContract {
+class MemoryCacheableMailQueueFactoryTest implements MailQueueFactoryContract<ManageableMailQueue>, ManageableMailQueueFactoryContract {
MemoryMailQueueFactory memoryMailQueueFactory;
diff --git a/server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryMailQueueTest.java b/server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryCacheableMailQueueTest.java
similarity index 91%
rename from server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryMailQueueTest.java
rename to server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryCacheableMailQueueTest.java
index 085e565..5868995 100644
--- a/server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryMailQueueTest.java
+++ b/server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryCacheableMailQueueTest.java
@@ -29,13 +29,13 @@ import org.apache.james.queue.api.RawMailQueueItemDecoratorFactory;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-public class MemoryMailQueueTest implements DelayedManageableMailQueueContract {
+public class MemoryCacheableMailQueueTest implements DelayedManageableMailQueueContract {
- private MemoryMailQueueFactory.MemoryMailQueue mailQueue;
+ private MemoryMailQueueFactory.MemoryCacheableMailQueue mailQueue;
@BeforeEach
public void setUp() {
- mailQueue = new MemoryMailQueueFactory.MemoryMailQueue("test", new RawMailQueueItemDecoratorFactory());
+ mailQueue = new MemoryMailQueueFactory.MemoryCacheableMailQueue("test", new RawMailQueueItemDecoratorFactory());
}
@Override
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
index ece7018..64c0c93 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
@@ -21,6 +21,7 @@ package org.apache.james.queue.rabbitmq;
import static org.apache.james.queue.api.MailQueue.DEQUEUED_METRIC_NAME_PREFIX;
+import java.io.Closeable;
import java.io.IOException;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -40,13 +41,14 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.rabbitmq.AcknowledgableDelivery;
import reactor.rabbitmq.ConsumeOptions;
+import reactor.rabbitmq.Receiver;
-class Dequeuer {
+class Dequeuer implements Closeable {
private static final boolean REQUEUE = true;
private static final int EXECUTION_RATE = 5;
- private final Flux<AcknowledgableDelivery> flux;
private static class RabbitMQMailQueueItem implements MailQueue.MailQueueItem {
+
private final Consumer<Boolean> ack;
private final EnqueueId enqueueId;
private final Mail mail;
@@ -70,12 +72,15 @@ class Dequeuer {
public void done(boolean success) {
ack.accept(success);
}
+
}
private final Function<MailReferenceDTO, MailWithEnqueueId> mailLoader;
private final Metric dequeueMetric;
private final MailReferenceSerializer mailReferenceSerializer;
private final MailQueueView mailQueueView;
+ private final Receiver receiver;
+ private final Flux<AcknowledgableDelivery> flux;
Dequeuer(MailQueueName name, ReceiverProvider receiverProvider, Function<MailReferenceDTO, MailWithEnqueueId> mailLoader,
MailReferenceSerializer serializer, MetricFactory metricFactory,
@@ -84,11 +89,17 @@ class Dequeuer {
this.mailReferenceSerializer = serializer;
this.mailQueueView = mailQueueView;
this.dequeueMetric = metricFactory.generate(DEQUEUED_METRIC_NAME_PREFIX + name.asString());
- this.flux = receiverProvider.createReceiver()
+ this.receiver = receiverProvider.createReceiver();
+ this.flux = this.receiver
.consumeManualAck(name.toWorkQueueName().asString(), new ConsumeOptions().qos(EXECUTION_RATE))
.filter(getResponse -> getResponse.getBody() != null);
}
+ @Override
+ public void close() {
+ receiver.close();
+ }
+
Flux<? extends MailQueue.MailQueueItem> deQueue() {
return flux.concatMap(this::loadItem)
.concatMap(this::filterIfDeleted);
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
index 2dc7663..b09fe30 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
@@ -58,6 +58,11 @@ public class RabbitMQMailQueue implements ManageableMailQueue {
}
@Override
+ public void close() {
+ dequeuer.close();
+ }
+
+ @Override
public String getName() {
return name.asString();
}
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
index f771118..57bde2f 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
@@ -29,7 +29,6 @@ import static org.apache.james.queue.api.MailQueue.QUEUE_SIZE_METRIC_NAME_PREFIX
import java.time.Clock;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import javax.inject.Inject;
@@ -46,6 +45,7 @@ import org.apache.james.queue.api.MailQueueFactory;
import org.apache.james.queue.api.MailQueueItemDecoratorFactory;
import org.apache.james.queue.rabbitmq.view.RabbitMQMailQueueConfiguration;
import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
+import org.apache.james.util.OptionalUtils;
import com.github.fge.lambdas.Throwing;
import com.google.common.annotations.VisibleForTesting;
@@ -119,26 +119,8 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
}
}
- /**
- * RabbitMQMailQueue should have a single instance in a given JVM for a given MailQueueName.
- * This class helps at keeping track of previously instantiated MailQueues.
- */
- private class RabbitMQMailQueueObjectPool {
-
- private final ConcurrentHashMap<MailQueueName, RabbitMQMailQueue> instantiatedQueues;
-
- RabbitMQMailQueueObjectPool() {
- this.instantiatedQueues = new ConcurrentHashMap<>();
- }
-
- RabbitMQMailQueue retrieveInstanceFor(MailQueueName name) {
- return instantiatedQueues.computeIfAbsent(name, privateFactory::create);
- }
- }
-
private final RabbitMQMailQueueManagement mqManagementApi;
private final PrivateFactory privateFactory;
- private final RabbitMQMailQueueObjectPool mailQueueObjectPool;
private final Sender sender;
@VisibleForTesting
@@ -149,7 +131,6 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
this.sender = sender;
this.mqManagementApi = mqManagementApi;
this.privateFactory = privateFactory;
- this.mailQueueObjectPool = new RabbitMQMailQueueObjectPool();
}
@Override
@@ -166,8 +147,9 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
@Override
public Set<RabbitMQMailQueue> listCreatedMailQueues() {
+ //TODO: it creates connections and leak them
return mqManagementApi.listCreatedMailQueueNames()
- .map(mailQueueObjectPool::retrieveInstanceFor)
+ .flatMap(name -> OptionalUtils.toStream(getQueue(name.asString())))
.collect(ImmutableSet.toImmutableSet());
}
@@ -188,13 +170,13 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
.routingKey(EMPTY_ROUTING_KEY)))
.then()
.block();
- return mailQueueObjectPool.retrieveInstanceFor(mailQueueName);
+ return privateFactory.create(mailQueueName);
}
private Optional<RabbitMQMailQueue> getQueueFromRabbitServer(MailQueueName name) {
return mqManagementApi.listCreatedMailQueueNames()
.filter(name::equals)
- .map(mailQueueObjectPool::retrieveInstanceFor)
+ .map(privateFactory::create)
.findFirst();
}
}
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
index a717476..5651d02 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
@@ -91,21 +91,4 @@ class RabbitMqMailQueueFactoryTest implements MailQueueFactoryContract<RabbitMQM
return mailQueueFactory;
}
- @Test
- void createQueueShouldReturnTheSameInstanceWhenParallelCreateSameQueueName() throws Exception {
- Set<RabbitMQMailQueue> createdRabbitMQMailQueues = ConcurrentHashMap.newKeySet();
-
- ConcurrentTestRunner.builder()
- .operation((threadNumber, operationNumber) ->
- createdRabbitMQMailQueues.add(mailQueueFactory.createQueue("spool")))
- .threadCount(100)
- .operationCount(10)
- .runSuccessfullyWithin(Duration.ofMinutes(10));
-
- assertThat(mailQueueFactory.listCreatedMailQueues())
- .hasSize(1)
- .isEqualTo(createdRabbitMQMailQueues)
- .extracting(RabbitMQMailQueue::getName)
- .hasOnlyOneElementSatisfying(queueName -> assertThat(queueName).isEqualTo("spool"));
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org