You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by rc...@apache.org on 2020/05/20 01:58:10 UTC
[james-project] 01/11: JAMES-3184
MessageFastViewProjectionCorrector concurrency control: RunningOptions
This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 86bd9d6a6143fa6e94c4e3f5617dc46a6b7d86c4
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Mon May 18 10:30:23 2020 +0700
JAMES-3184 MessageFastViewProjectionCorrector concurrency control: RunningOptions
Fix message processing rate via throttling
---
.../jmap/MessageFastViewProjectionCorrector.java | 125 ++++++++++++++++-----
.../RecomputeAllFastViewProjectionItemsTask.java | 3 +-
.../RecomputeUserFastViewProjectionItemsTask.java | 7 +-
3 files changed, 104 insertions(+), 31 deletions(-)
diff --git a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/MessageFastViewProjectionCorrector.java b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/MessageFastViewProjectionCorrector.java
index 2ba423c..27c65ab 100644
--- a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/MessageFastViewProjectionCorrector.java
+++ b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/MessageFastViewProjectionCorrector.java
@@ -20,6 +20,7 @@
package org.apache.james.webadmin.data.jmap;
import java.io.IOException;
+import java.time.Duration;
import java.util.concurrent.atomic.AtomicLong;
import javax.inject.Inject;
@@ -31,6 +32,7 @@ import org.apache.james.jmap.api.projections.MessageFastViewProjection;
import org.apache.james.mailbox.MailboxManager;
import org.apache.james.mailbox.MailboxSession;
import org.apache.james.mailbox.MessageManager;
+import org.apache.james.mailbox.MessageUid;
import org.apache.james.mailbox.exception.MailboxException;
import org.apache.james.mailbox.model.FetchGroup;
import org.apache.james.mailbox.model.MailboxMetaData;
@@ -45,13 +47,61 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.github.fge.lambdas.Throwing;
+import com.google.common.base.Preconditions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import reactor.util.function.Tuple2;
public class MessageFastViewProjectionCorrector {
-
private static final Logger LOGGER = LoggerFactory.getLogger(MessageFastViewProjectionCorrector.class);
+
+ private static final Duration DELAY = Duration.ZERO;
+ private static final Duration PERIOD = Duration.ofSeconds(1);
+
+ public static class RunningOptions {
+ public static RunningOptions withMessageRatePerSecond(int messageRatePerSecond) {
+ return new RunningOptions(messageRatePerSecond);
+ }
+
+ public static RunningOptions DEFAULT = new RunningOptions(10);
+
+ private final int messageRatePerSecond;
+
+ public RunningOptions(int messageRatePerSecond) {
+ Preconditions.checkArgument(messageRatePerSecond > 0, "'messageParallelism' must be strictly positive");
+
+ this.messageRatePerSecond = messageRatePerSecond;
+ }
+
+ public int getMessageRatePerSecond() {
+ return messageRatePerSecond;
+ }
+ }
+
+ private static class ProjectionEntry {
+ private final MessageManager messageManager;
+ private final MessageUid uid;
+ private final MailboxSession session;
+
+ private ProjectionEntry(MessageManager messageManager, MessageUid uid, MailboxSession session) {
+ this.messageManager = messageManager;
+ this.uid = uid;
+ this.session = session;
+ }
+
+ private MessageManager getMessageManager() {
+ return messageManager;
+ }
+
+ private MessageUid getUid() {
+ return uid;
+ }
+
+ private MailboxSession getSession() {
+ return session;
+ }
+ }
static class Progress {
private final AtomicLong processedUserCount;
@@ -102,48 +152,66 @@ public class MessageFastViewProjectionCorrector {
this.projectionItemFactory = projectionItemFactory;
}
- Mono<Void> correctAllProjectionItems(Progress progress) {
+ Mono<Void> correctAllProjectionItems(Progress progress, RunningOptions runningOptions) {
+ return correctProjection(listAllMailboxMessages(progress), runningOptions, progress);
+ }
+
+ Mono<Void> correctUsersProjectionItems(Progress progress, Username username, RunningOptions runningOptions) {
+ MailboxSession session = mailboxManager.createSystemSession(username);
+ return correctProjection(listUserMailboxMessages(progress, session), runningOptions, progress);
+ }
+
+ private Flux<ProjectionEntry> listAllMailboxMessages(Progress progress) {
try {
return Iterators.toFlux(usersRepository.list())
- .concatMap(username -> correctUsersProjectionItems(progress, username))
- .then();
+ .map(mailboxManager::createSystemSession)
+ .doOnNext(any -> progress.processedUserCount.incrementAndGet())
+ .flatMap(session -> listUserMailboxMessages(progress, session));
} catch (UsersRepositoryException e) {
- return Mono.error(e);
+ return Flux.error(e);
}
}
- Mono<Void> correctUsersProjectionItems(Progress progress, Username username) {
+ private Flux<ProjectionEntry> listUserMailboxMessages(Progress progress, MailboxSession session) {
try {
- MailboxSession session = mailboxManager.createSystemSession(username);
return listUsersMailboxes(session)
- .concatMap(mailboxMetadata -> retrieveMailbox(session, mailboxMetadata))
- .concatMap(Throwing.function(messageManager -> correctMailboxProjectionItems(progress, messageManager, session)))
- .doOnComplete(progress.processedUserCount::incrementAndGet)
- .onErrorContinue((error, o) -> {
- LOGGER.error("JMAP fastview re-computation aborted for {}", username, error);
- progress.failedUserCount.incrementAndGet();
- })
- .then();
+ .flatMap(mailboxMetadata -> retrieveMailbox(session, mailboxMetadata))
+ .flatMap(Throwing.function(messageManager -> listAllMailboxMessages(messageManager, session)
+ .map(message -> new ProjectionEntry(messageManager, message.getUid(), session))));
} catch (MailboxException e) {
- LOGGER.error("JMAP fastview re-computation aborted for {} as we failed listing user mailboxes", username, e);
+ LOGGER.error("JMAP fastview re-computation aborted for {} as we failed listing user mailboxes", session.getUser(), e);
progress.failedUserCount.incrementAndGet();
- return Mono.empty();
+ return Flux.empty();
}
}
- private Mono<Void> correctMailboxProjectionItems(Progress progress, MessageManager messageManager, MailboxSession session) throws MailboxException {
- return listAllMailboxMessages(messageManager, session)
- .concatMap(messageResult -> retrieveContent(messageManager, session, messageResult))
+ private Mono<Void> correctProjection(ProjectionEntry entry, Progress progress) {
+ return retrieveContent(entry.getMessageManager(), entry.getSession(), entry.getUid())
.map(this::computeProjectionEntry)
- .concatMap(pair -> storeProjectionEntry(pair)
- .doOnSuccess(any -> progress.processedMessageCount.incrementAndGet()))
- .onErrorContinue((error, triggeringValue) -> {
- LOGGER.error("JMAP fastview re-computation aborted for {} - {}", session.getUser(), triggeringValue, error);
+ .flatMap(this::storeProjectionEntry)
+ .doOnSuccess(any -> progress.processedMessageCount.incrementAndGet())
+ .onErrorResume(e -> {
+ LOGGER.error("JMAP fastview re-computation aborted for {} - {} - {}",
+ entry.getSession().getUser(),
+ entry.getMessageManager().getId(),
+ entry.getUid(), e);
progress.failedMessageCount.incrementAndGet();
- })
+ return Mono.empty();
+ });
+ }
+
+ private Mono<Void> correctProjection(Flux<ProjectionEntry> entries, RunningOptions runningOptions, Progress progress) {
+ return throttleWithRate(entries, runningOptions)
+ .flatMap(entry -> correctProjection(entry, progress))
.then();
}
+ private Flux<ProjectionEntry> throttleWithRate(Flux<ProjectionEntry> entries, RunningOptions runningOptions) {
+ return entries.windowTimeout(runningOptions.getMessageRatePerSecond(), Duration.ofSeconds(1))
+ .zipWith(Flux.interval(DELAY, PERIOD))
+ .flatMap(Tuple2::getT1);
+ }
+
private Flux<MailboxMetaData> listUsersMailboxes(MailboxSession session) throws MailboxException {
return Flux.fromIterable(mailboxManager.search(MailboxQuery.privateMailboxesBuilder(session).build(), session));
}
@@ -156,11 +224,12 @@ public class MessageFastViewProjectionCorrector {
return Iterators.toFlux(messageManager.getMessages(MessageRange.all(), FetchGroup.MINIMAL, session));
}
- private Flux<MessageResult> retrieveContent(MessageManager messageManager, MailboxSession session, MessageResult messageResult) {
+ private Mono<MessageResult> retrieveContent(MessageManager messageManager, MailboxSession session, MessageUid uid) {
try {
- return Iterators.toFlux(messageManager.getMessages(MessageRange.one(messageResult.getUid()), FetchGroup.FULL_CONTENT, session));
+ return Iterators.toFlux(messageManager.getMessages(MessageRange.one(uid), FetchGroup.FULL_CONTENT, session))
+ .next();
} catch (MailboxException e) {
- return Flux.error(e);
+ return Mono.error(e);
}
}
diff --git a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/RecomputeAllFastViewProjectionItemsTask.java b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/RecomputeAllFastViewProjectionItemsTask.java
index 4ef9365..3556756 100644
--- a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/RecomputeAllFastViewProjectionItemsTask.java
+++ b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/RecomputeAllFastViewProjectionItemsTask.java
@@ -29,6 +29,7 @@ import org.apache.james.server.task.json.dto.TaskDTOModule;
import org.apache.james.task.Task;
import org.apache.james.task.TaskExecutionDetails;
import org.apache.james.task.TaskType;
+import org.apache.james.webadmin.data.jmap.MessageFastViewProjectionCorrector.RunningOptions;
import com.fasterxml.jackson.annotation.JsonProperty;
@@ -116,7 +117,7 @@ public class RecomputeAllFastViewProjectionItemsTask implements Task {
@Override
public Result run() {
- corrector.correctAllProjectionItems(progress)
+ corrector.correctAllProjectionItems(progress, RunningOptions.DEFAULT)
.subscribeOn(Schedulers.elastic())
.block();
diff --git a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/RecomputeUserFastViewProjectionItemsTask.java b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/RecomputeUserFastViewProjectionItemsTask.java
index d29435a..439f3df 100644
--- a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/RecomputeUserFastViewProjectionItemsTask.java
+++ b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/RecomputeUserFastViewProjectionItemsTask.java
@@ -19,6 +19,9 @@
package org.apache.james.webadmin.data.jmap;
+import static org.apache.james.webadmin.data.jmap.MessageFastViewProjectionCorrector.Progress;
+import static org.apache.james.webadmin.data.jmap.MessageFastViewProjectionCorrector.RunningOptions;
+
import java.time.Clock;
import java.time.Instant;
import java.util.Optional;
@@ -39,7 +42,7 @@ public class RecomputeUserFastViewProjectionItemsTask implements Task {
static final TaskType TASK_TYPE = TaskType.of("RecomputeUserFastViewProjectionItemsTask");
public static class AdditionalInformation implements TaskExecutionDetails.AdditionalInformation {
- private static AdditionalInformation from(MessageFastViewProjectionCorrector.Progress progress, Username username) {
+ private static AdditionalInformation from(Progress progress, Username username) {
return new AdditionalInformation(username,
progress.getProcessedMessageCount(),
progress.getFailedMessageCount(),
@@ -119,7 +122,7 @@ public class RecomputeUserFastViewProjectionItemsTask implements Task {
@Override
public Result run() {
- corrector.correctUsersProjectionItems(progress, username)
+ corrector.correctUsersProjectionItems(progress, username, RunningOptions.DEFAULT)
.subscribeOn(Schedulers.elastic())
.block();
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org