You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by rc...@apache.org on 2020/06/10 04:10:43 UTC
[james-project] 07/08: JAMES-3201 Error handling using
functionaljava Either
This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 0b01bc71a9b88ee123e64088baaa33941bfe7a84
Author: LanKhuat <kh...@gmail.com>
AuthorDate: Mon Jun 8 12:07:44 2020 +0700
JAMES-3201 Error handling using functionaljava Either
---
mailbox/tools/indexer/pom.xml | 5 +
.../mailbox/tools/indexer/ReIndexerPerformer.java | 162 ++++++++++++++-------
.../james/webadmin/routes/MailboxesRoutesTest.java | 11 +-
.../webadmin/routes/UserMailboxesRoutesTest.java | 2 +-
4 files changed, 119 insertions(+), 61 deletions(-)
diff --git a/mailbox/tools/indexer/pom.xml b/mailbox/tools/indexer/pom.xml
index 528c4b1..e572be5 100644
--- a/mailbox/tools/indexer/pom.xml
+++ b/mailbox/tools/indexer/pom.xml
@@ -124,6 +124,11 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.functionaljava</groupId>
+ <artifactId>functionaljava-java8</artifactId>
+ <version>4.8.1</version>
+ </dependency>
+ <dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
index 9dd74cb..f9e2c5e 100644
--- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
+++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
@@ -50,6 +50,7 @@ import org.slf4j.LoggerFactory;
import com.google.common.collect.ImmutableList;
+import fj.data.Either;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -78,6 +79,50 @@ public class ReIndexerPerformer {
}
}
+ private interface Failure {
+ void recordFailure(ReprocessingContext context);
+ }
+
+ private static class MailboxFailure implements Failure {
+ private final MailboxId mailboxId;
+
+ private MailboxFailure(MailboxId mailboxId) {
+ this.mailboxId = mailboxId;
+ }
+
+ public MailboxId getMailboxId() {
+ return mailboxId;
+ }
+
+ @Override
+ public void recordFailure(ReprocessingContext context) {
+ context.recordMailboxFailure(mailboxId);
+ }
+ }
+
+ private static class MessageFailure implements Failure {
+ private final MailboxId mailboxId;
+ private final MessageUid uid;
+
+ private MessageFailure(MailboxId mailboxId, MessageUid uid) {
+ this.mailboxId = mailboxId;
+ this.uid = uid;
+ }
+
+ public MailboxId getMailboxId() {
+ return mailboxId;
+ }
+
+ public MessageUid getUid() {
+ return uid;
+ }
+
+ @Override
+ public void recordFailure(ReprocessingContext context) {
+ context.recordFailureDetailsForMessage(mailboxId, uid);
+ }
+ }
+
private static final Logger LOGGER = LoggerFactory.getLogger(ReIndexerPerformer.class);
private static final int SINGLE_MESSAGE = 1;
@@ -101,8 +146,8 @@ public class ReIndexerPerformer {
MailboxSession mailboxSession = mailboxManager.createSystemSession(RE_INDEXER_PERFORMER_USER);
LOGGER.info("Starting a full reindex");
- Flux<ReIndexingEntry> entriesToIndex = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession).list()
- .flatMap(mailbox -> reIndexingEntriesForMailbox(mailbox, mailboxSession, reprocessingContext));
+ Flux<Either<Failure, ReIndexingEntry>> entriesToIndex = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession).list()
+ .flatMap(mailbox -> reIndexingEntriesForMailbox(mailbox, mailboxSession));
return reIndexMessages(entriesToIndex, runningOptions, reprocessingContext)
.doFinally(any -> LOGGER.info("Full reindex finished"));
@@ -111,9 +156,9 @@ public class ReIndexerPerformer {
Mono<Result> reIndexSingleMailbox(MailboxId mailboxId, ReprocessingContext reprocessingContext, RunningOptions runningOptions) {
MailboxSession mailboxSession = mailboxManager.createSystemSession(RE_INDEXER_PERFORMER_USER);
- Flux<ReIndexingEntry> entriesToIndex = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession)
+ Flux<Either<Failure, ReIndexingEntry>> entriesToIndex = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession)
.findMailboxByIdReactive(mailboxId)
- .flatMapMany(mailbox -> reIndexingEntriesForMailbox(mailbox, mailboxSession, reprocessingContext));
+ .flatMapMany(mailbox -> reIndexingEntriesForMailbox(mailbox, mailboxSession));
return reIndexMessages(entriesToIndex, runningOptions, reprocessingContext);
}
@@ -126,8 +171,8 @@ public class ReIndexerPerformer {
MailboxQuery mailboxQuery = MailboxQuery.privateMailboxesBuilder(mailboxSession).build();
try {
- Flux<ReIndexingEntry> entriesToIndex = mailboxMapper.findMailboxWithPathLike(mailboxQuery.asUserBound())
- .flatMap(mailbox -> reIndexingEntriesForMailbox(mailbox, mailboxSession, reprocessingContext));
+ Flux<Either<Failure, ReIndexingEntry>> entriesToIndex = mailboxMapper.findMailboxWithPathLike(mailboxQuery.asUserBound())
+ .flatMap(mailbox -> reIndexingEntriesForMailbox(mailbox, mailboxSession));
return reIndexMessages(entriesToIndex, runningOptions, reprocessingContext)
.doFinally(any -> LOGGER.info("User {} reindex finished", username.asString()));
@@ -143,7 +188,8 @@ public class ReIndexerPerformer {
return mailboxSessionMapperFactory.getMailboxMapper(mailboxSession)
.findMailboxByIdReactive(mailboxId)
.flatMap(mailbox -> fullyReadMessage(mailboxSession, mailbox, uid)
- .flatMap(message -> reIndexMessage(mailboxSession, mailbox, reprocessingContext, message)))
+ .map(message -> Either.<Failure, ReIndexingEntry>right(new ReIndexingEntry(mailbox, mailboxSession, message)))
+ .flatMap(entryOrFailure -> reIndexMessage(entryOrFailure, reprocessingContext)))
.switchIfEmpty(Mono.just(Result.COMPLETED));
}
@@ -165,24 +211,18 @@ public class ReIndexerPerformer {
MailboxSession mailboxSession = mailboxManager.createSystemSession(RE_INDEXER_PERFORMER_USER);
MailboxMapper mapper = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession);
- return Flux.merge(
- reIndexMessages(Flux.fromIterable(previousReIndexingFailures.messageFailures())
- .flatMap(failure -> createReindexingEntryFromFailure(failure, reprocessingContext)),
- runningOptions,
- reprocessingContext),
- reIndexMailboxes(Flux.fromIterable(previousReIndexingFailures.mailboxFailures())
- .flatMap(mapper::findMailboxByIdReactive),
- mailboxSession,
- reprocessingContext,
- runningOptions))
- .reduce(Task::combine);
- }
-
- private Mono<Result> reIndexMailboxes(Flux<Mailbox> mailboxes, MailboxSession session, ReprocessingContext reprocessingContext, RunningOptions runningOptions) {
- Flux<ReIndexingEntry> entries = mailboxes
- .flatMap(mailbox -> reIndexingEntriesForMailbox(mailbox, session, reprocessingContext));
+ Flux<Either<Failure, ReIndexingEntry>> entriesToIndex = Flux.merge(
+ Flux.fromIterable(previousReIndexingFailures.messageFailures())
+ .flatMap(this::createReindexingEntryFromFailure),
+ Flux.fromIterable(previousReIndexingFailures.mailboxFailures())
+ .flatMap(mailboxId -> mapper.findMailboxByIdReactive(mailboxId)
+ .flatMapMany(mailbox -> reIndexingEntriesForMailbox(mailbox, mailboxSession))
+ .onErrorResume(e -> {
+ LOGGER.warn("Failed to re-index {}", mailboxId, e);
+ return Mono.just(Either.left(new MailboxFailure(mailboxId)));
+ })));
- return reIndexMessages(entries, runningOptions, reprocessingContext);
+ return reIndexMessages(entriesToIndex, runningOptions, reprocessingContext);
}
private Mono<Result> reIndex(MailboxMessage mailboxMessage, MailboxSession session) {
@@ -202,64 +242,78 @@ public class ReIndexerPerformer {
.next();
}
- private Mono<ReIndexingEntry> createReindexingEntryFromFailure(ReIndexingFailure failure, ReprocessingContext reprocessingContext) {
+ private Mono<Either<Failure, ReIndexingEntry>> createReindexingEntryFromFailure(ReIndexingFailure previousFailure) {
MailboxSession mailboxSession = mailboxManager.createSystemSession(RE_INDEXER_PERFORMER_USER);
return mailboxSessionMapperFactory.getMailboxMapper(mailboxSession)
- .findMailboxByIdReactive(failure.getMailboxId())
- .flatMap(mailbox -> fullyReadMessage(mailboxSession, mailbox, failure.getUid())
- .map(message -> new ReIndexingEntry(mailbox, mailboxSession, message)))
+ .findMailboxByIdReactive(previousFailure.getMailboxId())
+ .flatMap(mailbox -> fullyReadMessage(mailboxSession, mailbox, previousFailure.getUid())
+ .map(message -> Either.<Failure, ReIndexingEntry>right(new ReIndexingEntry(mailbox, mailboxSession, message))))
.onErrorResume(e -> {
- LOGGER.warn("ReIndexing failed for {}", failure, e);
- reprocessingContext.recordFailureDetailsForMessage(failure.getMailboxId(), failure.getUid());
- return Mono.empty();
+ LOGGER.warn("ReIndexing failed for {}", previousFailure, e);
+ return Mono.just(Either.left(new MessageFailure(previousFailure.getMailboxId(), previousFailure.getUid())));
});
}
- private Flux<ReIndexingEntry> reIndexingEntriesForMailbox(Mailbox mailbox, MailboxSession mailboxSession, ReprocessingContext reprocessingContext) {
+ private Flux<Either<Failure, ReIndexingEntry>> reIndexingEntriesForMailbox(Mailbox mailbox, MailboxSession mailboxSession) {
MessageMapper messageMapper = mailboxSessionMapperFactory.getMessageMapper(mailboxSession);
return messageSearchIndex.deleteAll(mailboxSession, mailbox.getMailboxId())
.thenMany(messageMapper.listAllMessageUids(mailbox))
- .flatMap(uid -> reIndexingEntryForUid(mailbox, mailboxSession, reprocessingContext, messageMapper, uid))
+ .flatMap(uid -> reIndexingEntryForUid(mailbox, mailboxSession, messageMapper, uid))
.onErrorResume(e -> {
LOGGER.warn("ReIndexing failed for {}", mailbox.generateAssociatedPath(), e);
- reprocessingContext.recordMailboxFailure(mailbox.getMailboxId());
- return Mono.empty();
+ return Mono.just(Either.left(new MailboxFailure(mailbox.getMailboxId())));
});
}
- private Flux<ReIndexingEntry> reIndexingEntryForUid(Mailbox mailbox, MailboxSession mailboxSession, ReprocessingContext reprocessingContext, MessageMapper messageMapper, MessageUid uid) {
+ private Flux<Either<Failure, ReIndexingEntry>> reIndexingEntryForUid(Mailbox mailbox, MailboxSession mailboxSession, MessageMapper messageMapper, MessageUid uid) {
return messageMapper.findInMailboxReactive(mailbox, MessageRange.one(uid), MessageMapper.FetchType.Full, UNLIMITED)
- .map(message -> new ReIndexingEntry(mailbox, mailboxSession, message))
+ .map(message -> Either.<Failure, ReIndexingEntry>right(new ReIndexingEntry(mailbox, mailboxSession, message)))
.onErrorResume(e -> {
- LOGGER.warn("ReIndexing failed for {} - {}", mailbox.getMailboxId(), uid, e);
- reprocessingContext.recordFailureDetailsForMessage(mailbox.getMailboxId(), uid);
- return Mono.empty();
+ LOGGER.warn("ReIndexing failed for {} {}", mailbox.getMailboxId(), uid, e);
+ return Mono.just(Either.left(new MessageFailure(mailbox.getMailboxId(), uid)));
});
}
- private Mono<Task.Result> reIndexMessages(Flux<ReIndexingEntry> entriesToIndex, RunningOptions runningOptions, ReprocessingContext reprocessingContext) {
- return ReactorUtils.Throttler.<ReIndexingEntry, Task.Result>forOperation(entry -> reIndexMessage(entry.getMailboxSession(), entry.getMailbox(), reprocessingContext, entry.getMessage()))
+ private Mono<Task.Result> reIndexMessages(Flux<Either<Failure, ReIndexingEntry>> entriesToIndex, RunningOptions runningOptions, ReprocessingContext reprocessingContext) {
+ return ReactorUtils.Throttler.<Either<Failure, ReIndexingEntry>, Task.Result>forOperation(
+ entry -> reIndexMessage(entry, reprocessingContext))
.window(runningOptions.getMessagesPerSecond(), Duration.ofSeconds(1))
.throttle(entriesToIndex)
.reduce(Task::combine)
- .switchIfEmpty(Mono.fromSupplier(() -> {
- if (reprocessingContext.failures().mailboxFailures().isEmpty()) {
- return Result.COMPLETED;
- }
+ .switchIfEmpty(Mono.just(Result.COMPLETED));
+ }
+
+ private Mono<Task.Result> reIndexMessage(Either<Failure, ReIndexingEntry> entryOrFailure, ReprocessingContext reprocessingContext) {
+ return toMono(entryOrFailure.right().map(this::index))
+ .map(this::flapMapRight)
+ .map(either -> recordIndexingResult(reprocessingContext, either));
+ }
+
+ private Result recordIndexingResult(ReprocessingContext reprocessingContext, Either<Failure, Result> either) {
+ return either.either(
+ failure -> {
+ failure.recordFailure(reprocessingContext);
return Result.PARTIAL;
- }));
+ },
+ result -> result.onComplete(reprocessingContext::recordSuccess));
}
- private Mono<Task.Result> reIndexMessage(MailboxSession mailboxSession, Mailbox mailbox, ReprocessingContext reprocessingContext, MailboxMessage message) {
- return Mono.fromCallable(() -> messageSearchIndex.add(mailboxSession, mailbox, message))
- .doOnNext(any -> reprocessingContext.recordSuccess())
- .thenReturn(Result.COMPLETED)
+ private Mono<Either<Failure, Result>> index(ReIndexingEntry entry) {
+ return messageSearchIndex.add(entry.getMailboxSession(), entry.getMailbox(), entry.getMessage())
+ .thenReturn(Either.<Failure, Result>right(Result.COMPLETED))
.onErrorResume(e -> {
- LOGGER.warn("ReIndexing failed for {} {}", mailbox.generateAssociatedPath(), message.getUid(), e);
- reprocessingContext.recordFailureDetailsForMessage(mailbox.getMailboxId(), message.getUid());
- return Mono.just(Result.PARTIAL);
+ LOGGER.warn("ReIndexing failed for {} {}", entry.getMailbox().generateAssociatedPath(), entry.getMessage().getUid(), e);
+ return Mono.just(Either.left(new MessageFailure(entry.getMailbox().getMailboxId(), entry.getMessage().getUid())));
});
}
+
+ private <X, Y> Either<X, Y> flapMapRight(Either<X, Either<X, Y>> nestedEither) {
+ return nestedEither.right().bind(either -> either);
+ }
+
+ private <X, Y> Mono<Either<X, Y>> toMono(Either<X, Mono<Y>> either) {
+ return either.either(x -> Mono.just(Either.left(x)), yMono -> yMono.map(Either::right));
+ }
}
\ No newline at end of file
diff --git a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MailboxesRoutesTest.java b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MailboxesRoutesTest.java
index 05a333c..c8c29cc 100644
--- a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MailboxesRoutesTest.java
+++ b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MailboxesRoutesTest.java
@@ -27,7 +27,6 @@ import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
@@ -213,7 +212,7 @@ class MailboxesRoutesTest {
MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
systemSession).getId();
- doThrow(new RuntimeException())
+ doReturn(Mono.error(new RuntimeException()))
.when(searchIndex)
.add(any(MailboxSession.class), any(Mailbox.class), any(MailboxMessage.class));
@@ -401,7 +400,7 @@ class MailboxesRoutesTest {
MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
systemSession).getId();
- doThrow(new RuntimeException())
+ doReturn(Mono.error(new RuntimeException()))
.when(searchIndex)
.add(any(MailboxSession.class), any(Mailbox.class), any(MailboxMessage.class));
@@ -743,7 +742,7 @@ class MailboxesRoutesTest {
MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
systemSession);
- doThrow(new RuntimeException()).when(searchIndex).add(any(MailboxSession.class), any(Mailbox.class), any(MailboxMessage.class));
+ doReturn(Mono.error(new RuntimeException())).when(searchIndex).add(any(MailboxSession.class), any(Mailbox.class), any(MailboxMessage.class));
String taskId = with()
.post("/mailboxes?task=reIndex")
@@ -787,7 +786,7 @@ class MailboxesRoutesTest {
MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
systemSession).getId();
- doThrow(new RuntimeException())
+ doReturn(Mono.error(new RuntimeException()))
.when(searchIndex)
.add(any(MailboxSession.class), any(Mailbox.class), any(MailboxMessage.class));
@@ -835,7 +834,7 @@ class MailboxesRoutesTest {
MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
systemSession).getId();
- doThrow(new RuntimeException()).when(searchIndex).add(any(MailboxSession.class), any(Mailbox.class), any(MailboxMessage.class));
+ doReturn(Mono.error(new RuntimeException())).when(searchIndex).add(any(MailboxSession.class), any(Mailbox.class), any(MailboxMessage.class));
String taskId = with()
.post("/mailboxes?task=reIndex")
diff --git a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/UserMailboxesRoutesTest.java b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/UserMailboxesRoutesTest.java
index 6d0e299..5dd021b 100644
--- a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/UserMailboxesRoutesTest.java
+++ b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/UserMailboxesRoutesTest.java
@@ -1231,7 +1231,7 @@ class UserMailboxesRoutesTest {
MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
systemSession).getId();
- doThrow(new RuntimeException())
+ doReturn(Mono.error(new RuntimeException()))
.when(searchIndex)
.add(any(MailboxSession.class), any(Mailbox.class), any(MailboxMessage.class));
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org