You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/12/10 02:30:24 UTC
[james-project] 21/27: JAMES-2979 fix spooler reactor usage
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 20b135341052d2630f46940de1eb7ea679c4ea95
Author: RĂ©mi KOWALSKI <rk...@linagora.com>
AuthorDate: Thu Nov 21 13:32:59 2019 +0100
JAMES-2979 fix spooler reactor usage
---
.../mailetcontainer/impl/JamesMailSpooler.java | 21 ++++++++++++---------
1 file changed, 12 insertions(+), 9 deletions(-)
diff --git a/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java b/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
index 2bd7ed0..2fb1799 100644
--- a/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
+++ b/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
@@ -78,6 +78,7 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
private final MailQueueFactory<?> queueFactory;
private reactor.core.Disposable disposable;
private Scheduler spooler;
+ private int parallelismLevel;
@Inject
public JamesMailSpooler(MetricFactory metricFactory, MailProcessor mailProcessor, MailQueueFactory<?> queueFactory) {
@@ -89,6 +90,9 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
@Override
public void configure(HierarchicalConfiguration<ImmutableNode> config) {
numThreads = config.getInt("threads", 100);
+ //Reactor helps us run things in parallel but we have to ensure there are always threads available
+ //in the threadpool to avoid starvation.
+ parallelismLevel = Math.max(1, numThreads - 2);
}
/**
@@ -105,19 +109,18 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
private void run() {
LOGGER.info("Queue={}", queue);
-
disposable = Flux.from(queue.deQueue())
- .flatMap(item -> handleOnQueueItem(item).subscribeOn(spooler))
+ .flatMap(item -> handleOnQueueItem(item).subscribeOn(spooler), parallelismLevel)
.onErrorContinue((throwable, item) -> LOGGER.error("Exception processing mail while spooling {}", item, throwable))
- .subscribeOn(Schedulers.boundedElastic())
+ .subscribeOn(spooler)
.subscribe();
}
private Mono<Void> handleOnQueueItem(MailQueueItem queueItem) {
TimeMetric timeMetric = metricFactory.timer(SPOOL_PROCESSING);
try {
- processingActive.incrementAndGet();
- return processMail(queueItem)
+ return Mono.fromCallable(processingActive::incrementAndGet)
+ .flatMap(ignore -> processMail(queueItem))
.doOnSuccess(any -> timeMetric.stopAndPublish())
.doOnSuccess(any -> processingActive.decrementAndGet());
} catch (Throwable e) {
@@ -126,14 +129,14 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
}
private Mono<Void> processMail(MailQueueItem queueItem) {
- Mail mail = queueItem.getMail();
- return Mono.fromRunnable(() -> LOGGER.debug("==== Begin processing mail {} ====", mail.getName()))
- .subscribeOn(Schedulers.boundedElastic())
+ Mono<Mail> mailPublisher = Mono.fromSupplier(queueItem::getMail);
+ return mailPublisher.flatMap(mail -> Mono.fromRunnable(() -> LOGGER.debug("==== Begin processing mail {} ====", mail.getName()))
+ .subscribeOn(spooler)
.then(Mono.fromCallable(() -> performProcessMail(mail)))
.flatMap(any -> acknowledgeItem(queueItem, true))
.onErrorResume(any -> acknowledgeItem(queueItem, false))
.then(Mono.fromRunnable(() -> LOGGER.debug("==== End processing mail {} ====", mail.getName())))
- .then(Mono.fromRunnable(() -> LifecycleUtil.dispose(mail)));
+ .then(Mono.fromRunnable(() -> LifecycleUtil.dispose(mail))));
}
private Mono<Void> acknowledgeItem(MailQueueItem queueItem, boolean success) {
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org