You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/06/25 02:56:11 UTC

[james-project] 05/10: JAMES-3202 Integration tests for Reindex with Mode set to Correct

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit bdfdeb6c6b1eaa6dce5cfdacafb82be0786b86d9
Author: Rene Cordier <rc...@linagora.com>
AuthorDate: Fri Jun 12 17:00:04 2020 +0700

    JAMES-3202 Integration tests for Reindex with Mode set to Correct
---
 server/protocols/webadmin/webadmin-mailbox/pom.xml |  18 +-
 .../james/webadmin/routes/MailboxesRoutesTest.java | 423 ++++++++++++++++++++-
 .../webadmin/routes/UserMailboxesRoutesTest.java   | 259 ++++++++++++-
 3 files changed, 666 insertions(+), 34 deletions(-)

diff --git a/server/protocols/webadmin/webadmin-mailbox/pom.xml b/server/protocols/webadmin/webadmin-mailbox/pom.xml
index c4868ce..89777b6 100644
--- a/server/protocols/webadmin/webadmin-mailbox/pom.xml
+++ b/server/protocols/webadmin/webadmin-mailbox/pom.xml
@@ -35,7 +35,8 @@
     <dependencies>
         <dependency>
             <groupId>${james.groupId}</groupId>
-            <artifactId>james-server-dnsservice-test</artifactId>
+            <artifactId>apache-james-backends-es</artifactId>
+            <type>test-jar</type>
             <scope>test</scope>
         </dependency>
         <dependency>
@@ -50,6 +51,10 @@
         </dependency>
         <dependency>
             <groupId>${james.groupId}</groupId>
+            <artifactId>apache-james-mailbox-elasticsearch</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
             <artifactId>apache-james-mailbox-event-json</artifactId>
         </dependency>
         <dependency>
@@ -93,12 +98,6 @@
         </dependency>
         <dependency>
             <groupId>${james.groupId}</groupId>
-            <artifactId>apache-james-backends-es</artifactId>
-            <type>test-jar</type>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>${james.groupId}</groupId>
             <artifactId>apache-james-mailbox-quota-search-elasticsearch</artifactId>
             <scope>test</scope>
         </dependency>
@@ -153,6 +152,11 @@
         </dependency>
         <dependency>
             <groupId>${james.groupId}</groupId>
+            <artifactId>james-server-dnsservice-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
             <artifactId>james-server-task-json</artifactId>
         </dependency>
         <dependency>
diff --git a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MailboxesRoutesTest.java b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MailboxesRoutesTest.java
index 65ce22a..244331a 100644
--- a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MailboxesRoutesTest.java
+++ b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MailboxesRoutesTest.java
@@ -23,28 +23,59 @@ import static io.restassured.RestAssured.given;
 import static io.restassured.RestAssured.when;
 import static io.restassured.RestAssured.with;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 
+import java.nio.charset.StandardCharsets;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.util.Date;
+import java.util.List;
+
+import javax.mail.Flags;
+import javax.mail.util.SharedByteArrayInputStream;
+
+import org.apache.james.backends.es.DockerElasticSearchExtension;
+import org.apache.james.backends.es.ElasticSearchIndexer;
+import org.apache.james.backends.es.ReactorElasticSearchClient;
 import org.apache.james.core.Username;
 import org.apache.james.json.DTOConverter;
 import org.apache.james.mailbox.MailboxSession;
+import org.apache.james.mailbox.MessageIdManager;
 import org.apache.james.mailbox.MessageManager;
+import org.apache.james.mailbox.MessageUid;
+import org.apache.james.mailbox.elasticsearch.IndexAttachments;
+import org.apache.james.mailbox.elasticsearch.MailboxElasticSearchConstants;
+import org.apache.james.mailbox.elasticsearch.MailboxIdRoutingKeyFactory;
+import org.apache.james.mailbox.elasticsearch.MailboxIndexCreationUtil;
+import org.apache.james.mailbox.elasticsearch.events.ElasticSearchListeningMessageSearchIndex;
+import org.apache.james.mailbox.elasticsearch.json.MessageToElasticSearchJson;
+import org.apache.james.mailbox.elasticsearch.query.CriterionConverter;
+import org.apache.james.mailbox.elasticsearch.query.QueryConverter;
+import org.apache.james.mailbox.elasticsearch.search.ElasticSearchSearcher;
 import org.apache.james.mailbox.indexer.ReIndexer;
 import org.apache.james.mailbox.inmemory.InMemoryId;
 import org.apache.james.mailbox.inmemory.InMemoryMailboxManager;
+import org.apache.james.mailbox.inmemory.InMemoryMessageId;
 import org.apache.james.mailbox.inmemory.manager.InMemoryIntegrationResources;
 import org.apache.james.mailbox.model.ComposedMessageId;
+import org.apache.james.mailbox.model.FetchGroup;
 import org.apache.james.mailbox.model.Mailbox;
 import org.apache.james.mailbox.model.MailboxId;
 import org.apache.james.mailbox.model.MailboxPath;
+import org.apache.james.mailbox.model.MessageResult;
+import org.apache.james.mailbox.model.UpdatedFlags;
+import org.apache.james.mailbox.store.extractor.DefaultTextExtractor;
 import org.apache.james.mailbox.store.mail.model.MailboxMessage;
+import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder;
+import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
 import org.apache.james.mailbox.store.search.ListeningMessageSearchIndex;
 import org.apache.james.task.Hostname;
 import org.apache.james.task.MemoryTaskManager;
