You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by rc...@apache.org on 2020/06/10 04:10:40 UTC

[james-project] 04/08: JAMES-3201 Add details about mailbox failures for indexing tasks

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit d82a3aa4324c5f56fd14effbaa0c25230614395e
Author: LanKhuat <kh...@gmail.com>
AuthorDate: Wed Jun 3 17:40:00 2020 +0700

    JAMES-3201 Add details about mailbox failures for indexing tasks
---
 .../indexer/ReIndexingExecutionFailures.java       |  30 ++-
 .../tools/indexer/ErrorRecoveryIndexationTask.java |  14 +-
 .../indexer/ErrorRecoveryIndexationTaskDTO.java    |   2 +-
 .../mailbox/tools/indexer/ReIndexerPerformer.java  |  68 ++++--
 .../mailbox/tools/indexer/ReprocessingContext.java |   8 +-
 .../indexer/ReprocessingContextInformationDTO.java |   2 +-
 .../SerializableReIndexingExecutionFailures.java   |   2 +-
 .../tools/indexer/CassandraReIndexerImplTest.java  | 243 ++++++++++++++++++++-
 8 files changed, 339 insertions(+), 30 deletions(-)

diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/indexer/ReIndexingExecutionFailures.java b/mailbox/api/src/main/java/org/apache/james/mailbox/indexer/ReIndexingExecutionFailures.java
index 429c9a4..97446cb 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/indexer/ReIndexingExecutionFailures.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/indexer/ReIndexingExecutionFailures.java
@@ -25,6 +25,7 @@ import java.util.Objects;
 import org.apache.james.mailbox.MessageUid;
 import org.apache.james.mailbox.model.MailboxId;
 
+import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableList;
 
 public class ReIndexingExecutionFailures {
@@ -61,16 +62,29 @@ public class ReIndexingExecutionFailures {
             return Objects.hash(mailboxId, uid);
         }
 
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(this)
+                .add("mailboxId", mailboxId)
+                .add("uid", uid)
+                .toString();
+        }
     }
 
-    private final List<ReIndexingFailure> failures;
+    private final List<ReIndexingFailure> messageFailures;
+    private final List<MailboxId> mailboxFailures;
 
