You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/03/20 12:15:51 UTC
[james-project] 08/08: JAMES-3105 Limit concurrency of mailbox
counters recomputation
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit f49b5b06a7a01af46d921a276b95beb334289c14
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Mar 20 10:53:46 2020 +0700
JAMES-3105 Limit concurrency of mailbox counters recomputation
---
.../cassandra/mail/task/RecomputeMailboxCountersService.java | 7 +++++--
1 file changed, 5 insertions(+), 2 deletions(-)
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/RecomputeMailboxCountersService.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/RecomputeMailboxCountersService.java
index b057c93..e14d16e 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/RecomputeMailboxCountersService.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/RecomputeMailboxCountersService.java
@@ -52,6 +52,9 @@ public class RecomputeMailboxCountersService {
private static final Logger LOGGER = LoggerFactory.getLogger(RecomputeMailboxCountersService.class);
+ private static final int MAILBOX_CONCURRENCY = 2;
+ private static final int MESSAGE_CONCURRENCY = 8;
+
private static class Counter {
private final CassandraId mailboxId;
private final AtomicLong total;
@@ -162,7 +165,7 @@ public class RecomputeMailboxCountersService {
Mono<Result> recomputeMailboxCounters(Context context) {
return mailboxDAO.retrieveAllMailboxes()
- .flatMap(mailbox -> recomputeMailboxCounter(context, mailbox))
+ .flatMap(mailbox -> recomputeMailboxCounter(context, mailbox), MAILBOX_CONCURRENCY)
.reduce(Result.COMPLETED, Task::combine)
.onErrorResume(e -> {
LOGGER.error("Error listing mailboxes", e);
@@ -175,7 +178,7 @@ public class RecomputeMailboxCountersService {
Counter counter = new Counter(mailboxId);
return imapUidToMessageIdDAO.retrieveMessages(mailboxId, MessageRange.all())
- .flatMap(message -> latestMetadata(mailboxId, message))
+ .flatMap(message -> latestMetadata(mailboxId, message), MESSAGE_CONCURRENCY)
.doOnNext(counter::process)
.then(Mono.defer(() -> counterDAO.resetCounters(counter.snapshot())))
.then(Mono.just(Result.COMPLETED))
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org