You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/05/13 12:07:11 UTC

[james-project] 02/15: JAMES-3149 Reactify ReIndexing

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 94c5faf60a7ede4c7d26e77039a3c6b45ff8687d
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue May 5 09:06:27 2020 +0700

    JAMES-3149 Reactify ReIndexing
    
    Concurrency model:
     - Sequential processing of mailboxes by using `concatMap`
     - Use `flatMap` to control messages concurrency (constant: 50)
---
 .../cassandra/mail/CassandraMessageMapper.java     |  20 ++-
 .../james/mailbox/jpa/mail/JPAMessageMapper.java   |   8 +-
 .../jpa/mail/TransactionalMessageMapper.java       |   4 +-
 .../inmemory/mail/InMemoryMessageIdMapper.java     |  16 +-
 .../mailbox/store/mail/AbstractMessageMapper.java  |   9 +-
 .../james/mailbox/store/mail/MailboxMapper.java    |   2 +-
 .../james/mailbox/store/mail/MessageMapper.java    |  12 +-
 .../store/search/SimpleMessageSearchIndex.java     |   8 +-
 .../StoreMailboxMessageResultIteratorTest.java     |   6 +-
 .../store/mail/model/MessageMapperTest.java        |   9 +-
 .../tools/indexer/ErrorRecoveryIndexationTask.java |   2 +-
 .../mailbox/tools/indexer/FullReindexingTask.java  |  11 +-
 .../tools/indexer/MessageIdReIndexingTask.java     |   2 +-
 .../mailbox/tools/indexer/ReIndexerPerformer.java  | 194 +++++++++------------
 .../tools/indexer/SingleMailboxReindexingTask.java |   3 +-
 .../tools/indexer/SingleMessageReindexingTask.java |  15 +-
 .../mailbox/tools/indexer/UserReindexingTask.java  |  11 +-
 17 files changed, 167 insertions(+), 165 deletions(-)

diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index cb31434..2e41530 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -109,12 +109,10 @@ public class CassandraMessageMapper implements MessageMapper {
     }
 
     @Override