-    public ReIndexingExecutionFailures(List<ReIndexingFailure> failures) {
-        this.failures = failures;
+    public ReIndexingExecutionFailures(List<ReIndexingFailure> messageFailures, List<MailboxId> mailboxFailures) {
+        this.messageFailures = messageFailures;
+        this.mailboxFailures = mailboxFailures;
     }
 
-    public List<ReIndexingFailure> failures() {
-        return ImmutableList.copyOf(failures);
+    public List<ReIndexingFailure> messageFailures() {
+        return ImmutableList.copyOf(messageFailures);
+    }
+
+    public List<MailboxId> mailboxFailures() {
+        return ImmutableList.copyOf(mailboxFailures);
     }
 
     @Override
@@ -78,14 +92,14 @@ public class ReIndexingExecutionFailures {
         if (o instanceof ReIndexingExecutionFailures) {
             ReIndexingExecutionFailures that = (ReIndexingExecutionFailures) o;
 
-            return Objects.equals(this.failures, that.failures);
+            return Objects.equals(this.messageFailures, that.messageFailures)
+                && Objects.equals(this.mailboxFailures, that.mailboxFailures);
         }
         return false;
     }
 
     @Override
     public final int hashCode() {
-        return Objects.hash(failures);
+        return Objects.hash(messageFailures, mailboxFailures);
     }
-
 }
diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ErrorRecoveryIndexationTask.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ErrorRecoveryIndexationTask.java
index df20874..91dfbe4 100644
--- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ErrorRecoveryIndexationTask.java
+++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ErrorRecoveryIndexationTask.java
@@ -49,8 +49,8 @@ public class ErrorRecoveryIndexationTask implements Task {
             this.mailboxIdFactory = mailboxIdFactory;
         }
 
-        private List<ReIndexingExecutionFailures.ReIndexingFailure> failuresFromDTO(List<ErrorRecoveryIndexationTaskDTO.ReindexingFailureDTO> failureDTOs) {
-            return failureDTOs
+        private List<ReIndexingExecutionFailures.ReIndexingFailure> messageFailuresFromDTO(List<ErrorRecoveryIndexationTaskDTO.ReindexingFailureDTO> messageFailures) {
+            return messageFailures
                 .stream()
                 .flatMap(dto -> dto.getUids()
                     .stream()
@@ -58,9 +58,17 @@ public class ErrorRecoveryIndexationTask implements Task {
                 .collect(Guavate.toImmutableList());
         }
 
+        private List<MailboxId> mailboxFailuresFromDTO(Optional<List<String>> mailboxFailures) {
+            return mailboxFailures.map(mailboxIdList ->
+                    mailboxIdList.stream()
+                        .map(mailboxIdFactory::fromString)
+                        .collect(Guavate.toImmutableList()))
+                .orElse(ImmutableList.of());
+        }
+
         public ErrorRecoveryIndexationTask create(ErrorRecoveryIndexationTaskDTO dto) {
             return new ErrorRecoveryIndexationTask(reIndexerPerformer,
-                new ReIndexingExecutionFailures(failuresFromDTO(dto.getPreviousFailures())),
+                new ReIndexingExecutionFailures(messageFailuresFromDTO(dto.getPreviousFailures())),
                 dto.getRunningOptions()
                     .map(RunningOptionsDTO::toDomainObject)
                     .orElse(RunningOptions.DEFAULT));
diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ErrorRecoveryIndexationTaskDTO.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ErrorRecoveryIndexationTaskDTO.java
index 1385686..2ce86ba 100644
--- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ErrorRecoveryIndexationTaskDTO.java
+++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ErrorRecoveryIndexationTaskDTO.java
@@ -49,7 +49,7 @@ public class ErrorRecoveryIndexationTaskDTO implements TaskDTO {
 
     public static ErrorRecoveryIndexationTaskDTO of(ErrorRecoveryIndexationTask task, String type) {
         Multimap<MailboxId, ReIndexingExecutionFailures.ReIndexingFailure> failuresByMailboxId = task.getPreviousFailures()
-            .failures()
+            .messageFailures()
             .stream()
             .collect(Guavate.toImmutableListMultimap(ReIndexingExecutionFailures.ReIndexingFailure::getMailboxId, Function.identity()));
 
diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
index e9b08d3..9dd74cb 100644
--- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
+++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
@@ -102,7 +102,7 @@ public class ReIndexerPerformer {
         LOGGER.info("Starting a full reindex");
 
         Flux<ReIndexingEntry> entriesToIndex = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession).list()
-            .flatMap(mailbox -> reIndexingEntriesForMailbox(mailbox, mailboxSession));
+            .flatMap(mailbox -> reIndexingEntriesForMailbox(mailbox, mailboxSession, reprocessingContext));
 
         return reIndexMessages(entriesToIndex, runningOptions, reprocessingContext)
             .doFinally(any -> LOGGER.info("Full reindex finished"));
@@ -113,7 +113,7 @@ public class ReIndexerPerformer {
 
         Flux<ReIndexingEntry> entriesToIndex = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession)
             .findMailboxByIdReactive(mailboxId)
-            .flatMapMany(mailbox -> reIndexingEntriesForMailbox(mailbox, mailboxSession));
+            .flatMapMany(mailbox -> reIndexingEntriesForMailbox(mailbox, mailboxSession, reprocessingContext));
 
         return reIndexMessages(entriesToIndex, runningOptions, reprocessingContext);
     }
@@ -127,7 +127,7 @@ public class ReIndexerPerformer {
 
         try {
             Flux<ReIndexingEntry> entriesToIndex = mailboxMapper.findMailboxWithPathLike(mailboxQuery.asUserBound())
-                .flatMap(mailbox -> reIndexingEntriesForMailbox(mailbox, mailboxSession));
+                .flatMap(mailbox -> reIndexingEntriesForMailbox(mailbox, mailboxSession, reprocessingContext));
 
             return reIndexMessages(entriesToIndex, runningOptions, reprocessingContext)
                 .doFinally(any -> LOGGER.info("User {} reindex finished", username.asString()));
@@ -162,11 +162,27 @@ public class ReIndexerPerformer {
     }
 
     Mono<Result> reIndexErrors(ReprocessingContext reprocessingContext, ReIndexingExecutionFailures previousReIndexingFailures, RunningOptions runningOptions) {
-        return reIndexMessages(
-            Flux.fromIterable(previousReIndexingFailures.failures())
-                .flatMap(this::createReindexingEntryFromFailure),
-            runningOptions,
-            reprocessingContext);
+        MailboxSession mailboxSession = mailboxManager.createSystemSession(RE_INDEXER_PERFORMER_USER);
+        MailboxMapper mapper = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession);
+
+        return Flux.merge(
+            reIndexMessages(Flux.fromIterable(previousReIndexingFailures.messageFailures())
+                    .flatMap(failure -> createReindexingEntryFromFailure(failure, reprocessingContext)),
+                runningOptions,
+                reprocessingContext),
+            reIndexMailboxes(Flux.fromIterable(previousReIndexingFailures.mailboxFailures())
+                    .flatMap(mapper::findMailboxByIdReactive),
+                mailboxSession,
+                reprocessingContext,
+                runningOptions))
+            .reduce(Task::combine);
+    }
+
+    private Mono<Result> reIndexMailboxes(Flux<Mailbox> mailboxes, MailboxSession session, ReprocessingContext reprocessingContext, RunningOptions runningOptions) {
+        Flux<ReIndexingEntry> entries = mailboxes
+            .flatMap(mailbox -> reIndexingEntriesForMailbox(mailbox, session, reprocessingContext));
+
+        return reIndexMessages(entries, runningOptions, reprocessingContext);
     }
 
     private Mono<Result> reIndex(MailboxMessage mailboxMessage, MailboxSession session) {
@@ -186,22 +202,41 @@ public class ReIndexerPerformer {
             .next();
     }
 
-    private Mono<ReIndexingEntry> createReindexingEntryFromFailure(ReIndexingFailure failure) {
+    private Mono<ReIndexingEntry> createReindexingEntryFromFailure(ReIndexingFailure failure, ReprocessingContext reprocessingContext) {
         MailboxSession mailboxSession = mailboxManager.createSystemSession(RE_INDEXER_PERFORMER_USER);
 
         return mailboxSessionMapperFactory.getMailboxMapper(mailboxSession)
             .findMailboxByIdReactive(failure.getMailboxId())
             .flatMap(mailbox -> fullyReadMessage(mailboxSession, mailbox, failure.getUid())
-                .map(message -> new ReIndexingEntry(mailbox, mailboxSession, message)));
+                .map(message -> new ReIndexingEntry(mailbox, mailboxSession, message)))
+            .onErrorResume(e -> {
+                LOGGER.warn("ReIndexing failed for {}", failure, e);
+                reprocessingContext.recordFailureDetailsForMessage(failure.getMailboxId(), failure.getUid());
+                return Mono.empty();
+            });
     }
 
-    private Flux<ReIndexingEntry> reIndexingEntriesForMailbox(Mailbox mailbox, MailboxSession mailboxSession) {
+    private Flux<ReIndexingEntry> reIndexingEntriesForMailbox(Mailbox mailbox, MailboxSession mailboxSession, ReprocessingContext reprocessingContext) {
         MessageMapper messageMapper = mailboxSessionMapperFactory.getMessageMapper(mailboxSession);
 
         return messageSearchIndex.deleteAll(mailboxSession, mailbox.getMailboxId())
             .thenMany(messageMapper.listAllMessageUids(mailbox))
-            .flatMap(uid -> messageMapper.findInMailboxReactive(mailbox, MessageRange.one(uid), MessageMapper.FetchType.Full, UNLIMITED))
-            .map(message -> new ReIndexingEntry(mailbox, mailboxSession, message));
+            .flatMap(uid -> reIndexingEntryForUid(mailbox, mailboxSession, reprocessingContext, messageMapper, uid))
+            .onErrorResume(e -> {
+                LOGGER.warn("ReIndexing failed for {}", mailbox.generateAssociatedPath(), e);
+                reprocessingContext.recordMailboxFailure(mailbox.getMailboxId());
+                return Mono.empty();
+            });
+    }
+
+    private Flux<ReIndexingEntry> reIndexingEntryForUid(Mailbox mailbox, MailboxSession mailboxSession, ReprocessingContext reprocessingContext, MessageMapper messageMapper, MessageUid uid) {
+        return messageMapper.findInMailboxReactive(mailbox, MessageRange.one(uid), MessageMapper.FetchType.Full, UNLIMITED)
+            .map(message -> new ReIndexingEntry(mailbox, mailboxSession, message))
+            .onErrorResume(e -> {
+                LOGGER.warn("ReIndexing failed for {} - {}", mailbox.getMailboxId(), uid, e);
+                reprocessingContext.recordFailureDetailsForMessage(mailbox.getMailboxId(), uid);
+                return Mono.empty();
+            });
     }
 
     private Mono<Task.Result> reIndexMessages(Flux<ReIndexingEntry> entriesToIndex, RunningOptions runningOptions, ReprocessingContext reprocessingContext) {
@@ -209,7 +244,12 @@ public class ReIndexerPerformer {
             .window(runningOptions.getMessagesPerSecond(), Duration.ofSeconds(1))
             .throttle(entriesToIndex)
             .reduce(Task::combine)
-            .switchIfEmpty(Mono.just(Result.COMPLETED));
+            .switchIfEmpty(Mono.fromSupplier(() -> {
+                if (reprocessingContext.failures().mailboxFailures().isEmpty()) {
+                    return Result.COMPLETED;
+                }
+                return Result.PARTIAL;
+            }));
     }
 
     private Mono<Task.Result> reIndexMessage(MailboxSession mailboxSession, Mailbox mailbox, ReprocessingContext reprocessingContext, MailboxMessage message) {
diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReprocessingContext.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReprocessingContext.java
index c0c3eb7..bc4facb 100644
--- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReprocessingContext.java
+++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReprocessingContext.java
@@ -32,11 +32,13 @@ class ReprocessingContext {
     private final AtomicInteger successfullyReprocessedMails;
     private final AtomicInteger failedReprocessingMails;
     private final ConcurrentLinkedDeque<ReIndexingExecutionFailures.ReIndexingFailure> failures;
+    private final ConcurrentLinkedDeque<MailboxId> mailboxFailures;
 
     ReprocessingContext() {
         failedReprocessingMails = new AtomicInteger(0);
         successfullyReprocessedMails = new AtomicInteger(0);
         failures = new ConcurrentLinkedDeque<>();
+        mailboxFailures = new ConcurrentLinkedDeque<>();
     }
 
     void recordFailureDetailsForMessage(MailboxId mailboxId, MessageUid uid) {
@@ -48,6 +50,10 @@ class ReprocessingContext {
         successfullyReprocessedMails.incrementAndGet();
     }
 
+    void recordMailboxFailure(MailboxId mailboxId) {
+        mailboxFailures.add(mailboxId);
+    }
+
     int successfullyReprocessedMailCount() {
         return successfullyReprocessedMails.get();
     }
@@ -57,6 +63,6 @@ class ReprocessingContext {
     }
 
     ReIndexingExecutionFailures failures() {
-        return new ReIndexingExecutionFailures(ImmutableList.copyOf(failures));
+        return new ReIndexingExecutionFailures(ImmutableList.copyOf(failures), ImmutableList.copyOf(mailboxFailures));
     }
 }
diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReprocessingContextInformationDTO.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReprocessingContextInformationDTO.java
index 446b387..5bf65dc 100644
--- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReprocessingContextInformationDTO.java
+++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReprocessingContextInformationDTO.java
@@ -169,7 +169,7 @@ public class ReprocessingContextInformationDTO implements AdditionalInformationD
     }
 
     static List<ReindexingFailureDTO> serializeFailures(ReIndexingExecutionFailures failures) {
-        ImmutableListMultimap<MailboxId, ReIndexingExecutionFailures.ReIndexingFailure> failuresByMailbox = failures.failures()
+        ImmutableListMultimap<MailboxId, ReIndexingExecutionFailures.ReIndexingFailure> failuresByMailbox = failures.messageFailures()
             .stream()
             .collect(Guavate.toImmutableListMultimap(ReIndexingExecutionFailures.ReIndexingFailure::getMailboxId));
 
diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/SerializableReIndexingExecutionFailures.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/SerializableReIndexingExecutionFailures.java
index 3b3515a..5c7c489 100644
--- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/SerializableReIndexingExecutionFailures.java
+++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/SerializableReIndexingExecutionFailures.java
@@ -52,7 +52,7 @@ public class SerializableReIndexingExecutionFailures {
 
     public static SerializableReIndexingExecutionFailures from(ReIndexingExecutionFailures reIndexingExecutionFailures) {
         return new SerializableReIndexingExecutionFailures(
-            reIndexingExecutionFailures.failures()
+            reIndexingExecutionFailures.messageFailures()
                 .stream()
                 .map(failure -> new SerializableReIndexingExecutionFailures.SerializableReIndexingFailure(failure.getMailboxId(), failure.getUid()))
                 .collect(Guavate.toImmutableList()));
diff --git a/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/CassandraReIndexerImplTest.java b/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/CassandraReIndexerImplTest.java
index 55ad6ed..bdd950e 100644
--- a/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/CassandraReIndexerImplTest.java
+++ b/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/CassandraReIndexerImplTest.java
@@ -19,6 +19,8 @@
 
 package org.apache.mailbox.tools.indexer;
 
+import static org.apache.james.backends.cassandra.Scenario.Builder.fail;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -34,10 +36,14 @@ import org.apache.james.backends.cassandra.CassandraClusterExtension;
 import org.apache.james.core.Username;
 import org.apache.james.mailbox.MailboxSession;
 import org.apache.james.mailbox.MessageManager;
+import org.apache.james.mailbox.MessageManager.AppendCommand;
+import org.apache.james.mailbox.MessageManager.AppendResult;
 import org.apache.james.mailbox.cassandra.CassandraMailboxManager;
 import org.apache.james.mailbox.cassandra.CassandraMailboxManagerProvider;
 import org.apache.james.mailbox.cassandra.mail.MailboxAggregateModule;
 import org.apache.james.mailbox.indexer.ReIndexer;
+import org.apache.james.mailbox.indexer.ReIndexingExecutionFailures;
+import org.apache.james.mailbox.indexer.ReIndexingExecutionFailures.ReIndexingFailure;
 import org.apache.james.mailbox.model.Mailbox;
 import org.apache.james.mailbox.model.MailboxId;
 import org.apache.james.mailbox.model.MailboxPath;
@@ -45,10 +51,15 @@ import org.apache.james.mailbox.store.MailboxSessionMapperFactory;
 import org.apache.james.mailbox.store.PreDeletionHooks;
 import org.apache.james.mailbox.store.mail.model.MailboxMessage;
 import org.apache.james.mailbox.store.search.ListeningMessageSearchIndex;
+import org.apache.james.task.Task;
 import org.apache.james.util.concurrency.ConcurrentTestRunner;
+import org.apache.mailbox.tools.indexer.ReprocessingContextInformationDTO.ReprocessingContextInformationForErrorRecoveryIndexationTask;
+import org.apache.mailbox.tools.indexer.ReprocessingContextInformationDTO.ReprocessingContextInformationForFullReindexingTask;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableList;
 
 import com.google.common.base.Strings;
 
@@ -90,7 +101,7 @@ public class CassandraReIndexerImplTest {
         ConcurrentTestRunner.builder()
             .operation((a, b) -> mailbox
                 .appendMessage(
-                    MessageManager.AppendCommand.builder().build(bigBody),
+                    AppendCommand.builder().build(bigBody),
                     systemSession))
             .threadCount(threadCount)
             .operationCount(operationCount)
@@ -105,4 +116,234 @@ public class CassandraReIndexerImplTest {
             .add(any(MailboxSession.class), any(Mailbox.class),any(MailboxMessage.class));
         verifyNoMoreInteractions(messageSearchIndex);
     }
+
+    @Nested
+    class FailureTesting {
+        @Test
+        void fullReindexingShouldReturnPartialUponFailure(CassandraCluster cassandra) throws Exception {
+            MailboxSession session = mailboxManager.createSystemSession(USERNAME);
+            mailboxManager.createMailbox(INBOX, session);
+
+            MessageManager mailbox = mailboxManager.getMailbox(INBOX, session);
+            mailbox.appendMessage(AppendCommand.builder().build("header: value\r\n\r\nbody"), session);
+
+            cassandra.getConf()
+                .registerScenario(fail()
+                    .forever()
+                    .whenQueryStartsWith("SELECT messageId,mailboxId,uid,modSeq,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags FROM messageIdTable WHERE mailboxId=:mailboxId;"));
+
+            Task task = reIndexer.reIndex(ReIndexer.RunningOptions.DEFAULT);
+            Task.Result result = task.run();
+
+            assertThat(result).isEqualTo(Task.Result.PARTIAL);
+        }
+
+        @Test
+        void fullReindexingShouldUpdateDetailsUponFailure(CassandraCluster cassandra) throws Exception {
+            MailboxSession session = mailboxManager.createSystemSession(USERNAME);
+            mailboxManager.createMailbox(INBOX, session);
+
+            MessageManager mailbox = mailboxManager.getMailbox(INBOX, session);
+            mailbox.appendMessage(AppendCommand.builder().build("header: value\r\n\r\nbody"), session);
+
+            cassandra.getConf()
+                .registerScenario(fail()
+                    .forever()
+                    .whenQueryStartsWith("SELECT messageId,mailboxId,uid,modSeq,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags FROM messageIdTable WHERE mailboxId=:mailboxId;"));
+
+            Task task = reIndexer.reIndex(ReIndexer.RunningOptions.DEFAULT);
+            task.run();
+
+            ReprocessingContextInformationForFullReindexingTask information = (ReprocessingContextInformationForFullReindexingTask) task.details().get();
+            assertThat(information.failures().mailboxFailures()).containsExactly(mailbox.getId());
+        }
+
+        @Test
+        void singleMailboxReindexingShouldReturnPartialUponFailure(CassandraCluster cassandra) throws Exception {
+            MailboxSession session = mailboxManager.createSystemSession(USERNAME);
+            mailboxManager.createMailbox(INBOX, session);
+
+            MessageManager mailbox = mailboxManager.getMailbox(INBOX, session);
+            mailbox.appendMessage(AppendCommand.builder().build("header: value\r\n\r\nbody"), session);
+
+            cassandra.getConf()
+                .registerScenario(fail()
+                    .forever()
+                    .whenQueryStartsWith("SELECT messageId,mailboxId,uid,modSeq,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags FROM messageIdTable WHERE mailboxId=:mailboxId;"));
+
+            Task task = reIndexer.reIndex(mailbox.getId(), ReIndexer.RunningOptions.DEFAULT);
+            Task.Result result = task.run();
+
+            assertThat(result).isEqualTo(Task.Result.PARTIAL);
+        }
+
+        @Test
+        void singleMailboxReindexingShouldUpdateDetailsUponFailure(CassandraCluster cassandra) throws Exception {
+            MailboxSession session = mailboxManager.createSystemSession(USERNAME);
+            mailboxManager.createMailbox(INBOX, session);
+
+            MessageManager mailbox = mailboxManager.getMailbox(INBOX, session);
+            mailbox.appendMessage(AppendCommand.builder().build("header: value\r\n\r\nbody"), session);
+
+            cassandra.getConf()
+                .registerScenario(fail()
+                    .forever()
+                    .whenQueryStartsWith("SELECT messageId,mailboxId,uid,modSeq,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags FROM messageIdTable WHERE mailboxId=:mailboxId;"));
+
+            Task task = reIndexer.reIndex(mailbox.getId(), ReIndexer.RunningOptions.DEFAULT);
+            task.run();
+
+            SingleMailboxReindexingTask.AdditionalInformation information = (SingleMailboxReindexingTask.AdditionalInformation) task.details().get();
+            assertThat(information.failures().mailboxFailures()).containsExactly(mailbox.getId());
+        }
+
+        @Test
+        void userMailboxReindexingShouldReturnPartialUponFailure(CassandraCluster cassandra) throws Exception {
+            MailboxSession session = mailboxManager.createSystemSession(USERNAME);
+            mailboxManager.createMailbox(INBOX, session);
+
+            MessageManager mailbox = mailboxManager.getMailbox(INBOX, session);
+            mailbox.appendMessage(AppendCommand.builder().build("header: value\r\n\r\nbody"), session);
+
+            cassandra.getConf()
+                .registerScenario(fail()
+                    .forever()
+                    .whenQueryStartsWith("SELECT messageId,mailboxId,uid,modSeq,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags FROM messageIdTable WHERE mailboxId=:mailboxId;"));
+
+            Task task = reIndexer.reIndex(USERNAME, ReIndexer.RunningOptions.DEFAULT);
+            Task.Result result = task.run();
+
+            assertThat(result).isEqualTo(Task.Result.PARTIAL);
+        }
+
+        @Test
+        void userMailboxReindexingShouldUpdateDetailsUponFailure(CassandraCluster cassandra) throws Exception {
+            MailboxSession session = mailboxManager.createSystemSession(USERNAME);
+            mailboxManager.createMailbox(INBOX, session);
+
+            MessageManager mailbox = mailboxManager.getMailbox(INBOX, session);
+            mailbox.appendMessage(AppendCommand.builder().build("header: value\r\n\r\nbody"), session);
+
+            cassandra.getConf()
+                .registerScenario(fail()
+                    .forever()
+                    .whenQueryStartsWith("SELECT messageId,mailboxId,uid,modSeq,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags FROM messageIdTable WHERE mailboxId=:mailboxId;"));
+
+            Task task = reIndexer.reIndex(USERNAME, ReIndexer.RunningOptions.DEFAULT);
+            task.run();
+
+            UserReindexingTask.AdditionalInformation information = (UserReindexingTask.AdditionalInformation) task.details().get();
+            assertThat(information.failures().mailboxFailures()).containsExactly(mailbox.getId());
+        }
+
+        @Test
+        void errorReindexingShouldReturnPartialUponFailure(CassandraCluster cassandra) throws Exception {
+            MailboxSession session = mailboxManager.createSystemSession(USERNAME);
+            mailboxManager.createMailbox(INBOX, session);
+
+            MessageManager mailbox = mailboxManager.getMailbox(INBOX, session);
+            AppendResult appendResult = mailbox.appendMessage(AppendCommand.builder().build("header: value\r\n\r\nbody"), session);
+
+            cassandra.getConf()
+                .registerScenario(fail()
+                    .forever()
+                    .whenQueryStartsWith("SELECT messageId,mailboxId,uid,modSeq,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags FROM messageIdTable WHERE mailboxId=:mailboxId;"));
+
+            Task task = reIndexer.reIndex(new ReIndexingExecutionFailures(
+                ImmutableList.of(new ReIndexingFailure(mailbox.getId(),
+                    appendResult.getId().getUid())),
+                ImmutableList.of(mailbox.getId())),
+                ReIndexer.RunningOptions.DEFAULT);
+            Task.Result result = task.run();
+
+            assertThat(result).isEqualTo(Task.Result.PARTIAL);
+        }
+
+        @Test
+        void errorReindexingShouldUpdateDetailsUponFailure(CassandraCluster cassandra) throws Exception {
+            MailboxSession session = mailboxManager.createSystemSession(USERNAME);
+            mailboxManager.createMailbox(INBOX, session);
+
+            MessageManager mailbox = mailboxManager.getMailbox(INBOX, session);
+            AppendResult appendResult = mailbox.appendMessage(AppendCommand.builder().build("header: value\r\n\r\nbody"), session);
+
+            cassandra.getConf()
+                .registerScenario(fail()
+                    .forever()
+                    .whenQueryStartsWith("SELECT messageId,mailboxId,uid,modSeq,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags FROM messageIdTable WHERE mailboxId=:mailboxId;"));
+
+            Task task = reIndexer.reIndex(new ReIndexingExecutionFailures(
+                ImmutableList.of(new ReIndexingFailure(mailbox.getId(),
+                    appendResult.getId().getUid())),
+                    ImmutableList.of(mailbox.getId())),
+                ReIndexer.RunningOptions.DEFAULT);
+            task.run();
+
+            ReprocessingContextInformationForErrorRecoveryIndexationTask information = (ReprocessingContextInformationForErrorRecoveryIndexationTask) task.details().get();
+            assertThat(information.failures().mailboxFailures()).containsExactly(mailbox.getId());
+        }
+
+        @Test
+        void errorReindexingShouldUpdateDetailsUponReadingMailboxError(CassandraCluster cassandra) throws Exception {
+            MailboxSession session = mailboxManager.createSystemSession(USERNAME);
+            mailboxManager.createMailbox(INBOX, session);
+
+            MessageManager mailbox = mailboxManager.getMailbox(INBOX, session);
+            AppendResult appendResult = mailbox.appendMessage(AppendCommand.builder().build("header: value\r\n\r\nbody"), session);
+
+            cassandra.getConf()
+                .registerScenario(fail()
+                    .forever()
+                    .whenQueryStartsWith("SELECT id,mailboxbase,uidvalidity,name FROM mailbox WHERE id=:id;"));
+
+            Task task = reIndexer.reIndex(new ReIndexingExecutionFailures(
+                    ImmutableList.of(new ReIndexingFailure(mailbox.getId(),
+                        appendResult.getId().getUid())),
+                    ImmutableList.of()),
+                ReIndexer.RunningOptions.DEFAULT);
+            task.run();
+
+            ReprocessingContextInformationForErrorRecoveryIndexationTask information = (ReprocessingContextInformationForErrorRecoveryIndexationTask) task.details().get();
+            assertThat(information.failures().messageFailures()).containsExactly(new ReIndexingFailure(mailbox.getId(), appendResult.getId().getUid()));
+        }
+
+        @Test
+        void fullReindexingShouldUpdateDetailsUponSingleMessageFullReadError(CassandraCluster cassandra) throws Exception {
+            MailboxSession session = mailboxManager.createSystemSession(USERNAME);
+            mailboxManager.createMailbox(INBOX, session);
+
+            MessageManager mailbox = mailboxManager.getMailbox(INBOX, session);
+            AppendResult appendResult = mailbox.appendMessage(AppendCommand.builder().build("header: value\r\n\r\nbody"), session);
+
+            cassandra.getConf()
+                .registerScenario(fail()
+                    .forever()
+                    .whenQueryStartsWith("SELECT messageId,internalDate,bodyStartOctet,fullContentOctets,bodyOctets,bodyContent,headerContent,textualLineCount,properties,attachments FROM messageV2 WHERE messageId=:messageId;"));
+
+            Task task = reIndexer.reIndex(ReIndexer.RunningOptions.DEFAULT);
+            task.run();
+
+            ReprocessingContextInformationForFullReindexingTask information = (ReprocessingContextInformationForFullReindexingTask) task.details().get();
+            assertThat(information.failures().messageFailures()).containsExactly(new ReIndexingFailure(mailbox.getId(), appendResult.getId().getUid()));
+        }
+    }
+
+    @Test
+    void errorReindexingShouldReindexPreviouslyFailedMailbox() throws Exception {
+        MailboxSession session = mailboxManager.createSystemSession(USERNAME);
+        mailboxManager.createMailbox(INBOX, session);
+
+        MessageManager mailbox = mailboxManager.getMailbox(INBOX, session);
+        mailbox.appendMessage(AppendCommand.builder().build("header: value\r\n\r\nbody"), session);
+
+        Task task = reIndexer.reIndex(new ReIndexingExecutionFailures(
+                ImmutableList.of(),
+                ImmutableList.of(mailbox.getId())),
+            ReIndexer.RunningOptions.DEFAULT);
+        task.run();
+
+        verify(messageSearchIndex).deleteAll(any(MailboxSession.class), any(MailboxId.class));
+        verify(messageSearchIndex, times(1))
+            .add(any(MailboxSession.class), any(Mailbox.class),any(MailboxMessage.class));
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org