You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/03/20 12:15:51 UTC

[james-project] 08/08: JAMES-3105 Limit concurrency of mailbox counters recomputation

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit f49b5b06a7a01af46d921a276b95beb334289c14
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Mar 20 10:53:46 2020 +0700

    JAMES-3105 Limit concurrency of mailbox counters recomputation
---
 .../cassandra/mail/task/RecomputeMailboxCountersService.java       | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)

diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/RecomputeMailboxCountersService.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/RecomputeMailboxCountersService.java
index b057c93..e14d16e 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/RecomputeMailboxCountersService.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/RecomputeMailboxCountersService.java
@@ -52,6 +52,9 @@ public class RecomputeMailboxCountersService {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(RecomputeMailboxCountersService.class);
 
+    private static final int MAILBOX_CONCURRENCY = 2;
+    private static final int MESSAGE_CONCURRENCY = 8;
+
     private static class Counter {
         private final CassandraId mailboxId;
         private final AtomicLong total;
@@ -162,7 +165,7 @@ public class RecomputeMailboxCountersService {
 
     Mono<Result> recomputeMailboxCounters(Context context) {
         return mailboxDAO.retrieveAllMailboxes()
-            .flatMap(mailbox -> recomputeMailboxCounter(context, mailbox))
+            .flatMap(mailbox -> recomputeMailboxCounter(context, mailbox), MAILBOX_CONCURRENCY)
             .reduce(Result.COMPLETED, Task::combine)
             .onErrorResume(e -> {
                 LOGGER.error("Error listing mailboxes", e);
@@ -175,7 +178,7 @@ public class RecomputeMailboxCountersService {
         Counter counter = new Counter(mailboxId);
 
         return imapUidToMessageIdDAO.retrieveMessages(mailboxId, MessageRange.all())
-            .flatMap(message -> latestMetadata(mailboxId, message))
+            .flatMap(message -> latestMetadata(mailboxId, message), MESSAGE_CONCURRENCY)
             .doOnNext(counter::process)
             .then(Mono.defer(() -> counterDAO.resetCounters(counter.snapshot())))
             .then(Mono.just(Result.COMPLETED))


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org