You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by rc...@apache.org on 2020/05/20 01:58:10 UTC

[james-project] 01/11: JAMES-3184 MessageFastViewProjectionCorrector concurrency control: RunningOptions

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 86bd9d6a6143fa6e94c4e3f5617dc46a6b7d86c4
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Mon May 18 10:30:23 2020 +0700

    JAMES-3184 MessageFastViewProjectionCorrector concurrency control: RunningOptions
    
    Fix message processing rate via throttling
---
 .../jmap/MessageFastViewProjectionCorrector.java   | 125 ++++++++++++++++-----
 .../RecomputeAllFastViewProjectionItemsTask.java   |   3 +-
 .../RecomputeUserFastViewProjectionItemsTask.java  |   7 +-
 3 files changed, 104 insertions(+), 31 deletions(-)

diff --git a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/MessageFastViewProjectionCorrector.java b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/MessageFastViewProjectionCorrector.java
index 2ba423c..27c65ab 100644
--- a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/MessageFastViewProjectionCorrector.java
+++ b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/MessageFastViewProjectionCorrector.java
@@ -20,6 +20,7 @@
 package org.apache.james.webadmin.data.jmap;
 
 import java.io.IOException;
+import java.time.Duration;
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.inject.Inject;
@@ -31,6 +32,7 @@ import org.apache.james.jmap.api.projections.MessageFastViewProjection;
 import org.apache.james.mailbox.MailboxManager;
 import org.apache.james.mailbox.MailboxSession;
 import org.apache.james.mailbox.MessageManager;
+import org.apache.james.mailbox.MessageUid;
 import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.model.FetchGroup;
 import org.apache.james.mailbox.model.MailboxMetaData;
