You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/06/25 02:56:11 UTC
[james-project] 05/10: JAMES-3202 Integration tests for Reindex
with Mode set to Correct
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit bdfdeb6c6b1eaa6dce5cfdacafb82be0786b86d9
Author: Rene Cordier <rc...@linagora.com>
AuthorDate: Fri Jun 12 17:00:04 2020 +0700
JAMES-3202 Integration tests for Reindex with Mode set to Correct
---
server/protocols/webadmin/webadmin-mailbox/pom.xml | 18 +-
.../james/webadmin/routes/MailboxesRoutesTest.java | 423 ++++++++++++++++++++-
.../webadmin/routes/UserMailboxesRoutesTest.java | 259 ++++++++++++-
3 files changed, 666 insertions(+), 34 deletions(-)
diff --git a/server/protocols/webadmin/webadmin-mailbox/pom.xml b/server/protocols/webadmin/webadmin-mailbox/pom.xml
index c4868ce..89777b6 100644
--- a/server/protocols/webadmin/webadmin-mailbox/pom.xml
+++ b/server/protocols/webadmin/webadmin-mailbox/pom.xml
@@ -35,7 +35,8 @@
<dependencies>
<dependency>
<groupId>${james.groupId}</groupId>
- <artifactId>james-server-dnsservice-test</artifactId>
+ <artifactId>apache-james-backends-es</artifactId>
+ <type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
@@ -50,6 +51,10 @@
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
+ <artifactId>apache-james-mailbox-elasticsearch</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>${james.groupId}</groupId>
<artifactId>apache-james-mailbox-event-json</artifactId>
</dependency>
<dependency>
@@ -93,12 +98,6 @@
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
- <artifactId>apache-james-backends-es</artifactId>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>${james.groupId}</groupId>
<artifactId>apache-james-mailbox-quota-search-elasticsearch</artifactId>
<scope>test</scope>
</dependency>
@@ -153,6 +152,11 @@
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
+ <artifactId>james-server-dnsservice-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>${james.groupId}</groupId>
<artifactId>james-server-task-json</artifactId>
</dependency>
<dependency>
diff --git a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MailboxesRoutesTest.java b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MailboxesRoutesTest.java
index 65ce22a..244331a 100644
--- a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MailboxesRoutesTest.java
+++ b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MailboxesRoutesTest.java
@@ -23,28 +23,59 @@ import static io.restassured.RestAssured.given;
import static io.restassured.RestAssured.when;
import static io.restassured.RestAssured.with;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
+import java.nio.charset.StandardCharsets;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.util.Date;
+import java.util.List;
+
+import javax.mail.Flags;
+import javax.mail.util.SharedByteArrayInputStream;
+
+import org.apache.james.backends.es.DockerElasticSearchExtension;
+import org.apache.james.backends.es.ElasticSearchIndexer;
+import org.apache.james.backends.es.ReactorElasticSearchClient;
import org.apache.james.core.Username;
import org.apache.james.json.DTOConverter;
import org.apache.james.mailbox.MailboxSession;
+import org.apache.james.mailbox.MessageIdManager;
import org.apache.james.mailbox.MessageManager;
+import org.apache.james.mailbox.MessageUid;
+import org.apache.james.mailbox.elasticsearch.IndexAttachments;
+import org.apache.james.mailbox.elasticsearch.MailboxElasticSearchConstants;
+import org.apache.james.mailbox.elasticsearch.MailboxIdRoutingKeyFactory;
+import org.apache.james.mailbox.elasticsearch.MailboxIndexCreationUtil;
+import org.apache.james.mailbox.elasticsearch.events.ElasticSearchListeningMessageSearchIndex;
+import org.apache.james.mailbox.elasticsearch.json.MessageToElasticSearchJson;
+import org.apache.james.mailbox.elasticsearch.query.CriterionConverter;
+import org.apache.james.mailbox.elasticsearch.query.QueryConverter;
+import org.apache.james.mailbox.elasticsearch.search.ElasticSearchSearcher;
import org.apache.james.mailbox.indexer.ReIndexer;
import org.apache.james.mailbox.inmemory.InMemoryId;
import org.apache.james.mailbox.inmemory.InMemoryMailboxManager;
+import org.apache.james.mailbox.inmemory.InMemoryMessageId;
import org.apache.james.mailbox.inmemory.manager.InMemoryIntegrationResources;
import org.apache.james.mailbox.model.ComposedMessageId;
+import org.apache.james.mailbox.model.FetchGroup;
import org.apache.james.mailbox.model.Mailbox;
import org.apache.james.mailbox.model.MailboxId;
import org.apache.james.mailbox.model.MailboxPath;
+import org.apache.james.mailbox.model.MessageResult;
+import org.apache.james.mailbox.model.UpdatedFlags;
+import org.apache.james.mailbox.store.extractor.DefaultTextExtractor;
import org.apache.james.mailbox.store.mail.model.MailboxMessage;
+import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder;
+import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
import org.apache.james.mailbox.store.search.ListeningMessageSearchIndex;
import org.apache.james.task.Hostname;
import org.apache.james.task.MemoryTaskManager;
@@ -63,35 +94,74 @@ import org.apache.mailbox.tools.indexer.SingleMailboxReindexingTask;
import org.apache.mailbox.tools.indexer.SingleMessageReindexingTask;
import org.apache.mailbox.tools.indexer.SingleMessageReindexingTaskAdditionalInformationDTO;
import org.eclipse.jetty.http.HttpStatus;
+import org.elasticsearch.index.IndexNotFoundException;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.ArgumentCaptor;
-import org.mockito.Mockito;
import org.testcontainers.shaded.com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableList;
+
import io.restassured.RestAssured;
import reactor.core.publisher.Mono;
class MailboxesRoutesTest {
- private static final Username USERNAME = Username.of("benwa@apache.org");
- private static final MailboxPath INBOX = MailboxPath.inbox(USERNAME);
+ static final Username USERNAME = Username.of("benwa@apache.org");
+ static final MailboxPath INBOX = MailboxPath.inbox(USERNAME);
+ static final int BATCH_SIZE = 1;
+ static final int SEARCH_SIZE = 1;
+
+ @RegisterExtension
+ DockerElasticSearchExtension elasticSearch = new DockerElasticSearchExtension();
- private WebAdminServer webAdminServer;
- private ListeningMessageSearchIndex searchIndex;
- private InMemoryMailboxManager mailboxManager;
- private MemoryTaskManager taskManager;
+ WebAdminServer webAdminServer;
+ ListeningMessageSearchIndex searchIndex;
+ InMemoryMailboxManager mailboxManager;
+ MessageIdManager messageIdManager;
+ MemoryTaskManager taskManager;
+ ReactorElasticSearchClient client;
@BeforeEach
- void beforeEach() {
- mailboxManager = InMemoryIntegrationResources.defaultResources().getMailboxManager();
+ void beforeEach() throws Exception {
+ client = MailboxIndexCreationUtil.prepareDefaultClient(
+ elasticSearch.getDockerElasticSearch().clientProvider().get(),
+ elasticSearch.getDockerElasticSearch().configuration());
+
+ InMemoryMessageId.Factory messageIdFactory = new InMemoryMessageId.Factory();
+ MailboxIdRoutingKeyFactory routingKeyFactory = new MailboxIdRoutingKeyFactory();
+
+ InMemoryIntegrationResources resources = InMemoryIntegrationResources.builder()
+ .preProvisionnedFakeAuthenticator()
+ .fakeAuthorizator()
+ .inVmEventBus()
+ .defaultAnnotationLimits()
+ .defaultMessageParser()
+ .listeningSearchIndex(preInstanciationStage -> new ElasticSearchListeningMessageSearchIndex(
+ preInstanciationStage.getMapperFactory(),
+ new ElasticSearchIndexer(client,
+ MailboxElasticSearchConstants.DEFAULT_MAILBOX_WRITE_ALIAS,
+ BATCH_SIZE),
+ new ElasticSearchSearcher(client, new QueryConverter(new CriterionConverter()), SEARCH_SIZE,
+ new InMemoryId.Factory(), messageIdFactory,
+ MailboxElasticSearchConstants.DEFAULT_MAILBOX_READ_ALIAS, routingKeyFactory),
+ new MessageToElasticSearchJson(new DefaultTextExtractor(), ZoneId.of("Europe/Paris"), IndexAttachments.YES),
+ preInstanciationStage.getSessionProvider(), routingKeyFactory))
+ .noPreDeletionHooks()
+ .storeQuotaManager()
+ .build();
+
+ mailboxManager = resources.getMailboxManager();
+ messageIdManager = resources.getMessageIdManager();
taskManager = new MemoryTaskManager(new Hostname("foo"));
InMemoryId.Factory mailboxIdFactory = new InMemoryId.Factory();
- searchIndex = mock(ListeningMessageSearchIndex.class);
- Mockito.when(searchIndex.add(any(), any(), any())).thenReturn(Mono.empty());
- Mockito.when(searchIndex.deleteAll(any(), any())).thenReturn(Mono.empty());
+
+ searchIndex = spy((ListeningMessageSearchIndex) resources.getSearchIndex());
+
ReIndexerPerformer reIndexerPerformer = new ReIndexerPerformer(
mailboxManager,
searchIndex,
@@ -271,6 +341,163 @@ class MailboxesRoutesTest {
.body("taskId", Matchers.is(notNullValue()))
.body("additionalInformation.mailboxFailures", Matchers.containsInAnyOrder(mailboxId.serialize()));
}
+
+ @Test
+ void fullReprocessingWithCorrectModeShouldReturnTaskDetailsWhenMails() throws Exception {
+ MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
+ MailboxId mailboxId = mailboxManager.createMailbox(INBOX, systemSession).get();
+ Mailbox mailbox = mailboxManager.getMailbox(mailboxId, systemSession).getMailboxEntity();
+
+ ComposedMessageId result = mailboxManager.getMailbox(INBOX, systemSession)
+ .appendMessage(
+ MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
+ systemSession)
+ .getId();
+ mailboxManager.getMailbox(INBOX, systemSession)
+ .appendMessage(
+ MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
+ systemSession);
+
+ List<MessageResult> messages = messageIdManager.getMessages(ImmutableList.of(result.getMessageId()), FetchGroup.MINIMAL, systemSession);
+
+ Flags newFlags = new Flags(Flags.Flag.DRAFT);
+ UpdatedFlags updatedFlags = UpdatedFlags.builder()
+ .uid(result.getUid())
+ .modSeq(messages.get(0).getModSeq())
+ .oldFlags(new Flags())
+ .newFlags(newFlags)
+ .build();
+
+ // We update on the searchIndex level to try to create inconsistencies
+ searchIndex.update(systemSession, mailbox, ImmutableList.of(updatedFlags)).block();
+
+ String taskId = with()
+ .post("/mailboxes?task=reIndex&mode=fixOutdated")
+ .jsonPath()
+ .get("taskId");
+
+ given()
+ .basePath(TasksRoutes.BASE)
+ .when()
+ .get(taskId + "/await")
+ .then()
+ .body("status", is("completed"))
+ .body("taskId", is(notNullValue()))
+ .body("type", is(FullReindexingTask.FULL_RE_INDEXING.asString()))
+ .body("additionalInformation.successfullyReprocessedMailCount", is(2))
+ .body("additionalInformation.failedReprocessedMailCount", is(0))
+ .body("startedDate", is(notNullValue()))
+ .body("submitDate", is(notNullValue()))
+ .body("completedDate", is(notNullValue()));
+ }
+
+ @Test
+ void fullReprocessingWithCorrectModeShouldFixInconsistenciesInES() throws Exception {
+ MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
+ MailboxId mailboxId = mailboxManager.createMailbox(INBOX, systemSession).get();
+ Mailbox mailbox = mailboxManager.getMailbox(mailboxId, systemSession).getMailboxEntity();
+
+ ComposedMessageId result = mailboxManager.getMailbox(INBOX, systemSession)
+ .appendMessage(
+ MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
+ systemSession)
+ .getId();
+
+ Flags initialFlags = searchIndex.retrieveIndexedFlags(mailbox, result.getUid()).block();
+
+ List<MessageResult> messages = messageIdManager.getMessages(ImmutableList.of(result.getMessageId()), FetchGroup.MINIMAL, systemSession);
+
+ Flags newFlags = new Flags(Flags.Flag.DRAFT);
+ UpdatedFlags updatedFlags = UpdatedFlags.builder()
+ .uid(result.getUid())
+ .modSeq(messages.get(0).getModSeq())
+ .oldFlags(new Flags())
+ .newFlags(newFlags)
+ .build();
+
+ // We update on the searchIndex level to try to create inconsistencies
+ searchIndex.update(systemSession, mailbox, ImmutableList.of(updatedFlags)).block();
+
+ String taskId = with()
+ .post("/mailboxes?task=reIndex&mode=fixOutdated")
+ .jsonPath()
+ .get("taskId");
+
+ given()
+ .basePath(TasksRoutes.BASE)
+ .when()
+ .get(taskId + "/await");
+
+ assertThat(searchIndex.retrieveIndexedFlags(mailbox, result.getUid()).block())
+ .isEqualTo(initialFlags);
+ }
+
+ @Test
+ void fullReprocessingWithCorrectModeShouldNotChangeDocumentsInESWhenNoInconsistencies() throws Exception {
+ MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
+ MailboxId mailboxId = mailboxManager.createMailbox(INBOX, systemSession).get();
+ Mailbox mailbox = mailboxManager.getMailbox(mailboxId, systemSession).getMailboxEntity();
+
+ ComposedMessageId result = mailboxManager.getMailbox(INBOX, systemSession)
+ .appendMessage(
+ MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
+ systemSession)
+ .getId();
+
+ Flags initialFlags = searchIndex.retrieveIndexedFlags(mailbox, result.getUid()).block();
+
+ String taskId = with()
+ .post("/mailboxes?task=reIndex&mode=fixOutdated")
+ .jsonPath()
+ .get("taskId");
+
+ given()
+ .basePath(TasksRoutes.BASE)
+ .when()
+ .get(taskId + "/await");
+
+ assertThat(searchIndex.retrieveIndexedFlags(mailbox, result.getUid()).block())
+ .isEqualTo(initialFlags);
+ }
+
+ @Disabled("JAMES-3202 Limitation of the current correct mode reindexation. We only check metadata and fix "
+ + "inconsistencies with ES, but we don't check for inconsistencies from ES to metadata")
+ @Test
+ void fullReprocessingWithCorrectModeShouldRemoveOrphanMessagesInES() throws Exception {
+ MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
+ MailboxId mailboxId = mailboxManager.createMailbox(INBOX, systemSession).get();
+ Mailbox mailbox = mailboxManager.getMailbox(mailboxId, systemSession).getMailboxEntity();
+
+ byte[] content = "Simple message content".getBytes(StandardCharsets.UTF_8);
+ MessageUid uid = MessageUid.of(22L);
+
+ SimpleMailboxMessage message = SimpleMailboxMessage.builder()
+ .messageId(InMemoryMessageId.of(42L))
+ .uid(uid)
+ .content(new SharedByteArrayInputStream(content))
+ .size(content.length)
+ .internalDate(new Date(ZonedDateTime.parse("2018-02-15T15:54:02Z").toEpochSecond()))
+ .bodyStartOctet(0)
+ .flags(new Flags("myFlags"))
+ .propertyBuilder(new PropertyBuilder())
+ .mailboxId(mailboxId)
+ .build();
+
+ searchIndex.add(systemSession, mailbox, message).block();
+
+ String taskId = with()
+ .post("/mailboxes?task=reIndex&mode=fixOutdated")
+ .jsonPath()
+ .get("taskId");
+
+ given()
+ .basePath(TasksRoutes.BASE)
+ .when()
+ .get(taskId + "/await");
+
+ assertThatThrownBy(() -> searchIndex.retrieveIndexedFlags(mailbox, uid).block())
+ .isInstanceOf(IndexNotFoundException.class);
+ }
}
@Nested
@@ -485,6 +712,172 @@ class MailboxesRoutesTest {
.body("taskId", Matchers.is(notNullValue()))
.body("additionalInformation.mailboxFailures", Matchers.containsInAnyOrder(mailboxId.serialize()));
}
+
+
+ @Test
+ void mailboxReprocessingWithCorrectModeShouldReturnTaskDetailsWhenMails() throws Exception {
+ MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
+ MailboxId mailboxId = mailboxManager.createMailbox(INBOX, systemSession).get();
+ Mailbox mailbox = mailboxManager.getMailbox(mailboxId, systemSession).getMailboxEntity();
+
+ ComposedMessageId result = mailboxManager.getMailbox(INBOX, systemSession)
+ .appendMessage(
+ MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
+ systemSession)
+ .getId();
+ mailboxManager.getMailbox(INBOX, systemSession)
+ .appendMessage(
+ MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
+ systemSession);
+
+ List<MessageResult> messages = messageIdManager.getMessages(ImmutableList.of(result.getMessageId()), FetchGroup.MINIMAL, systemSession);
+
+ Flags newFlags = new Flags(Flags.Flag.DRAFT);
+ UpdatedFlags updatedFlags = UpdatedFlags.builder()
+ .uid(result.getUid())
+ .modSeq(messages.get(0).getModSeq())
+ .oldFlags(new Flags())
+ .newFlags(newFlags)
+ .build();
+
+ // We update on the searchIndex level to try to create inconsistencies
+ searchIndex.update(systemSession, mailbox, ImmutableList.of(updatedFlags)).block();
+
+ String taskId = with()
+ .queryParam("task", "reIndex")
+ .queryParam("mode", "fixOutdated")
+ .post("/mailboxes/" + mailboxId.serialize())
+ .jsonPath()
+ .get("taskId");
+
+ given()
+ .basePath(TasksRoutes.BASE)
+ .when()
+ .get(taskId + "/await")
+ .then()
+ .body("status", is("completed"))
+ .body("taskId", is(notNullValue()))
+ .body("type", is(SingleMailboxReindexingTask.TYPE.asString()))
+ .body("additionalInformation.successfullyReprocessedMailCount", is(2))
+ .body("additionalInformation.failedReprocessedMailCount", is(0))
+ .body("startedDate", is(notNullValue()))
+ .body("submitDate", is(notNullValue()))
+ .body("completedDate", is(notNullValue()));
+ }
+
+ @Test
+ void mailboxReprocessingWithCorrectModeShouldFixInconsistenciesInES() throws Exception {
+ MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
+ MailboxId mailboxId = mailboxManager.createMailbox(INBOX, systemSession).get();
+ Mailbox mailbox = mailboxManager.getMailbox(mailboxId, systemSession).getMailboxEntity();
+
+ ComposedMessageId result = mailboxManager.getMailbox(INBOX, systemSession)
+ .appendMessage(
+ MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
+ systemSession)
+ .getId();
+
+ Flags initialFlags = searchIndex.retrieveIndexedFlags(mailbox, result.getUid()).block();
+
+ List<MessageResult> messages = messageIdManager.getMessages(ImmutableList.of(result.getMessageId()), FetchGroup.MINIMAL, systemSession);
+
+ Flags newFlags = new Flags(Flags.Flag.DRAFT);
+ UpdatedFlags updatedFlags = UpdatedFlags.builder()
+ .uid(result.getUid())
+ .modSeq(messages.get(0).getModSeq())
+ .oldFlags(new Flags())
+ .newFlags(newFlags)
+ .build();
+
+ // We update on the searchIndex level to try to create inconsistencies
+ searchIndex.update(systemSession, mailbox, ImmutableList.of(updatedFlags)).block();
+
+ String taskId = with()
+ .queryParam("task", "reIndex")
+ .queryParam("mode", "fixOutdated")
+ .post("/mailboxes/" + mailboxId.serialize())
+ .jsonPath()
+ .get("taskId");
+
+ given()
+ .basePath(TasksRoutes.BASE)
+ .when()
+ .get(taskId + "/await");
+
+ assertThat(searchIndex.retrieveIndexedFlags(mailbox, result.getUid()).block())
+ .isEqualTo(initialFlags);
+ }
+
+ @Test
+ void mailboxReprocessingWithCorrectModeShouldNotChangeDocumentsInESWhenNoInconsistencies() throws Exception {
+ MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
+ MailboxId mailboxId = mailboxManager.createMailbox(INBOX, systemSession).get();
+ Mailbox mailbox = mailboxManager.getMailbox(mailboxId, systemSession).getMailboxEntity();
+
+ ComposedMessageId result = mailboxManager.getMailbox(INBOX, systemSession)
+ .appendMessage(
+ MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
+ systemSession)
+ .getId();
+
+ Flags initialFlags = searchIndex.retrieveIndexedFlags(mailbox, result.getUid()).block();
+
+ String taskId = with()
+ .queryParam("task", "reIndex")
+ .queryParam("mode", "fixOutdated")
+ .post("/mailboxes/" + mailboxId.serialize())
+ .jsonPath()
+ .get("taskId");
+
+ given()
+ .basePath(TasksRoutes.BASE)
+ .when()
+ .get(taskId + "/await");
+
+ assertThat(searchIndex.retrieveIndexedFlags(mailbox, result.getUid()).block())
+ .isEqualTo(initialFlags);
+ }
+
+ @Disabled("JAMES-3202 Limitation of the current correct mode reindexation. We only check metadata and fix "
+ + "inconsistencies with ES, but we don't check for inconsistencies from ES to metadata")
+ @Test
+ void mailboxReprocessingWithCorrectModeShouldRemoveOrphanMessagesInES() throws Exception {
+ MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
+ MailboxId mailboxId = mailboxManager.createMailbox(INBOX, systemSession).get();
+ Mailbox mailbox = mailboxManager.getMailbox(mailboxId, systemSession).getMailboxEntity();
+
+ byte[] content = "Simple message content".getBytes(StandardCharsets.UTF_8);
+ MessageUid uid = MessageUid.of(22L);
+
+ SimpleMailboxMessage message = SimpleMailboxMessage.builder()
+ .messageId(InMemoryMessageId.of(42L))
+ .uid(uid)
+ .content(new SharedByteArrayInputStream(content))
+ .size(content.length)
+ .internalDate(new Date(ZonedDateTime.parse("2018-02-15T15:54:02Z").toEpochSecond()))
+ .bodyStartOctet(0)
+ .flags(new Flags("myFlags"))
+ .propertyBuilder(new PropertyBuilder())
+ .mailboxId(mailboxId)
+ .build();
+
+ searchIndex.add(systemSession, mailbox, message).block();
+
+ String taskId = with()
+ .queryParam("task", "reIndex")
+ .queryParam("mode", "fixOutdated")
+ .post("/mailboxes/" + mailboxId.serialize())
+ .jsonPath()
+ .get("taskId");
+
+ given()
+ .basePath(TasksRoutes.BASE)
+ .when()
+ .get(taskId + "/await");
+
+ assertThatThrownBy(() -> searchIndex.retrieveIndexedFlags(mailbox, uid).block())
+ .isInstanceOf(IndexNotFoundException.class);
+ }
}
@Nested
@@ -838,7 +1231,7 @@ class MailboxesRoutesTest {
}
@Test
- void mailboxReprocessingShouldReturnTaskDetailsWhenFailing() throws Exception {
+ void fixingReIndexingShouldReturnTaskDetailsWhenFailing() throws Exception {
MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
MailboxId mailboxId = mailboxManager.createMailbox(INBOX, systemSession).get();
ComposedMessageId composedMessageId = mailboxManager.getMailbox(INBOX, systemSession)
@@ -937,8 +1330,6 @@ class MailboxesRoutesTest {
.get(taskId + "/await");
reset(searchIndex);
- Mockito.when(searchIndex.add(any(), any(), any())).thenReturn(Mono.empty());
- Mockito.when(searchIndex.deleteAll(any(), any())).thenReturn(Mono.empty());
String fixingTaskId = with()
.queryParam("reIndexFailedMessagesOf", taskId)
diff --git a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/UserMailboxesRoutesTest.java b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/UserMailboxesRoutesTest.java
index ff3dbfb..3e18b00 100644
--- a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/UserMailboxesRoutesTest.java
+++ b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/UserMailboxesRoutesTest.java
@@ -26,6 +26,7 @@ import static io.restassured.http.ContentType.JSON;
import static org.apache.james.webadmin.Constants.SEPARATOR;
import static org.apache.james.webadmin.routes.UserMailboxesRoutes.USERS_BASE;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.eclipse.jetty.http.HttpStatus.BAD_REQUEST_400;
import static org.eclipse.jetty.http.HttpStatus.INTERNAL_SERVER_ERROR_500;
import static org.eclipse.jetty.http.HttpStatus.NOT_FOUND_404;
@@ -38,35 +39,63 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
+import java.nio.charset.StandardCharsets;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.util.Date;
import java.util.List;
import java.util.Map;
-import java.util.stream.Stream;
+import javax.mail.Flags;
+import javax.mail.util.SharedByteArrayInputStream;
+
+import org.apache.james.backends.es.DockerElasticSearchExtension;
+import org.apache.james.backends.es.ElasticSearchIndexer;
+import org.apache.james.backends.es.ReactorElasticSearchClient;
import org.apache.james.core.Username;
import org.apache.james.json.DTOConverter;
import org.apache.james.mailbox.MailboxManager;
import org.apache.james.mailbox.MailboxSession;
import org.apache.james.mailbox.MailboxSessionUtil;
+import org.apache.james.mailbox.MessageIdManager;
import org.apache.james.mailbox.MessageManager;
+import org.apache.james.mailbox.MessageUid;
+import org.apache.james.mailbox.elasticsearch.IndexAttachments;
+import org.apache.james.mailbox.elasticsearch.MailboxElasticSearchConstants;
+import org.apache.james.mailbox.elasticsearch.MailboxIdRoutingKeyFactory;
+import org.apache.james.mailbox.elasticsearch.MailboxIndexCreationUtil;
+import org.apache.james.mailbox.elasticsearch.events.ElasticSearchListeningMessageSearchIndex;
+import org.apache.james.mailbox.elasticsearch.json.MessageToElasticSearchJson;
+import org.apache.james.mailbox.elasticsearch.query.CriterionConverter;
+import org.apache.james.mailbox.elasticsearch.query.QueryConverter;
+import org.apache.james.mailbox.elasticsearch.search.ElasticSearchSearcher;
import org.apache.james.mailbox.exception.MailboxException;
import org.apache.james.mailbox.exception.MailboxExistsException;
import org.apache.james.mailbox.exception.MailboxNotFoundException;
import org.apache.james.mailbox.indexer.ReIndexer;
import org.apache.james.mailbox.inmemory.InMemoryId;
import org.apache.james.mailbox.inmemory.InMemoryMailboxManager;
+import org.apache.james.mailbox.inmemory.InMemoryMessageId;
import org.apache.james.mailbox.inmemory.manager.InMemoryIntegrationResources;
import org.apache.james.mailbox.model.ComposedMessageId;
+import org.apache.james.mailbox.model.FetchGroup;
import org.apache.james.mailbox.model.Mailbox;
import org.apache.james.mailbox.model.MailboxId;
import org.apache.james.mailbox.model.MailboxMetaData;
import org.apache.james.mailbox.model.MailboxPath;
+import org.apache.james.mailbox.model.MessageResult;
+import org.apache.james.mailbox.model.UpdatedFlags;
import org.apache.james.mailbox.model.search.MailboxQuery;
import org.apache.james.mailbox.store.MailboxSessionMapperFactory;
+import org.apache.james.mailbox.store.extractor.DefaultTextExtractor;
import org.apache.james.mailbox.store.mail.model.MailboxMessage;
+import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder;
+import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
import org.apache.james.mailbox.store.search.ListeningMessageSearchIndex;
import org.apache.james.task.Hostname;
import org.apache.james.task.MemoryTaskManager;
@@ -79,14 +108,18 @@ import org.apache.james.webadmin.utils.JsonTransformer;
import org.apache.mailbox.tools.indexer.ReIndexerImpl;
import org.apache.mailbox.tools.indexer.ReIndexerPerformer;
import org.apache.mailbox.tools.indexer.UserReindexingTask;
+import org.elasticsearch.index.IndexNotFoundException;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.restassured.RestAssured;
@@ -103,18 +136,13 @@ class UserMailboxesRoutesTest {
private WebAdminServer webAdminServer;
private UsersRepository usersRepository;
- private ListeningMessageSearchIndex searchIndex;
private MemoryTaskManager taskManager;
- private void createServer(MailboxManager mailboxManager, MailboxSessionMapperFactory mapperFactory, MailboxId.Factory mailboxIdFactory) throws Exception {
+ private void createServer(MailboxManager mailboxManager, MailboxSessionMapperFactory mapperFactory, MailboxId.Factory mailboxIdFactory, ListeningMessageSearchIndex searchIndex) throws Exception {
usersRepository = mock(UsersRepository.class);
when(usersRepository.contains(USERNAME)).thenReturn(true);
-
taskManager = new MemoryTaskManager(new Hostname("foo"));
- searchIndex = mock(ListeningMessageSearchIndex.class);
- Mockito.when(searchIndex.add(any(), any(), any())).thenReturn(Mono.empty());
- Mockito.when(searchIndex.deleteAll(any(), any())).thenReturn(Mono.empty());
ReIndexerPerformer reIndexerPerformer = new ReIndexerPerformer(
mailboxManager,
searchIndex,
@@ -149,7 +177,11 @@ class UserMailboxesRoutesTest {
@BeforeEach
void setUp() throws Exception {
InMemoryMailboxManager mailboxManager = InMemoryIntegrationResources.defaultResources().getMailboxManager();
- createServer(mailboxManager, mailboxManager.getMapperFactory(), new InMemoryId.Factory());
+ ListeningMessageSearchIndex searchIndex = mock(ListeningMessageSearchIndex.class);
+ Mockito.when(searchIndex.add(any(), any(), any())).thenReturn(Mono.empty());
+ Mockito.when(searchIndex.deleteAll(any(), any())).thenReturn(Mono.empty());
+
+ createServer(mailboxManager, mailboxManager.getMapperFactory(), new InMemoryId.Factory(), searchIndex);
}
@Test
@@ -857,8 +889,11 @@ class UserMailboxesRoutesTest {
void setUp() throws Exception {
mailboxManager = mock(MailboxManager.class);
when(mailboxManager.createSystemSession(any())).thenReturn(MailboxSessionUtil.create(USERNAME));
+ ListeningMessageSearchIndex searchIndex = mock(ListeningMessageSearchIndex.class);
+ Mockito.when(searchIndex.add(any(), any(), any())).thenReturn(Mono.empty());
+ Mockito.when(searchIndex.deleteAll(any(), any())).thenReturn(Mono.empty());
- createServer(mailboxManager, mock(MailboxSessionMapperFactory.class), new InMemoryId.Factory());
+ createServer(mailboxManager, mock(MailboxSessionMapperFactory.class), new InMemoryId.Factory(), searchIndex);
}
@Test
@@ -1114,13 +1149,50 @@ class UserMailboxesRoutesTest {
@Nested
class UserReprocessing {
+ static final int BATCH_SIZE = 1;
+ static final int SEARCH_SIZE = 1;
+
+ @RegisterExtension
+ DockerElasticSearchExtension elasticSearch = new DockerElasticSearchExtension();
private InMemoryMailboxManager mailboxManager;
+ private ListeningMessageSearchIndex searchIndex;
+ MessageIdManager messageIdManager;
@BeforeEach
void setUp() throws Exception {
- mailboxManager = InMemoryIntegrationResources.defaultResources().getMailboxManager();
- createServer(mailboxManager, mailboxManager.getMapperFactory(), new InMemoryId.Factory());
+ ReactorElasticSearchClient client = MailboxIndexCreationUtil.prepareDefaultClient(
+ elasticSearch.getDockerElasticSearch().clientProvider().get(),
+ elasticSearch.getDockerElasticSearch().configuration());
+
+ InMemoryMessageId.Factory messageIdFactory = new InMemoryMessageId.Factory();
+ MailboxIdRoutingKeyFactory routingKeyFactory = new MailboxIdRoutingKeyFactory();
+
+ InMemoryIntegrationResources resources = InMemoryIntegrationResources.builder()
+ .preProvisionnedFakeAuthenticator()
+ .fakeAuthorizator()
+ .inVmEventBus()
+ .defaultAnnotationLimits()
+ .defaultMessageParser()
+ .listeningSearchIndex(preInstanciationStage -> new ElasticSearchListeningMessageSearchIndex(
+ preInstanciationStage.getMapperFactory(),
+ new ElasticSearchIndexer(client,
+ MailboxElasticSearchConstants.DEFAULT_MAILBOX_WRITE_ALIAS,
+ BATCH_SIZE),
+ new ElasticSearchSearcher(client, new QueryConverter(new CriterionConverter()), SEARCH_SIZE,
+ new InMemoryId.Factory(), messageIdFactory,
+ MailboxElasticSearchConstants.DEFAULT_MAILBOX_READ_ALIAS, routingKeyFactory),
+ new MessageToElasticSearchJson(new DefaultTextExtractor(), ZoneId.of("Europe/Paris"), IndexAttachments.YES),
+ preInstanciationStage.getSessionProvider(), routingKeyFactory))
+ .noPreDeletionHooks()
+ .storeQuotaManager()
+ .build();
+
+ mailboxManager = resources.getMailboxManager();
+ messageIdManager = resources.getMessageIdManager();
+ searchIndex = spy((ListeningMessageSearchIndex) resources.getSearchIndex());
+
+ createServer(mailboxManager, mailboxManager.getMapperFactory(), new InMemoryId.Factory(), searchIndex);
}
@Nested
@@ -1289,6 +1361,171 @@ class UserMailboxesRoutesTest {
.body("taskId", Matchers.is(notNullValue()))
.body("additionalInformation.mailboxFailures", Matchers.containsInAnyOrder(mailboxId.serialize()));
}
+
+ @Test
+ void userReprocessingWithCorrectModeShouldReturnTaskDetailsWhenMails() throws Exception {
+ MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
+ MailboxId mailboxId = mailboxManager.createMailbox(INBOX, systemSession).get();
+ Mailbox mailbox = mailboxManager.getMailbox(mailboxId, systemSession).getMailboxEntity();
+
+ ComposedMessageId result = mailboxManager.getMailbox(INBOX, systemSession)
+ .appendMessage(
+ MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
+ systemSession)
+ .getId();
+ mailboxManager.getMailbox(INBOX, systemSession)
+ .appendMessage(
+ MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
+ systemSession);
+
+ List<MessageResult> messages = messageIdManager.getMessages(ImmutableList.of(result.getMessageId()), FetchGroup.MINIMAL, systemSession);
+
+ Flags newFlags = new Flags(Flags.Flag.DRAFT);
+ UpdatedFlags updatedFlags = UpdatedFlags.builder()
+ .uid(result.getUid())
+ .modSeq(messages.get(0).getModSeq())
+ .oldFlags(new Flags())
+ .newFlags(newFlags)
+ .build();
+
+ // We update on the searchIndex level to try to create inconsistencies
+ searchIndex.update(systemSession, mailbox, ImmutableList.of(updatedFlags)).block();
+
+ String taskId = with()
+ .queryParam("task", "reIndex")
+ .queryParam("mode", "fixOutdated")
+ .post()
+ .jsonPath()
+ .get("taskId");
+
+ given()
+ .basePath(TasksRoutes.BASE)
+ .when()
+ .get(taskId + "/await")
+ .then()
+ .body("status", Matchers.is("completed"))
+ .body("taskId", Matchers.is(notNullValue()))
+ .body("type", Matchers.is(UserReindexingTask.USER_RE_INDEXING.asString()))
+ .body("additionalInformation.successfullyReprocessedMailCount", Matchers.is(2))
+ .body("additionalInformation.failedReprocessedMailCount", Matchers.is(0))
+ .body("startedDate", Matchers.is(notNullValue()))
+ .body("submitDate", Matchers.is(notNullValue()))
+ .body("completedDate", Matchers.is(notNullValue()));
+ }
+
+ @Test
+ void userReprocessingWithCorrectModeShouldFixInconsistenciesInES() throws Exception {
+ MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
+ MailboxId mailboxId = mailboxManager.createMailbox(INBOX, systemSession).get();
+ Mailbox mailbox = mailboxManager.getMailbox(mailboxId, systemSession).getMailboxEntity();
+
+ ComposedMessageId result = mailboxManager.getMailbox(INBOX, systemSession)
+ .appendMessage(
+ MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
+ systemSession)
+ .getId();
+
+ Flags initialFlags = searchIndex.retrieveIndexedFlags(mailbox, result.getUid()).block();
+
+ List<MessageResult> messages = messageIdManager.getMessages(ImmutableList.of(result.getMessageId()), FetchGroup.MINIMAL, systemSession);
+
+ Flags newFlags = new Flags(Flags.Flag.DRAFT);
+ UpdatedFlags updatedFlags = UpdatedFlags.builder()
+ .uid(result.getUid())
+ .modSeq(messages.get(0).getModSeq())
+ .oldFlags(new Flags())
+ .newFlags(newFlags)
+ .build();
+
+ // We update on the searchIndex level to try to create inconsistencies
+ searchIndex.update(systemSession, mailbox, ImmutableList.of(updatedFlags)).block();
+
+ String taskId = with()
+ .queryParam("task", "reIndex")
+ .queryParam("mode", "fixOutdated")
+ .post()
+ .jsonPath()
+ .get("taskId");
+
+ given()
+ .basePath(TasksRoutes.BASE)
+ .when()
+ .get(taskId + "/await");
+
+ assertThat(searchIndex.retrieveIndexedFlags(mailbox, result.getUid()).block())
+ .isEqualTo(initialFlags);
+ }
+
+ @Test
+ void userReprocessingWithCorrectModeShouldNotChangeDocumentsInESWhenNoInconsistencies() throws Exception {
+ MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
+ MailboxId mailboxId = mailboxManager.createMailbox(INBOX, systemSession).get();
+ Mailbox mailbox = mailboxManager.getMailbox(mailboxId, systemSession).getMailboxEntity();
+
+ ComposedMessageId result = mailboxManager.getMailbox(INBOX, systemSession)
+ .appendMessage(
+ MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
+ systemSession)
+ .getId();
+
+ Flags initialFlags = searchIndex.retrieveIndexedFlags(mailbox, result.getUid()).block();
+
+ String taskId = with()
+ .queryParam("task", "reIndex")
+ .queryParam("mode", "fixOutdated")
+ .post()
+ .jsonPath()
+ .get("taskId");
+
+ given()
+ .basePath(TasksRoutes.BASE)
+ .when()
+ .get(taskId + "/await");
+
+ assertThat(searchIndex.retrieveIndexedFlags(mailbox, result.getUid()).block())
+ .isEqualTo(initialFlags);
+ }
+
+ @Disabled("JAMES-3202 Limitation of the current correct mode reindexation. We only check metadata and fix "
+ + "inconsistencies with ES, but we don't check for inconsistencies from ES to metadata")
+ @Test
+ void userReprocessingWithCorrectModeShouldRemoveOrphanMessagesInES() throws Exception {
+ MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
+ MailboxId mailboxId = mailboxManager.createMailbox(INBOX, systemSession).get();
+ Mailbox mailbox = mailboxManager.getMailbox(mailboxId, systemSession).getMailboxEntity();
+
+ byte[] content = "Simple message content".getBytes(StandardCharsets.UTF_8);
+ MessageUid uid = MessageUid.of(22L);
+
+ SimpleMailboxMessage message = SimpleMailboxMessage.builder()
+ .messageId(InMemoryMessageId.of(42L))
+ .uid(uid)
+ .content(new SharedByteArrayInputStream(content))
+ .size(content.length)
+ .internalDate(new Date(ZonedDateTime.parse("2018-02-15T15:54:02Z").toEpochSecond()))
+ .bodyStartOctet(0)
+ .flags(new Flags("myFlags"))
+ .propertyBuilder(new PropertyBuilder())
+ .mailboxId(mailboxId)
+ .build();
+
+ searchIndex.add(systemSession, mailbox, message).block();
+
+ String taskId = with()
+ .queryParam("task", "reIndex")
+ .queryParam("mode", "fixOutdated")
+ .post()
+ .jsonPath()
+ .get("taskId");
+
+ given()
+ .basePath(TasksRoutes.BASE)
+ .when()
+ .get(taskId + "/await");
+
+ assertThatThrownBy(() -> searchIndex.retrieveIndexedFlags(mailbox, uid).block())
+ .isInstanceOf(IndexNotFoundException.class);
+ }
}
@Nested
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org