You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/06/03 07:19:59 UTC
[james-project] 09/12: JAMES-3184 Throttling for reindexing tasks
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 11682778c23e4e8976223cf0e29398bfc193dd52
Author: LanKhuat <kh...@gmail.com>
AuthorDate: Mon May 25 15:32:01 2020 +0700
JAMES-3184 Throttling for reindexing tasks
---
.../mailbox/tools/indexer/ReIndexerPerformer.java | 155 ++++++++++++++-------
1 file changed, 101 insertions(+), 54 deletions(-)
diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
index 433e975..390a705 100644
--- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
+++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
@@ -19,6 +19,10 @@
package org.apache.mailbox.tools.indexer;
+import static org.apache.james.mailbox.store.mail.AbstractMessageMapper.UNLIMITED;
+
+import java.time.Duration;
+
import javax.inject.Inject;
import org.apache.james.core.Username;
@@ -27,13 +31,14 @@ import org.apache.james.mailbox.MailboxSession;
import org.apache.james.mailbox.MessageUid;
import org.apache.james.mailbox.indexer.ReIndexer.RunningOptions;
import org.apache.james.mailbox.indexer.ReIndexingExecutionFailures;
+import org.apache.james.mailbox.indexer.ReIndexingExecutionFailures.ReIndexingFailure;
import org.apache.james.mailbox.model.Mailbox;
import org.apache.james.mailbox.model.MailboxId;
-import org.apache.james.mailbox.model.MailboxMetaData;
import org.apache.james.mailbox.model.MessageId;
import org.apache.james.mailbox.model.MessageRange;
import org.apache.james.mailbox.model.search.MailboxQuery;
import org.apache.james.mailbox.store.MailboxSessionMapperFactory;
+import org.apache.james.mailbox.store.mail.MailboxMapper;
import org.apache.james.mailbox.store.mail.MessageMapper;
import org.apache.james.mailbox.store.mail.model.MailboxMessage;
import org.apache.james.mailbox.store.search.ListeningMessageSearchIndex;
@@ -46,15 +51,39 @@ import com.google.common.collect.ImmutableList;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import reactor.util.function.Tuple2;
public class ReIndexerPerformer {
+ private static class ReIndexingEntry {
+ private final Mailbox mailbox;
+ private final MailboxSession mailboxSession;
+ private final MailboxMessage message;
+
+ ReIndexingEntry(Mailbox mailbox, MailboxSession mailboxSession, MailboxMessage message) {
+ this.mailbox = mailbox;
+ this.mailboxSession = mailboxSession;
+ this.message = message;
+ }
+
+ public Mailbox getMailbox() {
+ return mailbox;
+ }
+
+ public MailboxMessage getMessage() {
+ return message;
+ }
+
+ public MailboxSession getMailboxSession() {
+ return mailboxSession;
+ }
+ }
+
private static final Logger LOGGER = LoggerFactory.getLogger(ReIndexerPerformer.class);
private static final int SINGLE_MESSAGE = 1;
private static final String RE_INDEXING = "re-indexing";
private static final Username RE_INDEXER_PERFORMER_USER = Username.of(RE_INDEXING);
- private static final int NO_CONCURRENCY = 1;
- private static final int NO_PREFETCH = 1;
+ private static final Duration DELAY = Duration.ofSeconds(0);
private final MailboxManager mailboxManager;
private final ListeningMessageSearchIndex messageSearchIndex;
@@ -72,33 +101,41 @@ public class ReIndexerPerformer {
Mono<Result> reIndexAllMessages(ReprocessingContext reprocessingContext, RunningOptions runningOptions) {
MailboxSession mailboxSession = mailboxManager.createSystemSession(RE_INDEXER_PERFORMER_USER);
LOGGER.info("Starting a full reindex");
- return mailboxSessionMapperFactory.getMailboxMapper(mailboxSession).list()
- .flatMap(mailbox -> reIndex(reprocessingContext, mailboxSession, mailbox, runningOptions), NO_CONCURRENCY, NO_PREFETCH)
- .reduce(Task::combine)
- .switchIfEmpty(Mono.just(Result.COMPLETED))
+
+ Flux<ReIndexingEntry> entriesToIndex = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession).list()
+ .flatMap(mailbox -> reIndexingEntriesForMailbox(mailbox, mailboxSession));
+
+ return reIndexMessages(entriesToIndex, runningOptions, reprocessingContext)
.doFinally(any -> LOGGER.info("Full reindex finished"));
}
Mono<Result> reIndexSingleMailbox(MailboxId mailboxId, ReprocessingContext reprocessingContext, RunningOptions runningOptions) {
MailboxSession mailboxSession = mailboxManager.createSystemSession(RE_INDEXER_PERFORMER_USER);
- return mailboxSessionMapperFactory.getMailboxMapper(mailboxSession)
+ Flux<ReIndexingEntry> entriesToIndex = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession)
.findMailboxByIdReactive(mailboxId)
- .flatMap(mailbox -> reIndex(reprocessingContext, mailboxSession, mailbox, runningOptions));
+ .flatMapMany(mailbox -> reIndexingEntriesForMailbox(mailbox, mailboxSession));
+
+ return reIndexMessages(entriesToIndex, runningOptions, reprocessingContext);
}
Mono<Result> reIndexUserMailboxes(Username username, ReprocessingContext reprocessingContext, RunningOptions runningOptions) {
MailboxSession mailboxSession = mailboxManager.createSystemSession(username);
+ MailboxMapper mailboxMapper = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession);
LOGGER.info("Starting a reindex for user {}", username.asString());
MailboxQuery mailboxQuery = MailboxQuery.privateMailboxesBuilder(mailboxSession).build();
- return mailboxManager.searchReactive(mailboxQuery, mailboxSession)
- .map(MailboxMetaData::getId)
- .flatMap(id -> reIndexSingleMailbox(id, reprocessingContext, runningOptions), NO_CONCURRENCY, NO_PREFETCH)
- .reduce(Task::combine)
- .switchIfEmpty(Mono.just(Result.COMPLETED))
- .doFinally(any -> LOGGER.info("User {} reindex finished", username.asString()));
+ try {
+ Flux<ReIndexingEntry> entriesToIndex = mailboxMapper.findMailboxWithPathLike(mailboxQuery.asUserBound())
+ .flatMap(mailbox -> reIndexingEntriesForMailbox(mailbox, mailboxSession));
+
+ return reIndexMessages(entriesToIndex, runningOptions, reprocessingContext)
+ .doFinally(any -> LOGGER.info("User {} reindex finished", username.asString()));
+ } catch (Exception e) {
+ LOGGER.error("Error fetching mailboxes for user: {}", username.asString());
+ return Mono.just(Result.PARTIAL);
+ }
}
Mono<Result> reIndexSingleMessage(MailboxId mailboxId, MessageUid uid, ReprocessingContext reprocessingContext) {
@@ -106,7 +143,9 @@ public class ReIndexerPerformer {
return mailboxSessionMapperFactory.getMailboxMapper(mailboxSession)
.findMailboxByIdReactive(mailboxId)
- .flatMap(mailbox -> handleMessageReIndexing(mailboxSession, mailbox, uid, reprocessingContext));
+ .flatMap(mailbox -> fullyReadMessage(mailboxSession, mailbox, uid)
+ .flatMap(message -> reIndexMessage(mailboxSession, mailbox, reprocessingContext, message)))
+ .switchIfEmpty(Mono.just(Result.COMPLETED));
}
Mono<Result> reIndexMessageId(MessageId messageId) {
@@ -124,33 +163,11 @@ public class ReIndexerPerformer {
}
Mono<Result> reIndexErrors(ReprocessingContext reprocessingContext, ReIndexingExecutionFailures previousReIndexingFailures, RunningOptions runningOptions) {
- return Flux.fromIterable(previousReIndexingFailures.failures())
- .flatMap(previousFailure -> reIndex(reprocessingContext, previousFailure), runningOptions.getMessagesPerSecond())
- .reduce(Task::combine)
- .switchIfEmpty(Mono.just(Result.COMPLETED));
- }
-
- private Mono<Result> reIndex(ReprocessingContext reprocessingContext, MailboxSession mailboxSession, Mailbox mailbox, RunningOptions runningOptions) {
- LOGGER.info("Attempt to reindex mailbox with mailboxId {}", mailbox.getMailboxId().serialize());
- return messageSearchIndex.deleteAll(mailboxSession, mailbox.getMailboxId())
- .then(mailboxSessionMapperFactory.getMessageMapper(mailboxSession)
- .listAllMessageUids(mailbox)
- .flatMap(uid -> handleMessageReIndexing(mailboxSession, mailbox, uid, reprocessingContext), runningOptions.getMessagesPerSecond())
- .reduce(Task::combine)
- .switchIfEmpty(Mono.just(Result.COMPLETED))
- .doFinally(any -> LOGGER.info("Finish to reindex mailbox with mailboxId {}", mailbox.getMailboxId().serialize())));
- }
-
- private Mono<Result> reIndex(ReprocessingContext reprocessingContext, ReIndexingExecutionFailures.ReIndexingFailure previousReIndexingFailure) {
- MailboxId mailboxId = previousReIndexingFailure.getMailboxId();
- MessageUid uid = previousReIndexingFailure.getUid();
-
- return reIndexSingleMessage(mailboxId, uid, reprocessingContext)
- .onErrorResume(e -> {
- LOGGER.warn("ReIndexing failed for {} {}", mailboxId, uid, e);
- reprocessingContext.recordFailureDetailsForMessage(mailboxId, uid);
- return Mono.just(Result.PARTIAL);
- });
+ return reIndexMessages(
+ Flux.fromIterable(previousReIndexingFailures.failures())
+ .flatMap(this::createReindexingEntryFromFailure),
+ runningOptions,
+ reprocessingContext);
}
private Mono<Result> reIndex(MailboxMessage mailboxMessage, MailboxSession session) {
@@ -164,21 +181,51 @@ public class ReIndexerPerformer {
});
}
- private Mono<Result> handleMessageReIndexing(MailboxSession mailboxSession, Mailbox mailbox, MessageUid uid, ReprocessingContext reprocessingContext) {
- return fullyReadMessage(mailboxSession, mailbox, uid)
- .flatMap(message -> messageSearchIndex.add(mailboxSession, mailbox, message))
- .thenReturn(Result.COMPLETED)
+ private Mono<MailboxMessage> fullyReadMessage(MailboxSession mailboxSession, Mailbox mailbox, MessageUid mUid) {
+ return mailboxSessionMapperFactory.getMessageMapper(mailboxSession)
+ .findInMailboxReactive(mailbox, MessageRange.one(mUid), MessageMapper.FetchType.Full, SINGLE_MESSAGE)
+ .next();
+ }
+
+ private Mono<ReIndexingEntry> createReindexingEntryFromFailure(ReIndexingFailure failure) {
+ MailboxSession mailboxSession = mailboxManager.createSystemSession(RE_INDEXER_PERFORMER_USER);
+
+ return mailboxSessionMapperFactory.getMailboxMapper(mailboxSession)
+ .findMailboxByIdReactive(failure.getMailboxId())
+ .flatMap(mailbox -> fullyReadMessage(mailboxSession, mailbox, failure.getUid())
+ .map(message -> new ReIndexingEntry(mailbox, mailboxSession, message)));
+ }
+
+ private Flux<ReIndexingEntry> reIndexingEntriesForMailbox(Mailbox mailbox, MailboxSession mailboxSession) {
+ MessageMapper messageMapper = mailboxSessionMapperFactory.getMessageMapper(mailboxSession);
+
+ return messageSearchIndex.deleteAll(mailboxSession, mailbox.getMailboxId())
+ .thenMany(messageMapper.listAllMessageUids(mailbox))
+ .flatMap(uid -> messageMapper.findInMailboxReactive(mailbox, MessageRange.one(uid), MessageMapper.FetchType.Full, UNLIMITED))
+ .map(message -> new ReIndexingEntry(mailbox, mailboxSession, message));
+ }
+
+ private Mono<Task.Result> reIndexMessages(Flux<ReIndexingEntry> entriesToIndex, RunningOptions runningOptions, ReprocessingContext reprocessingContext) {
+ return throttle(entriesToIndex, Duration.ofSeconds(1), runningOptions.getMessagesPerSecond())
+ .flatMap(entry -> reIndexMessage(entry.getMailboxSession(), entry.getMailbox(), reprocessingContext, entry.getMessage()), runningOptions.getMessagesPerSecond())
+ .reduce(Task::combine)
+ .switchIfEmpty(Mono.just(Result.COMPLETED));
+ }
+
+ private Mono<Task.Result> reIndexMessage(MailboxSession mailboxSession, Mailbox mailbox, ReprocessingContext reprocessingContext, MailboxMessage message) {
+ return Mono.fromCallable(() -> messageSearchIndex.add(mailboxSession, mailbox, message))
.doOnNext(any -> reprocessingContext.recordSuccess())
+ .thenReturn(Result.COMPLETED)
.onErrorResume(e -> {
- LOGGER.warn("ReIndexing failed for {} {}", mailbox.generateAssociatedPath(), uid, e);
- reprocessingContext.recordFailureDetailsForMessage(mailbox.getMailboxId(), uid);
+ LOGGER.warn("ReIndexing failed for {} {}", mailbox.generateAssociatedPath(), message.getUid(), e);
+ reprocessingContext.recordFailureDetailsForMessage(mailbox.getMailboxId(), message.getUid());
return Mono.just(Result.PARTIAL);
});
}
- private Mono<MailboxMessage> fullyReadMessage(MailboxSession mailboxSession, Mailbox mailbox, MessageUid mUid) {
- return mailboxSessionMapperFactory.getMessageMapper(mailboxSession)
- .findInMailboxReactive(mailbox, MessageRange.one(mUid), MessageMapper.FetchType.Full, SINGLE_MESSAGE)
- .next();
+ private <V> Flux<V> throttle(Flux<V> flux, Duration windowDuration, int windowMaxSize) {
+ return flux.windowTimeout(windowMaxSize, windowDuration)
+ .zipWith(Flux.interval(DELAY, windowDuration))
+ .flatMap(Tuple2::getT1);
}
-}
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org