@@ -45,13 +47,61 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.github.fge.lambdas.Throwing;
+import com.google.common.base.Preconditions;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.util.function.Tuple2;
 
 public class MessageFastViewProjectionCorrector {
-
     private static final Logger LOGGER = LoggerFactory.getLogger(MessageFastViewProjectionCorrector.class);
+    
+    private static final Duration DELAY = Duration.ZERO;
+    private static final Duration PERIOD = Duration.ofSeconds(1);
+
+    public static class RunningOptions {
+        public static RunningOptions withMessageRatePerSecond(int messageRatePerSecond) {
+            return new RunningOptions(messageRatePerSecond);
+        }
+
+        public static RunningOptions DEFAULT = new RunningOptions(10);
+
+        private final int messageRatePerSecond;
+
+        public RunningOptions(int messageRatePerSecond) {
+            Preconditions.checkArgument(messageRatePerSecond > 0, "'messageParallelism' must be strictly positive");
+
+            this.messageRatePerSecond = messageRatePerSecond;
+        }
+
+        public int getMessageRatePerSecond() {
+            return messageRatePerSecond;
+        }
+    }
+
+    private static class ProjectionEntry {
+        private final MessageManager messageManager;
+        private final MessageUid uid;
+        private final MailboxSession session;
+
+        private ProjectionEntry(MessageManager messageManager, MessageUid uid, MailboxSession session) {
+            this.messageManager = messageManager;
+            this.uid = uid;
+            this.session = session;
+        }
+
+        private MessageManager getMessageManager() {
+            return messageManager;
+        }
+
+        private MessageUid getUid() {
+            return uid;
+        }
+
+        private MailboxSession getSession() {
+            return session;
+        }
+    }
 
     static class Progress {
         private final AtomicLong processedUserCount;
@@ -102,48 +152,66 @@ public class MessageFastViewProjectionCorrector {
         this.projectionItemFactory = projectionItemFactory;
     }
 
-    Mono<Void> correctAllProjectionItems(Progress progress) {
+    Mono<Void> correctAllProjectionItems(Progress progress, RunningOptions runningOptions) {
+        return correctProjection(listAllMailboxMessages(progress), runningOptions, progress);
+    }
+
+    Mono<Void> correctUsersProjectionItems(Progress progress, Username username, RunningOptions runningOptions) {
+        MailboxSession session = mailboxManager.createSystemSession(username);
+        return correctProjection(listUserMailboxMessages(progress, session), runningOptions, progress);
+    }
+
+    private Flux<ProjectionEntry> listAllMailboxMessages(Progress progress) {
         try {
             return Iterators.toFlux(usersRepository.list())
-                .concatMap(username -> correctUsersProjectionItems(progress, username))
-                .then();
+                .map(mailboxManager::createSystemSession)
+                .doOnNext(any -> progress.processedUserCount.incrementAndGet())
+                .flatMap(session -> listUserMailboxMessages(progress, session));
         } catch (UsersRepositoryException e) {
-            return Mono.error(e);
+            return Flux.error(e);
         }
     }
 
-    Mono<Void> correctUsersProjectionItems(Progress progress, Username username) {
+    private Flux<ProjectionEntry> listUserMailboxMessages(Progress progress, MailboxSession session) {
         try {
-            MailboxSession session = mailboxManager.createSystemSession(username);
             return listUsersMailboxes(session)
-                .concatMap(mailboxMetadata -> retrieveMailbox(session, mailboxMetadata))
-                .concatMap(Throwing.function(messageManager -> correctMailboxProjectionItems(progress, messageManager, session)))
-                .doOnComplete(progress.processedUserCount::incrementAndGet)
-                .onErrorContinue((error, o) -> {
-                    LOGGER.error("JMAP fastview re-computation aborted for {}", username, error);
-                    progress.failedUserCount.incrementAndGet();
-                })
-                .then();
+                .flatMap(mailboxMetadata -> retrieveMailbox(session, mailboxMetadata))
+                .flatMap(Throwing.function(messageManager -> listAllMailboxMessages(messageManager, session)
+                    .map(message -> new ProjectionEntry(messageManager, message.getUid(), session))));
         } catch (MailboxException e) {
-            LOGGER.error("JMAP fastview re-computation aborted for {} as we failed listing user mailboxes", username, e);
+            LOGGER.error("JMAP fastview re-computation aborted for {} as we failed listing user mailboxes", session.getUser(), e);
             progress.failedUserCount.incrementAndGet();
-            return Mono.empty();
+            return Flux.empty();
         }
     }
 
-    private Mono<Void> correctMailboxProjectionItems(Progress progress, MessageManager messageManager, MailboxSession session) throws MailboxException {
-        return listAllMailboxMessages(messageManager, session)
-            .concatMap(messageResult -> retrieveContent(messageManager, session, messageResult))
+    private Mono<Void> correctProjection(ProjectionEntry entry, Progress progress) {
+        return retrieveContent(entry.getMessageManager(), entry.getSession(), entry.getUid())
             .map(this::computeProjectionEntry)
-            .concatMap(pair -> storeProjectionEntry(pair)
-                .doOnSuccess(any -> progress.processedMessageCount.incrementAndGet()))
-            .onErrorContinue((error, triggeringValue) -> {
-                LOGGER.error("JMAP fastview re-computation aborted for {} - {}", session.getUser(), triggeringValue, error);
+            .flatMap(this::storeProjectionEntry)
+            .doOnSuccess(any -> progress.processedMessageCount.incrementAndGet())
+            .onErrorResume(e -> {
+                LOGGER.error("JMAP fastview re-computation aborted for {} - {} - {}",
+                    entry.getSession().getUser(),
+                    entry.getMessageManager().getId(),
+                    entry.getUid(), e);
                 progress.failedMessageCount.incrementAndGet();
-            })
+                return Mono.empty();
+            });
+    }
+
+    private Mono<Void> correctProjection(Flux<ProjectionEntry> entries, RunningOptions runningOptions, Progress progress) {
+        return throttleWithRate(entries, runningOptions)
+            .flatMap(entry -> correctProjection(entry, progress))
             .then();
     }
 
+    private Flux<ProjectionEntry> throttleWithRate(Flux<ProjectionEntry> entries, RunningOptions runningOptions) {
+        return entries.windowTimeout(runningOptions.getMessageRatePerSecond(), Duration.ofSeconds(1))
+            .zipWith(Flux.interval(DELAY, PERIOD))
+            .flatMap(Tuple2::getT1);
+    }
+
     private Flux<MailboxMetaData> listUsersMailboxes(MailboxSession session) throws MailboxException {
         return Flux.fromIterable(mailboxManager.search(MailboxQuery.privateMailboxesBuilder(session).build(), session));
     }
@@ -156,11 +224,12 @@ public class MessageFastViewProjectionCorrector {
         return Iterators.toFlux(messageManager.getMessages(MessageRange.all(), FetchGroup.MINIMAL, session));
     }
 
-    private Flux<MessageResult> retrieveContent(MessageManager messageManager, MailboxSession session, MessageResult messageResult) {
+    private Mono<MessageResult> retrieveContent(MessageManager messageManager, MailboxSession session, MessageUid uid) {
         try {
-            return Iterators.toFlux(messageManager.getMessages(MessageRange.one(messageResult.getUid()), FetchGroup.FULL_CONTENT, session));
+            return Iterators.toFlux(messageManager.getMessages(MessageRange.one(uid), FetchGroup.FULL_CONTENT, session))
+                .next();
         } catch (MailboxException e) {
-            return Flux.error(e);
+            return Mono.error(e);
         }
     }
 
diff --git a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/RecomputeAllFastViewProjectionItemsTask.java b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/RecomputeAllFastViewProjectionItemsTask.java
index 4ef9365..3556756 100644
--- a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/RecomputeAllFastViewProjectionItemsTask.java
+++ b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/RecomputeAllFastViewProjectionItemsTask.java
@@ -29,6 +29,7 @@ import org.apache.james.server.task.json.dto.TaskDTOModule;
 import org.apache.james.task.Task;
 import org.apache.james.task.TaskExecutionDetails;
 import org.apache.james.task.TaskType;
+import org.apache.james.webadmin.data.jmap.MessageFastViewProjectionCorrector.RunningOptions;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 
@@ -116,7 +117,7 @@ public class RecomputeAllFastViewProjectionItemsTask implements Task {
 
     @Override
     public Result run() {
-        corrector.correctAllProjectionItems(progress)
+        corrector.correctAllProjectionItems(progress, RunningOptions.DEFAULT)
             .subscribeOn(Schedulers.elastic())
             .block();
 
diff --git a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/RecomputeUserFastViewProjectionItemsTask.java b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/RecomputeUserFastViewProjectionItemsTask.java
index d29435a..439f3df 100644
--- a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/RecomputeUserFastViewProjectionItemsTask.java
+++ b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/RecomputeUserFastViewProjectionItemsTask.java
@@ -19,6 +19,9 @@
 
 package org.apache.james.webadmin.data.jmap;
 
+import static org.apache.james.webadmin.data.jmap.MessageFastViewProjectionCorrector.Progress;
+import static org.apache.james.webadmin.data.jmap.MessageFastViewProjectionCorrector.RunningOptions;
+
 import java.time.Clock;
 import java.time.Instant;
 import java.util.Optional;
@@ -39,7 +42,7 @@ public class RecomputeUserFastViewProjectionItemsTask implements Task {
     static final TaskType TASK_TYPE = TaskType.of("RecomputeUserFastViewProjectionItemsTask");
 
     public static class AdditionalInformation implements TaskExecutionDetails.AdditionalInformation {
-        private static AdditionalInformation from(MessageFastViewProjectionCorrector.Progress progress, Username username) {
+        private static AdditionalInformation from(Progress progress, Username username) {
             return new AdditionalInformation(username,
                 progress.getProcessedMessageCount(),
                 progress.getFailedMessageCount(),
@@ -119,7 +122,7 @@ public class RecomputeUserFastViewProjectionItemsTask implements Task {
 
     @Override
     public Result run() {
-        corrector.correctUsersProjectionItems(progress, username)
+        corrector.correctUsersProjectionItems(progress, username, RunningOptions.DEFAULT)
             .subscribeOn(Schedulers.elastic())
             .block();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org