You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by rc...@apache.org on 2020/06/10 04:10:40 UTC
[james-project] 04/08: JAMES-3201 Add details about mailbox
failures for indexing tasks
This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit d82a3aa4324c5f56fd14effbaa0c25230614395e
Author: LanKhuat <kh...@gmail.com>
AuthorDate: Wed Jun 3 17:40:00 2020 +0700
JAMES-3201 Add details about mailbox failures for indexing tasks
---
.../indexer/ReIndexingExecutionFailures.java | 30 ++-
.../tools/indexer/ErrorRecoveryIndexationTask.java | 14 +-
.../indexer/ErrorRecoveryIndexationTaskDTO.java | 2 +-
.../mailbox/tools/indexer/ReIndexerPerformer.java | 68 ++++--
.../mailbox/tools/indexer/ReprocessingContext.java | 8 +-
.../indexer/ReprocessingContextInformationDTO.java | 2 +-
.../SerializableReIndexingExecutionFailures.java | 2 +-
.../tools/indexer/CassandraReIndexerImplTest.java | 243 ++++++++++++++++++++-
8 files changed, 339 insertions(+), 30 deletions(-)
diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/indexer/ReIndexingExecutionFailures.java b/mailbox/api/src/main/java/org/apache/james/mailbox/indexer/ReIndexingExecutionFailures.java
index 429c9a4..97446cb 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/indexer/ReIndexingExecutionFailures.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/indexer/ReIndexingExecutionFailures.java
@@ -25,6 +25,7 @@ import java.util.Objects;
import org.apache.james.mailbox.MessageUid;
import org.apache.james.mailbox.model.MailboxId;
+import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
public class ReIndexingExecutionFailures {
@@ -61,16 +62,29 @@ public class ReIndexingExecutionFailures {
return Objects.hash(mailboxId, uid);
}
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("mailboxId", mailboxId)
+ .add("uid", uid)
+ .toString();
+ }
}
- private final List<ReIndexingFailure> failures;
+ private final List<ReIndexingFailure> messageFailures;
+ private final List<MailboxId> mailboxFailures;
- public ReIndexingExecutionFailures(List<ReIndexingFailure> failures) {
- this.failures = failures;
+ public ReIndexingExecutionFailures(List<ReIndexingFailure> messageFailures, List<MailboxId> mailboxFailures) {
+ this.messageFailures = messageFailures;
+ this.mailboxFailures = mailboxFailures;
}
- public List<ReIndexingFailure> failures() {
- return ImmutableList.copyOf(failures);
+ public List<ReIndexingFailure> messageFailures() {
+ return ImmutableList.copyOf(messageFailures);
+ }
+
+ public List<MailboxId> mailboxFailures() {
+ return ImmutableList.copyOf(mailboxFailures);
}
@Override
@@ -78,14 +92,14 @@ public class ReIndexingExecutionFailures {
if (o instanceof ReIndexingExecutionFailures) {
ReIndexingExecutionFailures that = (ReIndexingExecutionFailures) o;
- return Objects.equals(this.failures, that.failures);
+ return Objects.equals(this.messageFailures, that.messageFailures)
+ && Objects.equals(this.mailboxFailures, that.mailboxFailures);
}
return false;
}
@Override
public final int hashCode() {
- return Objects.hash(failures);
+ return Objects.hash(messageFailures, mailboxFailures);
}
-
}
diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ErrorRecoveryIndexationTask.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ErrorRecoveryIndexationTask.java
index df20874..91dfbe4 100644
--- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ErrorRecoveryIndexationTask.java
+++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ErrorRecoveryIndexationTask.java
@@ -49,8 +49,8 @@ public class ErrorRecoveryIndexationTask implements Task {
this.mailboxIdFactory = mailboxIdFactory;
}
- private List<ReIndexingExecutionFailures.ReIndexingFailure> failuresFromDTO(List<ErrorRecoveryIndexationTaskDTO.ReindexingFailureDTO> failureDTOs) {
- return failureDTOs
+ private List<ReIndexingExecutionFailures.ReIndexingFailure> messageFailuresFromDTO(List<ErrorRecoveryIndexationTaskDTO.ReindexingFailureDTO> messageFailures) {
+ return messageFailures
.stream()
.flatMap(dto -> dto.getUids()
.stream()
@@ -58,9 +58,17 @@ public class ErrorRecoveryIndexationTask implements Task {
.collect(Guavate.toImmutableList());
}
+ private List<MailboxId> mailboxFailuresFromDTO(Optional<List<String>> mailboxFailures) {
+ return mailboxFailures.map(mailboxIdList ->
+ mailboxIdList.stream()
+ .map(mailboxIdFactory::fromString)
+ .collect(Guavate.toImmutableList()))
+ .orElse(ImmutableList.of());
+ }
+
public ErrorRecoveryIndexationTask create(ErrorRecoveryIndexationTaskDTO dto) {
return new ErrorRecoveryIndexationTask(reIndexerPerformer,
- new ReIndexingExecutionFailures(failuresFromDTO(dto.getPreviousFailures())),
+ new ReIndexingExecutionFailures(messageFailuresFromDTO(dto.getPreviousFailures())),
dto.getRunningOptions()
.map(RunningOptionsDTO::toDomainObject)
.orElse(RunningOptions.DEFAULT));
diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ErrorRecoveryIndexationTaskDTO.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ErrorRecoveryIndexationTaskDTO.java
index 1385686..2ce86ba 100644
--- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ErrorRecoveryIndexationTaskDTO.java
+++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ErrorRecoveryIndexationTaskDTO.java
@@ -49,7 +49,7 @@ public class ErrorRecoveryIndexationTaskDTO implements TaskDTO {
public static ErrorRecoveryIndexationTaskDTO of(ErrorRecoveryIndexationTask task, String type) {
Multimap<MailboxId, ReIndexingExecutionFailures.ReIndexingFailure> failuresByMailboxId = task.getPreviousFailures()
- .failures()
+ .messageFailures()
.stream()
.collect(Guavate.toImmutableListMultimap(ReIndexingExecutionFailures.ReIndexingFailure::getMailboxId, Function.identity()));
diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
index e9b08d3..9dd74cb 100644
--- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
+++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
@@ -102,7 +102,7 @@ public class ReIndexerPerformer {
LOGGER.info("Starting a full reindex");
Flux<ReIndexingEntry> entriesToIndex = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession).list()
- .flatMap(mailbox -> reIndexingEntriesForMailbox(mailbox, mailboxSession));
+ .flatMap(mailbox -> reIndexingEntriesForMailbox(mailbox, mailboxSession, reprocessingContext));
return reIndexMessages(entriesToIndex, runningOptions, reprocessingContext)
.doFinally(any -> LOGGER.info("Full reindex finished"));
@@ -113,7 +113,7 @@ public class ReIndexerPerformer {
Flux<ReIndexingEntry> entriesToIndex = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession)
.findMailboxByIdReactive(mailboxId)
- .flatMapMany(mailbox -> reIndexingEntriesForMailbox(mailbox, mailboxSession));
+ .flatMapMany(mailbox -> reIndexingEntriesForMailbox(mailbox, mailboxSession, reprocessingContext));
return reIndexMessages(entriesToIndex, runningOptions, reprocessingContext);
}
@@ -127,7 +127,7 @@ public class ReIndexerPerformer {
try {
Flux<ReIndexingEntry> entriesToIndex = mailboxMapper.findMailboxWithPathLike(mailboxQuery.asUserBound())
- .flatMap(mailbox -> reIndexingEntriesForMailbox(mailbox, mailboxSession));
+ .flatMap(mailbox -> reIndexingEntriesForMailbox(mailbox, mailboxSession, reprocessingContext));
return reIndexMessages(entriesToIndex, runningOptions, reprocessingContext)
.doFinally(any -> LOGGER.info("User {} reindex finished", username.asString()));
@@ -162,11 +162,27 @@ public class ReIndexerPerformer {
}
Mono<Result> reIndexErrors(ReprocessingContext reprocessingContext, ReIndexingExecutionFailures previousReIndexingFailures, RunningOptions runningOptions) {
- return reIndexMessages(
- Flux.fromIterable(previousReIndexingFailures.failures())
- .flatMap(this::createReindexingEntryFromFailure),
- runningOptions,
- reprocessingContext);
+ MailboxSession mailboxSession = mailboxManager.createSystemSession(RE_INDEXER_PERFORMER_USER);
+ MailboxMapper mapper = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession);
+
+ return Flux.merge(
+ reIndexMessages(Flux.fromIterable(previousReIndexingFailures.messageFailures())
+ .flatMap(failure -> createReindexingEntryFromFailure(failure, reprocessingContext)),
+ runningOptions,
+ reprocessingContext),
+ reIndexMailboxes(Flux.fromIterable(previousReIndexingFailures.mailboxFailures())
+ .flatMap(mapper::findMailboxByIdReactive),
+ mailboxSession,
+ reprocessingContext,
+ runningOptions))
+ .reduce(Task::combine);
+ }
+
+ private Mono<Result> reIndexMailboxes(Flux<Mailbox> mailboxes, MailboxSession session, ReprocessingContext reprocessingContext, RunningOptions runningOptions) {
+ Flux<ReIndexingEntry> entries = mailboxes
+ .flatMap(mailbox -> reIndexingEntriesForMailbox(mailbox, session, reprocessingContext));
+
+ return reIndexMessages(entries, runningOptions, reprocessingContext);
}
private Mono<Result> reIndex(MailboxMessage mailboxMessage, MailboxSession session) {
@@ -186,22 +202,41 @@ public class ReIndexerPerformer {
.next();
}
- private Mono<ReIndexingEntry> createReindexingEntryFromFailure(ReIndexingFailure failure) {
+ private Mono<ReIndexingEntry> createReindexingEntryFromFailure(ReIndexingFailure failure, ReprocessingContext reprocessingContext) {
MailboxSession mailboxSession = mailboxManager.createSystemSession(RE_INDEXER_PERFORMER_USER);
return mailboxSessionMapperFactory.getMailboxMapper(mailboxSession)
.findMailboxByIdReactive(failure.getMailboxId())
.flatMap(mailbox -> fullyReadMessage(mailboxSession, mailbox, failure.getUid())
- .map(message -> new ReIndexingEntry(mailbox, mailboxSession, message)));
+ .map(message -> new ReIndexingEntry(mailbox, mailboxSession, message)))
+ .onErrorResume(e -> {
+ LOGGER.warn("ReIndexing failed for {}", failure, e);
+ reprocessingContext.recordFailureDetailsForMessage(failure.getMailboxId(), failure.getUid());
+ return Mono.empty();
+ });
}
- private Flux<ReIndexingEntry> reIndexingEntriesForMailbox(Mailbox mailbox, MailboxSession mailboxSession) {
+ private Flux<ReIndexingEntry> reIndexingEntriesForMailbox(Mailbox mailbox, MailboxSession mailboxSession, ReprocessingContext reprocessingContext) {
MessageMapper messageMapper = mailboxSessionMapperFactory.getMessageMapper(mailboxSession);
return messageSearchIndex.deleteAll(mailboxSession, mailbox.getMailboxId())
.thenMany(messageMapper.listAllMessageUids(mailbox))
- .flatMap(uid -> messageMapper.findInMailboxReactive(mailbox, MessageRange.one(uid), MessageMapper.FetchType.Full, UNLIMITED))
- .map(message -> new ReIndexingEntry(mailbox, mailboxSession, message));
+ .flatMap(uid -> reIndexingEntryForUid(mailbox, mailboxSession, reprocessingContext, messageMapper, uid))
+ .onErrorResume(e -> {
+ LOGGER.warn("ReIndexing failed for {}", mailbox.generateAssociatedPath(), e);
+ reprocessingContext.recordMailboxFailure(mailbox.getMailboxId());
+ return Mono.empty();
+ });
+ }
+
+ private Flux<ReIndexingEntry> reIndexingEntryForUid(Mailbox mailbox, MailboxSession mailboxSession, ReprocessingContext reprocessingContext, MessageMapper messageMapper, MessageUid uid) {
+ return messageMapper.findInMailboxReactive(mailbox, MessageRange.one(uid), MessageMapper.FetchType.Full, UNLIMITED)
+ .map(message -> new ReIndexingEntry(mailbox, mailboxSession, message))
+ .onErrorResume(e -> {
+ LOGGER.warn("ReIndexing failed for {} - {}", mailbox.getMailboxId(), uid, e);
+ reprocessingContext.recordFailureDetailsForMessage(mailbox.getMailboxId(), uid);
+ return Mono.empty();
+ });
}
private Mono<Task.Result> reIndexMessages(Flux<ReIndexingEntry> entriesToIndex, RunningOptions runningOptions, ReprocessingContext reprocessingContext) {
@@ -209,7 +244,12 @@ public class ReIndexerPerformer {
.window(runningOptions.getMessagesPerSecond(), Duration.ofSeconds(1))
.throttle(entriesToIndex)
.reduce(Task::combine)
- .switchIfEmpty(Mono.just(Result.COMPLETED));
+ .switchIfEmpty(Mono.fromSupplier(() -> {
+ if (reprocessingContext.failures().mailboxFailures().isEmpty()) {
+ return Result.COMPLETED;
+ }
+ return Result.PARTIAL;
+ }));
}
private Mono<Task.Result> reIndexMessage(MailboxSession mailboxSession, Mailbox mailbox, ReprocessingContext reprocessingContext, MailboxMessage message) {
diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReprocessingContext.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReprocessingContext.java
index c0c3eb7..bc4facb 100644
--- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReprocessingContext.java
+++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReprocessingContext.java
@@ -32,11 +32,13 @@ class ReprocessingContext {
private final AtomicInteger successfullyReprocessedMails;
private final AtomicInteger failedReprocessingMails;
private final ConcurrentLinkedDeque<ReIndexingExecutionFailures.ReIndexingFailure> failures;
+ private final ConcurrentLinkedDeque<MailboxId> mailboxFailures;
ReprocessingContext() {
failedReprocessingMails = new AtomicInteger(0);
successfullyReprocessedMails = new AtomicInteger(0);
failures = new ConcurrentLinkedDeque<>();
+ mailboxFailures = new ConcurrentLinkedDeque<>();
}
void recordFailureDetailsForMessage(MailboxId mailboxId, MessageUid uid) {
@@ -48,6 +50,10 @@ class ReprocessingContext {
successfullyReprocessedMails.incrementAndGet();
}
+ void recordMailboxFailure(MailboxId mailboxId) {
+ mailboxFailures.add(mailboxId);
+ }
+
int successfullyReprocessedMailCount() {
return successfullyReprocessedMails.get();
}
@@ -57,6 +63,6 @@ class ReprocessingContext {
}
ReIndexingExecutionFailures failures() {
- return new ReIndexingExecutionFailures(ImmutableList.copyOf(failures));
+ return new ReIndexingExecutionFailures(ImmutableList.copyOf(failures), ImmutableList.copyOf(mailboxFailures));
}
}
diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReprocessingContextInformationDTO.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReprocessingContextInformationDTO.java
index 446b387..5bf65dc 100644
--- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReprocessingContextInformationDTO.java
+++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReprocessingContextInformationDTO.java
@@ -169,7 +169,7 @@ public class ReprocessingContextInformationDTO implements AdditionalInformationD
}
static List<ReindexingFailureDTO> serializeFailures(ReIndexingExecutionFailures failures) {
- ImmutableListMultimap<MailboxId, ReIndexingExecutionFailures.ReIndexingFailure> failuresByMailbox = failures.failures()
+ ImmutableListMultimap<MailboxId, ReIndexingExecutionFailures.ReIndexingFailure> failuresByMailbox = failures.messageFailures()
.stream()
.collect(Guavate.toImmutableListMultimap(ReIndexingExecutionFailures.ReIndexingFailure::getMailboxId));
diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/SerializableReIndexingExecutionFailures.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/SerializableReIndexingExecutionFailures.java
index 3b3515a..5c7c489 100644
--- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/SerializableReIndexingExecutionFailures.java
+++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/SerializableReIndexingExecutionFailures.java
@@ -52,7 +52,7 @@ public class SerializableReIndexingExecutionFailures {
public static SerializableReIndexingExecutionFailures from(ReIndexingExecutionFailures reIndexingExecutionFailures) {
return new SerializableReIndexingExecutionFailures(
- reIndexingExecutionFailures.failures()
+ reIndexingExecutionFailures.messageFailures()
.stream()
.map(failure -> new SerializableReIndexingExecutionFailures.SerializableReIndexingFailure(failure.getMailboxId(), failure.getUid()))
.collect(Guavate.toImmutableList()));
diff --git a/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/CassandraReIndexerImplTest.java b/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/CassandraReIndexerImplTest.java
index 55ad6ed..bdd950e 100644
--- a/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/CassandraReIndexerImplTest.java
+++ b/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/CassandraReIndexerImplTest.java
@@ -19,6 +19,8 @@
package org.apache.mailbox.tools.indexer;
+import static org.apache.james.backends.cassandra.Scenario.Builder.fail;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@@ -34,10 +36,14 @@ import org.apache.james.backends.cassandra.CassandraClusterExtension;
import org.apache.james.core.Username;
import org.apache.james.mailbox.MailboxSession;
import org.apache.james.mailbox.MessageManager;
+import org.apache.james.mailbox.MessageManager.AppendCommand;
+import org.apache.james.mailbox.MessageManager.AppendResult;
import org.apache.james.mailbox.cassandra.CassandraMailboxManager;
import org.apache.james.mailbox.cassandra.CassandraMailboxManagerProvider;
import org.apache.james.mailbox.cassandra.mail.MailboxAggregateModule;
import org.apache.james.mailbox.indexer.ReIndexer;
+import org.apache.james.mailbox.indexer.ReIndexingExecutionFailures;
+import org.apache.james.mailbox.indexer.ReIndexingExecutionFailures.ReIndexingFailure;
import org.apache.james.mailbox.model.Mailbox;
import org.apache.james.mailbox.model.MailboxId;
import org.apache.james.mailbox.model.MailboxPath;
@@ -45,10 +51,15 @@ import org.apache.james.mailbox.store.MailboxSessionMapperFactory;
import org.apache.james.mailbox.store.PreDeletionHooks;
import org.apache.james.mailbox.store.mail.model.MailboxMessage;
import org.apache.james.mailbox.store.search.ListeningMessageSearchIndex;
+import org.apache.james.task.Task;
import org.apache.james.util.concurrency.ConcurrentTestRunner;
+import org.apache.mailbox.tools.indexer.ReprocessingContextInformationDTO.ReprocessingContextInformationForErrorRecoveryIndexationTask;
+import org.apache.mailbox.tools.indexer.ReprocessingContextInformationDTO.ReprocessingContextInformationForFullReindexingTask;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableList;
import com.google.common.base.Strings;
@@ -90,7 +101,7 @@ public class CassandraReIndexerImplTest {
ConcurrentTestRunner.builder()
.operation((a, b) -> mailbox
.appendMessage(
- MessageManager.AppendCommand.builder().build(bigBody),
+ AppendCommand.builder().build(bigBody),
systemSession))
.threadCount(threadCount)
.operationCount(operationCount)
@@ -105,4 +116,234 @@ public class CassandraReIndexerImplTest {
.add(any(MailboxSession.class), any(Mailbox.class),any(MailboxMessage.class));
verifyNoMoreInteractions(messageSearchIndex);
}
+
+ @Nested
+ class FailureTesting {
+ @Test
+ void fullReindexingShouldReturnPartialUponFailure(CassandraCluster cassandra) throws Exception {
+ MailboxSession session = mailboxManager.createSystemSession(USERNAME);
+ mailboxManager.createMailbox(INBOX, session);
+
+ MessageManager mailbox = mailboxManager.getMailbox(INBOX, session);
+ mailbox.appendMessage(AppendCommand.builder().build("header: value\r\n\r\nbody"), session);
+
+ cassandra.getConf()
+ .registerScenario(fail()
+ .forever()
+ .whenQueryStartsWith("SELECT messageId,mailboxId,uid,modSeq,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags FROM messageIdTable WHERE mailboxId=:mailboxId;"));
+
+ Task task = reIndexer.reIndex(ReIndexer.RunningOptions.DEFAULT);
+ Task.Result result = task.run();
+
+ assertThat(result).isEqualTo(Task.Result.PARTIAL);
+ }
+
+ @Test
+ void fullReindexingShouldUpdateDetailsUponFailure(CassandraCluster cassandra) throws Exception {
+ MailboxSession session = mailboxManager.createSystemSession(USERNAME);
+ mailboxManager.createMailbox(INBOX, session);
+
+ MessageManager mailbox = mailboxManager.getMailbox(INBOX, session);
+ mailbox.appendMessage(AppendCommand.builder().build("header: value\r\n\r\nbody"), session);
+
+ cassandra.getConf()
+ .registerScenario(fail()
+ .forever()
+ .whenQueryStartsWith("SELECT messageId,mailboxId,uid,modSeq,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags FROM messageIdTable WHERE mailboxId=:mailboxId;"));
+
+ Task task = reIndexer.reIndex(ReIndexer.RunningOptions.DEFAULT);
+ task.run();
+
+ ReprocessingContextInformationForFullReindexingTask information = (ReprocessingContextInformationForFullReindexingTask) task.details().get();
+ assertThat(information.failures().mailboxFailures()).containsExactly(mailbox.getId());
+ }
+
+ @Test
+ void singleMailboxReindexingShouldReturnPartialUponFailure(CassandraCluster cassandra) throws Exception {
+ MailboxSession session = mailboxManager.createSystemSession(USERNAME);
+ mailboxManager.createMailbox(INBOX, session);
+
+ MessageManager mailbox = mailboxManager.getMailbox(INBOX, session);
+ mailbox.appendMessage(AppendCommand.builder().build("header: value\r\n\r\nbody"), session);
+
+ cassandra.getConf()
+ .registerScenario(fail()
+ .forever()
+ .whenQueryStartsWith("SELECT messageId,mailboxId,uid,modSeq,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags FROM messageIdTable WHERE mailboxId=:mailboxId;"));
+
+ Task task = reIndexer.reIndex(mailbox.getId(), ReIndexer.RunningOptions.DEFAULT);
+ Task.Result result = task.run();
+
+ assertThat(result).isEqualTo(Task.Result.PARTIAL);
+ }
+
+ @Test
+ void singleMailboxReindexingShouldUpdateDetailsUponFailure(CassandraCluster cassandra) throws Exception {
+ MailboxSession session = mailboxManager.createSystemSession(USERNAME);
+ mailboxManager.createMailbox(INBOX, session);
+
+ MessageManager mailbox = mailboxManager.getMailbox(INBOX, session);
+ mailbox.appendMessage(AppendCommand.builder().build("header: value\r\n\r\nbody"), session);
+
+ cassandra.getConf()
+ .registerScenario(fail()
+ .forever()
+ .whenQueryStartsWith("SELECT messageId,mailboxId,uid,modSeq,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags FROM messageIdTable WHERE mailboxId=:mailboxId;"));
+
+ Task task = reIndexer.reIndex(mailbox.getId(), ReIndexer.RunningOptions.DEFAULT);
+ task.run();
+
+ SingleMailboxReindexingTask.AdditionalInformation information = (SingleMailboxReindexingTask.AdditionalInformation) task.details().get();
+ assertThat(information.failures().mailboxFailures()).containsExactly(mailbox.getId());
+ }
+
+ @Test
+ void userMailboxReindexingShouldReturnPartialUponFailure(CassandraCluster cassandra) throws Exception {
+ MailboxSession session = mailboxManager.createSystemSession(USERNAME);
+ mailboxManager.createMailbox(INBOX, session);
+
+ MessageManager mailbox = mailboxManager.getMailbox(INBOX, session);
+ mailbox.appendMessage(AppendCommand.builder().build("header: value\r\n\r\nbody"), session);
+
+ cassandra.getConf()
+ .registerScenario(fail()
+ .forever()
+ .whenQueryStartsWith("SELECT messageId,mailboxId,uid,modSeq,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags FROM messageIdTable WHERE mailboxId=:mailboxId;"));
+
+ Task task = reIndexer.reIndex(USERNAME, ReIndexer.RunningOptions.DEFAULT);
+ Task.Result result = task.run();
+
+ assertThat(result).isEqualTo(Task.Result.PARTIAL);
+ }
+
+ @Test
+ void userMailboxReindexingShouldUpdateDetailsUponFailure(CassandraCluster cassandra) throws Exception {
+ MailboxSession session = mailboxManager.createSystemSession(USERNAME);
+ mailboxManager.createMailbox(INBOX, session);
+
+ MessageManager mailbox = mailboxManager.getMailbox(INBOX, session);
+ mailbox.appendMessage(AppendCommand.builder().build("header: value\r\n\r\nbody"), session);
+
+ cassandra.getConf()
+ .registerScenario(fail()
+ .forever()
+ .whenQueryStartsWith("SELECT messageId,mailboxId,uid,modSeq,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags FROM messageIdTable WHERE mailboxId=:mailboxId;"));
+
+ Task task = reIndexer.reIndex(USERNAME, ReIndexer.RunningOptions.DEFAULT);
+ task.run();
+
+ UserReindexingTask.AdditionalInformation information = (UserReindexingTask.AdditionalInformation) task.details().get();
+ assertThat(information.failures().mailboxFailures()).containsExactly(mailbox.getId());
+ }
+
+ @Test
+ void errorReindexingShouldReturnPartialUponFailure(CassandraCluster cassandra) throws Exception {
+ MailboxSession session = mailboxManager.createSystemSession(USERNAME);
+ mailboxManager.createMailbox(INBOX, session);
+
+ MessageManager mailbox = mailboxManager.getMailbox(INBOX, session);
+ AppendResult appendResult = mailbox.appendMessage(AppendCommand.builder().build("header: value\r\n\r\nbody"), session);
+
+ cassandra.getConf()
+ .registerScenario(fail()
+ .forever()
+ .whenQueryStartsWith("SELECT messageId,mailboxId,uid,modSeq,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags FROM messageIdTable WHERE mailboxId=:mailboxId;"));
+
+ Task task = reIndexer.reIndex(new ReIndexingExecutionFailures(
+ ImmutableList.of(new ReIndexingFailure(mailbox.getId(),
+ appendResult.getId().getUid())),
+ ImmutableList.of(mailbox.getId())),
+ ReIndexer.RunningOptions.DEFAULT);
+ Task.Result result = task.run();
+
+ assertThat(result).isEqualTo(Task.Result.PARTIAL);
+ }
+
+ @Test
+ void errorReindexingShouldUpdateDetailsUponFailure(CassandraCluster cassandra) throws Exception {
+ MailboxSession session = mailboxManager.createSystemSession(USERNAME);
+ mailboxManager.createMailbox(INBOX, session);
+
+ MessageManager mailbox = mailboxManager.getMailbox(INBOX, session);
+ AppendResult appendResult = mailbox.appendMessage(AppendCommand.builder().build("header: value\r\n\r\nbody"), session);
+
+ cassandra.getConf()
+ .registerScenario(fail()
+ .forever()
+ .whenQueryStartsWith("SELECT messageId,mailboxId,uid,modSeq,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags FROM messageIdTable WHERE mailboxId=:mailboxId;"));
+
+ Task task = reIndexer.reIndex(new ReIndexingExecutionFailures(
+ ImmutableList.of(new ReIndexingFailure(mailbox.getId(),
+ appendResult.getId().getUid())),
+ ImmutableList.of(mailbox.getId())),
+ ReIndexer.RunningOptions.DEFAULT);
+ task.run();
+
+ ReprocessingContextInformationForErrorRecoveryIndexationTask information = (ReprocessingContextInformationForErrorRecoveryIndexationTask) task.details().get();
+ assertThat(information.failures().mailboxFailures()).containsExactly(mailbox.getId());
+ }
+
+ @Test
+ void errorReindexingShouldUpdateDetailsUponReadingMailboxError(CassandraCluster cassandra) throws Exception {
+ MailboxSession session = mailboxManager.createSystemSession(USERNAME);
+ mailboxManager.createMailbox(INBOX, session);
+
+ MessageManager mailbox = mailboxManager.getMailbox(INBOX, session);
+ AppendResult appendResult = mailbox.appendMessage(AppendCommand.builder().build("header: value\r\n\r\nbody"), session);
+
+ cassandra.getConf()
+ .registerScenario(fail()
+ .forever()
+ .whenQueryStartsWith("SELECT id,mailboxbase,uidvalidity,name FROM mailbox WHERE id=:id;"));
+
+ Task task = reIndexer.reIndex(new ReIndexingExecutionFailures(
+ ImmutableList.of(new ReIndexingFailure(mailbox.getId(),
+ appendResult.getId().getUid())),
+ ImmutableList.of()),
+ ReIndexer.RunningOptions.DEFAULT);
+ task.run();
+
+ ReprocessingContextInformationForErrorRecoveryIndexationTask information = (ReprocessingContextInformationForErrorRecoveryIndexationTask) task.details().get();
+ assertThat(information.failures().messageFailures()).containsExactly(new ReIndexingFailure(mailbox.getId(), appendResult.getId().getUid()));
+ }
+
+ @Test
+ void fullReindexingShouldUpdateDetailsUponSingleMessageFullReadError(CassandraCluster cassandra) throws Exception {
+ MailboxSession session = mailboxManager.createSystemSession(USERNAME);
+ mailboxManager.createMailbox(INBOX, session);
+
+ MessageManager mailbox = mailboxManager.getMailbox(INBOX, session);
+ AppendResult appendResult = mailbox.appendMessage(AppendCommand.builder().build("header: value\r\n\r\nbody"), session);
+
+ cassandra.getConf()
+ .registerScenario(fail()
+ .forever()
+ .whenQueryStartsWith("SELECT messageId,internalDate,bodyStartOctet,fullContentOctets,bodyOctets,bodyContent,headerContent,textualLineCount,properties,attachments FROM messageV2 WHERE messageId=:messageId;"));
+
+ Task task = reIndexer.reIndex(ReIndexer.RunningOptions.DEFAULT);
+ task.run();
+
+ ReprocessingContextInformationForFullReindexingTask information = (ReprocessingContextInformationForFullReindexingTask) task.details().get();
+ assertThat(information.failures().messageFailures()).containsExactly(new ReIndexingFailure(mailbox.getId(), appendResult.getId().getUid()));
+ }
+ }
+
+ @Test
+ void errorReindexingShouldReindexPreviouslyFailedMailbox() throws Exception {
+ MailboxSession session = mailboxManager.createSystemSession(USERNAME);
+ mailboxManager.createMailbox(INBOX, session);
+
+ MessageManager mailbox = mailboxManager.getMailbox(INBOX, session);
+ mailbox.appendMessage(AppendCommand.builder().build("header: value\r\n\r\nbody"), session);
+
+ Task task = reIndexer.reIndex(new ReIndexingExecutionFailures(
+ ImmutableList.of(),
+ ImmutableList.of(mailbox.getId())),
+ ReIndexer.RunningOptions.DEFAULT);
+ task.run();
+
+ verify(messageSearchIndex).deleteAll(any(MailboxSession.class), any(MailboxId.class));
+ verify(messageSearchIndex, times(1))
+ .add(any(MailboxSession.class), any(Mailbox.class),any(MailboxMessage.class));
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org