You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by rc...@apache.org on 2020/05/20 01:58:11 UTC
[james-project] 02/11: JAMES-3184
MessageFastViewProjectionCorrector: remove temporal coupling
This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit e33ce51702eafba71082cf64ac186c91b0ef5a8d
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Mon May 18 10:32:27 2020 +0700
JAMES-3184 MessageFastViewProjectionCorrector: remove temporal coupling
Tasks should not rely on the Progress (that
could have been not yet updated) to know if
a task succeeded or not.
---
.../jmap/MessageFastViewProjectionCorrector.java | 20 ++++++++++----------
.../RecomputeAllFastViewProjectionItemsTask.java | 7 +------
.../RecomputeUserFastViewProjectionItemsTask.java | 7 +------
3 files changed, 12 insertions(+), 22 deletions(-)
diff --git a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/MessageFastViewProjectionCorrector.java b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/MessageFastViewProjectionCorrector.java
index 27c65ab..a692aea 100644
--- a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/MessageFastViewProjectionCorrector.java
+++ b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/MessageFastViewProjectionCorrector.java
@@ -40,6 +40,8 @@ import org.apache.james.mailbox.model.MessageId;
import org.apache.james.mailbox.model.MessageRange;
import org.apache.james.mailbox.model.MessageResult;
import org.apache.james.mailbox.model.search.MailboxQuery;
+import org.apache.james.task.Task;
+import org.apache.james.task.Task.Result;
import org.apache.james.user.api.UsersRepository;
import org.apache.james.user.api.UsersRepositoryException;
import org.apache.james.util.streams.Iterators;
@@ -131,10 +133,6 @@ public class MessageFastViewProjectionCorrector {
long getFailedMessageCount() {
return failedMessageCount.get();
}
-
- boolean failed() {
- return failedMessageCount.get() > 0 || failedUserCount.get() > 0;
- }
}
private final UsersRepository usersRepository;
@@ -152,11 +150,11 @@ public class MessageFastViewProjectionCorrector {
this.projectionItemFactory = projectionItemFactory;
}
- Mono<Void> correctAllProjectionItems(Progress progress, RunningOptions runningOptions) {
+ Mono<Result> correctAllProjectionItems(Progress progress, RunningOptions runningOptions) {
return correctProjection(listAllMailboxMessages(progress), runningOptions, progress);
}
- Mono<Void> correctUsersProjectionItems(Progress progress, Username username, RunningOptions runningOptions) {
+ Mono<Result> correctUsersProjectionItems(Progress progress, Username username, RunningOptions runningOptions) {
MailboxSession session = mailboxManager.createSystemSession(username);
return correctProjection(listUserMailboxMessages(progress, session), runningOptions, progress);
}
@@ -185,25 +183,27 @@ public class MessageFastViewProjectionCorrector {
}
}
- private Mono<Void> correctProjection(ProjectionEntry entry, Progress progress) {
+ private Mono<Result> correctProjection(ProjectionEntry entry, Progress progress) {
return retrieveContent(entry.getMessageManager(), entry.getSession(), entry.getUid())
.map(this::computeProjectionEntry)
.flatMap(this::storeProjectionEntry)
.doOnSuccess(any -> progress.processedMessageCount.incrementAndGet())
+ .thenReturn(Result.COMPLETED)
.onErrorResume(e -> {
LOGGER.error("JMAP fastview re-computation aborted for {} - {} - {}",
entry.getSession().getUser(),
entry.getMessageManager().getId(),
entry.getUid(), e);
progress.failedMessageCount.incrementAndGet();
- return Mono.empty();
+ return Mono.just(Result.PARTIAL);
});
}
- private Mono<Void> correctProjection(Flux<ProjectionEntry> entries, RunningOptions runningOptions, Progress progress) {
+ private Mono<Result> correctProjection(Flux<ProjectionEntry> entries, RunningOptions runningOptions, Progress progress) {
return throttleWithRate(entries, runningOptions)
.flatMap(entry -> correctProjection(entry, progress))
- .then();
+ .reduce(Task::combine)
+ .switchIfEmpty(Mono.just(Result.COMPLETED));
}
private Flux<ProjectionEntry> throttleWithRate(Flux<ProjectionEntry> entries, RunningOptions runningOptions) {
diff --git a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/RecomputeAllFastViewProjectionItemsTask.java b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/RecomputeAllFastViewProjectionItemsTask.java
index 3556756..6acba49 100644
--- a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/RecomputeAllFastViewProjectionItemsTask.java
+++ b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/RecomputeAllFastViewProjectionItemsTask.java
@@ -117,14 +117,9 @@ public class RecomputeAllFastViewProjectionItemsTask implements Task {
@Override
public Result run() {
- corrector.correctAllProjectionItems(progress, RunningOptions.DEFAULT)
+ return corrector.correctAllProjectionItems(progress, RunningOptions.DEFAULT)
.subscribeOn(Schedulers.elastic())
.block();
-
- if (progress.failed()) {
- return Result.PARTIAL;
- }
- return Result.COMPLETED;
}
@Override
diff --git a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/RecomputeUserFastViewProjectionItemsTask.java b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/RecomputeUserFastViewProjectionItemsTask.java
index 439f3df..3e44238 100644
--- a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/RecomputeUserFastViewProjectionItemsTask.java
+++ b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/RecomputeUserFastViewProjectionItemsTask.java
@@ -122,14 +122,9 @@ public class RecomputeUserFastViewProjectionItemsTask implements Task {
@Override
public Result run() {
- corrector.correctUsersProjectionItems(progress, username, RunningOptions.DEFAULT)
+ return corrector.correctUsersProjectionItems(progress, username, RunningOptions.DEFAULT)
.subscribeOn(Schedulers.elastic())
.block();
-
- if (progress.failed()) {
- return Result.PARTIAL;
- }
- return Result.COMPLETED;
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org