You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2023/05/02 10:48:36 UTC

[james-project] 02/02: JAMES-3822 MemoryCacheableMailQueue should use a clock to manage delays

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit aa10262880e8081d1b8b2650f015c970fafd9409
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Apr 14 09:58:11 2023 +0700

    JAMES-3822 MemoryCacheableMailQueue should use a clock to manage delays
    
    This enables writing tests on the delay logic.
---
 .../james/queue/memory/MemoryMailQueueFactory.java | 38 ++++++++++++++--------
 .../queue/memory/MemoryCacheableMailQueueTest.java |  4 ++-
 2 files changed, 28 insertions(+), 14 deletions(-)

diff --git a/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java b/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java
index 73bf35f1d7..b11aaab97a 100644
--- a/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java
+++ b/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java
@@ -21,6 +21,7 @@ package org.apache.james.queue.memory;
 
 import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
 
+import java.time.Clock;
 import java.time.DateTimeException;
 import java.time.Duration;
 import java.time.Instant;
@@ -69,11 +70,17 @@ public class MemoryMailQueueFactory implements MailQueueFactory<MemoryMailQueueF
 
     private final ConcurrentHashMap<MailQueueName, MemoryCacheableMailQueue> mailQueues;
     private final MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory;
+    private final Clock clock;
 
     @Inject
