You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2022/08/22 15:59:39 UTC

[james-project] 01/04: [BUILD] MemoryMailQueue should use its dedicated scheduler

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 7edac2f6e6826240878c63485022f6b63d05c312
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Mon Aug 15 15:51:07 2022 +0700

    [BUILD] MemoryMailQueue should use its dedicated scheduler
    
    Underlying flux was never disposed of. Relying on boundedElastic means after running 80 tests the memory
    mail queue would starve the underlying thread pool.
    
    Running a distinct scheduler allows repeated tests
    (10.000 iterations) and allowed to reproduce some CI
    fails.
---
 .../java/org/apache/james/queue/memory/MemoryMailQueueFactory.java | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)

diff --git a/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java b/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java
index 1c994edef3..68b04b8a6e 100644
--- a/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java
+++ b/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java
@@ -59,6 +59,7 @@ import com.google.common.collect.Iterables;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Scheduler;
 import reactor.core.scheduler.Schedulers;
 
 public class MemoryMailQueueFactory implements MailQueueFactory<MemoryMailQueueFactory.MemoryCacheableMailQueue> {
@@ -95,14 +96,16 @@ public class MemoryMailQueueFactory implements MailQueueFactory<MemoryMailQueueF
         private final LinkedBlockingDeque<MemoryMailQueueItem> inProcessingMailItems;
         private final MailQueueName name;
         private final Flux<MailQueueItem> flux;
+        private final Scheduler scheduler;
 
         public MemoryCacheableMailQueue(MailQueueName name, MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory) {
             this.mailItems = new DelayQueue<>();
             this.inProcessingMailItems = new LinkedBlockingDeque<>();
             this.name = name;
+            this.scheduler = Schedulers.newSingle("memory-mail-queue");
             this.flux = Mono.fromCallable(mailItems::take)
                 .repeat()
-                .subscribeOn(Schedulers.boundedElastic())
+                .subscribeOn(scheduler)
                 .flatMap(item ->
                     Mono.fromRunnable(() -> inProcessingMailItems.add(item)).thenReturn(item), DEFAULT_CONCURRENCY)
                 .map(item -> mailQueueItemDecoratorFactory.decorate(item, name));
@@ -110,7 +113,7 @@ public class MemoryMailQueueFactory implements MailQueueFactory<MemoryMailQueueF
 
         @Override
         public void close() {
-            //There's no resource to free
+            this.scheduler.dispose();
         }
 
         @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org