You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2023/05/02 10:48:36 UTC
[james-project] 02/02: JAMES-3822 MemoryCacheableMailQueue should use a clock to manage delays
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit aa10262880e8081d1b8b2650f015c970fafd9409
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Apr 14 09:58:11 2023 +0700
JAMES-3822 MemoryCacheableMailQueue should use a clock to manage delays
This enables writing tests on the delay logic.
---
.../james/queue/memory/MemoryMailQueueFactory.java | 38 ++++++++++++++--------
.../queue/memory/MemoryCacheableMailQueueTest.java | 4 ++-
2 files changed, 28 insertions(+), 14 deletions(-)
diff --git a/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java b/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java
index 73bf35f1d7..b11aaab97a 100644
--- a/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java
+++ b/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java
@@ -21,6 +21,7 @@ package org.apache.james.queue.memory;
import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
+import java.time.Clock;
import java.time.DateTimeException;
import java.time.Duration;
import java.time.Instant;
@@ -69,11 +70,17 @@ public class MemoryMailQueueFactory implements MailQueueFactory<MemoryMailQueueF
private final ConcurrentHashMap<MailQueueName, MemoryCacheableMailQueue> mailQueues;
private final MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory;
+ private final Clock clock;
@Inject
- public MemoryMailQueueFactory(MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory) {
+ public MemoryMailQueueFactory(MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory, Clock clock) {
this.mailQueues = new ConcurrentHashMap<>();
this.mailQueueItemDecoratorFactory = mailQueueItemDecoratorFactory;
+ this.clock = clock;
+ }
+
+ public MemoryMailQueueFactory(MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory) {
+ this(mailQueueItemDecoratorFactory, Clock.systemUTC());
}
@PreDestroy
@@ -98,7 +105,7 @@ public class MemoryMailQueueFactory implements MailQueueFactory<MemoryMailQueueF
@Override
public MemoryCacheableMailQueue createQueue(MailQueueName name, PrefetchCount prefetchCount) {
- MemoryCacheableMailQueue queue = mailQueues.computeIfAbsent(name, mailQueueName -> new MemoryCacheableMailQueue(mailQueueName, mailQueueItemDecoratorFactory));
+ MemoryCacheableMailQueue queue = mailQueues.computeIfAbsent(name, mailQueueName -> new MemoryCacheableMailQueue(mailQueueName, mailQueueItemDecoratorFactory, clock));
queue.reference();
return queue;
}
@@ -110,8 +117,10 @@ public class MemoryMailQueueFactory implements MailQueueFactory<MemoryMailQueueF
private final MailQueueName name;
private final Flux<MailQueueItem> flux;
private final Scheduler scheduler;
+ private final Clock clock;
- public MemoryCacheableMailQueue(MailQueueName name, MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory) {
+ public MemoryCacheableMailQueue(MailQueueName name, MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory, Clock clock) {
+ this.clock = clock;
this.mailItems = new DelayQueue<>();
this.inProcessingMailItems = new LinkedBlockingDeque<>();
this.name = name;
@@ -120,6 +129,7 @@ public class MemoryMailQueueFactory implements MailQueueFactory<MemoryMailQueueF
try {
sink.success(mailItems.take());
} catch (InterruptedException e) {
+ sink.error(e);
Thread.currentThread().interrupt();
}
})
@@ -155,7 +165,7 @@ public class MemoryMailQueueFactory implements MailQueueFactory<MemoryMailQueueF
public void enQueue(Mail mail, Duration delay) throws MailQueueException {
ZonedDateTime nextDelivery = calculateNextDelivery(delay);
try {
- mailItems.put(new MemoryMailQueueItem(cloneMail(mail), this, nextDelivery));
+ mailItems.put(new MemoryMailQueueItem(cloneMail(mail), this, clock, nextDelivery));
} catch (MessagingException e) {
throw new MailQueueException("Error while copying mail " + mail.getName(), e);
}
@@ -169,13 +179,13 @@ public class MemoryMailQueueFactory implements MailQueueFactory<MemoryMailQueueF
private ZonedDateTime calculateNextDelivery(Duration delay) {
if (!delay.isNegative()) {
try {
- return ZonedDateTime.now().plus(delay);
+ return ZonedDateTime.now(clock).plus(delay);
} catch (DateTimeException | ArithmeticException e) {
return Instant.ofEpochMilli(Long.MAX_VALUE).atZone(ZoneId.of("UTC"));
}
}
- return ZonedDateTime.now();
+ return ZonedDateTime.now(clock);
}
@Override
@@ -198,7 +208,7 @@ public class MemoryMailQueueFactory implements MailQueueFactory<MemoryMailQueueF
return flux;
}
- public Mail getLastMail() throws MailQueueException, InterruptedException {
+ public Mail getLastMail() {
MemoryMailQueueItem maybeItem = Iterables.getLast(mailItems, null);
if (maybeItem == null) {
return null;
@@ -207,7 +217,7 @@ public class MemoryMailQueueFactory implements MailQueueFactory<MemoryMailQueueF
}
@Override
- public long getSize() throws MailQueueException {
+ public long getSize() {
return mailItems.size() + inProcessingMailItems.size();
}
@@ -224,14 +234,14 @@ public class MemoryMailQueueFactory implements MailQueueFactory<MemoryMailQueueF
}
@Override
- public long clear() throws MailQueueException {
+ public long clear() {
int size = mailItems.size();
mailItems.clear();
return size;
}
@Override
- public long remove(Type type, String value) throws MailQueueException {
+ public long remove(Type type, String value) {
ImmutableList<MemoryMailQueueItem> toBeRemoved = mailItems.stream()
.filter(item -> shouldRemove(item, type, value))
.collect(ImmutableList.toImmutableList());
@@ -261,7 +271,7 @@ public class MemoryMailQueueFactory implements MailQueueFactory<MemoryMailQueueF
}
@Override
- public MailQueueIterator browse() throws MailQueueException {
+ public MailQueueIterator browse() {
Iterator<DefaultMailQueueItemView> underlying = ImmutableList.copyOf(mailItems)
.stream()
.map(item -> new DefaultMailQueueItemView(item.getMail(), item.delivery))
@@ -305,11 +315,13 @@ public class MemoryMailQueueFactory implements MailQueueFactory<MemoryMailQueueF
public static class MemoryMailQueueItem implements MailQueue.MailQueueItem, Delayed {
private final Mail mail;
private final MemoryCacheableMailQueue queue;
+ private final Clock clock;
private final ZonedDateTime delivery;
- public MemoryMailQueueItem(Mail mail, MemoryCacheableMailQueue queue, ZonedDateTime delivery) {
+ public MemoryMailQueueItem(Mail mail, MemoryCacheableMailQueue queue, Clock clock, ZonedDateTime delivery) {
this.mail = mail;
this.queue = queue;
+ this.clock = clock;
this.delivery = delivery;
}
@@ -329,7 +341,7 @@ public class MemoryMailQueueFactory implements MailQueueFactory<MemoryMailQueueF
@Override
public long getDelay(TimeUnit unit) {
try {
- return ZonedDateTime.now().until(delivery, Temporals.chronoUnit(unit));
+ return ZonedDateTime.now(clock).until(delivery, Temporals.chronoUnit(unit));
} catch (ArithmeticException e) {
return Long.MAX_VALUE;
}
diff --git a/server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryCacheableMailQueueTest.java b/server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryCacheableMailQueueTest.java
index a2fb680ab6..218d28d954 100644
--- a/server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryCacheableMailQueueTest.java
+++ b/server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryCacheableMailQueueTest.java
@@ -22,6 +22,8 @@ package org.apache.james.queue.memory;
import static org.apache.james.queue.api.Mails.defaultMail;
import static org.assertj.core.api.Assertions.assertThat;
+import java.time.Clock;
+
import org.apache.james.queue.api.DelayedManageableMailQueueContract;
import org.apache.james.queue.api.MailQueue;
import org.apache.james.queue.api.MailQueueName;
@@ -37,7 +39,7 @@ public class MemoryCacheableMailQueueTest implements DelayedManageableMailQueueC
@BeforeEach
public void setUp() {
- mailQueue = new MemoryMailQueueFactory.MemoryCacheableMailQueue(MailQueueName.of("test"), new RawMailQueueItemDecoratorFactory());
+ mailQueue = new MemoryMailQueueFactory.MemoryCacheableMailQueue(MailQueueName.of("test"), new RawMailQueueItemDecoratorFactory(), Clock.systemUTC());
}
@AfterEach
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org