You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by rc...@apache.org on 2020/06/10 04:10:43 UTC

[james-project] 07/08: JAMES-3201 Error handling using functionaljava Either

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 0b01bc71a9b88ee123e64088baaa33941bfe7a84
Author: LanKhuat <kh...@gmail.com>
AuthorDate: Mon Jun 8 12:07:44 2020 +0700

    JAMES-3201 Error handling using functionaljava Either
---
 mailbox/tools/indexer/pom.xml                      |   5 +
 .../mailbox/tools/indexer/ReIndexerPerformer.java  | 162 ++++++++++++++-------
 .../james/webadmin/routes/MailboxesRoutesTest.java |  11 +-
 .../webadmin/routes/UserMailboxesRoutesTest.java   |   2 +-
 4 files changed, 119 insertions(+), 61 deletions(-)

diff --git a/mailbox/tools/indexer/pom.xml b/mailbox/tools/indexer/pom.xml
index 528c4b1..e572be5 100644
--- a/mailbox/tools/indexer/pom.xml
+++ b/mailbox/tools/indexer/pom.xml
@@ -124,6 +124,11 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>org.functionaljava</groupId>
+            <artifactId>functionaljava-java8</artifactId>
+            <version>4.8.1</version>
+        </dependency>
+        <dependency>
             <groupId>org.mockito</groupId>
             <artifactId>mockito-core</artifactId>
             <scope>test</scope>
diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
index 9dd74cb..f9e2c5e 100644
--- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
+++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
@@ -50,6 +50,7 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.ImmutableList;
 
+import fj.data.Either;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
@@ -78,6 +79,50 @@ public class ReIndexerPerformer {
         }
     }
 
+    private interface Failure {
+        void recordFailure(ReprocessingContext context);
+    }
+
+    private static class MailboxFailure implements Failure {
+        private final MailboxId mailboxId;
+
+        private MailboxFailure(MailboxId mailboxId) {
+            this.mailboxId = mailboxId;
+        }
+
+        public MailboxId getMailboxId() {
+            return mailboxId;
+        }
+
+        @Override
+        public void recordFailure(ReprocessingContext context) {
+            context.recordMailboxFailure(mailboxId);
+        }
+    }
+
+    private static class MessageFailure implements Failure {
+        private final MailboxId mailboxId;
+        private final MessageUid uid;
+
+        private MessageFailure(MailboxId mailboxId, MessageUid uid) {
+            this.mailboxId = mailboxId;
+            this.uid = uid;
+        }
+
+        public MailboxId getMailboxId() {
+            return mailboxId;
+        }
+
+        public MessageUid getUid() {
+            return uid;
+        }
+
+        @Override
+        public void recordFailure(ReprocessingContext context) {
+            context.recordFailureDetailsForMessage(mailboxId, uid);
+        }
+    }
+
     private static final Logger LOGGER = LoggerFactory.getLogger(ReIndexerPerformer.class);
 
     private static final int SINGLE_MESSAGE = 1;
@@ -101,8 +146,8 @@ public class ReIndexerPerformer {
         MailboxSession mailboxSession = mailboxManager.createSystemSession(RE_INDEXER_PERFORMER_USER);
         LOGGER.info("Starting a full reindex");
 
-        Flux<ReIndexingEntry> entriesToIndex = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession).list()
-            .flatMap(mailbox -> reIndexingEntriesForMailbox(mailbox, mailboxSession, reprocessingContext));
+        Flux<Either<Failure, ReIndexingEntry>> entriesToIndex = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession).list()
+            .flatMap(mailbox -> reIndexingEntriesForMailbox(mailbox, mailboxSession));
 
         return reIndexMessages(entriesToIndex, runningOptions, reprocessingContext)
             .doFinally(any -> LOGGER.info("Full reindex finished"));
