You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/12/10 02:30:24 UTC

[james-project] 21/27: JAMES-2979 fix spooler reactor usage

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 20b135341052d2630f46940de1eb7ea679c4ea95
Author: RĂ©mi KOWALSKI <rk...@linagora.com>
AuthorDate: Thu Nov 21 13:32:59 2019 +0100

    JAMES-2979 fix spooler reactor usage
---
 .../mailetcontainer/impl/JamesMailSpooler.java      | 21 ++++++++++++---------
 1 file changed, 12 insertions(+), 9 deletions(-)

diff --git a/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java b/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
index 2bd7ed0..2fb1799 100644
--- a/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
+++ b/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
@@ -78,6 +78,7 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
     private final MailQueueFactory<?> queueFactory;
     private reactor.core.Disposable disposable;
     private Scheduler spooler;
+    private int parallelismLevel;
 
     @Inject
     public JamesMailSpooler(MetricFactory metricFactory, MailProcessor mailProcessor, MailQueueFactory<?> queueFactory) {
@@ -89,6 +90,9 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
     @Override
     public void configure(HierarchicalConfiguration<ImmutableNode> config) {
         numThreads = config.getInt("threads", 100);
+        //Reactor helps us run things in parallel but we have to ensure there are always threads available
+        //in the threadpool to avoid starvation.
+        parallelismLevel = Math.max(1, numThreads - 2);
     }
 
     /**
@@ -105,19 +109,18 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
 
     private void run() {
         LOGGER.info("Queue={}", queue);
-
         disposable = Flux.from(queue.deQueue())
-            .flatMap(item -> handleOnQueueItem(item).subscribeOn(spooler))
+            .flatMap(item -> handleOnQueueItem(item).subscribeOn(spooler), parallelismLevel)
             .onErrorContinue((throwable, item) -> LOGGER.error("Exception processing mail while spooling {}", item, throwable))
-            .subscribeOn(Schedulers.boundedElastic())
+            .subscribeOn(spooler)
             .subscribe();
     }
 
     private Mono<Void> handleOnQueueItem(MailQueueItem queueItem) {
         TimeMetric timeMetric = metricFactory.timer(SPOOL_PROCESSING);
         try {
-            processingActive.incrementAndGet();
-            return processMail(queueItem)
+            return Mono.fromCallable(processingActive::incrementAndGet)
+                .flatMap(ignore -> processMail(queueItem))
                 .doOnSuccess(any -> timeMetric.stopAndPublish())
                 .doOnSuccess(any -> processingActive.decrementAndGet());
         } catch (Throwable e) {
@@ -126,14 +129,14 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
     }
 
     private Mono<Void> processMail(MailQueueItem queueItem) {
-        Mail mail = queueItem.getMail();
-        return Mono.fromRunnable(() -> LOGGER.debug("==== Begin processing mail {} ====", mail.getName()))
-            .subscribeOn(Schedulers.boundedElastic())
+        Mono<Mail> mailPublisher = Mono.fromSupplier(queueItem::getMail);
+        return mailPublisher.flatMap(mail -> Mono.fromRunnable(() -> LOGGER.debug("==== Begin processing mail {} ====", mail.getName()))
+            .subscribeOn(spooler)
             .then(Mono.fromCallable(() -> performProcessMail(mail)))
             .flatMap(any -> acknowledgeItem(queueItem, true))
             .onErrorResume(any -> acknowledgeItem(queueItem, false))
             .then(Mono.fromRunnable(() -> LOGGER.debug("==== End processing mail {} ====", mail.getName())))
-            .then(Mono.fromRunnable(() -> LifecycleUtil.dispose(mail)));
+            .then(Mono.fromRunnable(() -> LifecycleUtil.dispose(mail))));
     }
 
     private Mono<Void> acknowledgeItem(MailQueueItem queueItem, boolean success) {


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org