You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/05/17 09:51:38 UTC
[james-project] 02/03: MAILBOX-351 Add detailed failure report to
ElasticSearch reIndexing
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 7b850db65c6abad4f7d71e92d8178718ff424c92
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Thu May 16 12:06:50 2019 +0700
MAILBOX-351 Add detailed failure report to ElasticSearch reIndexing
---
mailbox/tools/indexer/pom.xml | 10 +++
.../mailbox/tools/indexer/FullReindexingTask.java | 4 ++
.../mailbox/tools/indexer/ReIndexerPerformer.java | 11 ++--
...ntext.java => ReIndexingExecutionFailures.java} | 52 ++++++++-------
.../mailbox/tools/indexer/ReprocessingContext.java | 36 ++++++-----
.../tools/indexer/SingleMailboxReindexingTask.java | 4 ++
.../tools/indexer/SingleMessageReindexingTask.java | 6 +-
.../indexer/ReIndexingExecutionFailuresTest.java | 73 ++++++++++++++++++++++
8 files changed, 153 insertions(+), 43 deletions(-)
diff --git a/mailbox/tools/indexer/pom.xml b/mailbox/tools/indexer/pom.xml
index 764c019..27c8b2e 100644
--- a/mailbox/tools/indexer/pom.xml
+++ b/mailbox/tools/indexer/pom.xml
@@ -91,6 +91,11 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>com.fasterxml.jackson.datatype</groupId>
+ <artifactId>jackson-datatype-guava</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
<scope>test</scope>
@@ -105,6 +110,11 @@
<artifactId>javax.inject</artifactId>
</dependency>
<dependency>
+ <groupId>net.javacrumbs.json-unit</groupId>
+ <artifactId>json-unit-assertj</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/FullReindexingTask.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/FullReindexingTask.java
index 196c1d4..a9ebec8 100644
--- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/FullReindexingTask.java
+++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/FullReindexingTask.java
@@ -45,6 +45,10 @@ public class FullReindexingTask implements Task {
public int getFailedReprocessedMailCount() {
return reprocessingContext.failedReprocessingMailCount();
}
+
+ public ReIndexingExecutionFailures failures() {
+ return reprocessingContext.failures();
+ }
}
private final ReIndexerPerformer reIndexerPerformer;
diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
index c1f6cc9..1411d70 100644
--- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
+++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
@@ -75,8 +75,7 @@ public class ReIndexerPerformer {
mailboxSessionMapperFactory.getMessageMapper(mailboxSession)
.findInMailbox(mailbox, MessageRange.all(), MessageMapper.FetchType.Metadata, NO_LIMIT))
.map(MailboxMessage::getUid)
- .map(uid -> handleMessageReIndexing(mailboxSession, mailbox, uid))
- .peek(reprocessingContext::updateAccordingToReprocessingResult)
+ .map(uid -> handleMessageReIndexing(mailboxSession, mailbox, uid, reprocessingContext))
.reduce(Task::combine)
.orElse(Task.Result.COMPLETED);
} finally {
@@ -113,11 +112,11 @@ public class ReIndexerPerformer {
}
}
- Task.Result handleMessageReIndexing(MailboxId mailboxId, MessageUid uid) throws MailboxException {
+ Task.Result handleMessageReIndexing(MailboxId mailboxId, MessageUid uid, ReprocessingContext reprocessingContext) throws MailboxException {
MailboxSession mailboxSession = mailboxManager.createSystemSession(RE_INDEXING);
Mailbox mailbox = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession).findMailboxById(mailboxId);
- return handleMessageReIndexing(mailboxSession, mailbox, uid);
+ return handleMessageReIndexing(mailboxSession, mailbox, uid, reprocessingContext);
}
private Task.Result reIndex(Stream<MailboxId> mailboxIds, ReprocessingContext reprocessingContext) {
@@ -134,14 +133,16 @@ public class ReIndexerPerformer {
.orElse(Task.Result.COMPLETED);
}
- private Task.Result handleMessageReIndexing(MailboxSession mailboxSession, Mailbox mailbox, MessageUid uid) {
+ private Task.Result handleMessageReIndexing(MailboxSession mailboxSession, Mailbox mailbox, MessageUid uid, ReprocessingContext reprocessingContext) {
try {
Optional.of(uid)
.flatMap(Throwing.function(mUid -> fullyReadMessage(mailboxSession, mailbox, mUid)))
.ifPresent(Throwing.consumer(message -> messageSearchIndex.add(mailboxSession, mailbox, message)));
+ reprocessingContext.recordSuccess();
return Task.Result.COMPLETED;
} catch (Exception e) {
LOGGER.warn("ReIndexing failed for {} {}", mailbox.generateAssociatedPath(), uid, e);
+ reprocessingContext.recordFailureDetailsForMessage(mailbox.getMailboxId(), uid);
return Task.Result.PARTIAL;
}
}
diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReprocessingContext.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexingExecutionFailures.java
similarity index 52%
copy from mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReprocessingContext.java
copy to mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexingExecutionFailures.java
index 08417cf..fd56d09 100644
--- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReprocessingContext.java
+++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexingExecutionFailures.java
@@ -19,35 +19,45 @@
package org.apache.mailbox.tools.indexer;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.List;
-import org.apache.james.task.Task;
+import org.apache.james.mailbox.MessageUid;
+import org.apache.james.mailbox.model.MailboxId;
-public class ReprocessingContext {
- private final AtomicInteger successfullyReprocessedMails;
- private final AtomicInteger failedReprocessingMails;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonValue;
+import com.github.steveash.guavate.Guavate;
+import com.google.common.collect.Multimap;
- public ReprocessingContext() {
- failedReprocessingMails = new AtomicInteger(0);
- successfullyReprocessedMails = new AtomicInteger(0);
- }
+public class ReIndexingExecutionFailures {
+ public static class ReIndexingFailure {
+ private final MailboxId mailboxId;
+ private final MessageUid uid;
+
+ public ReIndexingFailure(MailboxId mailboxId, MessageUid uid) {
+ this.mailboxId = mailboxId;
+ this.uid = uid;
+ }
- public void updateAccordingToReprocessingResult(Task.Result result) {
- switch (result) {
- case COMPLETED:
- successfullyReprocessedMails.incrementAndGet();
- break;
- case PARTIAL:
- failedReprocessingMails.incrementAndGet();
- break;
+ @JsonIgnore
+ public String getMailboxId() {
+ return mailboxId.serialize();
+ }
+
+ public long getUid() {
+ return uid.asLong();
}
}
- public int successfullyReprocessedMailCount() {
- return successfullyReprocessedMails.get();
+ private final List<ReIndexingFailure> failures;
+
+ public ReIndexingExecutionFailures(List<ReIndexingFailure> failures) {
+ this.failures = failures;
}
- public int failedReprocessingMailCount() {
- return failedReprocessingMails.get();
+ @JsonValue
+ public Multimap<String, ReIndexingFailure> failures() {
+ return failures.stream()
+ .collect(Guavate.toImmutableListMultimap(ReIndexingFailure::getMailboxId));
}
}
diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReprocessingContext.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReprocessingContext.java
index 08417cf..4834bcb 100644
--- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReprocessingContext.java
+++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReprocessingContext.java
@@ -19,35 +19,43 @@
package org.apache.mailbox.tools.indexer;
+import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.james.task.Task;
+import org.apache.james.mailbox.MessageUid;
+import org.apache.james.mailbox.model.MailboxId;
-public class ReprocessingContext {
+import com.google.common.collect.ImmutableList;
+
+class ReprocessingContext {
private final AtomicInteger successfullyReprocessedMails;
private final AtomicInteger failedReprocessingMails;
+ private final ConcurrentLinkedDeque<ReIndexingExecutionFailures.ReIndexingFailure> failures;
- public ReprocessingContext() {
+ ReprocessingContext() {
failedReprocessingMails = new AtomicInteger(0);
successfullyReprocessedMails = new AtomicInteger(0);
+ failures = new ConcurrentLinkedDeque<>();
+ }
+
+ void recordFailureDetailsForMessage(MailboxId mailboxId, MessageUid uid) {
+ failures.add(new ReIndexingExecutionFailures.ReIndexingFailure(mailboxId, uid));
+ failedReprocessingMails.incrementAndGet();
}
- public void updateAccordingToReprocessingResult(Task.Result result) {
- switch (result) {
- case COMPLETED:
- successfullyReprocessedMails.incrementAndGet();
- break;
- case PARTIAL:
- failedReprocessingMails.incrementAndGet();
- break;
- }
+ void recordSuccess() {
+ successfullyReprocessedMails.incrementAndGet();
}
- public int successfullyReprocessedMailCount() {
+ int successfullyReprocessedMailCount() {
return successfullyReprocessedMails.get();
}
- public int failedReprocessingMailCount() {
+ int failedReprocessingMailCount() {
return failedReprocessingMails.get();
}
+
+ ReIndexingExecutionFailures failures() {
+ return new ReIndexingExecutionFailures(ImmutableList.copyOf(failures));
+ }
}
diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/SingleMailboxReindexingTask.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/SingleMailboxReindexingTask.java
index 5aade16..138f8a4 100644
--- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/SingleMailboxReindexingTask.java
+++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/SingleMailboxReindexingTask.java
@@ -52,6 +52,10 @@ public class SingleMailboxReindexingTask implements Task {
public int getFailedReprocessedMailCount() {
return reprocessingContext.failedReprocessingMailCount();
}
+
+ public ReIndexingExecutionFailures failures() {
+ return reprocessingContext.failures();
+ }
}
private final ReIndexerPerformer reIndexerPerformer;
diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/SingleMessageReindexingTask.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/SingleMessageReindexingTask.java
index 749bf36..0a72284 100644
--- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/SingleMessageReindexingTask.java
+++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/SingleMessageReindexingTask.java
@@ -60,7 +60,7 @@ public class SingleMessageReindexingTask implements Task {
private final AdditionalInformation additionalInformation;
@Inject
- public SingleMessageReindexingTask(ReIndexerPerformer reIndexerPerformer, MailboxId mailboxId, MessageUid uid) {
+ SingleMessageReindexingTask(ReIndexerPerformer reIndexerPerformer, MailboxId mailboxId, MessageUid uid) {
this.reIndexerPerformer = reIndexerPerformer;
this.mailboxId = mailboxId;
this.uid = uid;
@@ -70,9 +70,9 @@ public class SingleMessageReindexingTask implements Task {
@Override
public Result run() {
try {
- return reIndexerPerformer.handleMessageReIndexing(mailboxId, uid);
+ return reIndexerPerformer.handleMessageReIndexing(mailboxId, uid, new ReprocessingContext());
} catch (MailboxException e) {
- LOGGER.warn("Error encounteres while reindexing {} : {}",mailboxId, uid, e);
+ LOGGER.warn("Error encounteres while reindexing {} : {}", mailboxId, uid, e);
return Result.PARTIAL;
}
}
diff --git a/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/ReIndexingExecutionFailuresTest.java b/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/ReIndexingExecutionFailuresTest.java
new file mode 100644
index 0000000..f224113
--- /dev/null
+++ b/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/ReIndexingExecutionFailuresTest.java
@@ -0,0 +1,73 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.mailbox.tools.indexer;
+
+import static net.javacrumbs.jsonunit.assertj.JsonAssertions.assertThatJson;
+
+import org.apache.james.mailbox.MessageUid;
+import org.apache.james.mailbox.inmemory.InMemoryId;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableList;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.datatype.guava.GuavaModule;
+
+import net.javacrumbs.jsonunit.core.Option;
+
+class ReIndexingExecutionFailuresTest {
+ private ObjectMapper objectMapper;
+
+ @BeforeEach
+ void setUp() {
+ objectMapper = new ObjectMapper();
+ objectMapper.registerModules(new GuavaModule());
+ }
+
+ @Test
+ void failuresShouldBeSerializedAsEmptyArrayWhenNone() throws Exception {
+ ReIndexingExecutionFailures failures = new ReIndexingExecutionFailures(ImmutableList.of());
+
+ assertThatJson(objectMapper.writeValueAsString(failures))
+ .when(Option.IGNORING_ARRAY_ORDER)
+ .isEqualTo("{}");
+ }
+
+ @Test
+ void failuresShouldBeSerializedGroupedByMailboxId() throws Exception {
+ ReIndexingExecutionFailures failures = new ReIndexingExecutionFailures(ImmutableList.of(
+ new ReIndexingExecutionFailures.ReIndexingFailure(InMemoryId.of(45), MessageUid.of(34)),
+ new ReIndexingExecutionFailures.ReIndexingFailure(InMemoryId.of(45), MessageUid.of(33)),
+ new ReIndexingExecutionFailures.ReIndexingFailure(InMemoryId.of(44), MessageUid.of(31)),
+ new ReIndexingExecutionFailures.ReIndexingFailure(InMemoryId.of(44), MessageUid.of(34)),
+ new ReIndexingExecutionFailures.ReIndexingFailure(InMemoryId.of(41), MessageUid.of(18)),
+ new ReIndexingExecutionFailures.ReIndexingFailure(InMemoryId.of(16), MessageUid.of(24))));
+
+ assertThatJson(objectMapper.writeValueAsString(failures))
+ .when(Option.IGNORING_ARRAY_ORDER)
+ .isEqualTo("{" +
+ " \"45\":[{\"uid\":34}, {\"uid\":33}]," +
+ " \"44\":[{\"uid\":31}, {\"uid\":34}]," +
+ " \"41\":[{\"uid\":18}]," +
+ " \"16\":[{\"uid\":24}]" +
+ "}");
+ }
+
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org