You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/06/03 07:19:59 UTC

[james-project] 09/12: JAMES-3184 Throttling for reindexing tasks

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 11682778c23e4e8976223cf0e29398bfc193dd52
Author: LanKhuat <kh...@gmail.com>
AuthorDate: Mon May 25 15:32:01 2020 +0700

    JAMES-3184 Throttling for reindexing tasks
---
 .../mailbox/tools/indexer/ReIndexerPerformer.java  | 155 ++++++++++++++-------
 1 file changed, 101 insertions(+), 54 deletions(-)

diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
index 433e975..390a705 100644
--- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
+++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
@@ -19,6 +19,10 @@
 
 package org.apache.mailbox.tools.indexer;
 
+import static org.apache.james.mailbox.store.mail.AbstractMessageMapper.UNLIMITED;
+
+import java.time.Duration;
+
 import javax.inject.Inject;
 
 import org.apache.james.core.Username;
@@ -27,13 +31,14 @@ import org.apache.james.mailbox.MailboxSession;
 import org.apache.james.mailbox.MessageUid;
 import org.apache.james.mailbox.indexer.ReIndexer.RunningOptions;
 import org.apache.james.mailbox.indexer.ReIndexingExecutionFailures;
+import org.apache.james.mailbox.indexer.ReIndexingExecutionFailures.ReIndexingFailure;
 import org.apache.james.mailbox.model.Mailbox;
 import org.apache.james.mailbox.model.MailboxId;
-import org.apache.james.mailbox.model.MailboxMetaData;
 import org.apache.james.mailbox.model.MessageId;
 import org.apache.james.mailbox.model.MessageRange;
 import org.apache.james.mailbox.model.search.MailboxQuery;
 import org.apache.james.mailbox.store.MailboxSessionMapperFactory;
+import org.apache.james.mailbox.store.mail.MailboxMapper;
 import org.apache.james.mailbox.store.mail.MessageMapper;
 import org.apache.james.mailbox.store.mail.model.MailboxMessage;
 import org.apache.james.mailbox.store.search.ListeningMessageSearchIndex;
@@ -46,15 +51,39 @@ import com.google.common.collect.ImmutableList;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.util.function.Tuple2;
 
 public class ReIndexerPerformer {
+    private static class ReIndexingEntry {
+        private final Mailbox mailbox;
+        private final MailboxSession mailboxSession;
+        private final MailboxMessage message;
+
+        ReIndexingEntry(Mailbox mailbox, MailboxSession mailboxSession, MailboxMessage message) {
+            this.mailbox = mailbox;
+            this.mailboxSession = mailboxSession;
+            this.message = message;
+        }
+
+        public Mailbox getMailbox() {
+            return mailbox;
+        }
+
+        public MailboxMessage getMessage() {
+            return message;
+        }
+
+        public MailboxSession getMailboxSession() {
+            return mailboxSession;
+        }
+    }
+
     private static final Logger LOGGER = LoggerFactory.getLogger(ReIndexerPerformer.class);
 
     private static final int SINGLE_MESSAGE = 1;
     private static final String RE_INDEXING = "re-indexing";
     private static final Username RE_INDEXER_PERFORMER_USER = Username.of(RE_INDEXING);
-    private static final int NO_CONCURRENCY = 1;
-    private static final int NO_PREFETCH = 1;
+    private static final Duration DELAY = Duration.ofSeconds(0);
 
     private final MailboxManager mailboxManager;
     private final ListeningMessageSearchIndex messageSearchIndex;
@@ -72,33 +101,41 @@ public class ReIndexerPerformer {
     Mono<Result> reIndexAllMessages(ReprocessingContext reprocessingContext, RunningOptions runningOptions) {
         MailboxSession mailboxSession = mailboxManager.createSystemSession(RE_INDEXER_PERFORMER_USER);
         LOGGER.info("Starting a full reindex");
-        return mailboxSessionMapperFactory.getMailboxMapper(mailboxSession).list()
-            .flatMap(mailbox -> reIndex(reprocessingContext, mailboxSession, mailbox, runningOptions), NO_CONCURRENCY, NO_PREFETCH)
-            .reduce(Task::combine)
-            .switchIfEmpty(Mono.just(Result.COMPLETED))
+
+        Flux<ReIndexingEntry> entriesToIndex = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession).list()
+            .flatMap(mailbox -> reIndexingEntriesForMailbox(mailbox, mailboxSession));
+
+        return reIndexMessages(entriesToIndex, runningOptions, reprocessingContext)
             .doFinally(any -> LOGGER.info("Full reindex finished"));
     }
 
     Mono<Result> reIndexSingleMailbox(MailboxId mailboxId, ReprocessingContext reprocessingContext, RunningOptions runningOptions) {
         MailboxSession mailboxSession = mailboxManager.createSystemSession(RE_INDEXER_PERFORMER_USER);
 
-        return mailboxSessionMapperFactory.getMailboxMapper(mailboxSession)
+        Flux<ReIndexingEntry> entriesToIndex = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession)
             .findMailboxByIdReactive(mailboxId)
-            .flatMap(mailbox -> reIndex(reprocessingContext, mailboxSession, mailbox, runningOptions));
+            .flatMapMany(mailbox -> reIndexingEntriesForMailbox(mailbox, mailboxSession));
+
+        return reIndexMessages(entriesToIndex, runningOptions, reprocessingContext);
     }
 
     Mono<Result> reIndexUserMailboxes(Username username, ReprocessingContext reprocessingContext, RunningOptions runningOptions) {
         MailboxSession mailboxSession = mailboxManager.createSystemSession(username);
+        MailboxMapper mailboxMapper = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession);
         LOGGER.info("Starting a reindex for user {}", username.asString());
 
         MailboxQuery mailboxQuery = MailboxQuery.privateMailboxesBuilder(mailboxSession).build();
 