-    public Iterator<MessageUid> listAllMessageUids(Mailbox mailbox) {
+    public Flux<MessageUid> listAllMessageUids(Mailbox mailbox) {
         CassandraId cassandraId = (CassandraId) mailbox.getMailboxId();
         return messageIdDAO.retrieveMessages(cassandraId, MessageRange.all())
-            .map(metaData -> metaData.getComposedMessageId().getUid())
-            .toIterable()
-            .iterator();
+            .map(metaData -> metaData.getComposedMessageId().getUid());
     }
 
     @Override
@@ -163,14 +161,20 @@ public class CassandraMessageMapper implements MessageMapper {
 
     @Override
     public Iterator<MailboxMessage> findInMailbox(Mailbox mailbox, MessageRange messageRange, FetchType ftype, int max) {
+        return findInMailboxReactive(mailbox, messageRange, ftype, max)
+            .toIterable()
+            .iterator();
+    }
+
+    @Override
+    public Flux<MailboxMessage> findInMailboxReactive(Mailbox mailbox, MessageRange messageRange, FetchType ftype, int limit) {
         CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
-        return Limit.from(max).applyOnFlux(
+
+        return Limit.from(limit).applyOnFlux(
             messageIdDAO.retrieveMessages(mailboxId, messageRange)
                 .flatMap(id -> retrieveMessage(id, ftype), cassandraConfiguration.getMessageReadChunkSize()))
             .map(MailboxMessage.class::cast)
-            .sort(Comparator.comparing(MailboxMessage::getUid))
-            .toIterable()
-            .iterator();
+            .sort(Comparator.comparing(MailboxMessage::getUid));
     }
 
     private Mono<MailboxMessage> retrieveMessage(ComposedMessageIdWithMetaData messageId, FetchType fetchType) {
diff --git a/mailbox/jpa/src/main/java/org/apache/james/mailbox/jpa/mail/JPAMessageMapper.java b/mailbox/jpa/src/main/java/org/apache/james/mailbox/jpa/mail/JPAMessageMapper.java
index 9ef0bbc..c853b03 100644
--- a/mailbox/jpa/src/main/java/org/apache/james/mailbox/jpa/mail/JPAMessageMapper.java
+++ b/mailbox/jpa/src/main/java/org/apache/james/mailbox/jpa/mail/JPAMessageMapper.java
@@ -58,7 +58,8 @@ import org.apache.openjpa.persistence.ArgumentException;
 import com.github.fge.lambdas.Throwing;
 import com.github.steveash.guavate.Guavate;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterators;
+
+import reactor.core.publisher.Flux;
 
 /**
  * JPA implementation of a {@link MessageMapper}. This class is not thread-safe!
@@ -88,8 +89,9 @@ public class JPAMessageMapper extends JPATransactionalMapper implements MessageM
     }
 
     @Override
-    public Iterator<MessageUid> listAllMessageUids(final Mailbox mailbox) throws MailboxException {
-        return Iterators.transform(findInMailbox(mailbox, MessageRange.all(), FetchType.Full, UNLIMITED), MailboxMessage::getUid);
+    public Flux<MessageUid> listAllMessageUids(Mailbox mailbox) {
+        return findInMailboxReactive(mailbox, MessageRange.all(), FetchType.Metadata, UNLIMITED)
+            .map(MailboxMessage::getUid);
     }
 
     @Override
diff --git a/mailbox/jpa/src/test/java/org/apache/james/mailbox/jpa/mail/TransactionalMessageMapper.java b/mailbox/jpa/src/test/java/org/apache/james/mailbox/jpa/mail/TransactionalMessageMapper.java
index c6f8f6d..1e7d09b 100644
--- a/mailbox/jpa/src/test/java/org/apache/james/mailbox/jpa/mail/TransactionalMessageMapper.java
+++ b/mailbox/jpa/src/test/java/org/apache/james/mailbox/jpa/mail/TransactionalMessageMapper.java
@@ -40,6 +40,8 @@ import org.apache.james.mailbox.store.mail.MessageMapper;
 import org.apache.james.mailbox.store.mail.model.MailboxMessage;
 import org.apache.james.mailbox.store.transaction.Mapper;
 
+import reactor.core.publisher.Flux;
+
 public class TransactionalMessageMapper implements MessageMapper {
     private final JPAMessageMapper messageMapper;
 
@@ -62,7 +64,7 @@ public class TransactionalMessageMapper implements MessageMapper {
     }
 
     @Override
-    public Iterator<MessageUid> listAllMessageUids(Mailbox mailbox) throws MailboxException {
+    public Flux<MessageUid> listAllMessageUids(Mailbox mailbox) {
         return messageMapper.listAllMessageUids(mailbox);
     }
 
diff --git a/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java b/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java
index 5e18dad..4c8e0af 100644
--- a/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java
+++ b/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java
@@ -40,13 +40,14 @@ import org.apache.james.mailbox.store.mail.MailboxMapper;
 import org.apache.james.mailbox.store.mail.MessageIdMapper;
 import org.apache.james.mailbox.store.mail.MessageMapper;
 import org.apache.james.mailbox.store.mail.model.MailboxMessage;
-import org.apache.james.util.streams.Iterators;
 
 import com.github.fge.lambdas.Throwing;
 import com.github.steveash.guavate.Guavate;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Multimap;
 
+import reactor.core.publisher.Flux;
+
 public class InMemoryMessageIdMapper implements MessageIdMapper {
     private final MailboxMapper mailboxMapper;
     private final InMemoryMessageMapper messageMapper;
@@ -58,16 +59,19 @@ public class InMemoryMessageIdMapper implements MessageIdMapper {
 
     @Override
     public List<MailboxMessage> find(Collection<MessageId> messageIds, MessageMapper.FetchType fetchType) {
-        return mailboxMapper.list()
-            .flatMap(Throwing.function(mailbox ->
-                Iterators.toFlux(
-                    messageMapper.findInMailbox(mailbox, MessageRange.all(), fetchType, UNLIMITED))))
-            .filter(message -> messageIds.contains(message.getMessageId()))
+        return findReactive(messageIds, fetchType)
             .collect(Guavate.toImmutableList())
             .block();
     }
 
     @Override
+    public Flux<MailboxMessage> findReactive(Collection<MessageId> messageIds, MessageMapper.FetchType fetchType) {
+        return mailboxMapper.list()
+            .flatMap(mailbox -> messageMapper.findInMailboxReactive(mailbox, MessageRange.all(), fetchType, UNLIMITED))
+            .filter(message -> messageIds.contains(message.getMessageId()));
+    }
+
+    @Override
     public List<MailboxId> findMailboxes(MessageId messageId) {
         return find(ImmutableList.of(messageId), MessageMapper.FetchType.Metadata)
             .stream()
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/AbstractMessageMapper.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/AbstractMessageMapper.java
index 5a993d4..9fd5cf3 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/AbstractMessageMapper.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/AbstractMessageMapper.java
@@ -39,7 +39,8 @@ import org.apache.james.mailbox.store.mail.model.MailboxMessage;
 import org.apache.james.mailbox.store.transaction.TransactionalMapper;
 
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterators;
+
+import reactor.core.publisher.Flux;
 
 /**
  * Abstract base class for {@link MessageMapper} implementation
@@ -149,8 +150,8 @@ public abstract class AbstractMessageMapper extends TransactionalMapper implemen
     protected abstract MessageMetaData copy(Mailbox mailbox, MessageUid uid, ModSeq modSeq, MailboxMessage original) throws MailboxException;
 
     @Override
-    public Iterator<MessageUid> listAllMessageUids(Mailbox mailbox) throws MailboxException {
-        return Iterators.transform(findInMailbox(mailbox, MessageRange.all(), FetchType.Metadata, UNLIMITED),
-            MailboxMessage::getUid);
+    public Flux<MessageUid> listAllMessageUids(Mailbox mailbox) {
+        return findInMailboxReactive(mailbox, MessageRange.all(), FetchType.Metadata, UNLIMITED)
+            .map(MailboxMessage::getUid);
     }
 }
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MailboxMapper.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MailboxMapper.java
index c5aaf7e..059356e 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MailboxMapper.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MailboxMapper.java
@@ -138,5 +138,5 @@ public interface MailboxMapper extends Mapper {
     /**
      * Return a unmodifable {@link List} of all {@link Mailbox}
      */
-    Flux<Mailbox> list() throws MailboxException;
+    Flux<Mailbox> list();
 }
\ No newline at end of file
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageMapper.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageMapper.java
index d7c8428..2c79390 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageMapper.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageMapper.java
@@ -37,7 +37,9 @@ import org.apache.james.mailbox.store.FlagsUpdateCalculator;
 import org.apache.james.mailbox.store.mail.model.MailboxMessage;
 import org.apache.james.mailbox.store.mail.model.Property;
 import org.apache.james.mailbox.store.transaction.Mapper;
+import org.apache.james.util.streams.Iterators;
 
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 /**
@@ -58,6 +60,14 @@ public interface MessageMapper extends Mapper {
     Iterator<MailboxMessage> findInMailbox(Mailbox mailbox, MessageRange set, FetchType type, int limit)
             throws MailboxException;
 
+    default Flux<MailboxMessage> findInMailboxReactive(Mailbox mailbox, MessageRange set, FetchType type, int limit) {
+        try {
+            return Iterators.toFlux(findInMailbox(mailbox, set, type, limit));
+        } catch (MailboxException e) {
+            return Flux.error(e);
+        }
+    }
+
     /**
      * Returns a list of {@link MessageUid} which are marked as deleted
      */
@@ -147,7 +157,7 @@ public interface MessageMapper extends Mapper {
     /**
      * Return a list containing all MessageUid of Messages that belongs to given {@link Mailbox}
      */
-    Iterator<MessageUid> listAllMessageUids(Mailbox mailbox) throws MailboxException;
+    Flux<MessageUid> listAllMessageUids(Mailbox mailbox);
 
     /**
      * Specify what data needs to get filled in a {@link MailboxMessage} before returning it
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/SimpleMessageSearchIndex.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/SimpleMessageSearchIndex.java
index b33f980..3560b96 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/SimpleMessageSearchIndex.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/SimpleMessageSearchIndex.java
@@ -58,6 +58,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 
 import reactor.core.publisher.Flux;
+import reactor.core.scheduler.Schedulers;
 
 /**
  * {@link MessageSearchIndex} which just fetch {@link MailboxMessage}'s from the {@link MessageMapper} and use {@link MessageSearcher}
@@ -155,8 +156,9 @@ public class SimpleMessageSearchIndex implements MessageSearchIndex {
         return getAsMessageIds(searchResults(session, filteredMailboxes, searchQuery), limit);
     }
 
-    private Flux<SearchResult> searchResults(MailboxSession session, Flux<Mailbox> mailboxes, SearchQuery query) throws MailboxException {
-        return mailboxes.concatMap(mailbox -> Flux.fromStream(getSearchResultStream(session, query, mailbox)));
+    private Flux<? extends SearchResult> searchResults(MailboxSession session, Flux<Mailbox> mailboxes, SearchQuery query) throws MailboxException {
+        return mailboxes.concatMap(mailbox -> Flux.fromStream(getSearchResultStream(session, query, mailbox)))
+            .subscribeOn(Schedulers.elastic());
     }
 
     private Stream<? extends SearchResult> getSearchResultStream(MailboxSession session, SearchQuery query, Mailbox mailbox) {
@@ -167,7 +169,7 @@ public class SimpleMessageSearchIndex implements MessageSearchIndex {
         }
     }
 
-    private Flux<MessageId> getAsMessageIds(Flux<SearchResult> temp, long limit) {
+    private Flux<MessageId> getAsMessageIds(Flux<? extends SearchResult> temp, long limit) {
         return temp.map(searchResult -> searchResult.getMessageId().get())
             .filter(SearchUtil.distinct())
             .take(Long.valueOf(limit).intValue());
diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/StoreMailboxMessageResultIteratorTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/StoreMailboxMessageResultIteratorTest.java
index 9f56121..c20c807 100644
--- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/StoreMailboxMessageResultIteratorTest.java
+++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/StoreMailboxMessageResultIteratorTest.java
@@ -51,6 +51,8 @@ import org.junit.jupiter.api.Test;
 
 import com.google.common.collect.Iterables;
 
+import reactor.core.publisher.Flux;
+
 class StoreMailboxMessageResultIteratorTest {
 
     private final class TestMessageMapper implements MessageMapper {
@@ -61,8 +63,8 @@ class StoreMailboxMessageResultIteratorTest {
         }
 
         @Override
-        public Iterator<MessageUid> listAllMessageUids(Mailbox mailbox) throws MailboxException {
-            return messageRange.iterator();
+        public Flux<MessageUid> listAllMessageUids(Mailbox mailbox) {
+            return Flux.fromIterable(messageRange);
         }
 
         @Override
diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageMapperTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageMapperTest.java
index 66ee7bf..99c7186 100644
--- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageMapperTest.java
+++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageMapperTest.java
@@ -1134,8 +1134,7 @@ public abstract class MessageMapperTest {
     void getUidsShouldReturnUidsOfMessagesInTheMailbox() throws Exception {
         saveMessages();
 
-        assertThat(messageMapper.listAllMessageUids(benwaInboxMailbox))
-            .toIterable()
+        assertThat(messageMapper.listAllMessageUids(benwaInboxMailbox).collectList().block())
             .containsOnly(message1.getUid(),
                 message2.getUid(),
                 message3.getUid(),
@@ -1149,8 +1148,7 @@ public abstract class MessageMapperTest {
 
         messageMapper.deleteMessages(benwaInboxMailbox, ImmutableList.of(message2.getUid(), message3.getUid()));
 
-        assertThat(messageMapper.listAllMessageUids(benwaInboxMailbox))
-            .toIterable()
+        assertThat(messageMapper.listAllMessageUids(benwaInboxMailbox).collectList().block())
             .containsOnly(message1.getUid(),
                 message4.getUid(),
                 message5.getUid());
@@ -1166,8 +1164,7 @@ public abstract class MessageMapperTest {
         List<MessageUid> uids = messageMapper.retrieveMessagesMarkedForDeletion(benwaInboxMailbox, MessageRange.all());
         messageMapper.deleteMessages(benwaInboxMailbox, uids);
 
-        assertThat(messageMapper.listAllMessageUids(benwaInboxMailbox))
-            .toIterable()
+        assertThat(messageMapper.listAllMessageUids(benwaInboxMailbox).collectList().block())
             .containsOnly(message1.getUid(), message5.getUid());
     }
 
diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ErrorRecoveryIndexationTask.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ErrorRecoveryIndexationTask.java
index 73259cb..716f578 100644
--- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ErrorRecoveryIndexationTask.java
+++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ErrorRecoveryIndexationTask.java
@@ -73,7 +73,7 @@ public class ErrorRecoveryIndexationTask implements Task {
 
     @Override
     public Result run() {
-        return reIndexerPerformer.reIndex(reprocessingContext, previousFailures);
+        return reIndexerPerformer.reIndex(reprocessingContext, previousFailures).block();
     }
 
     @Override
diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/FullReindexingTask.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/FullReindexingTask.java
index 39aa268..0e99cdb 100644
--- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/FullReindexingTask.java
+++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/FullReindexingTask.java
@@ -24,7 +24,6 @@ import java.util.Optional;
 import javax.inject.Inject;
 
 import org.apache.james.json.DTOModule;
-import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.server.task.json.dto.TaskDTO;
 import org.apache.james.server.task.json.dto.TaskDTOModule;
 import org.apache.james.task.Task;
@@ -33,6 +32,8 @@ import org.apache.james.task.TaskType;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 
+import reactor.core.publisher.Mono;
+
 public class FullReindexingTask implements Task {
 
     public static final TaskType FULL_RE_INDEXING = TaskType.of("full-reindexing");
@@ -73,11 +74,9 @@ public class FullReindexingTask implements Task {
 
     @Override
     public Result run() {
-        try {
-            return reIndexerPerformer.reIndex(reprocessingContext);
-        } catch (MailboxException e) {
-            return Result.PARTIAL;
-        }
+        return reIndexerPerformer.reIndex(reprocessingContext)
+            .onErrorResume(e -> Mono.just(Result.PARTIAL))
+            .block();
     }
 
     @Override
diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/MessageIdReIndexingTask.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/MessageIdReIndexingTask.java
index 369b7c4..191d0ab 100644
--- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/MessageIdReIndexingTask.java
+++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/MessageIdReIndexingTask.java
@@ -78,7 +78,7 @@ public class MessageIdReIndexingTask implements Task {
 
     @Override
     public Result run() {
-        return reIndexerPerformer.handleMessageIdReindexing(messageId);
+        return reIndexerPerformer.handleMessageIdReindexing(messageId).block();
     }
 
     @Override
diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
index e5640b5..8efcb38 100644
--- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
+++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
@@ -19,16 +19,12 @@
 
 package org.apache.mailbox.tools.indexer;
 
-import java.util.Optional;
-import java.util.stream.Stream;
-
 import javax.inject.Inject;
 
 import org.apache.james.core.Username;
 import org.apache.james.mailbox.MailboxManager;
 import org.apache.james.mailbox.MailboxSession;
 import org.apache.james.mailbox.MessageUid;
-import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.indexer.ReIndexingExecutionFailures;
 import org.apache.james.mailbox.model.Mailbox;
 import org.apache.james.mailbox.model.MailboxId;
@@ -37,24 +33,28 @@ import org.apache.james.mailbox.model.MessageId;
 import org.apache.james.mailbox.model.MessageRange;
 import org.apache.james.mailbox.model.search.MailboxQuery;
 import org.apache.james.mailbox.store.MailboxSessionMapperFactory;
-import org.apache.james.mailbox.store.mail.MailboxMapper;
 import org.apache.james.mailbox.store.mail.MessageMapper;
 import org.apache.james.mailbox.store.mail.model.MailboxMessage;
 import org.apache.james.mailbox.store.search.ListeningMessageSearchIndex;
 import org.apache.james.task.Task;
-import org.apache.james.util.streams.Iterators;
+import org.apache.james.task.Task.Result;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.github.fge.lambdas.Throwing;
 import com.google.common.collect.ImmutableList;
 
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
 public class ReIndexerPerformer {
     private static final Logger LOGGER = LoggerFactory.getLogger(ReIndexerPerformer.class);
 
     private static final int SINGLE_MESSAGE = 1;
+    private static final int MESSAGE_CONCURRENCY = 50;
     private static final String RE_INDEXING = "re-indexing";
     private static final Username RE_INDEXER_PERFORMER_USER = Username.of(RE_INDEXING);
+    public static final int NO_CONCURRENCY = 1;
+    public static final int NO_PREFETCH = 1;
 
     private final MailboxManager mailboxManager;
     private final ListeningMessageSearchIndex messageSearchIndex;
@@ -69,138 +69,116 @@ public class ReIndexerPerformer {
         this.mailboxSessionMapperFactory = mailboxSessionMapperFactory;
     }
 
-    Task.Result reIndex(MailboxId mailboxId, ReprocessingContext reprocessingContext) throws Exception {
-        LOGGER.info("Intend to reindex mailbox with mailboxId {}", mailboxId.serialize());
+    Mono<Result> reIndex(MailboxId mailboxId, ReprocessingContext reprocessingContext) {
         MailboxSession mailboxSession = mailboxManager.createSystemSession(RE_INDEXER_PERFORMER_USER);
-        Mailbox mailbox = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession).findMailboxById(mailboxId);
-        messageSearchIndex.deleteAll(mailboxSession, mailboxId).block();
-        try {
-            return Iterators.toStream(
-                mailboxSessionMapperFactory.getMessageMapper(mailboxSession)
-                    .listAllMessageUids(mailbox))
-                .map(uid -> handleMessageReIndexing(mailboxSession, mailbox, uid, reprocessingContext))
+
+        return mailboxSessionMapperFactory.getMailboxMapper(mailboxSession)
+            .findMailboxByIdReactive(mailboxId)
+            .flatMap(mailbox -> reIndex(reprocessingContext, mailboxSession, mailbox));
+    }
+
+    private Mono<Result> reIndex(ReprocessingContext reprocessingContext, MailboxSession mailboxSession, Mailbox mailbox) {
+        LOGGER.info("Attempt to reindex mailbox with mailboxId {}", mailbox.getMailboxId().serialize());
+        return messageSearchIndex.deleteAll(mailboxSession, mailbox.getMailboxId())
+            .then(mailboxSessionMapperFactory.getMessageMapper(mailboxSession)
+                .listAllMessageUids(mailbox)
+                .flatMap(uid -> handleMessageReIndexing(mailboxSession, mailbox, uid, reprocessingContext), MESSAGE_CONCURRENCY)
                 .reduce(Task::combine)
-                .orElse(Task.Result.COMPLETED);
-        } finally {
-            LOGGER.info("Finish to reindex mailbox with mailboxId {}", mailboxId.serialize());
-        }
+                .switchIfEmpty(Mono.just(Result.COMPLETED))
+                .doFinally(any -> LOGGER.info("Finish to reindex mailbox with mailboxId {}", mailbox.getMailboxId().serialize())));
     }
 
-    Task.Result reIndex(ReprocessingContext reprocessingContext, ReIndexingExecutionFailures previousReIndexingFailures) {
-        return previousReIndexingFailures.failures()
-            .stream()
-            .map(previousFailure -> reIndex(reprocessingContext, previousFailure))
+    Mono<Result> reIndex(ReprocessingContext reprocessingContext, ReIndexingExecutionFailures previousReIndexingFailures) {
+        return Flux.fromIterable(previousReIndexingFailures.failures())
+            .flatMap(previousFailure -> reIndex(reprocessingContext, previousFailure), MESSAGE_CONCURRENCY)
             .reduce(Task::combine)
-            .orElse(Task.Result.COMPLETED);
+            .switchIfEmpty(Mono.just(Result.COMPLETED));
     }
 
-    private Task.Result reIndex(ReprocessingContext reprocessingContext, ReIndexingExecutionFailures.ReIndexingFailure previousReIndexingFailure) {
+    private Mono<Result> reIndex(ReprocessingContext reprocessingContext, ReIndexingExecutionFailures.ReIndexingFailure previousReIndexingFailure) {
         MailboxId mailboxId = previousReIndexingFailure.getMailboxId();
         MessageUid uid = previousReIndexingFailure.getUid();
-        try {
-            return handleMessageReIndexing(mailboxId, uid, reprocessingContext);
-        } catch (MailboxException e) {
-            LOGGER.warn("ReIndexing failed for {} {}", mailboxId, uid, e);
-            reprocessingContext.recordFailureDetailsForMessage(mailboxId, uid);
-            return Task.Result.PARTIAL;
-        }
+
+        return handleMessageReIndexing(mailboxId, uid, reprocessingContext)
+            .onErrorResume(e -> {
+                LOGGER.warn("ReIndexing failed for {} {}", mailboxId, uid, e);
+                reprocessingContext.recordFailureDetailsForMessage(mailboxId, uid);
+                return Mono.just(Result.PARTIAL);
+            });
     }
 
-    Task.Result reIndex(ReprocessingContext reprocessingContext) throws MailboxException {
+    Mono<Result> reIndex(ReprocessingContext reprocessingContext) {
         MailboxSession mailboxSession = mailboxManager.createSystemSession(RE_INDEXER_PERFORMER_USER);
         LOGGER.info("Starting a full reindex");
-        Stream<MailboxId> mailboxIds = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession).list()
-            .map(Mailbox::getMailboxId)
-            .toStream();
-
-        try {
-            return reIndex(mailboxIds, reprocessingContext);
-        } finally {
-            LOGGER.info("Full reindex finished");
-        }
+        return mailboxSessionMapperFactory.getMailboxMapper(mailboxSession).list()
+            .flatMap(mailbox -> reIndex(reprocessingContext, mailboxSession, mailbox), NO_CONCURRENCY, NO_PREFETCH)
+            .reduce(Task::combine)
+            .switchIfEmpty(Mono.just(Result.COMPLETED))
+            .doFinally(any -> LOGGER.info("Full reindex finished"));
     }
 
-    Task.Result reIndex(Username username, ReprocessingContext reprocessingContext) throws MailboxException {
+    Mono<Result> reIndex(Username username, ReprocessingContext reprocessingContext) {
         MailboxSession mailboxSession = mailboxManager.createSystemSession(username);
         LOGGER.info("Starting a reindex for user {}", username.asString());
 
-        Stream<MailboxId> mailboxIds = mailboxManager.search(MailboxQuery.privateMailboxesBuilder(mailboxSession).build(), mailboxSession)
-            .stream()
-            .map(MailboxMetaData::getId);
+        MailboxQuery mailboxQuery = MailboxQuery.privateMailboxesBuilder(mailboxSession).build();
 
-        try {
-            return reIndex(mailboxIds, reprocessingContext);
-        } finally {
-            LOGGER.info("User {} reindex finished", username.asString());
-        }
+        return mailboxManager.searchReactive(mailboxQuery, mailboxSession)
+            .map(MailboxMetaData::getId)
+            .flatMap(id -> reIndex(id, reprocessingContext), NO_CONCURRENCY, NO_PREFETCH)
+            .reduce(Task::combine)
+            .switchIfEmpty(Mono.just(Result.COMPLETED))
+            .doFinally(any -> LOGGER.info("User {} reindex finished", username.asString()));
     }
 
-    Task.Result handleMessageReIndexing(MailboxId mailboxId, MessageUid uid, ReprocessingContext reprocessingContext) throws MailboxException {
+    Mono<Result> handleMessageReIndexing(MailboxId mailboxId, MessageUid uid, ReprocessingContext reprocessingContext) {
         MailboxSession mailboxSession = mailboxManager.createSystemSession(RE_INDEXER_PERFORMER_USER);
 
-        Mailbox mailbox = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession).findMailboxById(mailboxId);
-        return handleMessageReIndexing(mailboxSession, mailbox, uid, reprocessingContext);
+        return mailboxSessionMapperFactory.getMailboxMapper(mailboxSession)
+            .findMailboxByIdReactive(mailboxId)
+            .flatMap(mailbox -> handleMessageReIndexing(mailboxSession, mailbox, uid, reprocessingContext));
     }
 
-    Task.Result handleMessageIdReindexing(MessageId messageId) {
-        try {
-            MailboxSession session = mailboxManager.createSystemSession(RE_INDEXER_PERFORMER_USER);
-
-            return mailboxSessionMapperFactory.getMessageIdMapper(session)
-                .find(ImmutableList.of(messageId), MessageMapper.FetchType.Full)
-                .stream()
-                .map(mailboxMessage -> reIndex(mailboxMessage, session))
-                .reduce(Task::combine)
-                .orElse(Task.Result.COMPLETED);
-        } catch (Exception e) {
-            LOGGER.warn("Failed to re-index {}", messageId, e);
-            return Task.Result.PARTIAL;
-        }
-    }
+    Mono<Result> handleMessageIdReindexing(MessageId messageId) {
+        MailboxSession session = mailboxManager.createSystemSession(RE_INDEXER_PERFORMER_USER);
 
-    private Task.Result reIndex(MailboxMessage mailboxMessage, MailboxSession session) {
-        try {
-            MailboxMapper mailboxMapper = mailboxSessionMapperFactory.getMailboxMapper(session);
-            Mailbox mailbox = mailboxMapper.findMailboxById(mailboxMessage.getMailboxId());
-            messageSearchIndex.add(session, mailbox, mailboxMessage).block();
-            return Task.Result.COMPLETED;
-        } catch (Exception e) {
-            LOGGER.warn("Failed to re-index {} in {}", mailboxMessage.getUid(), mailboxMessage.getMailboxId(), e);
-            return Task.Result.PARTIAL;
-        }
+        return mailboxSessionMapperFactory.getMessageIdMapper(session)
+            .findReactive(ImmutableList.of(messageId), MessageMapper.FetchType.Full)
+            .flatMap(mailboxMessage -> reIndex(mailboxMessage, session))
+            .reduce(Task::combine)
+            .switchIfEmpty(Mono.just(Result.COMPLETED))
+            .onErrorResume(e -> {
+                LOGGER.warn("Failed to re-index {}", messageId, e);
+                return Mono.just(Result.PARTIAL);
+            });
     }
 
-    private Task.Result reIndex(Stream<MailboxId> mailboxIds, ReprocessingContext reprocessingContext) {
-        return mailboxIds
-            .map(mailboxId -> {
-                try {
-                    return reIndex(mailboxId, reprocessingContext);
-                } catch (Throwable e) {
-                    LOGGER.error("Error while proceeding to full reindexing on mailbox with mailboxId {}", mailboxId.serialize(), e);
-                    return Task.Result.PARTIAL;
-                }
-            })
-            .reduce(Task::combine)
-            .orElse(Task.Result.COMPLETED);
+    private Mono<Result> reIndex(MailboxMessage mailboxMessage, MailboxSession session) {
+        return mailboxSessionMapperFactory.getMailboxMapper(session)
+            .findMailboxByIdReactive(mailboxMessage.getMailboxId())
+            .flatMap(mailbox -> messageSearchIndex.add(session, mailbox, mailboxMessage))
+            .thenReturn(Result.COMPLETED)
+            .onErrorResume(e -> {
+                LOGGER.warn("Failed to re-index {} in {}", mailboxMessage.getUid(), mailboxMessage.getMailboxId(), e);
+                return Mono.just(Result.PARTIAL);
+            });
     }
 
-    private Task.Result handleMessageReIndexing(MailboxSession mailboxSession, Mailbox mailbox, MessageUid uid, ReprocessingContext reprocessingContext) {
-        try {
-            Optional.of(uid)
-                .flatMap(Throwing.function(mUid -> fullyReadMessage(mailboxSession, mailbox, mUid)))
-                .ifPresent(message -> messageSearchIndex.add(mailboxSession, mailbox, message).block());
-            reprocessingContext.recordSuccess();
-            return Task.Result.COMPLETED;
-        } catch (Exception e) {
-            LOGGER.warn("ReIndexing failed for {} {}", mailbox.generateAssociatedPath(), uid, e);
-            reprocessingContext.recordFailureDetailsForMessage(mailbox.getMailboxId(), uid);
-            return Task.Result.PARTIAL;
-        }
+    private Mono<Result> handleMessageReIndexing(MailboxSession mailboxSession, Mailbox mailbox, MessageUid uid, ReprocessingContext reprocessingContext) {
+        return fullyReadMessage(mailboxSession, mailbox, uid)
+            .flatMap(message -> messageSearchIndex.add(mailboxSession, mailbox, message))
+            .thenReturn(Result.COMPLETED)
+            .doOnNext(any -> reprocessingContext.recordSuccess())
+            .onErrorResume(e -> {
+                LOGGER.warn("ReIndexing failed for {} {}", mailbox.generateAssociatedPath(), uid, e);
+                reprocessingContext.recordFailureDetailsForMessage(mailbox.getMailboxId(), uid);
+                return Mono.just(Result.PARTIAL);
+            });
     }
 
-    private Optional<MailboxMessage> fullyReadMessage(MailboxSession mailboxSession, Mailbox mailbox, MessageUid mUid) throws MailboxException {
-        return Iterators.toStream(mailboxSessionMapperFactory.getMessageMapper(mailboxSession)
-            .findInMailbox(mailbox, MessageRange.one(mUid), MessageMapper.FetchType.Full, SINGLE_MESSAGE))
-            .findFirst();
+    private Mono<MailboxMessage> fullyReadMessage(MailboxSession mailboxSession, Mailbox mailbox, MessageUid mUid) {
+        return mailboxSessionMapperFactory.getMessageMapper(mailboxSession)
+            .findInMailboxReactive(mailbox, MessageRange.one(mUid), MessageMapper.FetchType.Full, SINGLE_MESSAGE)
+            .next();
     }
 }
diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/SingleMailboxReindexingTask.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/SingleMailboxReindexingTask.java
index 737be90..0e9580d 100644
--- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/SingleMailboxReindexingTask.java
+++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/SingleMailboxReindexingTask.java
@@ -79,7 +79,8 @@ public class SingleMailboxReindexingTask implements Task {
     @Override
     public Result run() {
         try {
-            return reIndexerPerformer.reIndex(mailboxId, reprocessingContext);
+            return reIndexerPerformer.reIndex(mailboxId, reprocessingContext)
+                .block();
         } catch (Exception e) {
             return Result.PARTIAL;
         }
diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/SingleMessageReindexingTask.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/SingleMessageReindexingTask.java
index 77ea683..08c014b 100644
--- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/SingleMessageReindexingTask.java
+++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/SingleMessageReindexingTask.java
@@ -26,7 +26,6 @@ import java.util.Optional;
 import javax.inject.Inject;
 
 import org.apache.james.mailbox.MessageUid;
-import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.model.MailboxId;
 import org.apache.james.task.Task;
 import org.apache.james.task.TaskExecutionDetails;
@@ -34,6 +33,8 @@ import org.apache.james.task.TaskType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import reactor.core.publisher.Mono;
+
 public class SingleMessageReindexingTask implements Task {
     private static final Logger LOGGER = LoggerFactory.getLogger(SingleMessageReindexingTask.class);
 
@@ -95,12 +96,12 @@ public class SingleMessageReindexingTask implements Task {
 
     @Override
     public Result run() {
-        try {
-            return reIndexerPerformer.handleMessageReIndexing(mailboxId, uid, new ReprocessingContext());
-        } catch (MailboxException e) {
-            LOGGER.warn("Error encounteres while reindexing {} : {}", mailboxId, uid, e);
-            return Result.PARTIAL;
-        }
+        return reIndexerPerformer.handleMessageReIndexing(mailboxId, uid, new ReprocessingContext())
+            .onErrorResume(e -> {
+                LOGGER.warn("Error encountered while reindexing {} : {}", mailboxId, uid, e);
+                return Mono.just(Result.PARTIAL);
+            })
+            .block();
     }
 
     MailboxId getMailboxId() {
diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/UserReindexingTask.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/UserReindexingTask.java
index 9a7a863..d1ff42b 100644
--- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/UserReindexingTask.java
+++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/UserReindexingTask.java
@@ -26,12 +26,13 @@ import java.util.Optional;
 import javax.inject.Inject;
 
 import org.apache.james.core.Username;
-import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.indexer.ReIndexingExecutionFailures;
 import org.apache.james.task.Task;
 import org.apache.james.task.TaskExecutionDetails;
 import org.apache.james.task.TaskType;
 
+import reactor.core.publisher.Mono;
+
 public class UserReindexingTask implements Task {
 
     public static final TaskType USER_RE_INDEXING = TaskType.of("user-reindexing");
@@ -78,11 +79,9 @@ public class UserReindexingTask implements Task {
 
     @Override
     public Result run() {
-        try {
-            return reIndexerPerformer.reIndex(username, reprocessingContext);
-        } catch (MailboxException e) {
-            return Result.PARTIAL;
-        }
+        return reIndexerPerformer.reIndex(username, reprocessingContext)
+            .onErrorResume(e -> Mono.just(Result.PARTIAL))
+            .block();
     }
 
     public Username getUsername() {


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org