@@ -63,35 +94,74 @@ import org.apache.mailbox.tools.indexer.SingleMailboxReindexingTask;
 import org.apache.mailbox.tools.indexer.SingleMessageReindexingTask;
 import org.apache.mailbox.tools.indexer.SingleMessageReindexingTaskAdditionalInformationDTO;
 import org.eclipse.jetty.http.HttpStatus;
+import org.elasticsearch.index.IndexNotFoundException;
 import org.hamcrest.Matchers;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
 import org.mockito.ArgumentCaptor;
-import org.mockito.Mockito;
 import org.testcontainers.shaded.com.google.common.collect.ImmutableSet;
 
+import com.google.common.collect.ImmutableList;
+
 import io.restassured.RestAssured;
 import reactor.core.publisher.Mono;
 
 class MailboxesRoutesTest {
-    private static final Username USERNAME = Username.of("benwa@apache.org");
-    private static final MailboxPath INBOX = MailboxPath.inbox(USERNAME);
+    static final Username USERNAME = Username.of("benwa@apache.org");
+    static final MailboxPath INBOX = MailboxPath.inbox(USERNAME);
+    static final int BATCH_SIZE = 1;
+    static final int SEARCH_SIZE = 1;
+
+    @RegisterExtension
+    DockerElasticSearchExtension elasticSearch = new DockerElasticSearchExtension();
 
-    private WebAdminServer webAdminServer;
-    private ListeningMessageSearchIndex searchIndex;
-    private InMemoryMailboxManager mailboxManager;
-    private MemoryTaskManager taskManager;
+    WebAdminServer webAdminServer;
+    ListeningMessageSearchIndex searchIndex;
+    InMemoryMailboxManager mailboxManager;
+    MessageIdManager messageIdManager;
+    MemoryTaskManager taskManager;
+    ReactorElasticSearchClient client;
 
     @BeforeEach
-    void beforeEach() {
-        mailboxManager = InMemoryIntegrationResources.defaultResources().getMailboxManager();
+    void beforeEach() throws Exception {
+        client = MailboxIndexCreationUtil.prepareDefaultClient(
+            elasticSearch.getDockerElasticSearch().clientProvider().get(),
+            elasticSearch.getDockerElasticSearch().configuration());
+
+        InMemoryMessageId.Factory messageIdFactory = new InMemoryMessageId.Factory();
+        MailboxIdRoutingKeyFactory routingKeyFactory = new MailboxIdRoutingKeyFactory();
+
+        InMemoryIntegrationResources resources = InMemoryIntegrationResources.builder()
+            .preProvisionnedFakeAuthenticator()
+            .fakeAuthorizator()
+            .inVmEventBus()
+            .defaultAnnotationLimits()
+            .defaultMessageParser()
+            .listeningSearchIndex(preInstanciationStage -> new ElasticSearchListeningMessageSearchIndex(
+                preInstanciationStage.getMapperFactory(),
+                new ElasticSearchIndexer(client,
+                    MailboxElasticSearchConstants.DEFAULT_MAILBOX_WRITE_ALIAS,
+                    BATCH_SIZE),
+                new ElasticSearchSearcher(client, new QueryConverter(new CriterionConverter()), SEARCH_SIZE,
+                    new InMemoryId.Factory(), messageIdFactory,
+                    MailboxElasticSearchConstants.DEFAULT_MAILBOX_READ_ALIAS, routingKeyFactory),
+                new MessageToElasticSearchJson(new DefaultTextExtractor(), ZoneId.of("Europe/Paris"), IndexAttachments.YES),
+                preInstanciationStage.getSessionProvider(), routingKeyFactory))
+            .noPreDeletionHooks()
+            .storeQuotaManager()
+            .build();
+
+        mailboxManager = resources.getMailboxManager();
+        messageIdManager = resources.getMessageIdManager();
         taskManager = new MemoryTaskManager(new Hostname("foo"));
         InMemoryId.Factory mailboxIdFactory = new InMemoryId.Factory();
-        searchIndex = mock(ListeningMessageSearchIndex.class);
-        Mockito.when(searchIndex.add(any(), any(), any())).thenReturn(Mono.empty());
-        Mockito.when(searchIndex.deleteAll(any(), any())).thenReturn(Mono.empty());
+
+        searchIndex = spy((ListeningMessageSearchIndex) resources.getSearchIndex());
+
         ReIndexerPerformer reIndexerPerformer = new ReIndexerPerformer(
             mailboxManager,
             searchIndex,
@@ -271,6 +341,163 @@ class MailboxesRoutesTest {
                     .body("taskId", Matchers.is(notNullValue()))
                     .body("additionalInformation.mailboxFailures", Matchers.containsInAnyOrder(mailboxId.serialize()));
             }
+
+            @Test
+            void fullReprocessingWithCorrectModeShouldReturnTaskDetailsWhenMails() throws Exception {
+                MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
+                MailboxId mailboxId = mailboxManager.createMailbox(INBOX, systemSession).get();
+                Mailbox mailbox = mailboxManager.getMailbox(mailboxId, systemSession).getMailboxEntity();
+
+                ComposedMessageId result = mailboxManager.getMailbox(INBOX, systemSession)
+                    .appendMessage(
+                        MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
+                        systemSession)
+                    .getId();
+                mailboxManager.getMailbox(INBOX, systemSession)
+                    .appendMessage(
+                        MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
+                        systemSession);
+
+                List<MessageResult> messages = messageIdManager.getMessages(ImmutableList.of(result.getMessageId()), FetchGroup.MINIMAL, systemSession);
+
+                Flags newFlags = new Flags(Flags.Flag.DRAFT);
+                UpdatedFlags updatedFlags = UpdatedFlags.builder()
+                    .uid(result.getUid())
+                    .modSeq(messages.get(0).getModSeq())
+                    .oldFlags(new Flags())
+                    .newFlags(newFlags)
+                    .build();
+
+                // We update on the searchIndex level to try to create inconsistencies
+                searchIndex.update(systemSession, mailbox, ImmutableList.of(updatedFlags)).block();
+
+                String taskId = with()
+                    .post("/mailboxes?task=reIndex&mode=fixOutdated")
+                    .jsonPath()
+                    .get("taskId");
+
+                given()
+                    .basePath(TasksRoutes.BASE)
+                .when()
+                    .get(taskId + "/await")
+                .then()
+                    .body("status", is("completed"))
+                    .body("taskId", is(notNullValue()))
+                    .body("type", is(FullReindexingTask.FULL_RE_INDEXING.asString()))
+                    .body("additionalInformation.successfullyReprocessedMailCount", is(2))
+                    .body("additionalInformation.failedReprocessedMailCount", is(0))
+                    .body("startedDate", is(notNullValue()))
+                    .body("submitDate", is(notNullValue()))
+                    .body("completedDate", is(notNullValue()));
+            }
+
+            @Test
+            void fullReprocessingWithCorrectModeShouldFixInconsistenciesInES() throws Exception {
+                MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
+                MailboxId mailboxId = mailboxManager.createMailbox(INBOX, systemSession).get();
+                Mailbox mailbox = mailboxManager.getMailbox(mailboxId, systemSession).getMailboxEntity();
+
+                ComposedMessageId result = mailboxManager.getMailbox(INBOX, systemSession)
+                    .appendMessage(
+                        MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
+                        systemSession)
+                    .getId();
+
+                Flags initialFlags = searchIndex.retrieveIndexedFlags(mailbox, result.getUid()).block();
+
+                List<MessageResult> messages = messageIdManager.getMessages(ImmutableList.of(result.getMessageId()), FetchGroup.MINIMAL, systemSession);
+
+                Flags newFlags = new Flags(Flags.Flag.DRAFT);
+                UpdatedFlags updatedFlags = UpdatedFlags.builder()
+                    .uid(result.getUid())
+                    .modSeq(messages.get(0).getModSeq())
+                    .oldFlags(new Flags())
+                    .newFlags(newFlags)
+                    .build();
+
+                // We update on the searchIndex level to try to create inconsistencies
+                searchIndex.update(systemSession, mailbox, ImmutableList.of(updatedFlags)).block();
+
+                String taskId = with()
+                    .post("/mailboxes?task=reIndex&mode=fixOutdated")
+                    .jsonPath()
+                    .get("taskId");
+
+                given()
+                    .basePath(TasksRoutes.BASE)
+                .when()
+                    .get(taskId + "/await");
+
+                assertThat(searchIndex.retrieveIndexedFlags(mailbox, result.getUid()).block())
+                    .isEqualTo(initialFlags);
+            }
+
+            @Test
+            void fullReprocessingWithCorrectModeShouldNotChangeDocumentsInESWhenNoInconsistencies() throws Exception {
+                MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
+                MailboxId mailboxId = mailboxManager.createMailbox(INBOX, systemSession).get();
+                Mailbox mailbox = mailboxManager.getMailbox(mailboxId, systemSession).getMailboxEntity();
+
+                ComposedMessageId result = mailboxManager.getMailbox(INBOX, systemSession)
+                    .appendMessage(
+                        MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
+                        systemSession)
+                    .getId();
+
+                Flags initialFlags = searchIndex.retrieveIndexedFlags(mailbox, result.getUid()).block();
+
+                String taskId = with()
+                    .post("/mailboxes?task=reIndex&mode=fixOutdated")
+                    .jsonPath()
+                    .get("taskId");
+
+                given()
+                    .basePath(TasksRoutes.BASE)
+                .when()
+                    .get(taskId + "/await");
+
+                assertThat(searchIndex.retrieveIndexedFlags(mailbox, result.getUid()).block())
+                    .isEqualTo(initialFlags);
+            }
+
+            @Disabled("JAMES-3202 Limitation of the current correct mode reindexation. We only check metadata and fix "
+                + "inconsistencies with ES, but we don't check for inconsistencies from ES to metadata")
+            @Test
+            void fullReprocessingWithCorrectModeShouldRemoveOrphanMessagesInES() throws Exception {
+                MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
+                MailboxId mailboxId = mailboxManager.createMailbox(INBOX, systemSession).get();
+                Mailbox mailbox = mailboxManager.getMailbox(mailboxId, systemSession).getMailboxEntity();
+
+                byte[] content = "Simple message content".getBytes(StandardCharsets.UTF_8);
+                MessageUid uid = MessageUid.of(22L);
+
+                SimpleMailboxMessage message = SimpleMailboxMessage.builder()
+                    .messageId(InMemoryMessageId.of(42L))
+                    .uid(uid)
+                    .content(new SharedByteArrayInputStream(content))
+                    .size(content.length)
+                    .internalDate(new Date(ZonedDateTime.parse("2018-02-15T15:54:02Z").toEpochSecond()))
+                    .bodyStartOctet(0)
+                    .flags(new Flags("myFlags"))
+                    .propertyBuilder(new PropertyBuilder())
+                    .mailboxId(mailboxId)
+                    .build();
+
+                searchIndex.add(systemSession, mailbox, message).block();
+
+                String taskId = with()
+                    .post("/mailboxes?task=reIndex&mode=fixOutdated")
+                    .jsonPath()
+                    .get("taskId");
+
+                given()
+                    .basePath(TasksRoutes.BASE)
+                .when()
+                    .get(taskId + "/await");
+
+                assertThatThrownBy(() -> searchIndex.retrieveIndexedFlags(mailbox, uid).block())
+                    .isInstanceOf(IndexNotFoundException.class);
+            }
         }
 
         @Nested
@@ -485,6 +712,172 @@ class MailboxesRoutesTest {
                     .body("taskId", Matchers.is(notNullValue()))
                     .body("additionalInformation.mailboxFailures", Matchers.containsInAnyOrder(mailboxId.serialize()));
             }
+
+
+            @Test
+            void mailboxReprocessingWithCorrectModeShouldReturnTaskDetailsWhenMails() throws Exception {
+                MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
+                MailboxId mailboxId = mailboxManager.createMailbox(INBOX, systemSession).get();
+                Mailbox mailbox = mailboxManager.getMailbox(mailboxId, systemSession).getMailboxEntity();
+
+                ComposedMessageId result = mailboxManager.getMailbox(INBOX, systemSession)
+                    .appendMessage(
+                        MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
+                        systemSession)
+                    .getId();
+                mailboxManager.getMailbox(INBOX, systemSession)
+                    .appendMessage(
+                        MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
+                        systemSession);
+
+                List<MessageResult> messages = messageIdManager.getMessages(ImmutableList.of(result.getMessageId()), FetchGroup.MINIMAL, systemSession);
+
+                Flags newFlags = new Flags(Flags.Flag.DRAFT);
+                UpdatedFlags updatedFlags = UpdatedFlags.builder()
+                    .uid(result.getUid())
+                    .modSeq(messages.get(0).getModSeq())
+                    .oldFlags(new Flags())
+                    .newFlags(newFlags)
+                    .build();
+
+                // We update on the searchIndex level to try to create inconsistencies
+                searchIndex.update(systemSession, mailbox, ImmutableList.of(updatedFlags)).block();
+
+                String taskId = with()
+                    .queryParam("task", "reIndex")
+                    .queryParam("mode", "fixOutdated")
+                    .post("/mailboxes/" + mailboxId.serialize())
+                    .jsonPath()
+                    .get("taskId");
+
+                given()
+                    .basePath(TasksRoutes.BASE)
+                .when()
+                    .get(taskId + "/await")
+                .then()
+                    .body("status", is("completed"))
+                    .body("taskId", is(notNullValue()))
+                    .body("type", is(SingleMailboxReindexingTask.TYPE.asString()))
+                    .body("additionalInformation.successfullyReprocessedMailCount", is(2))
+                    .body("additionalInformation.failedReprocessedMailCount", is(0))
+                    .body("startedDate", is(notNullValue()))
+                    .body("submitDate", is(notNullValue()))
+                    .body("completedDate", is(notNullValue()));
+            }
+
+            @Test
+            void mailboxReprocessingWithCorrectModeShouldFixInconsistenciesInES() throws Exception {
+                MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
+                MailboxId mailboxId = mailboxManager.createMailbox(INBOX, systemSession).get();
+                Mailbox mailbox = mailboxManager.getMailbox(mailboxId, systemSession).getMailboxEntity();
+
+                ComposedMessageId result = mailboxManager.getMailbox(INBOX, systemSession)
+                    .appendMessage(
+                        MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
+                        systemSession)
+                    .getId();
+
+                Flags initialFlags = searchIndex.retrieveIndexedFlags(mailbox, result.getUid()).block();
+
+                List<MessageResult> messages = messageIdManager.getMessages(ImmutableList.of(result.getMessageId()), FetchGroup.MINIMAL, systemSession);
+
+                Flags newFlags = new Flags(Flags.Flag.DRAFT);
+                UpdatedFlags updatedFlags = UpdatedFlags.builder()
+                    .uid(result.getUid())
+                    .modSeq(messages.get(0).getModSeq())
+                    .oldFlags(new Flags())
+                    .newFlags(newFlags)
+                    .build();
+
+                // We update on the searchIndex level to try to create inconsistencies
+                searchIndex.update(systemSession, mailbox, ImmutableList.of(updatedFlags)).block();
+
+                String taskId = with()
+                    .queryParam("task", "reIndex")
+                    .queryParam("mode", "fixOutdated")
+                    .post("/mailboxes/" + mailboxId.serialize())
+                    .jsonPath()
+                    .get("taskId");
+
+                given()
+                    .basePath(TasksRoutes.BASE)
+                .when()
+                    .get(taskId + "/await");
+
+                assertThat(searchIndex.retrieveIndexedFlags(mailbox, result.getUid()).block())
+                    .isEqualTo(initialFlags);
+            }
+
+            @Test
+            void mailboxReprocessingWithCorrectModeShouldNotChangeDocumentsInESWhenNoInconsistencies() throws Exception {
+                MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
+                MailboxId mailboxId = mailboxManager.createMailbox(INBOX, systemSession).get();
+                Mailbox mailbox = mailboxManager.getMailbox(mailboxId, systemSession).getMailboxEntity();
+
+                ComposedMessageId result = mailboxManager.getMailbox(INBOX, systemSession)
+                    .appendMessage(
+                        MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
+                        systemSession)
+                    .getId();
+
+                Flags initialFlags = searchIndex.retrieveIndexedFlags(mailbox, result.getUid()).block();
+
+                String taskId = with()
+                    .queryParam("task", "reIndex")
+                    .queryParam("mode", "fixOutdated")
+                    .post("/mailboxes/" + mailboxId.serialize())
+                    .jsonPath()
+                    .get("taskId");
+
+                given()
+                    .basePath(TasksRoutes.BASE)
+                .when()
+                    .get(taskId + "/await");
+
+                assertThat(searchIndex.retrieveIndexedFlags(mailbox, result.getUid()).block())
+                    .isEqualTo(initialFlags);
+            }
+
+            @Disabled("JAMES-3202 Limitation of the current correct mode reindexation. We only check metadata and fix "
+                + "inconsistencies with ES, but we don't check for inconsistencies from ES to metadata")
+            @Test
+            void mailboxReprocessingWithCorrectModeShouldRemoveOrphanMessagesInES() throws Exception {
+                MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
+                MailboxId mailboxId = mailboxManager.createMailbox(INBOX, systemSession).get();
+                Mailbox mailbox = mailboxManager.getMailbox(mailboxId, systemSession).getMailboxEntity();
+
+                byte[] content = "Simple message content".getBytes(StandardCharsets.UTF_8);
+                MessageUid uid = MessageUid.of(22L);
+
+                SimpleMailboxMessage message = SimpleMailboxMessage.builder()
+                    .messageId(InMemoryMessageId.of(42L))
+                    .uid(uid)
+                    .content(new SharedByteArrayInputStream(content))
+                    .size(content.length)
+                    .internalDate(new Date(ZonedDateTime.parse("2018-02-15T15:54:02Z").toEpochSecond()))
+                    .bodyStartOctet(0)
+                    .flags(new Flags("myFlags"))
+                    .propertyBuilder(new PropertyBuilder())
+                    .mailboxId(mailboxId)
+                    .build();
+
+                searchIndex.add(systemSession, mailbox, message).block();
+
+                String taskId = with()
+                    .queryParam("task", "reIndex")
+                    .queryParam("mode", "fixOutdated")
+                    .post("/mailboxes/" + mailboxId.serialize())
+                    .jsonPath()
+                    .get("taskId");
+
+                given()
+                    .basePath(TasksRoutes.BASE)
+                .when()
+                    .get(taskId + "/await");
+
+                assertThatThrownBy(() -> searchIndex.retrieveIndexedFlags(mailbox, uid).block())
+                    .isInstanceOf(IndexNotFoundException.class);
+            }
         }
 
         @Nested
@@ -838,7 +1231,7 @@ class MailboxesRoutesTest {
             }
 
             @Test
-            void mailboxReprocessingShouldReturnTaskDetailsWhenFailing() throws Exception {
+            void fixingReIndexingShouldReturnTaskDetailsWhenFailing() throws Exception {
                 MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
                 MailboxId mailboxId = mailboxManager.createMailbox(INBOX, systemSession).get();
                 ComposedMessageId composedMessageId = mailboxManager.getMailbox(INBOX, systemSession)
@@ -937,8 +1330,6 @@ class MailboxesRoutesTest {
                     .get(taskId + "/await");
 
                 reset(searchIndex);
-                Mockito.when(searchIndex.add(any(), any(), any())).thenReturn(Mono.empty());
-                Mockito.when(searchIndex.deleteAll(any(), any())).thenReturn(Mono.empty());
 
                 String fixingTaskId = with()
                     .queryParam("reIndexFailedMessagesOf", taskId)
diff --git a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/UserMailboxesRoutesTest.java b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/UserMailboxesRoutesTest.java
index ff3dbfb..3e18b00 100644
--- a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/UserMailboxesRoutesTest.java
+++ b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/UserMailboxesRoutesTest.java
@@ -26,6 +26,7 @@ import static io.restassured.http.ContentType.JSON;
 import static org.apache.james.webadmin.Constants.SEPARATOR;
 import static org.apache.james.webadmin.routes.UserMailboxesRoutes.USERS_BASE;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.eclipse.jetty.http.HttpStatus.BAD_REQUEST_400;
 import static org.eclipse.jetty.http.HttpStatus.INTERNAL_SERVER_ERROR_500;
 import static org.eclipse.jetty.http.HttpStatus.NOT_FOUND_404;
@@ -38,35 +39,63 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
+import java.nio.charset.StandardCharsets;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.util.Date;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Stream;
 
+import javax.mail.Flags;
+import javax.mail.util.SharedByteArrayInputStream;
+
+import org.apache.james.backends.es.DockerElasticSearchExtension;
+import org.apache.james.backends.es.ElasticSearchIndexer;
+import org.apache.james.backends.es.ReactorElasticSearchClient;
 import org.apache.james.core.Username;
 import org.apache.james.json.DTOConverter;
 import org.apache.james.mailbox.MailboxManager;
 import org.apache.james.mailbox.MailboxSession;
 import org.apache.james.mailbox.MailboxSessionUtil;
+import org.apache.james.mailbox.MessageIdManager;
 import org.apache.james.mailbox.MessageManager;
+import org.apache.james.mailbox.MessageUid;
+import org.apache.james.mailbox.elasticsearch.IndexAttachments;
+import org.apache.james.mailbox.elasticsearch.MailboxElasticSearchConstants;
+import org.apache.james.mailbox.elasticsearch.MailboxIdRoutingKeyFactory;
+import org.apache.james.mailbox.elasticsearch.MailboxIndexCreationUtil;
+import org.apache.james.mailbox.elasticsearch.events.ElasticSearchListeningMessageSearchIndex;
+import org.apache.james.mailbox.elasticsearch.json.MessageToElasticSearchJson;
+import org.apache.james.mailbox.elasticsearch.query.CriterionConverter;
+import org.apache.james.mailbox.elasticsearch.query.QueryConverter;
+import org.apache.james.mailbox.elasticsearch.search.ElasticSearchSearcher;
 import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.exception.MailboxExistsException;
 import org.apache.james.mailbox.exception.MailboxNotFoundException;
 import org.apache.james.mailbox.indexer.ReIndexer;
 import org.apache.james.mailbox.inmemory.InMemoryId;
 import org.apache.james.mailbox.inmemory.InMemoryMailboxManager;
+import org.apache.james.mailbox.inmemory.InMemoryMessageId;
 import org.apache.james.mailbox.inmemory.manager.InMemoryIntegrationResources;
 import org.apache.james.mailbox.model.ComposedMessageId;
+import org.apache.james.mailbox.model.FetchGroup;
 import org.apache.james.mailbox.model.Mailbox;
 import org.apache.james.mailbox.model.MailboxId;
 import org.apache.james.mailbox.model.MailboxMetaData;
 import org.apache.james.mailbox.model.MailboxPath;
+import org.apache.james.mailbox.model.MessageResult;
+import org.apache.james.mailbox.model.UpdatedFlags;
 import org.apache.james.mailbox.model.search.MailboxQuery;
 import org.apache.james.mailbox.store.MailboxSessionMapperFactory;
+import org.apache.james.mailbox.store.extractor.DefaultTextExtractor;
 import org.apache.james.mailbox.store.mail.model.MailboxMessage;
+import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder;
+import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
 import org.apache.james.mailbox.store.search.ListeningMessageSearchIndex;
 import org.apache.james.task.Hostname;
 import org.apache.james.task.MemoryTaskManager;
@@ -79,14 +108,18 @@ import org.apache.james.webadmin.utils.JsonTransformer;
 import org.apache.mailbox.tools.indexer.ReIndexerImpl;
 import org.apache.mailbox.tools.indexer.ReIndexerPerformer;
 import org.apache.mailbox.tools.indexer.UserReindexingTask;
+import org.elasticsearch.index.IndexNotFoundException;
 import org.hamcrest.Matchers;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 
 import io.restassured.RestAssured;
@@ -103,18 +136,13 @@ class UserMailboxesRoutesTest {
     
     private WebAdminServer webAdminServer;
     private UsersRepository usersRepository;
-    private ListeningMessageSearchIndex searchIndex;
     private MemoryTaskManager taskManager;
 
-    private void createServer(MailboxManager mailboxManager, MailboxSessionMapperFactory mapperFactory, MailboxId.Factory mailboxIdFactory) throws Exception {
+    private void createServer(MailboxManager mailboxManager, MailboxSessionMapperFactory mapperFactory, MailboxId.Factory mailboxIdFactory, ListeningMessageSearchIndex searchIndex) throws Exception {
         usersRepository = mock(UsersRepository.class);
         when(usersRepository.contains(USERNAME)).thenReturn(true);
 
-
         taskManager = new MemoryTaskManager(new Hostname("foo"));
-        searchIndex = mock(ListeningMessageSearchIndex.class);
-        Mockito.when(searchIndex.add(any(), any(), any())).thenReturn(Mono.empty());
-        Mockito.when(searchIndex.deleteAll(any(), any())).thenReturn(Mono.empty());
         ReIndexerPerformer reIndexerPerformer = new ReIndexerPerformer(
             mailboxManager,
             searchIndex,
@@ -149,7 +177,11 @@ class UserMailboxesRoutesTest {
         @BeforeEach
         void setUp() throws Exception {
             InMemoryMailboxManager mailboxManager = InMemoryIntegrationResources.defaultResources().getMailboxManager();
-            createServer(mailboxManager, mailboxManager.getMapperFactory(), new InMemoryId.Factory());
+            ListeningMessageSearchIndex searchIndex = mock(ListeningMessageSearchIndex.class);
+            Mockito.when(searchIndex.add(any(), any(), any())).thenReturn(Mono.empty());
+            Mockito.when(searchIndex.deleteAll(any(), any())).thenReturn(Mono.empty());
+
+            createServer(mailboxManager, mailboxManager.getMapperFactory(), new InMemoryId.Factory(), searchIndex);
         }
 
         @Test
@@ -857,8 +889,11 @@ class UserMailboxesRoutesTest {
         void setUp() throws Exception {
             mailboxManager = mock(MailboxManager.class);
             when(mailboxManager.createSystemSession(any())).thenReturn(MailboxSessionUtil.create(USERNAME));
+            ListeningMessageSearchIndex searchIndex = mock(ListeningMessageSearchIndex.class);
+            Mockito.when(searchIndex.add(any(), any(), any())).thenReturn(Mono.empty());
+            Mockito.when(searchIndex.deleteAll(any(), any())).thenReturn(Mono.empty());
 
-            createServer(mailboxManager, mock(MailboxSessionMapperFactory.class), new InMemoryId.Factory());
+            createServer(mailboxManager, mock(MailboxSessionMapperFactory.class), new InMemoryId.Factory(), searchIndex);
         }
 
         @Test
@@ -1114,13 +1149,50 @@ class UserMailboxesRoutesTest {
 
     @Nested
     class UserReprocessing {
+        static final int BATCH_SIZE = 1;
+        static final int SEARCH_SIZE = 1;
+
+        @RegisterExtension
+        DockerElasticSearchExtension elasticSearch = new DockerElasticSearchExtension();
 
         private InMemoryMailboxManager mailboxManager;
+        private ListeningMessageSearchIndex searchIndex;
+        MessageIdManager messageIdManager;
 
         @BeforeEach
         void setUp() throws Exception {
-            mailboxManager = InMemoryIntegrationResources.defaultResources().getMailboxManager();
-            createServer(mailboxManager, mailboxManager.getMapperFactory(), new InMemoryId.Factory());
+            ReactorElasticSearchClient client = MailboxIndexCreationUtil.prepareDefaultClient(
+                elasticSearch.getDockerElasticSearch().clientProvider().get(),
+                elasticSearch.getDockerElasticSearch().configuration());
+
+            InMemoryMessageId.Factory messageIdFactory = new InMemoryMessageId.Factory();
+            MailboxIdRoutingKeyFactory routingKeyFactory = new MailboxIdRoutingKeyFactory();
+
+            InMemoryIntegrationResources resources = InMemoryIntegrationResources.builder()
+                .preProvisionnedFakeAuthenticator()
+                .fakeAuthorizator()
+                .inVmEventBus()
+                .defaultAnnotationLimits()
+                .defaultMessageParser()
+                .listeningSearchIndex(preInstanciationStage -> new ElasticSearchListeningMessageSearchIndex(
+                    preInstanciationStage.getMapperFactory(),
+                    new ElasticSearchIndexer(client,
+                        MailboxElasticSearchConstants.DEFAULT_MAILBOX_WRITE_ALIAS,
+                        BATCH_SIZE),
+                    new ElasticSearchSearcher(client, new QueryConverter(new CriterionConverter()), SEARCH_SIZE,
+                        new InMemoryId.Factory(), messageIdFactory,
+                        MailboxElasticSearchConstants.DEFAULT_MAILBOX_READ_ALIAS, routingKeyFactory),
+                    new MessageToElasticSearchJson(new DefaultTextExtractor(), ZoneId.of("Europe/Paris"), IndexAttachments.YES),
+                    preInstanciationStage.getSessionProvider(), routingKeyFactory))
+                .noPreDeletionHooks()
+                .storeQuotaManager()
+                .build();
+
+            mailboxManager = resources.getMailboxManager();
+            messageIdManager = resources.getMessageIdManager();
+            searchIndex = spy((ListeningMessageSearchIndex) resources.getSearchIndex());
+
+            createServer(mailboxManager, mailboxManager.getMapperFactory(), new InMemoryId.Factory(), searchIndex);
         }
 
         @Nested
@@ -1289,6 +1361,171 @@ class UserMailboxesRoutesTest {
                     .body("taskId", Matchers.is(notNullValue()))
                     .body("additionalInformation.mailboxFailures", Matchers.containsInAnyOrder(mailboxId.serialize()));
             }
+
+            @Test
+            void userReprocessingWithCorrectModeShouldReturnTaskDetailsWhenMails() throws Exception {
+                MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
+                MailboxId mailboxId = mailboxManager.createMailbox(INBOX, systemSession).get();
+                Mailbox mailbox = mailboxManager.getMailbox(mailboxId, systemSession).getMailboxEntity();
+
+                ComposedMessageId result = mailboxManager.getMailbox(INBOX, systemSession)
+                    .appendMessage(
+                        MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
+                        systemSession)
+                    .getId();
+                mailboxManager.getMailbox(INBOX, systemSession)
+                    .appendMessage(
+                        MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
+                        systemSession);
+
+                List<MessageResult> messages = messageIdManager.getMessages(ImmutableList.of(result.getMessageId()), FetchGroup.MINIMAL, systemSession);
+
+                Flags newFlags = new Flags(Flags.Flag.DRAFT);
+                UpdatedFlags updatedFlags = UpdatedFlags.builder()
+                    .uid(result.getUid())
+                    .modSeq(messages.get(0).getModSeq())
+                    .oldFlags(new Flags())
+                    .newFlags(newFlags)
+                    .build();
+
+                // We update on the searchIndex level to try to create inconsistencies
+                searchIndex.update(systemSession, mailbox, ImmutableList.of(updatedFlags)).block();
+
+                String taskId = with()
+                    .queryParam("task", "reIndex")
+                    .queryParam("mode", "fixOutdated")
+                    .post()
+                    .jsonPath()
+                    .get("taskId");
+
+                given()
+                    .basePath(TasksRoutes.BASE)
+                .when()
+                    .get(taskId + "/await")
+                .then()
+                    .body("status", Matchers.is("completed"))
+                    .body("taskId", Matchers.is(notNullValue()))
+                    .body("type", Matchers.is(UserReindexingTask.USER_RE_INDEXING.asString()))
+                    .body("additionalInformation.successfullyReprocessedMailCount", Matchers.is(2))
+                    .body("additionalInformation.failedReprocessedMailCount", Matchers.is(0))
+                    .body("startedDate", Matchers.is(notNullValue()))
+                    .body("submitDate", Matchers.is(notNullValue()))
+                    .body("completedDate", Matchers.is(notNullValue()));
+            }
+
+            @Test
+            void userReprocessingWithCorrectModeShouldFixInconsistenciesInES() throws Exception {
+                MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
+                MailboxId mailboxId = mailboxManager.createMailbox(INBOX, systemSession).get();
+                Mailbox mailbox = mailboxManager.getMailbox(mailboxId, systemSession).getMailboxEntity();
+
+                ComposedMessageId result = mailboxManager.getMailbox(INBOX, systemSession)
+                    .appendMessage(
+                        MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
+                        systemSession)
+                    .getId();
+
+                Flags initialFlags = searchIndex.retrieveIndexedFlags(mailbox, result.getUid()).block();
+
+                List<MessageResult> messages = messageIdManager.getMessages(ImmutableList.of(result.getMessageId()), FetchGroup.MINIMAL, systemSession);
+
+                Flags newFlags = new Flags(Flags.Flag.DRAFT);
+                UpdatedFlags updatedFlags = UpdatedFlags.builder()
+                    .uid(result.getUid())
+                    .modSeq(messages.get(0).getModSeq())
+                    .oldFlags(new Flags())
+                    .newFlags(newFlags)
+                    .build();
+
+                // We update on the searchIndex level to try to create inconsistencies
+                searchIndex.update(systemSession, mailbox, ImmutableList.of(updatedFlags)).block();
+
+                String taskId = with()
+                    .queryParam("task", "reIndex")
+                    .queryParam("mode", "fixOutdated")
+                    .post()
+                    .jsonPath()
+                    .get("taskId");
+
+                given()
+                    .basePath(TasksRoutes.BASE)
+                .when()
+                    .get(taskId + "/await");
+
+                assertThat(searchIndex.retrieveIndexedFlags(mailbox, result.getUid()).block())
+                    .isEqualTo(initialFlags);
+            }
+
+            @Test
+            void userReprocessingWithCorrectModeShouldNotChangeDocumentsInESWhenNoInconsistencies() throws Exception {
+                MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
+                MailboxId mailboxId = mailboxManager.createMailbox(INBOX, systemSession).get();
+                Mailbox mailbox = mailboxManager.getMailbox(mailboxId, systemSession).getMailboxEntity();
+
+                ComposedMessageId result = mailboxManager.getMailbox(INBOX, systemSession)
+                    .appendMessage(
+                        MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
+                        systemSession)
+                    .getId();
+
+                Flags initialFlags = searchIndex.retrieveIndexedFlags(mailbox, result.getUid()).block();
+
+                String taskId = with()
+                    .queryParam("task", "reIndex")
+                    .queryParam("mode", "fixOutdated")
+                    .post()
+                    .jsonPath()
+                    .get("taskId");
+
+                given()
+                    .basePath(TasksRoutes.BASE)
+                .when()
+                    .get(taskId + "/await");
+
+                assertThat(searchIndex.retrieveIndexedFlags(mailbox, result.getUid()).block())
+                    .isEqualTo(initialFlags);
+            }
+
+            @Disabled("JAMES-3202 Limitation of the current correct mode reindexation. We only check metadata and fix "
+                + "inconsistencies with ES, but we don't check for inconsistencies from ES to metadata")
+            @Test
+            void userReprocessingWithCorrectModeShouldRemoveOrphanMessagesInES() throws Exception {
+                MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
+                MailboxId mailboxId = mailboxManager.createMailbox(INBOX, systemSession).get();
+                Mailbox mailbox = mailboxManager.getMailbox(mailboxId, systemSession).getMailboxEntity();
+
+                byte[] content = "Simple message content".getBytes(StandardCharsets.UTF_8);
+                MessageUid uid = MessageUid.of(22L);
+
+                SimpleMailboxMessage message = SimpleMailboxMessage.builder()
+                    .messageId(InMemoryMessageId.of(42L))
+                    .uid(uid)
+                    .content(new SharedByteArrayInputStream(content))
+                    .size(content.length)
+                    .internalDate(new Date(ZonedDateTime.parse("2018-02-15T15:54:02Z").toEpochSecond()))
+                    .bodyStartOctet(0)
+                    .flags(new Flags("myFlags"))
+                    .propertyBuilder(new PropertyBuilder())
+                    .mailboxId(mailboxId)
+                    .build();
+
+                searchIndex.add(systemSession, mailbox, message).block();
+
+                String taskId = with()
+                    .queryParam("task", "reIndex")
+                    .queryParam("mode", "fixOutdated")
+                    .post()
+                    .jsonPath()
+                    .get("taskId");
+
+                given()
+                    .basePath(TasksRoutes.BASE)
+                .when()
+                    .get(taskId + "/await");
+
+                assertThatThrownBy(() -> searchIndex.retrieveIndexedFlags(mailbox, uid).block())
+                    .isInstanceOf(IndexNotFoundException.class);
+            }
         }
 
         @Nested


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org