-        return mailboxManager.searchReactive(mailboxQuery, mailboxSession)
-            .map(MailboxMetaData::getId)
-            .flatMap(id -> reIndexSingleMailbox(id, reprocessingContext, runningOptions), NO_CONCURRENCY, NO_PREFETCH)
-            .reduce(Task::combine)
-            .switchIfEmpty(Mono.just(Result.COMPLETED))
-            .doFinally(any -> LOGGER.info("User {} reindex finished", username.asString()));
+        try {
+            Flux<ReIndexingEntry> entriesToIndex = mailboxMapper.findMailboxWithPathLike(mailboxQuery.asUserBound())
+                .flatMap(mailbox -> reIndexingEntriesForMailbox(mailbox, mailboxSession));
+
+            return reIndexMessages(entriesToIndex, runningOptions, reprocessingContext)
+                .doFinally(any -> LOGGER.info("User {} reindex finished", username.asString()));
+        } catch (Exception e) {
+            LOGGER.error("Error fetching mailboxes for user: {}", username.asString());
+            return Mono.just(Result.PARTIAL);
+        }
     }
 
     Mono<Result> reIndexSingleMessage(MailboxId mailboxId, MessageUid uid, ReprocessingContext reprocessingContext) {
@@ -106,7 +143,9 @@ public class ReIndexerPerformer {
 
         return mailboxSessionMapperFactory.getMailboxMapper(mailboxSession)
             .findMailboxByIdReactive(mailboxId)
-            .flatMap(mailbox -> handleMessageReIndexing(mailboxSession, mailbox, uid, reprocessingContext));
+            .flatMap(mailbox -> fullyReadMessage(mailboxSession, mailbox, uid)
+                .flatMap(message -> reIndexMessage(mailboxSession, mailbox, reprocessingContext, message)))
+            .switchIfEmpty(Mono.just(Result.COMPLETED));
     }
 
     Mono<Result> reIndexMessageId(MessageId messageId) {
@@ -124,33 +163,11 @@ public class ReIndexerPerformer {
     }
 
     Mono<Result> reIndexErrors(ReprocessingContext reprocessingContext, ReIndexingExecutionFailures previousReIndexingFailures, RunningOptions runningOptions) {
-        return Flux.fromIterable(previousReIndexingFailures.failures())
-            .flatMap(previousFailure -> reIndex(reprocessingContext, previousFailure), runningOptions.getMessagesPerSecond())
-            .reduce(Task::combine)
-            .switchIfEmpty(Mono.just(Result.COMPLETED));
-    }
-
-    private Mono<Result> reIndex(ReprocessingContext reprocessingContext, MailboxSession mailboxSession, Mailbox mailbox, RunningOptions runningOptions) {
-        LOGGER.info("Attempt to reindex mailbox with mailboxId {}", mailbox.getMailboxId().serialize());
-        return messageSearchIndex.deleteAll(mailboxSession, mailbox.getMailboxId())
-            .then(mailboxSessionMapperFactory.getMessageMapper(mailboxSession)
-                .listAllMessageUids(mailbox)
-                .flatMap(uid -> handleMessageReIndexing(mailboxSession, mailbox, uid, reprocessingContext), runningOptions.getMessagesPerSecond())
-                .reduce(Task::combine)
-                .switchIfEmpty(Mono.just(Result.COMPLETED))
-                .doFinally(any -> LOGGER.info("Finish to reindex mailbox with mailboxId {}", mailbox.getMailboxId().serialize())));
-    }
-
-    private Mono<Result> reIndex(ReprocessingContext reprocessingContext, ReIndexingExecutionFailures.ReIndexingFailure previousReIndexingFailure) {
-        MailboxId mailboxId = previousReIndexingFailure.getMailboxId();
-        MessageUid uid = previousReIndexingFailure.getUid();
-
-        return reIndexSingleMessage(mailboxId, uid, reprocessingContext)
-            .onErrorResume(e -> {
-                LOGGER.warn("ReIndexing failed for {} {}", mailboxId, uid, e);
-                reprocessingContext.recordFailureDetailsForMessage(mailboxId, uid);
-                return Mono.just(Result.PARTIAL);
-            });
+        return reIndexMessages(
+            Flux.fromIterable(previousReIndexingFailures.failures())
+                .flatMap(this::createReindexingEntryFromFailure),
+            runningOptions,
+            reprocessingContext);
     }
 
     private Mono<Result> reIndex(MailboxMessage mailboxMessage, MailboxSession session) {
@@ -164,21 +181,51 @@ public class ReIndexerPerformer {
             });
     }
 
-    private Mono<Result> handleMessageReIndexing(MailboxSession mailboxSession, Mailbox mailbox, MessageUid uid, ReprocessingContext reprocessingContext) {
-        return fullyReadMessage(mailboxSession, mailbox, uid)
-            .flatMap(message -> messageSearchIndex.add(mailboxSession, mailbox, message))
-            .thenReturn(Result.COMPLETED)
+    private Mono<MailboxMessage> fullyReadMessage(MailboxSession mailboxSession, Mailbox mailbox, MessageUid mUid) {
+        return mailboxSessionMapperFactory.getMessageMapper(mailboxSession)
+            .findInMailboxReactive(mailbox, MessageRange.one(mUid), MessageMapper.FetchType.Full, SINGLE_MESSAGE)
+            .next();
+    }
+
+    private Mono<ReIndexingEntry> createReindexingEntryFromFailure(ReIndexingFailure failure) {
+        MailboxSession mailboxSession = mailboxManager.createSystemSession(RE_INDEXER_PERFORMER_USER);
+
+        return mailboxSessionMapperFactory.getMailboxMapper(mailboxSession)
+            .findMailboxByIdReactive(failure.getMailboxId())
+            .flatMap(mailbox -> fullyReadMessage(mailboxSession, mailbox, failure.getUid())
+                .map(message -> new ReIndexingEntry(mailbox, mailboxSession, message)));
+    }
+
+    private Flux<ReIndexingEntry> reIndexingEntriesForMailbox(Mailbox mailbox, MailboxSession mailboxSession) {
+        MessageMapper messageMapper = mailboxSessionMapperFactory.getMessageMapper(mailboxSession);
+
+        return messageSearchIndex.deleteAll(mailboxSession, mailbox.getMailboxId())
+            .thenMany(messageMapper.listAllMessageUids(mailbox))
+            .flatMap(uid -> messageMapper.findInMailboxReactive(mailbox, MessageRange.one(uid), MessageMapper.FetchType.Full, UNLIMITED))
+            .map(message -> new ReIndexingEntry(mailbox, mailboxSession, message));
+    }
+
+    private Mono<Task.Result> reIndexMessages(Flux<ReIndexingEntry> entriesToIndex, RunningOptions runningOptions, ReprocessingContext reprocessingContext) {
+        return throttle(entriesToIndex, Duration.ofSeconds(1), runningOptions.getMessagesPerSecond())
+            .flatMap(entry -> reIndexMessage(entry.getMailboxSession(), entry.getMailbox(), reprocessingContext, entry.getMessage()), runningOptions.getMessagesPerSecond())
+            .reduce(Task::combine)
+            .switchIfEmpty(Mono.just(Result.COMPLETED));
+    }
+
+    private Mono<Task.Result> reIndexMessage(MailboxSession mailboxSession, Mailbox mailbox, ReprocessingContext reprocessingContext, MailboxMessage message) {
+        return Mono.fromCallable(() -> messageSearchIndex.add(mailboxSession, mailbox, message))
             .doOnNext(any -> reprocessingContext.recordSuccess())
+            .thenReturn(Result.COMPLETED)
             .onErrorResume(e -> {
-                LOGGER.warn("ReIndexing failed for {} {}", mailbox.generateAssociatedPath(), uid, e);
-                reprocessingContext.recordFailureDetailsForMessage(mailbox.getMailboxId(), uid);
+                LOGGER.warn("ReIndexing failed for {} {}", mailbox.generateAssociatedPath(), message.getUid(), e);
+                reprocessingContext.recordFailureDetailsForMessage(mailbox.getMailboxId(), message.getUid());
                 return Mono.just(Result.PARTIAL);
             });
     }
 
-    private Mono<MailboxMessage> fullyReadMessage(MailboxSession mailboxSession, Mailbox mailbox, MessageUid mUid) {
-        return mailboxSessionMapperFactory.getMessageMapper(mailboxSession)
-            .findInMailboxReactive(mailbox, MessageRange.one(mUid), MessageMapper.FetchType.Full, SINGLE_MESSAGE)
-            .next();
+    private <V> Flux<V> throttle(Flux<V> flux, Duration windowDuration, int windowMaxSize) {
+        return flux.windowTimeout(windowMaxSize, windowDuration)
+            .zipWith(Flux.interval(DELAY, windowDuration))
+            .flatMap(Tuple2::getT1);
     }
-}
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org