You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/05/13 12:07:11 UTC
[james-project] 02/15: JAMES-3149 Reactify ReIndexing
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 94c5faf60a7ede4c7d26e77039a3c6b45ff8687d
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue May 5 09:06:27 2020 +0700
JAMES-3149 Reactify ReIndexing
Concurrency model:
- Sequential processing of mailboxes by using `concatMap`
- Use `flatMap` to control messages concurrency (constant: 50)
---
.../cassandra/mail/CassandraMessageMapper.java | 20 ++-
.../james/mailbox/jpa/mail/JPAMessageMapper.java | 8 +-
.../jpa/mail/TransactionalMessageMapper.java | 4 +-
.../inmemory/mail/InMemoryMessageIdMapper.java | 16 +-
.../mailbox/store/mail/AbstractMessageMapper.java | 9 +-
.../james/mailbox/store/mail/MailboxMapper.java | 2 +-
.../james/mailbox/store/mail/MessageMapper.java | 12 +-
.../store/search/SimpleMessageSearchIndex.java | 8 +-
.../StoreMailboxMessageResultIteratorTest.java | 6 +-
.../store/mail/model/MessageMapperTest.java | 9 +-
.../tools/indexer/ErrorRecoveryIndexationTask.java | 2 +-
.../mailbox/tools/indexer/FullReindexingTask.java | 11 +-
.../tools/indexer/MessageIdReIndexingTask.java | 2 +-
.../mailbox/tools/indexer/ReIndexerPerformer.java | 194 +++++++++------------
.../tools/indexer/SingleMailboxReindexingTask.java | 3 +-
.../tools/indexer/SingleMessageReindexingTask.java | 15 +-
.../mailbox/tools/indexer/UserReindexingTask.java | 11 +-
17 files changed, 167 insertions(+), 165 deletions(-)
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index cb31434..2e41530 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -109,12 +109,10 @@ public class CassandraMessageMapper implements MessageMapper {
}
@Override
- public Iterator<MessageUid> listAllMessageUids(Mailbox mailbox) {
+ public Flux<MessageUid> listAllMessageUids(Mailbox mailbox) {
CassandraId cassandraId = (CassandraId) mailbox.getMailboxId();
return messageIdDAO.retrieveMessages(cassandraId, MessageRange.all())
- .map(metaData -> metaData.getComposedMessageId().getUid())
- .toIterable()
- .iterator();
+ .map(metaData -> metaData.getComposedMessageId().getUid());
}
@Override
@@ -163,14 +161,20 @@ public class CassandraMessageMapper implements MessageMapper {
@Override
public Iterator<MailboxMessage> findInMailbox(Mailbox mailbox, MessageRange messageRange, FetchType ftype, int max) {
+ return findInMailboxReactive(mailbox, messageRange, ftype, max)
+ .toIterable()
+ .iterator();
+ }
+
+ @Override
+ public Flux<MailboxMessage> findInMailboxReactive(Mailbox mailbox, MessageRange messageRange, FetchType ftype, int limit) {
CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
- return Limit.from(max).applyOnFlux(
+
+ return Limit.from(limit).applyOnFlux(
messageIdDAO.retrieveMessages(mailboxId, messageRange)
.flatMap(id -> retrieveMessage(id, ftype), cassandraConfiguration.getMessageReadChunkSize()))
.map(MailboxMessage.class::cast)
- .sort(Comparator.comparing(MailboxMessage::getUid))
- .toIterable()
- .iterator();
+ .sort(Comparator.comparing(MailboxMessage::getUid));
}
private Mono<MailboxMessage> retrieveMessage(ComposedMessageIdWithMetaData messageId, FetchType fetchType) {
diff --git a/mailbox/jpa/src/main/java/org/apache/james/mailbox/jpa/mail/JPAMessageMapper.java b/mailbox/jpa/src/main/java/org/apache/james/mailbox/jpa/mail/JPAMessageMapper.java
index 9ef0bbc..c853b03 100644
--- a/mailbox/jpa/src/main/java/org/apache/james/mailbox/jpa/mail/JPAMessageMapper.java
+++ b/mailbox/jpa/src/main/java/org/apache/james/mailbox/jpa/mail/JPAMessageMapper.java
@@ -58,7 +58,8 @@ import org.apache.openjpa.persistence.ArgumentException;
import com.github.fge.lambdas.Throwing;
import com.github.steveash.guavate.Guavate;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterators;
+
+import reactor.core.publisher.Flux;
/**
* JPA implementation of a {@link MessageMapper}. This class is not thread-safe!
@@ -88,8 +89,9 @@ public class JPAMessageMapper extends JPATransactionalMapper implements MessageM
}
@Override
- public Iterator<MessageUid> listAllMessageUids(final Mailbox mailbox) throws MailboxException {
- return Iterators.transform(findInMailbox(mailbox, MessageRange.all(), FetchType.Full, UNLIMITED), MailboxMessage::getUid);
+ public Flux<MessageUid> listAllMessageUids(Mailbox mailbox) {
+ return findInMailboxReactive(mailbox, MessageRange.all(), FetchType.Metadata, UNLIMITED)
+ .map(MailboxMessage::getUid);
}
@Override
diff --git a/mailbox/jpa/src/test/java/org/apache/james/mailbox/jpa/mail/TransactionalMessageMapper.java b/mailbox/jpa/src/test/java/org/apache/james/mailbox/jpa/mail/TransactionalMessageMapper.java
index c6f8f6d..1e7d09b 100644
--- a/mailbox/jpa/src/test/java/org/apache/james/mailbox/jpa/mail/TransactionalMessageMapper.java
+++ b/mailbox/jpa/src/test/java/org/apache/james/mailbox/jpa/mail/TransactionalMessageMapper.java
@@ -40,6 +40,8 @@ import org.apache.james.mailbox.store.mail.MessageMapper;
import org.apache.james.mailbox.store.mail.model.MailboxMessage;
import org.apache.james.mailbox.store.transaction.Mapper;
+import reactor.core.publisher.Flux;
+
public class TransactionalMessageMapper implements MessageMapper {
private final JPAMessageMapper messageMapper;
@@ -62,7 +64,7 @@ public class TransactionalMessageMapper implements MessageMapper {
}
@Override
- public Iterator<MessageUid> listAllMessageUids(Mailbox mailbox) throws MailboxException {
+ public Flux<MessageUid> listAllMessageUids(Mailbox mailbox) {
return messageMapper.listAllMessageUids(mailbox);
}
diff --git a/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java b/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java
index 5e18dad..4c8e0af 100644
--- a/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java
+++ b/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java
@@ -40,13 +40,14 @@ import org.apache.james.mailbox.store.mail.MailboxMapper;
import org.apache.james.mailbox.store.mail.MessageIdMapper;
import org.apache.james.mailbox.store.mail.MessageMapper;
import org.apache.james.mailbox.store.mail.model.MailboxMessage;
-import org.apache.james.util.streams.Iterators;
import com.github.fge.lambdas.Throwing;
import com.github.steveash.guavate.Guavate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Multimap;
+import reactor.core.publisher.Flux;
+
public class InMemoryMessageIdMapper implements MessageIdMapper {
private final MailboxMapper mailboxMapper;
private final InMemoryMessageMapper messageMapper;
@@ -58,16 +59,19 @@ public class InMemoryMessageIdMapper implements MessageIdMapper {
@Override
public List<MailboxMessage> find(Collection<MessageId> messageIds, MessageMapper.FetchType fetchType) {
- return mailboxMapper.list()
- .flatMap(Throwing.function(mailbox ->
- Iterators.toFlux(
- messageMapper.findInMailbox(mailbox, MessageRange.all(), fetchType, UNLIMITED))))
- .filter(message -> messageIds.contains(message.getMessageId()))
+ return findReactive(messageIds, fetchType)
.collect(Guavate.toImmutableList())
.block();
}
@Override
+ public Flux<MailboxMessage> findReactive(Collection<MessageId> messageIds, MessageMapper.FetchType fetchType) {
+ return mailboxMapper.list()
+ .flatMap(mailbox -> messageMapper.findInMailboxReactive(mailbox, MessageRange.all(), fetchType, UNLIMITED))
+ .filter(message -> messageIds.contains(message.getMessageId()));
+ }
+
+ @Override
public List<MailboxId> findMailboxes(MessageId messageId) {
return find(ImmutableList.of(messageId), MessageMapper.FetchType.Metadata)
.stream()
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/AbstractMessageMapper.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/AbstractMessageMapper.java
index 5a993d4..9fd5cf3 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/AbstractMessageMapper.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/AbstractMessageMapper.java
@@ -39,7 +39,8 @@ import org.apache.james.mailbox.store.mail.model.MailboxMessage;
import org.apache.james.mailbox.store.transaction.TransactionalMapper;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterators;
+
+import reactor.core.publisher.Flux;
/**
* Abstract base class for {@link MessageMapper} implementation
@@ -149,8 +150,8 @@ public abstract class AbstractMessageMapper extends TransactionalMapper implemen
protected abstract MessageMetaData copy(Mailbox mailbox, MessageUid uid, ModSeq modSeq, MailboxMessage original) throws MailboxException;
@Override
- public Iterator<MessageUid> listAllMessageUids(Mailbox mailbox) throws MailboxException {
- return Iterators.transform(findInMailbox(mailbox, MessageRange.all(), FetchType.Metadata, UNLIMITED),
- MailboxMessage::getUid);
+ public Flux<MessageUid> listAllMessageUids(Mailbox mailbox) {
+ return findInMailboxReactive(mailbox, MessageRange.all(), FetchType.Metadata, UNLIMITED)
+ .map(MailboxMessage::getUid);
}
}
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MailboxMapper.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MailboxMapper.java
index c5aaf7e..059356e 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MailboxMapper.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MailboxMapper.java
@@ -138,5 +138,5 @@ public interface MailboxMapper extends Mapper {
/**
* Return a unmodifable {@link List} of all {@link Mailbox}
*/
- Flux<Mailbox> list() throws MailboxException;
+ Flux<Mailbox> list();
}
\ No newline at end of file
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageMapper.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageMapper.java
index d7c8428..2c79390 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageMapper.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageMapper.java
@@ -37,7 +37,9 @@ import org.apache.james.mailbox.store.FlagsUpdateCalculator;
import org.apache.james.mailbox.store.mail.model.MailboxMessage;
import org.apache.james.mailbox.store.mail.model.Property;
import org.apache.james.mailbox.store.transaction.Mapper;
+import org.apache.james.util.streams.Iterators;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
@@ -58,6 +60,14 @@ public interface MessageMapper extends Mapper {
Iterator<MailboxMessage> findInMailbox(Mailbox mailbox, MessageRange set, FetchType type, int limit)
throws MailboxException;
+ default Flux<MailboxMessage> findInMailboxReactive(Mailbox mailbox, MessageRange set, FetchType type, int limit) {
+ try {
+ return Iterators.toFlux(findInMailbox(mailbox, set, type, limit));
+ } catch (MailboxException e) {
+ return Flux.error(e);
+ }
+ }
+
/**
* Returns a list of {@link MessageUid} which are marked as deleted
*/
@@ -147,7 +157,7 @@ public interface MessageMapper extends Mapper {
/**
* Return a list containing all MessageUid of Messages that belongs to given {@link Mailbox}
*/
- Iterator<MessageUid> listAllMessageUids(Mailbox mailbox) throws MailboxException;
+ Flux<MessageUid> listAllMessageUids(Mailbox mailbox);
/**
* Specify what data needs to get filled in a {@link MailboxMessage} before returning it
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/SimpleMessageSearchIndex.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/SimpleMessageSearchIndex.java
index b33f980..3560b96 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/SimpleMessageSearchIndex.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/SimpleMessageSearchIndex.java
@@ -58,6 +58,7 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import reactor.core.publisher.Flux;
+import reactor.core.scheduler.Schedulers;
/**
* {@link MessageSearchIndex} which just fetch {@link MailboxMessage}'s from the {@link MessageMapper} and use {@link MessageSearcher}
@@ -155,8 +156,9 @@ public class SimpleMessageSearchIndex implements MessageSearchIndex {
return getAsMessageIds(searchResults(session, filteredMailboxes, searchQuery), limit);
}
- private Flux<SearchResult> searchResults(MailboxSession session, Flux<Mailbox> mailboxes, SearchQuery query) throws MailboxException {
- return mailboxes.concatMap(mailbox -> Flux.fromStream(getSearchResultStream(session, query, mailbox)));
+ private Flux<? extends SearchResult> searchResults(MailboxSession session, Flux<Mailbox> mailboxes, SearchQuery query) throws MailboxException {
+ return mailboxes.concatMap(mailbox -> Flux.fromStream(getSearchResultStream(session, query, mailbox)))
+ .subscribeOn(Schedulers.elastic());
}
private Stream<? extends SearchResult> getSearchResultStream(MailboxSession session, SearchQuery query, Mailbox mailbox) {
@@ -167,7 +169,7 @@ public class SimpleMessageSearchIndex implements MessageSearchIndex {
}
}
- private Flux<MessageId> getAsMessageIds(Flux<SearchResult> temp, long limit) {
+ private Flux<MessageId> getAsMessageIds(Flux<? extends SearchResult> temp, long limit) {
return temp.map(searchResult -> searchResult.getMessageId().get())
.filter(SearchUtil.distinct())
.take(Long.valueOf(limit).intValue());
diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/StoreMailboxMessageResultIteratorTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/StoreMailboxMessageResultIteratorTest.java
index 9f56121..c20c807 100644
--- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/StoreMailboxMessageResultIteratorTest.java
+++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/StoreMailboxMessageResultIteratorTest.java
@@ -51,6 +51,8 @@ import org.junit.jupiter.api.Test;
import com.google.common.collect.Iterables;
+import reactor.core.publisher.Flux;
+
class StoreMailboxMessageResultIteratorTest {
private final class TestMessageMapper implements MessageMapper {
@@ -61,8 +63,8 @@ class StoreMailboxMessageResultIteratorTest {
}
@Override
- public Iterator<MessageUid> listAllMessageUids(Mailbox mailbox) throws MailboxException {
- return messageRange.iterator();
+ public Flux<MessageUid> listAllMessageUids(Mailbox mailbox) {
+ return Flux.fromIterable(messageRange);
}
@Override
diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageMapperTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageMapperTest.java
index 66ee7bf..99c7186 100644
--- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageMapperTest.java
+++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageMapperTest.java
@@ -1134,8 +1134,7 @@ public abstract class MessageMapperTest {
void getUidsShouldReturnUidsOfMessagesInTheMailbox() throws Exception {
saveMessages();
- assertThat(messageMapper.listAllMessageUids(benwaInboxMailbox))
- .toIterable()
+ assertThat(messageMapper.listAllMessageUids(benwaInboxMailbox).collectList().block())
.containsOnly(message1.getUid(),
message2.getUid(),
message3.getUid(),
@@ -1149,8 +1148,7 @@ public abstract class MessageMapperTest {
messageMapper.deleteMessages(benwaInboxMailbox, ImmutableList.of(message2.getUid(), message3.getUid()));
- assertThat(messageMapper.listAllMessageUids(benwaInboxMailbox))
- .toIterable()
+ assertThat(messageMapper.listAllMessageUids(benwaInboxMailbox).collectList().block())
.containsOnly(message1.getUid(),
message4.getUid(),
message5.getUid());
@@ -1166,8 +1164,7 @@ public abstract class MessageMapperTest {
List<MessageUid> uids = messageMapper.retrieveMessagesMarkedForDeletion(benwaInboxMailbox, MessageRange.all());
messageMapper.deleteMessages(benwaInboxMailbox, uids);
- assertThat(messageMapper.listAllMessageUids(benwaInboxMailbox))
- .toIterable()
+ assertThat(messageMapper.listAllMessageUids(benwaInboxMailbox).collectList().block())
.containsOnly(message1.getUid(), message5.getUid());
}
diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ErrorRecoveryIndexationTask.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ErrorRecoveryIndexationTask.java
index 73259cb..716f578 100644
--- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ErrorRecoveryIndexationTask.java
+++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ErrorRecoveryIndexationTask.java
@@ -73,7 +73,7 @@ public class ErrorRecoveryIndexationTask implements Task {
@Override
public Result run() {
- return reIndexerPerformer.reIndex(reprocessingContext, previousFailures);
+ return reIndexerPerformer.reIndex(reprocessingContext, previousFailures).block();
}
@Override
diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/FullReindexingTask.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/FullReindexingTask.java
index 39aa268..0e99cdb 100644
--- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/FullReindexingTask.java
+++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/FullReindexingTask.java
@@ -24,7 +24,6 @@ import java.util.Optional;
import javax.inject.Inject;
import org.apache.james.json.DTOModule;
-import org.apache.james.mailbox.exception.MailboxException;
import org.apache.james.server.task.json.dto.TaskDTO;
import org.apache.james.server.task.json.dto.TaskDTOModule;
import org.apache.james.task.Task;
@@ -33,6 +32,8 @@ import org.apache.james.task.TaskType;
import com.fasterxml.jackson.annotation.JsonProperty;
+import reactor.core.publisher.Mono;
+
public class FullReindexingTask implements Task {
public static final TaskType FULL_RE_INDEXING = TaskType.of("full-reindexing");
@@ -73,11 +74,9 @@ public class FullReindexingTask implements Task {
@Override
public Result run() {
- try {
- return reIndexerPerformer.reIndex(reprocessingContext);
- } catch (MailboxException e) {
- return Result.PARTIAL;
- }
+ return reIndexerPerformer.reIndex(reprocessingContext)
+ .onErrorResume(e -> Mono.just(Result.PARTIAL))
+ .block();
}
@Override
diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/MessageIdReIndexingTask.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/MessageIdReIndexingTask.java
index 369b7c4..191d0ab 100644
--- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/MessageIdReIndexingTask.java
+++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/MessageIdReIndexingTask.java
@@ -78,7 +78,7 @@ public class MessageIdReIndexingTask implements Task {
@Override
public Result run() {
- return reIndexerPerformer.handleMessageIdReindexing(messageId);
+ return reIndexerPerformer.handleMessageIdReindexing(messageId).block();
}
@Override
diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
index e5640b5..8efcb38 100644
--- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
+++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
@@ -19,16 +19,12 @@
package org.apache.mailbox.tools.indexer;
-import java.util.Optional;
-import java.util.stream.Stream;
-
import javax.inject.Inject;
import org.apache.james.core.Username;
import org.apache.james.mailbox.MailboxManager;
import org.apache.james.mailbox.MailboxSession;
import org.apache.james.mailbox.MessageUid;
-import org.apache.james.mailbox.exception.MailboxException;
import org.apache.james.mailbox.indexer.ReIndexingExecutionFailures;
import org.apache.james.mailbox.model.Mailbox;
import org.apache.james.mailbox.model.MailboxId;
@@ -37,24 +33,28 @@ import org.apache.james.mailbox.model.MessageId;
import org.apache.james.mailbox.model.MessageRange;
import org.apache.james.mailbox.model.search.MailboxQuery;
import org.apache.james.mailbox.store.MailboxSessionMapperFactory;
-import org.apache.james.mailbox.store.mail.MailboxMapper;
import org.apache.james.mailbox.store.mail.MessageMapper;
import org.apache.james.mailbox.store.mail.model.MailboxMessage;
import org.apache.james.mailbox.store.search.ListeningMessageSearchIndex;
import org.apache.james.task.Task;
-import org.apache.james.util.streams.Iterators;
+import org.apache.james.task.Task.Result;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.github.fge.lambdas.Throwing;
import com.google.common.collect.ImmutableList;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
public class ReIndexerPerformer {
private static final Logger LOGGER = LoggerFactory.getLogger(ReIndexerPerformer.class);
private static final int SINGLE_MESSAGE = 1;
+ private static final int MESSAGE_CONCURRENCY = 50;
private static final String RE_INDEXING = "re-indexing";
private static final Username RE_INDEXER_PERFORMER_USER = Username.of(RE_INDEXING);
+ public static final int NO_CONCURRENCY = 1;
+ public static final int NO_PREFETCH = 1;
private final MailboxManager mailboxManager;
private final ListeningMessageSearchIndex messageSearchIndex;
@@ -69,138 +69,116 @@ public class ReIndexerPerformer {
this.mailboxSessionMapperFactory = mailboxSessionMapperFactory;
}
- Task.Result reIndex(MailboxId mailboxId, ReprocessingContext reprocessingContext) throws Exception {
- LOGGER.info("Intend to reindex mailbox with mailboxId {}", mailboxId.serialize());
+ Mono<Result> reIndex(MailboxId mailboxId, ReprocessingContext reprocessingContext) {
MailboxSession mailboxSession = mailboxManager.createSystemSession(RE_INDEXER_PERFORMER_USER);
- Mailbox mailbox = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession).findMailboxById(mailboxId);
- messageSearchIndex.deleteAll(mailboxSession, mailboxId).block();
- try {
- return Iterators.toStream(
- mailboxSessionMapperFactory.getMessageMapper(mailboxSession)
- .listAllMessageUids(mailbox))
- .map(uid -> handleMessageReIndexing(mailboxSession, mailbox, uid, reprocessingContext))
+
+ return mailboxSessionMapperFactory.getMailboxMapper(mailboxSession)
+ .findMailboxByIdReactive(mailboxId)
+ .flatMap(mailbox -> reIndex(reprocessingContext, mailboxSession, mailbox));
+ }
+
+ private Mono<Result> reIndex(ReprocessingContext reprocessingContext, MailboxSession mailboxSession, Mailbox mailbox) {
+ LOGGER.info("Attempt to reindex mailbox with mailboxId {}", mailbox.getMailboxId().serialize());
+ return messageSearchIndex.deleteAll(mailboxSession, mailbox.getMailboxId())
+ .then(mailboxSessionMapperFactory.getMessageMapper(mailboxSession)
+ .listAllMessageUids(mailbox)
+ .flatMap(uid -> handleMessageReIndexing(mailboxSession, mailbox, uid, reprocessingContext), MESSAGE_CONCURRENCY)
.reduce(Task::combine)
- .orElse(Task.Result.COMPLETED);
- } finally {
- LOGGER.info("Finish to reindex mailbox with mailboxId {}", mailboxId.serialize());
- }
+ .switchIfEmpty(Mono.just(Result.COMPLETED))
+ .doFinally(any -> LOGGER.info("Finish to reindex mailbox with mailboxId {}", mailbox.getMailboxId().serialize())));
}
- Task.Result reIndex(ReprocessingContext reprocessingContext, ReIndexingExecutionFailures previousReIndexingFailures) {
- return previousReIndexingFailures.failures()
- .stream()
- .map(previousFailure -> reIndex(reprocessingContext, previousFailure))
+ Mono<Result> reIndex(ReprocessingContext reprocessingContext, ReIndexingExecutionFailures previousReIndexingFailures) {
+ return Flux.fromIterable(previousReIndexingFailures.failures())
+ .flatMap(previousFailure -> reIndex(reprocessingContext, previousFailure), MESSAGE_CONCURRENCY)
.reduce(Task::combine)
- .orElse(Task.Result.COMPLETED);
+ .switchIfEmpty(Mono.just(Result.COMPLETED));
}
- private Task.Result reIndex(ReprocessingContext reprocessingContext, ReIndexingExecutionFailures.ReIndexingFailure previousReIndexingFailure) {
+ private Mono<Result> reIndex(ReprocessingContext reprocessingContext, ReIndexingExecutionFailures.ReIndexingFailure previousReIndexingFailure) {
MailboxId mailboxId = previousReIndexingFailure.getMailboxId();
MessageUid uid = previousReIndexingFailure.getUid();
- try {
- return handleMessageReIndexing(mailboxId, uid, reprocessingContext);
- } catch (MailboxException e) {
- LOGGER.warn("ReIndexing failed for {} {}", mailboxId, uid, e);
- reprocessingContext.recordFailureDetailsForMessage(mailboxId, uid);
- return Task.Result.PARTIAL;
- }
+
+ return handleMessageReIndexing(mailboxId, uid, reprocessingContext)
+ .onErrorResume(e -> {
+ LOGGER.warn("ReIndexing failed for {} {}", mailboxId, uid, e);
+ reprocessingContext.recordFailureDetailsForMessage(mailboxId, uid);
+ return Mono.just(Result.PARTIAL);
+ });
}
- Task.Result reIndex(ReprocessingContext reprocessingContext) throws MailboxException {
+ Mono<Result> reIndex(ReprocessingContext reprocessingContext) {
MailboxSession mailboxSession = mailboxManager.createSystemSession(RE_INDEXER_PERFORMER_USER);
LOGGER.info("Starting a full reindex");
- Stream<MailboxId> mailboxIds = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession).list()
- .map(Mailbox::getMailboxId)
- .toStream();
-
- try {
- return reIndex(mailboxIds, reprocessingContext);
- } finally {
- LOGGER.info("Full reindex finished");
- }
+ return mailboxSessionMapperFactory.getMailboxMapper(mailboxSession).list()
+ .flatMap(mailbox -> reIndex(reprocessingContext, mailboxSession, mailbox), NO_CONCURRENCY, NO_PREFETCH)
+ .reduce(Task::combine)
+ .switchIfEmpty(Mono.just(Result.COMPLETED))
+ .doFinally(any -> LOGGER.info("Full reindex finished"));
}
- Task.Result reIndex(Username username, ReprocessingContext reprocessingContext) throws MailboxException {
+ Mono<Result> reIndex(Username username, ReprocessingContext reprocessingContext) {
MailboxSession mailboxSession = mailboxManager.createSystemSession(username);
LOGGER.info("Starting a reindex for user {}", username.asString());
- Stream<MailboxId> mailboxIds = mailboxManager.search(MailboxQuery.privateMailboxesBuilder(mailboxSession).build(), mailboxSession)
- .stream()
- .map(MailboxMetaData::getId);
+ MailboxQuery mailboxQuery = MailboxQuery.privateMailboxesBuilder(mailboxSession).build();
- try {
- return reIndex(mailboxIds, reprocessingContext);
- } finally {
- LOGGER.info("User {} reindex finished", username.asString());
- }
+ return mailboxManager.searchReactive(mailboxQuery, mailboxSession)
+ .map(MailboxMetaData::getId)
+ .flatMap(id -> reIndex(id, reprocessingContext), NO_CONCURRENCY, NO_PREFETCH)
+ .reduce(Task::combine)
+ .switchIfEmpty(Mono.just(Result.COMPLETED))
+ .doFinally(any -> LOGGER.info("User {} reindex finished", username.asString()));
}
- Task.Result handleMessageReIndexing(MailboxId mailboxId, MessageUid uid, ReprocessingContext reprocessingContext) throws MailboxException {
+ Mono<Result> handleMessageReIndexing(MailboxId mailboxId, MessageUid uid, ReprocessingContext reprocessingContext) {
MailboxSession mailboxSession = mailboxManager.createSystemSession(RE_INDEXER_PERFORMER_USER);
- Mailbox mailbox = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession).findMailboxById(mailboxId);
- return handleMessageReIndexing(mailboxSession, mailbox, uid, reprocessingContext);
+ return mailboxSessionMapperFactory.getMailboxMapper(mailboxSession)
+ .findMailboxByIdReactive(mailboxId)
+ .flatMap(mailbox -> handleMessageReIndexing(mailboxSession, mailbox, uid, reprocessingContext));
}
- Task.Result handleMessageIdReindexing(MessageId messageId) {
- try {
- MailboxSession session = mailboxManager.createSystemSession(RE_INDEXER_PERFORMER_USER);
-
- return mailboxSessionMapperFactory.getMessageIdMapper(session)
- .find(ImmutableList.of(messageId), MessageMapper.FetchType.Full)
- .stream()
- .map(mailboxMessage -> reIndex(mailboxMessage, session))
- .reduce(Task::combine)
- .orElse(Task.Result.COMPLETED);
- } catch (Exception e) {
- LOGGER.warn("Failed to re-index {}", messageId, e);
- return Task.Result.PARTIAL;
- }
- }
+ Mono<Result> handleMessageIdReindexing(MessageId messageId) {
+ MailboxSession session = mailboxManager.createSystemSession(RE_INDEXER_PERFORMER_USER);
- private Task.Result reIndex(MailboxMessage mailboxMessage, MailboxSession session) {
- try {
- MailboxMapper mailboxMapper = mailboxSessionMapperFactory.getMailboxMapper(session);
- Mailbox mailbox = mailboxMapper.findMailboxById(mailboxMessage.getMailboxId());
- messageSearchIndex.add(session, mailbox, mailboxMessage).block();
- return Task.Result.COMPLETED;
- } catch (Exception e) {
- LOGGER.warn("Failed to re-index {} in {}", mailboxMessage.getUid(), mailboxMessage.getMailboxId(), e);
- return Task.Result.PARTIAL;
- }
+ return mailboxSessionMapperFactory.getMessageIdMapper(session)
+ .findReactive(ImmutableList.of(messageId), MessageMapper.FetchType.Full)
+ .flatMap(mailboxMessage -> reIndex(mailboxMessage, session))
+ .reduce(Task::combine)
+ .switchIfEmpty(Mono.just(Result.COMPLETED))
+ .onErrorResume(e -> {
+ LOGGER.warn("Failed to re-index {}", messageId, e);
+ return Mono.just(Result.PARTIAL);
+ });
}
- private Task.Result reIndex(Stream<MailboxId> mailboxIds, ReprocessingContext reprocessingContext) {
- return mailboxIds
- .map(mailboxId -> {
- try {
- return reIndex(mailboxId, reprocessingContext);
- } catch (Throwable e) {
- LOGGER.error("Error while proceeding to full reindexing on mailbox with mailboxId {}", mailboxId.serialize(), e);
- return Task.Result.PARTIAL;
- }
- })
- .reduce(Task::combine)
- .orElse(Task.Result.COMPLETED);
+ private Mono<Result> reIndex(MailboxMessage mailboxMessage, MailboxSession session) {
+ return mailboxSessionMapperFactory.getMailboxMapper(session)
+ .findMailboxByIdReactive(mailboxMessage.getMailboxId())
+ .flatMap(mailbox -> messageSearchIndex.add(session, mailbox, mailboxMessage))
+ .thenReturn(Result.COMPLETED)
+ .onErrorResume(e -> {
+ LOGGER.warn("Failed to re-index {} in {}", mailboxMessage.getUid(), mailboxMessage.getMailboxId(), e);
+ return Mono.just(Result.PARTIAL);
+ });
}
- private Task.Result handleMessageReIndexing(MailboxSession mailboxSession, Mailbox mailbox, MessageUid uid, ReprocessingContext reprocessingContext) {
- try {
- Optional.of(uid)
- .flatMap(Throwing.function(mUid -> fullyReadMessage(mailboxSession, mailbox, mUid)))
- .ifPresent(message -> messageSearchIndex.add(mailboxSession, mailbox, message).block());
- reprocessingContext.recordSuccess();
- return Task.Result.COMPLETED;
- } catch (Exception e) {
- LOGGER.warn("ReIndexing failed for {} {}", mailbox.generateAssociatedPath(), uid, e);
- reprocessingContext.recordFailureDetailsForMessage(mailbox.getMailboxId(), uid);
- return Task.Result.PARTIAL;
- }
+ private Mono<Result> handleMessageReIndexing(MailboxSession mailboxSession, Mailbox mailbox, MessageUid uid, ReprocessingContext reprocessingContext) {
+ return fullyReadMessage(mailboxSession, mailbox, uid)
+ .flatMap(message -> messageSearchIndex.add(mailboxSession, mailbox, message))
+ .thenReturn(Result.COMPLETED)
+ .doOnNext(any -> reprocessingContext.recordSuccess())
+ .onErrorResume(e -> {
+ LOGGER.warn("ReIndexing failed for {} {}", mailbox.generateAssociatedPath(), uid, e);
+ reprocessingContext.recordFailureDetailsForMessage(mailbox.getMailboxId(), uid);
+ return Mono.just(Result.PARTIAL);
+ });
}
- private Optional<MailboxMessage> fullyReadMessage(MailboxSession mailboxSession, Mailbox mailbox, MessageUid mUid) throws MailboxException {
- return Iterators.toStream(mailboxSessionMapperFactory.getMessageMapper(mailboxSession)
- .findInMailbox(mailbox, MessageRange.one(mUid), MessageMapper.FetchType.Full, SINGLE_MESSAGE))
- .findFirst();
+ private Mono<MailboxMessage> fullyReadMessage(MailboxSession mailboxSession, Mailbox mailbox, MessageUid mUid) {
+ return mailboxSessionMapperFactory.getMessageMapper(mailboxSession)
+ .findInMailboxReactive(mailbox, MessageRange.one(mUid), MessageMapper.FetchType.Full, SINGLE_MESSAGE)
+ .next();
}
}
diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/SingleMailboxReindexingTask.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/SingleMailboxReindexingTask.java
index 737be90..0e9580d 100644
--- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/SingleMailboxReindexingTask.java
+++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/SingleMailboxReindexingTask.java
@@ -79,7 +79,8 @@ public class SingleMailboxReindexingTask implements Task {
@Override
public Result run() {
try {
- return reIndexerPerformer.reIndex(mailboxId, reprocessingContext);
+ return reIndexerPerformer.reIndex(mailboxId, reprocessingContext)
+ .block();
} catch (Exception e) {
return Result.PARTIAL;
}
diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/SingleMessageReindexingTask.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/SingleMessageReindexingTask.java
index 77ea683..08c014b 100644
--- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/SingleMessageReindexingTask.java
+++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/SingleMessageReindexingTask.java
@@ -26,7 +26,6 @@ import java.util.Optional;
import javax.inject.Inject;
import org.apache.james.mailbox.MessageUid;
-import org.apache.james.mailbox.exception.MailboxException;
import org.apache.james.mailbox.model.MailboxId;
import org.apache.james.task.Task;
import org.apache.james.task.TaskExecutionDetails;
@@ -34,6 +33,8 @@ import org.apache.james.task.TaskType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Mono;
+
public class SingleMessageReindexingTask implements Task {
private static final Logger LOGGER = LoggerFactory.getLogger(SingleMessageReindexingTask.class);
@@ -95,12 +96,12 @@ public class SingleMessageReindexingTask implements Task {
@Override
public Result run() {
- try {
- return reIndexerPerformer.handleMessageReIndexing(mailboxId, uid, new ReprocessingContext());
- } catch (MailboxException e) {
- LOGGER.warn("Error encounteres while reindexing {} : {}", mailboxId, uid, e);
- return Result.PARTIAL;
- }
+ return reIndexerPerformer.handleMessageReIndexing(mailboxId, uid, new ReprocessingContext())
+ .onErrorResume(e -> {
+ LOGGER.warn("Error encountered while reindexing {} : {}", mailboxId, uid, e);
+ return Mono.just(Result.PARTIAL);
+ })
+ .block();
}
MailboxId getMailboxId() {
diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/UserReindexingTask.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/UserReindexingTask.java
index 9a7a863..d1ff42b 100644
--- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/UserReindexingTask.java
+++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/UserReindexingTask.java
@@ -26,12 +26,13 @@ import java.util.Optional;
import javax.inject.Inject;
import org.apache.james.core.Username;
-import org.apache.james.mailbox.exception.MailboxException;
import org.apache.james.mailbox.indexer.ReIndexingExecutionFailures;
import org.apache.james.task.Task;
import org.apache.james.task.TaskExecutionDetails;
import org.apache.james.task.TaskType;
+import reactor.core.publisher.Mono;
+
public class UserReindexingTask implements Task {
public static final TaskType USER_RE_INDEXING = TaskType.of("user-reindexing");
@@ -78,11 +79,9 @@ public class UserReindexingTask implements Task {
@Override
public Result run() {
- try {
- return reIndexerPerformer.reIndex(username, reprocessingContext);
- } catch (MailboxException e) {
- return Result.PARTIAL;
- }
+ return reIndexerPerformer.reIndex(username, reprocessingContext)
+ .onErrorResume(e -> Mono.just(Result.PARTIAL))
+ .block();
}
public Username getUsername() {
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org