@@ -111,9 +156,9 @@ public class ReIndexerPerformer {
     Mono<Result> reIndexSingleMailbox(MailboxId mailboxId, ReprocessingContext reprocessingContext, RunningOptions runningOptions) {
         MailboxSession mailboxSession = mailboxManager.createSystemSession(RE_INDEXER_PERFORMER_USER);
 
-        Flux<ReIndexingEntry> entriesToIndex = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession)
+        Flux<Either<Failure, ReIndexingEntry>> entriesToIndex = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession)
             .findMailboxByIdReactive(mailboxId)
-            .flatMapMany(mailbox -> reIndexingEntriesForMailbox(mailbox, mailboxSession, reprocessingContext));
+            .flatMapMany(mailbox -> reIndexingEntriesForMailbox(mailbox, mailboxSession));
 
         return reIndexMessages(entriesToIndex, runningOptions, reprocessingContext);
     }
@@ -126,8 +171,8 @@ public class ReIndexerPerformer {
         MailboxQuery mailboxQuery = MailboxQuery.privateMailboxesBuilder(mailboxSession).build();
 
         try {
-            Flux<ReIndexingEntry> entriesToIndex = mailboxMapper.findMailboxWithPathLike(mailboxQuery.asUserBound())
-                .flatMap(mailbox -> reIndexingEntriesForMailbox(mailbox, mailboxSession, reprocessingContext));
+            Flux<Either<Failure, ReIndexingEntry>> entriesToIndex = mailboxMapper.findMailboxWithPathLike(mailboxQuery.asUserBound())
+                .flatMap(mailbox -> reIndexingEntriesForMailbox(mailbox, mailboxSession));
 
             return reIndexMessages(entriesToIndex, runningOptions, reprocessingContext)
                 .doFinally(any -> LOGGER.info("User {} reindex finished", username.asString()));
@@ -143,7 +188,8 @@ public class ReIndexerPerformer {
         return mailboxSessionMapperFactory.getMailboxMapper(mailboxSession)
             .findMailboxByIdReactive(mailboxId)
             .flatMap(mailbox -> fullyReadMessage(mailboxSession, mailbox, uid)
-                .flatMap(message -> reIndexMessage(mailboxSession, mailbox, reprocessingContext, message)))
+                .map(message -> Either.<Failure, ReIndexingEntry>right(new ReIndexingEntry(mailbox, mailboxSession, message)))
+                .flatMap(entryOrFailure -> reIndexMessage(entryOrFailure, reprocessingContext)))
             .switchIfEmpty(Mono.just(Result.COMPLETED));
     }
 
@@ -165,24 +211,18 @@ public class ReIndexerPerformer {
         MailboxSession mailboxSession = mailboxManager.createSystemSession(RE_INDEXER_PERFORMER_USER);
         MailboxMapper mapper = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession);
 
-        return Flux.merge(
-            reIndexMessages(Flux.fromIterable(previousReIndexingFailures.messageFailures())
-                    .flatMap(failure -> createReindexingEntryFromFailure(failure, reprocessingContext)),
-                runningOptions,
-                reprocessingContext),
-            reIndexMailboxes(Flux.fromIterable(previousReIndexingFailures.mailboxFailures())
-                    .flatMap(mapper::findMailboxByIdReactive),
-                mailboxSession,
-                reprocessingContext,
-                runningOptions))
-            .reduce(Task::combine);
-    }
-
-    private Mono<Result> reIndexMailboxes(Flux<Mailbox> mailboxes, MailboxSession session, ReprocessingContext reprocessingContext, RunningOptions runningOptions) {
-        Flux<ReIndexingEntry> entries = mailboxes
-            .flatMap(mailbox -> reIndexingEntriesForMailbox(mailbox, session, reprocessingContext));
+        Flux<Either<Failure, ReIndexingEntry>> entriesToIndex = Flux.merge(
+            Flux.fromIterable(previousReIndexingFailures.messageFailures())
+                .flatMap(this::createReindexingEntryFromFailure),
+            Flux.fromIterable(previousReIndexingFailures.mailboxFailures())
+                .flatMap(mailboxId -> mapper.findMailboxByIdReactive(mailboxId)
+                    .flatMapMany(mailbox -> reIndexingEntriesForMailbox(mailbox, mailboxSession))
+                    .onErrorResume(e -> {
+                        LOGGER.warn("Failed to re-index {}", mailboxId, e);
+                        return Mono.just(Either.left(new MailboxFailure(mailboxId)));
+                    })));
 
-        return reIndexMessages(entries, runningOptions, reprocessingContext);
+        return reIndexMessages(entriesToIndex, runningOptions, reprocessingContext);
     }
 
     private Mono<Result> reIndex(MailboxMessage mailboxMessage, MailboxSession session) {
@@ -202,64 +242,78 @@ public class ReIndexerPerformer {
             .next();
     }
 
-    private Mono<ReIndexingEntry> createReindexingEntryFromFailure(ReIndexingFailure failure, ReprocessingContext reprocessingContext) {
+    private Mono<Either<Failure, ReIndexingEntry>> createReindexingEntryFromFailure(ReIndexingFailure previousFailure) {
         MailboxSession mailboxSession = mailboxManager.createSystemSession(RE_INDEXER_PERFORMER_USER);
 
         return mailboxSessionMapperFactory.getMailboxMapper(mailboxSession)
-            .findMailboxByIdReactive(failure.getMailboxId())
-            .flatMap(mailbox -> fullyReadMessage(mailboxSession, mailbox, failure.getUid())
-                .map(message -> new ReIndexingEntry(mailbox, mailboxSession, message)))
+            .findMailboxByIdReactive(previousFailure.getMailboxId())
+            .flatMap(mailbox -> fullyReadMessage(mailboxSession, mailbox, previousFailure.getUid())
+                .map(message -> Either.<Failure, ReIndexingEntry>right(new ReIndexingEntry(mailbox, mailboxSession, message))))
             .onErrorResume(e -> {
-                LOGGER.warn("ReIndexing failed for {}", failure, e);
-                reprocessingContext.recordFailureDetailsForMessage(failure.getMailboxId(), failure.getUid());
-                return Mono.empty();
+                LOGGER.warn("ReIndexing failed for {}", previousFailure, e);
+                return Mono.just(Either.left(new MessageFailure(previousFailure.getMailboxId(), previousFailure.getUid())));
             });
     }
 
-    private Flux<ReIndexingEntry> reIndexingEntriesForMailbox(Mailbox mailbox, MailboxSession mailboxSession, ReprocessingContext reprocessingContext) {
+    private Flux<Either<Failure, ReIndexingEntry>> reIndexingEntriesForMailbox(Mailbox mailbox, MailboxSession mailboxSession) {
         MessageMapper messageMapper = mailboxSessionMapperFactory.getMessageMapper(mailboxSession);
 
         return messageSearchIndex.deleteAll(mailboxSession, mailbox.getMailboxId())
             .thenMany(messageMapper.listAllMessageUids(mailbox))
-            .flatMap(uid -> reIndexingEntryForUid(mailbox, mailboxSession, reprocessingContext, messageMapper, uid))
+            .flatMap(uid -> reIndexingEntryForUid(mailbox, mailboxSession, messageMapper, uid))
             .onErrorResume(e -> {
                 LOGGER.warn("ReIndexing failed for {}", mailbox.generateAssociatedPath(), e);
-                reprocessingContext.recordMailboxFailure(mailbox.getMailboxId());
-                return Mono.empty();
+                return Mono.just(Either.left(new MailboxFailure(mailbox.getMailboxId())));
             });
     }
 
-    private Flux<ReIndexingEntry> reIndexingEntryForUid(Mailbox mailbox, MailboxSession mailboxSession, ReprocessingContext reprocessingContext, MessageMapper messageMapper, MessageUid uid) {
+    private Flux<Either<Failure, ReIndexingEntry>> reIndexingEntryForUid(Mailbox mailbox, MailboxSession mailboxSession, MessageMapper messageMapper, MessageUid uid) {
         return messageMapper.findInMailboxReactive(mailbox, MessageRange.one(uid), MessageMapper.FetchType.Full, UNLIMITED)
-            .map(message -> new ReIndexingEntry(mailbox, mailboxSession, message))
+            .map(message -> Either.<Failure, ReIndexingEntry>right(new ReIndexingEntry(mailbox, mailboxSession, message)))
             .onErrorResume(e -> {
-                LOGGER.warn("ReIndexing failed for {} - {}", mailbox.getMailboxId(), uid, e);
-                reprocessingContext.recordFailureDetailsForMessage(mailbox.getMailboxId(), uid);
-                return Mono.empty();
+                LOGGER.warn("ReIndexing failed for {} {}", mailbox.getMailboxId(), uid, e);
+                return Mono.just(Either.left(new MessageFailure(mailbox.getMailboxId(), uid)));
             });
     }
 
-    private Mono<Task.Result> reIndexMessages(Flux<ReIndexingEntry> entriesToIndex, RunningOptions runningOptions, ReprocessingContext reprocessingContext) {
-        return ReactorUtils.Throttler.<ReIndexingEntry, Task.Result>forOperation(entry -> reIndexMessage(entry.getMailboxSession(), entry.getMailbox(), reprocessingContext, entry.getMessage()))
+    private Mono<Task.Result> reIndexMessages(Flux<Either<Failure, ReIndexingEntry>> entriesToIndex, RunningOptions runningOptions, ReprocessingContext reprocessingContext) {
+        return ReactorUtils.Throttler.<Either<Failure, ReIndexingEntry>, Task.Result>forOperation(
+                entry -> reIndexMessage(entry, reprocessingContext))
             .window(runningOptions.getMessagesPerSecond(), Duration.ofSeconds(1))
             .throttle(entriesToIndex)
             .reduce(Task::combine)
-            .switchIfEmpty(Mono.fromSupplier(() -> {
-                if (reprocessingContext.failures().mailboxFailures().isEmpty()) {
-                    return Result.COMPLETED;
-                }
+            .switchIfEmpty(Mono.just(Result.COMPLETED));
+    }
+
+    private Mono<Task.Result> reIndexMessage(Either<Failure, ReIndexingEntry> entryOrFailure, ReprocessingContext reprocessingContext) {
+        return toMono(entryOrFailure.right().map(this::index))
+            .map(this::flapMapRight)
+            .map(either -> recordIndexingResult(reprocessingContext, either));
+    }
+
+    private Result recordIndexingResult(ReprocessingContext reprocessingContext, Either<Failure, Result> either) {
+        return either.either(
+            failure -> {
+                failure.recordFailure(reprocessingContext);
                 return Result.PARTIAL;
-            }));
+            },
+            result -> result.onComplete(reprocessingContext::recordSuccess));
     }
 
-    private Mono<Task.Result> reIndexMessage(MailboxSession mailboxSession, Mailbox mailbox, ReprocessingContext reprocessingContext, MailboxMessage message) {
-        return Mono.fromCallable(() -> messageSearchIndex.add(mailboxSession, mailbox, message))
-            .doOnNext(any -> reprocessingContext.recordSuccess())
-            .thenReturn(Result.COMPLETED)
+    private Mono<Either<Failure, Result>> index(ReIndexingEntry entry) {
+        return messageSearchIndex.add(entry.getMailboxSession(), entry.getMailbox(), entry.getMessage())
+            .thenReturn(Either.<Failure, Result>right(Result.COMPLETED))
             .onErrorResume(e -> {
-                LOGGER.warn("ReIndexing failed for {} {}", mailbox.generateAssociatedPath(), message.getUid(), e);
-                reprocessingContext.recordFailureDetailsForMessage(mailbox.getMailboxId(), message.getUid());
-                return Mono.just(Result.PARTIAL);
+                LOGGER.warn("ReIndexing failed for {} {}", entry.getMailbox().generateAssociatedPath(), entry.getMessage().getUid(), e);
+                return Mono.just(Either.left(new MessageFailure(entry.getMailbox().getMailboxId(), entry.getMessage().getUid())));
             });
     }
+
+    private <X, Y> Either<X, Y> flapMapRight(Either<X, Either<X, Y>> nestedEither) {
+        return nestedEither.right().bind(either -> either);
+    }
+
+    private <X, Y> Mono<Either<X, Y>> toMono(Either<X, Mono<Y>> either) {
+        return either.either(x -> Mono.just(Either.left(x)), yMono -> yMono.map(Either::right));
+    }
 }
\ No newline at end of file
diff --git a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MailboxesRoutesTest.java b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MailboxesRoutesTest.java
index 05a333c..c8c29cc 100644
--- a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MailboxesRoutesTest.java
+++ b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MailboxesRoutesTest.java
@@ -27,7 +27,6 @@ import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.verify;
@@ -213,7 +212,7 @@ class MailboxesRoutesTest {
                         MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
                         systemSession).getId();
 
-                doThrow(new RuntimeException())
+                doReturn(Mono.error(new RuntimeException()))
                     .when(searchIndex)
                     .add(any(MailboxSession.class), any(Mailbox.class), any(MailboxMessage.class));
 
@@ -401,7 +400,7 @@ class MailboxesRoutesTest {
                         MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
                         systemSession).getId();
 
-                doThrow(new RuntimeException())
+                doReturn(Mono.error(new RuntimeException()))
                     .when(searchIndex)
                     .add(any(MailboxSession.class), any(Mailbox.class), any(MailboxMessage.class));
 
@@ -743,7 +742,7 @@ class MailboxesRoutesTest {
                         MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
                         systemSession);
 
-                doThrow(new RuntimeException()).when(searchIndex).add(any(MailboxSession.class), any(Mailbox.class), any(MailboxMessage.class));
+                doReturn(Mono.error(new RuntimeException())).when(searchIndex).add(any(MailboxSession.class), any(Mailbox.class), any(MailboxMessage.class));
 
                 String taskId = with()
                     .post("/mailboxes?task=reIndex")
@@ -787,7 +786,7 @@ class MailboxesRoutesTest {
                         MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
                         systemSession).getId();
 
-                doThrow(new RuntimeException())
+                doReturn(Mono.error(new RuntimeException()))
                     .when(searchIndex)
                     .add(any(MailboxSession.class), any(Mailbox.class), any(MailboxMessage.class));
 
@@ -835,7 +834,7 @@ class MailboxesRoutesTest {
                         MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
                         systemSession).getId();
 
-                doThrow(new RuntimeException()).when(searchIndex).add(any(MailboxSession.class), any(Mailbox.class), any(MailboxMessage.class));
+                doReturn(Mono.error(new RuntimeException())).when(searchIndex).add(any(MailboxSession.class), any(Mailbox.class), any(MailboxMessage.class));
 
                 String taskId = with()
                     .post("/mailboxes?task=reIndex")
diff --git a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/UserMailboxesRoutesTest.java b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/UserMailboxesRoutesTest.java
index 6d0e299..5dd021b 100644
--- a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/UserMailboxesRoutesTest.java
+++ b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/UserMailboxesRoutesTest.java
@@ -1231,7 +1231,7 @@ class UserMailboxesRoutesTest {
                         MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
                         systemSession).getId();
 
-                doThrow(new RuntimeException())
+                doReturn(Mono.error(new RuntimeException()))
                     .when(searchIndex)
                     .add(any(MailboxSession.class), any(Mailbox.class), any(MailboxMessage.class));
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org