-    public MemoryMailQueueFactory(MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory) {
+    public MemoryMailQueueFactory(MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory, Clock clock) {
         this.mailQueues = new ConcurrentHashMap<>();
         this.mailQueueItemDecoratorFactory = mailQueueItemDecoratorFactory;
+        this.clock = clock;
+    }
+
+    public MemoryMailQueueFactory(MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory) {
+        this(mailQueueItemDecoratorFactory, Clock.systemUTC());
     }
 
     @PreDestroy
@@ -98,7 +105,7 @@ public class MemoryMailQueueFactory implements MailQueueFactory<MemoryMailQueueF
 
     @Override
     public MemoryCacheableMailQueue createQueue(MailQueueName name, PrefetchCount prefetchCount) {
-        MemoryCacheableMailQueue queue = mailQueues.computeIfAbsent(name, mailQueueName -> new MemoryCacheableMailQueue(mailQueueName, mailQueueItemDecoratorFactory));
+        MemoryCacheableMailQueue queue = mailQueues.computeIfAbsent(name, mailQueueName -> new MemoryCacheableMailQueue(mailQueueName, mailQueueItemDecoratorFactory, clock));
         queue.reference();
         return queue;
     }
@@ -110,8 +117,10 @@ public class MemoryMailQueueFactory implements MailQueueFactory<MemoryMailQueueF
         private final MailQueueName name;
         private final Flux<MailQueueItem> flux;
         private final Scheduler scheduler;
+        private final Clock clock;
 
-        public MemoryCacheableMailQueue(MailQueueName name, MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory) {
+        public MemoryCacheableMailQueue(MailQueueName name, MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory, Clock clock) {
+            this.clock = clock;
             this.mailItems = new DelayQueue<>();
             this.inProcessingMailItems = new LinkedBlockingDeque<>();
             this.name = name;
@@ -120,6 +129,7 @@ public class MemoryMailQueueFactory implements MailQueueFactory<MemoryMailQueueF
                 try {
                     sink.success(mailItems.take());
                 } catch (InterruptedException e) {
+                    sink.error(e);
                     Thread.currentThread().interrupt();
                 }
             })
@@ -155,7 +165,7 @@ public class MemoryMailQueueFactory implements MailQueueFactory<MemoryMailQueueF
         public void enQueue(Mail mail, Duration delay) throws MailQueueException {
             ZonedDateTime nextDelivery = calculateNextDelivery(delay);
             try {
-                mailItems.put(new MemoryMailQueueItem(cloneMail(mail), this, nextDelivery));
+                mailItems.put(new MemoryMailQueueItem(cloneMail(mail), this, clock, nextDelivery));
             } catch (MessagingException e) {
                 throw new MailQueueException("Error while copying mail " + mail.getName(), e);
             }
@@ -169,13 +179,13 @@ public class MemoryMailQueueFactory implements MailQueueFactory<MemoryMailQueueF
         private ZonedDateTime calculateNextDelivery(Duration delay) {
             if (!delay.isNegative()) {
                 try {
-                    return ZonedDateTime.now().plus(delay);
+                    return ZonedDateTime.now(clock).plus(delay);
                 } catch (DateTimeException | ArithmeticException e) {
                     return Instant.ofEpochMilli(Long.MAX_VALUE).atZone(ZoneId.of("UTC"));
                 }
             }
 
-            return ZonedDateTime.now();
+            return ZonedDateTime.now(clock);
         }
 
         @Override
@@ -198,7 +208,7 @@ public class MemoryMailQueueFactory implements MailQueueFactory<MemoryMailQueueF
             return flux;
         }
 
-        public Mail getLastMail() throws MailQueueException, InterruptedException {
+        public Mail getLastMail() {
             MemoryMailQueueItem maybeItem = Iterables.getLast(mailItems, null);
             if (maybeItem == null) {
                 return null;
@@ -207,7 +217,7 @@ public class MemoryMailQueueFactory implements MailQueueFactory<MemoryMailQueueF
         }
 
         @Override
-        public long getSize() throws MailQueueException {
+        public long getSize() {
             return mailItems.size() + inProcessingMailItems.size();
         }
 
@@ -224,14 +234,14 @@ public class MemoryMailQueueFactory implements MailQueueFactory<MemoryMailQueueF
         }
 
         @Override
-        public long clear() throws MailQueueException {
+        public long clear() {
             int size = mailItems.size();
             mailItems.clear();
             return size;
         }
 
         @Override
-        public long remove(Type type, String value) throws MailQueueException {
+        public long remove(Type type, String value) {
             ImmutableList<MemoryMailQueueItem> toBeRemoved = mailItems.stream()
                 .filter(item -> shouldRemove(item, type, value))
                 .collect(ImmutableList.toImmutableList());
@@ -261,7 +271,7 @@ public class MemoryMailQueueFactory implements MailQueueFactory<MemoryMailQueueF
         }
 
         @Override
-        public MailQueueIterator browse() throws MailQueueException {
+        public MailQueueIterator browse() {
             Iterator<DefaultMailQueueItemView> underlying = ImmutableList.copyOf(mailItems)
                 .stream()
                 .map(item -> new DefaultMailQueueItemView(item.getMail(), item.delivery))
@@ -305,11 +315,13 @@ public class MemoryMailQueueFactory implements MailQueueFactory<MemoryMailQueueF
     public static class MemoryMailQueueItem implements MailQueue.MailQueueItem, Delayed {
         private final Mail mail;
         private final MemoryCacheableMailQueue queue;
+        private final Clock clock;
         private final ZonedDateTime delivery;
 
-        public MemoryMailQueueItem(Mail mail, MemoryCacheableMailQueue queue, ZonedDateTime delivery) {
+        public MemoryMailQueueItem(Mail mail, MemoryCacheableMailQueue queue, Clock clock, ZonedDateTime delivery) {
             this.mail = mail;
             this.queue = queue;
+            this.clock = clock;
             this.delivery = delivery;
         }
 
@@ -329,7 +341,7 @@ public class MemoryMailQueueFactory implements MailQueueFactory<MemoryMailQueueF
         @Override
         public long getDelay(TimeUnit unit) {
             try {
-                return ZonedDateTime.now().until(delivery, Temporals.chronoUnit(unit));
+                return ZonedDateTime.now(clock).until(delivery, Temporals.chronoUnit(unit));
             } catch (ArithmeticException e) {
                 return Long.MAX_VALUE;
             }
diff --git a/server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryCacheableMailQueueTest.java b/server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryCacheableMailQueueTest.java
index a2fb680ab6..218d28d954 100644
--- a/server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryCacheableMailQueueTest.java
+++ b/server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryCacheableMailQueueTest.java
@@ -22,6 +22,8 @@ package org.apache.james.queue.memory;
 import static org.apache.james.queue.api.Mails.defaultMail;
 import static org.assertj.core.api.Assertions.assertThat;
 
+import java.time.Clock;
+
 import org.apache.james.queue.api.DelayedManageableMailQueueContract;
 import org.apache.james.queue.api.MailQueue;
 import org.apache.james.queue.api.MailQueueName;
@@ -37,7 +39,7 @@ public class MemoryCacheableMailQueueTest implements DelayedManageableMailQueueC
 
     @BeforeEach
     public void setUp() {
-        mailQueue = new MemoryMailQueueFactory.MemoryCacheableMailQueue(MailQueueName.of("test"), new RawMailQueueItemDecoratorFactory());
+        mailQueue = new MemoryMailQueueFactory.MemoryCacheableMailQueue(MailQueueName.of("test"), new RawMailQueueItemDecoratorFactory(), Clock.systemUTC());
     }
 
     @AfterEach


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org