You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/05/05 07:51:03 UTC
[james-project] 04/07: JAMES-3155
ElasticSearchListeningMessageSearchIndex can be reactive
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 08c98e6ebf95979038d31e625bc0e9f668e9cf52
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Mon Apr 27 09:05:50 2020 +0700
JAMES-3155 ElasticSearchListeningMessageSearchIndex can be reactive
---
.../james/backends/es/ElasticSearchIndexer.java | 4 +-
.../backends/es/ElasticSearchIndexerTest.java | 4 +-
.../ElasticSearchListeningMessageSearchIndex.java | 36 ++++++------
...asticSearchListeningMessageSearchIndexTest.java | 66 +++++++++++-----------
.../lucene/search/LuceneMessageSearchIndex.java | 42 ++++++++------
.../LuceneMailboxMessageSearchIndexTest.java | 10 ++--
.../store/search/LazyMessageSearchIndex.java | 19 ++++---
.../store/search/ListeningMessageSearchIndex.java | 59 +++++++++++--------
.../mailbox/tools/indexer/ReIndexerPerformer.java | 6 +-
.../tools/indexer/CassandraReIndexerImplTest.java | 5 ++
.../tools/indexer/MessageIdReIndexerImplTest.java | 4 ++
.../mailbox/tools/indexer/ReIndexerImplTest.java | 5 ++
.../org/apache/james/FakeMessageSearchIndex.java | 9 +--
.../james/webadmin/routes/MailboxesRoutesTest.java | 10 +++-
.../james/webadmin/routes/MessageRoutesTest.java | 9 ++-
.../webadmin/routes/UserMailboxesRoutesTest.java | 3 +
16 files changed, 169 insertions(+), 122 deletions(-)
diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchIndexer.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchIndexer.java
index 0fcbc11..3b5c8a2 100644
--- a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchIndexer.java
+++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchIndexer.java
@@ -113,8 +113,8 @@ public class ElasticSearchIndexer {
});
}
- public void deleteAllMatchingQuery(QueryBuilder queryBuilder, RoutingKey routingKey) {
- deleteByQueryPerformer.perform(queryBuilder, routingKey).block();
+ public Mono<Void> deleteAllMatchingQuery(QueryBuilder queryBuilder, RoutingKey routingKey) {
+ return deleteByQueryPerformer.perform(queryBuilder, routingKey);
}
private void checkArgument(String content) {
diff --git a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchIndexerTest.java b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchIndexerTest.java
index f5f8a40..ac320ca 100644
--- a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchIndexerTest.java
+++ b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchIndexerTest.java
@@ -163,7 +163,7 @@ class ElasticSearchIndexerTest {
testee.index(documentId, content, routingKey).block();
elasticSearch.awaitForElasticSearch();
- testee.deleteAllMatchingQuery(termQuery("property", "1"), routingKey);
+ testee.deleteAllMatchingQuery(termQuery("property", "1"), routingKey).block();
elasticSearch.awaitForElasticSearch();
CALMLY_AWAIT.atMost(Duration.TEN_SECONDS)
@@ -193,7 +193,7 @@ class ElasticSearchIndexerTest {
testee.index(documentId3, content3, ROUTING).block();
elasticSearch.awaitForElasticSearch();
- testee.deleteAllMatchingQuery(termQuery("property", "1"), ROUTING);
+ testee.deleteAllMatchingQuery(termQuery("property", "1"), ROUTING).block();
elasticSearch.awaitForElasticSearch();
CALMLY_AWAIT.atMost(Duration.TEN_SECONDS)
diff --git a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java
index a85467b..3d2d099 100644
--- a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java
+++ b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java
@@ -20,7 +20,6 @@ package org.apache.james.mailbox.elasticsearch.events;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
-import java.io.IOException;
import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
@@ -135,18 +134,19 @@ public class ElasticSearchListeningMessageSearchIndex extends ListeningMessageSe
}
@Override
- public void add(MailboxSession session, Mailbox mailbox, MailboxMessage message) throws IOException {
+ public Mono<Void> add(MailboxSession session, Mailbox mailbox, MailboxMessage message) {
LOGGER.info("Indexing mailbox {}-{} of user {} on message {}",
mailbox.getName(),
mailbox.getMailboxId(),
session.getUser().asString(),
message.getUid());
- String jsonContent = generateIndexedJson(mailbox, message, session);
+ RoutingKey from = routingKeyFactory.from(mailbox.getMailboxId());
+ DocumentId id = indexIdFor(mailbox, message.getUid());
- elasticSearchIndexer
- .index(indexIdFor(mailbox, message.getUid()), jsonContent, routingKeyFactory.from(mailbox.getMailboxId()))
- .block();
+ return Mono.fromCallable(() -> generateIndexedJson(mailbox, message, session))
+ .flatMap(jsonContent -> elasticSearchIndexer.index(id, jsonContent, from))
+ .then();
}
private String generateIndexedJson(Mailbox mailbox, MailboxMessage message, MailboxSession session) throws JsonProcessingException {
@@ -164,36 +164,36 @@ public class ElasticSearchListeningMessageSearchIndex extends ListeningMessageSe
}
@Override
- public void delete(MailboxSession session, Mailbox mailbox, Collection<MessageUid> expungedUids) {
- elasticSearchIndexer
+ public Mono<Void> delete(MailboxSession session, Mailbox mailbox, Collection<MessageUid> expungedUids) {
+ return elasticSearchIndexer
.delete(expungedUids.stream()
.map(uid -> indexIdFor(mailbox, uid))
.collect(Guavate.toImmutableList()),
routingKeyFactory.from(mailbox.getMailboxId()))
- .block();
+ .then();
}
@Override
- public void deleteAll(MailboxSession session, MailboxId mailboxId) {
+ public Mono<Void> deleteAll(MailboxSession session, MailboxId mailboxId) {
TermQueryBuilder queryBuilder = termQuery(
JsonMessageConstants.MAILBOX_ID,
mailboxId.serialize());
- elasticSearchIndexer
+ return elasticSearchIndexer
.deleteAllMatchingQuery(queryBuilder, routingKeyFactory.from(mailboxId));
}
@Override
- public void update(MailboxSession session, Mailbox mailbox, List<UpdatedFlags> updatedFlagsList) {
- ImmutableList<UpdatedRepresentation> updates = updatedFlagsList.stream()
+ public Mono<Void> update(MailboxSession session, Mailbox mailbox, List<UpdatedFlags> updatedFlagsList) {
+ RoutingKey routingKey = routingKeyFactory.from(mailbox.getMailboxId());
+
+ return Flux.fromIterable(updatedFlagsList)
.map(Throwing.<UpdatedFlags, UpdatedRepresentation>function(
updatedFlags -> createUpdatedDocumentPartFromUpdatedFlags(mailbox, updatedFlags))
.sneakyThrow())
- .collect(Guavate.toImmutableList());
-
- elasticSearchIndexer
- .update(updates, routingKeyFactory.from(mailbox.getMailboxId()))
- .block();
+ .collect(Guavate.toImmutableList())
+ .flatMap(updates -> elasticSearchIndexer.update(updates, routingKey))
+ .then();
}
private UpdatedRepresentation createUpdatedDocumentPartFromUpdatedFlags(Mailbox mailbox, UpdatedFlags updatedFlags) throws JsonProcessingException {
diff --git a/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndexTest.java b/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndexTest.java
index bd4fc78..e3f16a3 100644
--- a/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndexTest.java
+++ b/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndexTest.java
@@ -192,7 +192,7 @@ class ElasticSearchListeningMessageSearchIndexTest {
@Test
void addShouldIndexMessageWithoutAttachment() throws Exception {
- testee.add(session, mailbox, MESSAGE_1);
+ testee.add(session, mailbox, MESSAGE_1).block();
elasticSearch.awaitForElasticSearch();
SearchQuery query = new SearchQuery(SearchQuery.all());
@@ -203,7 +203,7 @@ class ElasticSearchListeningMessageSearchIndexTest {
@Test
void addShouldIndexMessageWithAttachment() throws Exception {
- testee.add(session, mailbox, MESSAGE_WITH_ATTACHMENT);
+ testee.add(session, mailbox, MESSAGE_WITH_ATTACHMENT).block();
elasticSearch.awaitForElasticSearch();
SearchQuery query = new SearchQuery(SearchQuery.all());
@@ -213,8 +213,8 @@ class ElasticSearchListeningMessageSearchIndexTest {
@Test
void addShouldBeIndempotent() throws Exception {
- testee.add(session, mailbox, MESSAGE_1);
- testee.add(session, mailbox, MESSAGE_1);
+ testee.add(session, mailbox, MESSAGE_1).block();
+ testee.add(session, mailbox, MESSAGE_1).block();
elasticSearch.awaitForElasticSearch();
@@ -225,8 +225,8 @@ class ElasticSearchListeningMessageSearchIndexTest {
@Test
void addShouldIndexMultipleMessages() throws Exception {
- testee.add(session, mailbox, MESSAGE_1);
- testee.add(session, mailbox, MESSAGE_2);
+ testee.add(session, mailbox, MESSAGE_1).block();
+ testee.add(session, mailbox, MESSAGE_2).block();
elasticSearch.awaitForElasticSearch();
@@ -245,7 +245,7 @@ class ElasticSearchListeningMessageSearchIndexTest {
testee = new ElasticSearchListeningMessageSearchIndex(mapperFactory, elasticSearchIndexer, elasticSearchSearcher,
messageToElasticSearchJson, sessionProvider, new MailboxIdRoutingKeyFactory());
- testee.add(session, mailbox, MESSAGE_WITH_ATTACHMENT);
+ testee.add(session, mailbox, MESSAGE_WITH_ATTACHMENT).block();
elasticSearch.awaitForElasticSearch();
SearchQuery query = new SearchQuery(SearchQuery.all());
@@ -258,7 +258,7 @@ class ElasticSearchListeningMessageSearchIndexTest {
elasticSearch.getDockerElasticSearch().pause();
Thread.sleep(Duration.FIVE_SECONDS.getValueInMS()); // Docker pause is asynchronous and we found no way to poll for it
- assertThatThrownBy(() -> testee.add(session, mailbox, MESSAGE_1))
+ assertThatThrownBy(() -> testee.add(session, mailbox, MESSAGE_1).block())
.hasCauseInstanceOf(IOException.class);
elasticSearch.getDockerElasticSearch().unpause();
@@ -266,10 +266,10 @@ class ElasticSearchListeningMessageSearchIndexTest {
@Test
void deleteShouldRemoveIndex() throws IOException {
- testee.add(session, mailbox, MESSAGE_1);
+ testee.add(session, mailbox, MESSAGE_1).block();
elasticSearch.awaitForElasticSearch();
- testee.delete(session, mailbox, Lists.newArrayList(MESSAGE_UID_1));
+ testee.delete(session, mailbox, Lists.newArrayList(MESSAGE_UID_1)).block();
elasticSearch.awaitForElasticSearch();
SearchQuery query = new SearchQuery(SearchQuery.all());
@@ -279,12 +279,12 @@ class ElasticSearchListeningMessageSearchIndexTest {
@Test
void deleteShouldOnlyRemoveIndexesPassedAsArguments() throws IOException {
- testee.add(session, mailbox, MESSAGE_1);
- testee.add(session, mailbox, MESSAGE_2);
+ testee.add(session, mailbox, MESSAGE_1).block();
+ testee.add(session, mailbox, MESSAGE_2).block();
elasticSearch.awaitForElasticSearch();
- testee.delete(session, mailbox, Lists.newArrayList(MESSAGE_UID_1));
+ testee.delete(session, mailbox, Lists.newArrayList(MESSAGE_UID_1)).block();
elasticSearch.awaitForElasticSearch();
SearchQuery query = new SearchQuery(SearchQuery.all());
@@ -294,12 +294,12 @@ class ElasticSearchListeningMessageSearchIndexTest {
@Test
void deleteShouldRemoveMultipleIndexes() throws IOException {
- testee.add(session, mailbox, MESSAGE_1);
- testee.add(session, mailbox, MESSAGE_2);
+ testee.add(session, mailbox, MESSAGE_1).block();
+ testee.add(session, mailbox, MESSAGE_2).block();
elasticSearch.awaitForElasticSearch();
- testee.delete(session, mailbox, Lists.newArrayList(MESSAGE_UID_1, MESSAGE_UID_2));
+ testee.delete(session, mailbox, Lists.newArrayList(MESSAGE_UID_1, MESSAGE_UID_2)).block();
elasticSearch.awaitForElasticSearch();
SearchQuery query = new SearchQuery(SearchQuery.all());
@@ -309,11 +309,11 @@ class ElasticSearchListeningMessageSearchIndexTest {
@Test
void deleteShouldBeIdempotent() throws IOException {
- testee.add(session, mailbox, MESSAGE_1);
+ testee.add(session, mailbox, MESSAGE_1).block();
elasticSearch.awaitForElasticSearch();
- testee.delete(session, mailbox, Lists.newArrayList(MESSAGE_UID_1));
- testee.delete(session, mailbox, Lists.newArrayList(MESSAGE_UID_1));
+ testee.delete(session, mailbox, Lists.newArrayList(MESSAGE_UID_1)).block();
+ testee.delete(session, mailbox, Lists.newArrayList(MESSAGE_UID_1)).block();
elasticSearch.awaitForElasticSearch();
SearchQuery query = new SearchQuery(SearchQuery.all());
@@ -323,7 +323,7 @@ class ElasticSearchListeningMessageSearchIndexTest {
@Test
void deleteShouldNotThrowOnUnknownMessageUid() {
- assertThatCode(() -> testee.delete(session, mailbox, Lists.newArrayList(MESSAGE_UID_1)))
+ assertThatCode(() -> testee.delete(session, mailbox, Lists.newArrayList(MESSAGE_UID_1)).block())
.doesNotThrowAnyException();
}
@@ -332,7 +332,7 @@ class ElasticSearchListeningMessageSearchIndexTest {
elasticSearch.getDockerElasticSearch().pause();
Thread.sleep(Duration.FIVE_SECONDS.getValueInMS()); // Docker pause is asynchronous and we found no way to poll for it
- assertThatThrownBy(() -> testee.delete(session, mailbox, Lists.newArrayList(MESSAGE_UID_1)))
+ assertThatThrownBy(() -> testee.delete(session, mailbox, Lists.newArrayList(MESSAGE_UID_1)).block())
.hasCauseInstanceOf(IOException.class);
elasticSearch.getDockerElasticSearch().unpause();
@@ -340,7 +340,7 @@ class ElasticSearchListeningMessageSearchIndexTest {
@Test
void updateShouldUpdateIndex() throws Exception {
- testee.add(session, mailbox, MESSAGE_1);
+ testee.add(session, mailbox, MESSAGE_1).block();
elasticSearch.awaitForElasticSearch();
Flags newFlags = new Flags(Flags.Flag.ANSWERED);
@@ -351,7 +351,7 @@ class ElasticSearchListeningMessageSearchIndexTest {
.newFlags(newFlags)
.build();
- testee.update(session, mailbox, Lists.newArrayList(updatedFlags));
+ testee.update(session, mailbox, Lists.newArrayList(updatedFlags)).block();
elasticSearch.awaitForElasticSearch();
SearchQuery query = new SearchQuery(SearchQuery.flagIsSet(Flags.Flag.ANSWERED));
@@ -361,7 +361,7 @@ class ElasticSearchListeningMessageSearchIndexTest {
@Test
void updateShouldNotUpdateNorThrowOnUnknownMessageUid() throws Exception {
- testee.add(session, mailbox, MESSAGE_1);
+ testee.add(session, mailbox, MESSAGE_1).block();
elasticSearch.awaitForElasticSearch();
Flags newFlags = new Flags(Flags.Flag.ANSWERED);
@@ -372,7 +372,7 @@ class ElasticSearchListeningMessageSearchIndexTest {
.newFlags(newFlags)
.build();
- testee.update(session, mailbox, Lists.newArrayList(updatedFlags));
+ testee.update(session, mailbox, Lists.newArrayList(updatedFlags)).block();
elasticSearch.awaitForElasticSearch();
SearchQuery query = new SearchQuery(SearchQuery.flagIsSet(Flags.Flag.ANSWERED));
@@ -382,7 +382,7 @@ class ElasticSearchListeningMessageSearchIndexTest {
@Test
void updateShouldBeIdempotent() throws Exception {
- testee.add(session, mailbox, MESSAGE_1);
+ testee.add(session, mailbox, MESSAGE_1).block();
elasticSearch.awaitForElasticSearch();
Flags newFlags = new Flags(Flags.Flag.ANSWERED);
@@ -393,8 +393,8 @@ class ElasticSearchListeningMessageSearchIndexTest {
.newFlags(newFlags)
.build();
- testee.update(session, mailbox, Lists.newArrayList(updatedFlags));
- testee.update(session, mailbox, Lists.newArrayList(updatedFlags));
+ testee.update(session, mailbox, Lists.newArrayList(updatedFlags)).block();
+ testee.update(session, mailbox, Lists.newArrayList(updatedFlags)).block();
elasticSearch.awaitForElasticSearch();
SearchQuery query = new SearchQuery(SearchQuery.flagIsSet(Flags.Flag.ANSWERED));
@@ -415,7 +415,7 @@ class ElasticSearchListeningMessageSearchIndexTest {
.newFlags(newFlags)
.build();
- assertThatThrownBy(() -> testee.update(session, mailbox, Lists.newArrayList(updatedFlags)))
+ assertThatThrownBy(() -> testee.update(session, mailbox, Lists.newArrayList(updatedFlags)).block())
.hasCauseInstanceOf(IOException.class);
elasticSearch.getDockerElasticSearch().unpause();
@@ -423,12 +423,12 @@ class ElasticSearchListeningMessageSearchIndexTest {
@Test
void deleteAllShouldRemoveAllIndexes() throws Exception {
- testee.add(session, mailbox, MESSAGE_1);
- testee.add(session, mailbox, MESSAGE_2);
+ testee.add(session, mailbox, MESSAGE_1).block();
+ testee.add(session, mailbox, MESSAGE_2).block();
elasticSearch.awaitForElasticSearch();
- testee.deleteAll(session, mailbox.getMailboxId());
+ testee.deleteAll(session, mailbox.getMailboxId()).block();
elasticSearch.awaitForElasticSearch();
SearchQuery query = new SearchQuery(SearchQuery.all());
@@ -438,7 +438,7 @@ class ElasticSearchListeningMessageSearchIndexTest {
@Test
void deleteAllShouldNotThrowWhenEmptyIndex() {
- assertThatCode(() -> testee.deleteAll(session, mailbox.getMailboxId()))
+ assertThatCode(() -> testee.deleteAll(session, mailbox.getMailboxId()).block())
.doesNotThrowAnyException();
}
diff --git a/mailbox/lucene/src/main/java/org/apache/james/mailbox/lucene/search/LuceneMessageSearchIndex.java b/mailbox/lucene/src/main/java/org/apache/james/mailbox/lucene/search/LuceneMessageSearchIndex.java
index ff86117..e3a4f25 100644
--- a/mailbox/lucene/src/main/java/org/apache/james/mailbox/lucene/search/LuceneMessageSearchIndex.java
+++ b/mailbox/lucene/src/main/java/org/apache/james/mailbox/lucene/search/LuceneMessageSearchIndex.java
@@ -119,11 +119,13 @@ import org.apache.lucene.util.Version;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.github.fge.lambdas.Throwing;
import com.github.steveash.guavate.Guavate;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
/**
* Lucene based {@link ListeningMessageSearchIndex} which offers message searching via a Lucene index
@@ -1176,19 +1178,23 @@ public class LuceneMessageSearchIndex extends ListeningMessageSearchIndex {
}
@Override
- public void add(MailboxSession session, Mailbox mailbox, MailboxMessage membership) throws IOException, MimeException {
- Document doc = createMessageDocument(session, membership);
- Document flagsDoc = createFlagsDocument(membership);
-
- writer.addDocument(doc);
- writer.addDocument(flagsDoc);
+ public Mono<Void> add(MailboxSession session, Mailbox mailbox, MailboxMessage membership) {
+ return Mono.fromRunnable(Throwing.runnable(() -> {
+ Document doc = createMessageDocument(session, membership);
+ Document flagsDoc = createFlagsDocument(membership);
+
+ writer.addDocument(doc);
+ writer.addDocument(flagsDoc);
+ }));
}
@Override
- public void update(MailboxSession session, Mailbox mailbox, List<UpdatedFlags> updatedFlagsList) throws IOException {
- for (UpdatedFlags updatedFlags : updatedFlagsList) {
- update(mailbox, updatedFlags.getUid(), updatedFlags.getNewFlags());
- }
+ public Mono<Void> update(MailboxSession session, Mailbox mailbox, List<UpdatedFlags> updatedFlagsList) {
+ return Mono.fromRunnable(Throwing.runnable(() -> {
+ for (UpdatedFlags updatedFlags : updatedFlagsList) {
+ update(mailbox, updatedFlags.getUid(), updatedFlags.getNewFlags());
+ }
+ }));
}
private void update(Mailbox mailbox, MessageUid uid, Flags f) throws IOException {
@@ -1266,16 +1272,18 @@ public class LuceneMessageSearchIndex extends ListeningMessageSearchIndex {
}
@Override
- public void delete(MailboxSession session, Mailbox mailbox, Collection<MessageUid> expungedUids) throws IOException {
- Collection<MessageRange> messageRanges = MessageRange.toRanges(expungedUids);
- for (MessageRange messageRange : messageRanges) {
- delete(mailbox.getMailboxId(), messageRange);
- }
+ public Mono<Void> delete(MailboxSession session, Mailbox mailbox, Collection<MessageUid> expungedUids) {
+ return Mono.fromRunnable(Throwing.runnable(() -> {
+ Collection<MessageRange> messageRanges = MessageRange.toRanges(expungedUids);
+ for (MessageRange messageRange : messageRanges) {
+ delete(mailbox.getMailboxId(), messageRange);
+ }
+ }));
}
@Override
- public void deleteAll(MailboxSession session, MailboxId mailboxId) throws IOException {
- delete(mailboxId, MessageRange.all());
+ public Mono<Void> deleteAll(MailboxSession session, MailboxId mailboxId) {
+ return Mono.fromRunnable(Throwing.runnable(() -> delete(mailboxId, MessageRange.all())));
}
public void delete(MailboxId mailboxId, MessageRange range) throws IOException {
diff --git a/mailbox/lucene/src/test/java/org/apache/james/mailbox/lucene/search/LuceneMailboxMessageSearchIndexTest.java b/mailbox/lucene/src/test/java/org/apache/james/mailbox/lucene/search/LuceneMailboxMessageSearchIndexTest.java
index 4fa87f4..d7338f2 100644
--- a/mailbox/lucene/src/test/java/org/apache/james/mailbox/lucene/search/LuceneMailboxMessageSearchIndexTest.java
+++ b/mailbox/lucene/src/test/java/org/apache/james/mailbox/lucene/search/LuceneMailboxMessageSearchIndexTest.java
@@ -133,7 +133,7 @@ class LuceneMailboxMessageSearchIndexTest {
.internalDate(new Date())
.body("My Body".getBytes(StandardCharsets.UTF_8))
.size(200);
- index.add(session, mailbox, builder1.build(id1));
+ index.add(session, mailbox, builder1.build(id1)).block();
uid2 = MessageUid.of(1);
MessageBuilder builder2 = new MessageBuilder()
@@ -144,7 +144,7 @@ class LuceneMailboxMessageSearchIndexTest {
.internalDate(new Date())
.body("My Body".getBytes(StandardCharsets.UTF_8))
.size(20);
- index.add(session, mailbox2, builder2.build(id2));
+ index.add(session, mailbox2, builder2.build(id2)).block();
uid3 = MessageUid.of(2);
Calendar cal = Calendar.getInstance();
@@ -157,7 +157,7 @@ class LuceneMailboxMessageSearchIndexTest {
.internalDate(cal.getTime())
.body("My Otherbody".getBytes(StandardCharsets.UTF_8))
.size(20);
- index.add(session, mailbox, builder3.build(id3));
+ index.add(session, mailbox, builder3.build(id3)).block();
uid4 = MessageUid.of(3);
Calendar cal2 = Calendar.getInstance();
@@ -170,7 +170,7 @@ class LuceneMailboxMessageSearchIndexTest {
.internalDate(cal2.getTime())
.body("My Otherbody2".getBytes(StandardCharsets.UTF_8))
.size(20);
- index.add(session, mailbox, builder4.build(id4));
+ index.add(session, mailbox, builder4.build(id4)).block();
uid5 = MessageUid.of(10);
MessageBuilder builder = new MessageBuilder();
@@ -181,7 +181,7 @@ class LuceneMailboxMessageSearchIndexTest {
builder.body(StandardCharsets.US_ASCII.encode(BODY).array());
builder.uid(uid5);
builder.mailboxId(TEST_ID_3);
- index.add(session, mailbox3, builder.build(id5));
+ index.add(session, mailbox3, builder.build(id5)).block();
}
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/LazyMessageSearchIndex.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/LazyMessageSearchIndex.java
index 6e75f26..eac5ee5 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/LazyMessageSearchIndex.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/LazyMessageSearchIndex.java
@@ -48,6 +48,7 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
/**
* {@link ListeningMessageSearchIndex} implementation which wraps another {@link ListeningMessageSearchIndex} and will forward all calls to it.
@@ -88,18 +89,18 @@ public class LazyMessageSearchIndex extends ListeningMessageSearchIndex {
}
@Override
- public void add(MailboxSession session, Mailbox mailbox, MailboxMessage message) throws Exception {
- index.add(session, mailbox, message);
+ public Mono<Void> add(MailboxSession session, Mailbox mailbox, MailboxMessage message) {
+ return index.add(session, mailbox, message);
}
@Override
- public void delete(MailboxSession session, Mailbox mailbox, Collection<MessageUid> expungedUids) throws Exception {
- index.delete(session, mailbox, expungedUids);
+ public Mono<Void> delete(MailboxSession session, Mailbox mailbox, Collection<MessageUid> expungedUids) {
+ return index.delete(session, mailbox, expungedUids);
}
@Override
- public void deleteAll(MailboxSession session, MailboxId mailboxId) throws Exception {
- index.deleteAll(session, mailboxId);
+ public Mono<Void> deleteAll(MailboxSession session, MailboxId mailboxId) {
+ return index.deleteAll(session, mailboxId);
}
/**
@@ -125,7 +126,7 @@ public class LazyMessageSearchIndex extends ListeningMessageSearchIndex {
while (messages.hasNext()) {
final MailboxMessage message = messages.next();
try {
- add(session, mailbox, message);
+ add(session, mailbox, message).block();
} catch (Exception e) {
LOGGER.error("Unable to index message {} in mailbox {}", message.getUid(), mailbox.getName(), e);
}
@@ -137,8 +138,8 @@ public class LazyMessageSearchIndex extends ListeningMessageSearchIndex {
}
@Override
- public void update(MailboxSession session, Mailbox mailbox, List<UpdatedFlags> updatedFlagsList) throws Exception {
- index.update(session, mailbox, updatedFlagsList);
+ public Mono<Void> update(MailboxSession session, Mailbox mailbox, List<UpdatedFlags> updatedFlagsList) {
+ return index.update(session, mailbox, updatedFlagsList);
}
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/ListeningMessageSearchIndex.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/ListeningMessageSearchIndex.java
index e38c8cd..77a7a8b 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/ListeningMessageSearchIndex.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/ListeningMessageSearchIndex.java
@@ -20,7 +20,6 @@ package org.apache.james.mailbox.store.search;
import java.util.Collection;
import java.util.List;
-import java.util.stream.Stream;
import org.apache.james.mailbox.MailboxSession;
import org.apache.james.mailbox.MessageUid;
@@ -38,14 +37,16 @@ import org.apache.james.util.streams.Iterators;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.github.fge.lambdas.Throwing;
import com.google.common.collect.ImmutableList;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
/**
* {@link MessageSearchIndex} which needs to get registered as global {@link MailboxListener} and so get
* notified about message changes. This will then allow to update the underlying index.
*/
-public abstract class ListeningMessageSearchIndex implements MessageSearchIndex, MailboxListener.GroupMailboxListener {
+public abstract class ListeningMessageSearchIndex implements MessageSearchIndex, MailboxListener.ReactiveGroupMailboxListener {
private static final Logger LOGGER = LoggerFactory.getLogger(ListeningMessageSearchIndex.class);
protected static final int UNLIMITED = -1;
@@ -68,45 +69,53 @@ public abstract class ListeningMessageSearchIndex implements MessageSearchIndex,
* something relevant is received
*/
@Override
- public void event(Event event) throws Exception {
- handleMailboxEvent(event,
+ public Mono<Void> reactiveEvent(Event event) {
+ return handleMailboxEvent(event,
sessionProvider.createSystemSession(event.getUsername()),
(MailboxEvent) event);
}
- private void handleMailboxEvent(Event event, MailboxSession session, MailboxEvent mailboxEvent) throws Exception {
+ private Mono<Void> handleMailboxEvent(Event event, MailboxSession session, MailboxEvent mailboxEvent) {
MailboxId mailboxId = mailboxEvent.getMailboxId();
if (event instanceof Added) {
- Mailbox mailbox = factory.getMailboxMapper(session).findMailboxById(mailboxId);
- handleAdded(session, mailbox, (Added) event);
+ return factory.getMailboxMapper(session)
+ .findMailboxByIdReactive(mailboxId)
+ .flatMap(mailbox -> handleAdded(session, mailbox, (Added) event));
} else if (event instanceof Expunged) {
- Mailbox mailbox = factory.getMailboxMapper(session).findMailboxById(mailboxId);
Expunged expunged = (Expunged) event;
- delete(session, mailbox, expunged.getUids());
+
+ return factory.getMailboxMapper(session)
+ .findMailboxByIdReactive(mailboxId)
+ .flatMap(mailbox -> delete(session, mailbox, expunged.getUids()));
} else if (event instanceof FlagsUpdated) {
- Mailbox mailbox = factory.getMailboxMapper(session).findMailboxById(mailboxId);
FlagsUpdated flagsUpdated = (FlagsUpdated) event;
- update(session, mailbox, flagsUpdated.getUpdatedFlags());
+
+ return factory.getMailboxMapper(session)
+ .findMailboxByIdReactive(mailboxId)
+ .flatMap(mailbox -> update(session, mailbox, flagsUpdated.getUpdatedFlags()));
} else if (event instanceof MailboxDeletion) {
- deleteAll(session, mailboxId);
+ return deleteAll(session, mailboxId);
+ } else {
+ return Mono.empty();
}
}
- private void handleAdded(MailboxSession session, Mailbox mailbox, Added added) {
- MessageRange.toRanges(added.getUids())
- .stream()
- .flatMap(range -> retrieveMailboxMessages(session, mailbox, range))
- .forEach(Throwing.<MailboxMessage>consumer(mailboxMessage -> add(session, mailbox, mailboxMessage)).sneakyThrow());
+ private Mono<Void> handleAdded(MailboxSession session, Mailbox mailbox, Added added) {
+ return Flux.fromIterable(MessageRange.toRanges(added.getUids()))
+ .concatMap(range -> retrieveMailboxMessages(session, mailbox, range))
+ .concatMap(mailboxMessage -> add(session, mailbox, mailboxMessage))
+ .then();
}
- private Stream<MailboxMessage> retrieveMailboxMessages(MailboxSession session, Mailbox mailbox, MessageRange range) {
+ private Flux<MailboxMessage> retrieveMailboxMessages(MailboxSession session, Mailbox mailbox, MessageRange range) {
try {
- return Iterators.toStream(factory.getMessageMapper(session)
+ return Iterators.toFlux(factory.getMessageMapper(session)
.findInMailbox(mailbox, range, FetchType.Full, UNLIMITED));
} catch (Exception e) {
+ // todo this error handling makes us loose messages!
LOGGER.error("Could not retrieve message {} in mailbox {}", range.toString(), mailbox.getMailboxId().serialize(), e);
- return Stream.empty();
+ return Flux.empty();
}
}
@@ -117,7 +126,7 @@ public abstract class ListeningMessageSearchIndex implements MessageSearchIndex,
* @param mailbox mailbox on which the message addition was performed
* @param message The added message
*/
- public abstract void add(MailboxSession session, Mailbox mailbox, MailboxMessage message) throws Exception;
+ public abstract Mono<Void> add(MailboxSession session, Mailbox mailbox, MailboxMessage message);
/**
* Delete the concerned UIDs for the given {@link Mailbox} from the index
@@ -126,7 +135,7 @@ public abstract class ListeningMessageSearchIndex implements MessageSearchIndex,
* @param mailbox mailbox on which the expunge was performed
* @param expungedUids UIDS to be deleted
*/
- public abstract void delete(MailboxSession session, Mailbox mailbox, Collection<MessageUid> expungedUids) throws Exception;
+ public abstract Mono<Void> delete(MailboxSession session, Mailbox mailbox, Collection<MessageUid> expungedUids);
/**
* Delete the messages contained in the given {@link Mailbox} from the index
@@ -134,7 +143,7 @@ public abstract class ListeningMessageSearchIndex implements MessageSearchIndex,
* @param session The mailbox session performing the expunge
* @param mailboxId mailboxId on which the expunge was performed
*/
- public abstract void deleteAll(MailboxSession session, MailboxId mailboxId) throws Exception;
+ public abstract Mono<Void> deleteAll(MailboxSession session, MailboxId mailboxId);
/**
* Update the messages concerned by the updated flags list for the given {@link Mailbox}
@@ -143,5 +152,5 @@ public abstract class ListeningMessageSearchIndex implements MessageSearchIndex,
* @param mailbox mailbox containing the updated messages
* @param updatedFlagsList list of flags that were updated
*/
- public abstract void update(MailboxSession session, Mailbox mailbox, List<UpdatedFlags> updatedFlagsList) throws Exception;
+ public abstract Mono<Void> update(MailboxSession session, Mailbox mailbox, List<UpdatedFlags> updatedFlagsList);
}
diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
index 0f8cb27..8e06511 100644
--- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
+++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
@@ -74,7 +74,7 @@ public class ReIndexerPerformer {
LOGGER.info("Intend to reindex mailbox with mailboxId {}", mailboxId.serialize());
MailboxSession mailboxSession = mailboxManager.createSystemSession(RE_INDEXER_PERFORMER_USER);
Mailbox mailbox = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession).findMailboxById(mailboxId);
- messageSearchIndex.deleteAll(mailboxSession, mailboxId);
+ messageSearchIndex.deleteAll(mailboxSession, mailboxId).block();
try {
return Iterators.toStream(
mailboxSessionMapperFactory.getMessageMapper(mailboxSession)
@@ -163,7 +163,7 @@ public class ReIndexerPerformer {
try {
MailboxMapper mailboxMapper = mailboxSessionMapperFactory.getMailboxMapper(session);
Mailbox mailbox = mailboxMapper.findMailboxById(mailboxMessage.getMailboxId());
- messageSearchIndex.add(session, mailbox, mailboxMessage);
+ messageSearchIndex.add(session, mailbox, mailboxMessage).block();
return Task.Result.COMPLETED;
} catch (Exception e) {
LOGGER.warn("Failed to re-index {} in {}", mailboxMessage.getUid(), mailboxMessage.getMailboxId(), e);
@@ -189,7 +189,7 @@ public class ReIndexerPerformer {
try {
Optional.of(uid)
.flatMap(Throwing.function(mUid -> fullyReadMessage(mailboxSession, mailbox, mUid)))
- .ifPresent(Throwing.consumer(message -> messageSearchIndex.add(mailboxSession, mailbox, message)));
+ .ifPresent(message -> messageSearchIndex.add(mailboxSession, mailbox, message).block());
reprocessingContext.recordSuccess();
return Task.Result.COMPLETED;
} catch (Exception e) {
diff --git a/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/CassandraReIndexerImplTest.java b/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/CassandraReIndexerImplTest.java
index 2362609..c147110 100644
--- a/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/CassandraReIndexerImplTest.java
+++ b/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/CassandraReIndexerImplTest.java
@@ -24,6 +24,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
@@ -51,6 +52,8 @@ import org.junit.jupiter.api.extension.RegisterExtension;
import com.google.common.base.Strings;
+import reactor.core.publisher.Mono;
+
public class CassandraReIndexerImplTest {
private static final Username USERNAME = Username.of("benwa@apache.org");
public static final MailboxPath INBOX = MailboxPath.inbox(USERNAME);
@@ -67,6 +70,8 @@ public class CassandraReIndexerImplTest {
mailboxManager = CassandraMailboxManagerProvider.provideMailboxManager(cassandra, PreDeletionHooks.NO_PRE_DELETION_HOOK);
MailboxSessionMapperFactory mailboxSessionMapperFactory = mailboxManager.getMapperFactory();
messageSearchIndex = mock(ListeningMessageSearchIndex.class);
+ when(messageSearchIndex.add(any(), any(), any())).thenReturn(Mono.empty());
+ when(messageSearchIndex.deleteAll(any(), any())).thenReturn(Mono.empty());
reIndexer = new ReIndexerImpl(new ReIndexerPerformer(mailboxManager, messageSearchIndex, mailboxSessionMapperFactory),
mailboxManager, mailboxSessionMapperFactory);
}
diff --git a/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/MessageIdReIndexerImplTest.java b/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/MessageIdReIndexerImplTest.java
index 5aefb71..823114f 100644
--- a/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/MessageIdReIndexerImplTest.java
+++ b/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/MessageIdReIndexerImplTest.java
@@ -24,6 +24,7 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
import org.apache.james.core.Username;
import org.apache.james.mailbox.MailboxSession;
@@ -41,6 +42,8 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
+import reactor.core.publisher.Mono;
+
public class MessageIdReIndexerImplTest {
private static final Username USERNAME = Username.of("benwa@apache.org");
public static final MailboxPath INBOX = MailboxPath.inbox(USERNAME);
@@ -56,6 +59,7 @@ public class MessageIdReIndexerImplTest {
mailboxManager = InMemoryIntegrationResources.defaultResources().getMailboxManager();
MailboxSessionMapperFactory mailboxSessionMapperFactory = mailboxManager.getMapperFactory();
messageSearchIndex = mock(ListeningMessageSearchIndex.class);
+ when(messageSearchIndex.add(any(), any(), any())).thenReturn(Mono.empty());
reindexerPerformer = new ReIndexerPerformer(mailboxManager, messageSearchIndex, mailboxSessionMapperFactory);
reIndexer = new MessageIdReIndexerImpl(reindexerPerformer);
}
diff --git a/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/ReIndexerImplTest.java b/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/ReIndexerImplTest.java
index f38f39c..d5b15e7 100644
--- a/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/ReIndexerImplTest.java
+++ b/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/ReIndexerImplTest.java
@@ -25,6 +25,7 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
import org.apache.james.core.Username;
import org.apache.james.mailbox.MailboxSession;
@@ -46,6 +47,8 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
+import reactor.core.publisher.Mono;
+
public class ReIndexerImplTest {
private static final Username USERNAME = Username.of("benwa@apache.org");
@@ -60,6 +63,8 @@ public class ReIndexerImplTest {
mailboxManager = InMemoryIntegrationResources.defaultResources().getMailboxManager();
MailboxSessionMapperFactory mailboxSessionMapperFactory = mailboxManager.getMapperFactory();
messageSearchIndex = mock(ListeningMessageSearchIndex.class);
+ when(messageSearchIndex.add(any(), any(), any())).thenReturn(Mono.empty());
+ when(messageSearchIndex.deleteAll(any(), any())).thenReturn(Mono.empty());
reIndexer = new ReIndexerImpl(new ReIndexerPerformer(mailboxManager, messageSearchIndex, mailboxSessionMapperFactory),
mailboxManager, mailboxSessionMapperFactory);
}
diff --git a/server/container/guice/memory-guice/src/main/java/org/apache/james/FakeMessageSearchIndex.java b/server/container/guice/memory-guice/src/main/java/org/apache/james/FakeMessageSearchIndex.java
index f9b866c..46fe6d0 100644
--- a/server/container/guice/memory-guice/src/main/java/org/apache/james/FakeMessageSearchIndex.java
+++ b/server/container/guice/memory-guice/src/main/java/org/apache/james/FakeMessageSearchIndex.java
@@ -39,6 +39,7 @@ import org.apache.james.mailbox.store.mail.model.MailboxMessage;
import org.apache.james.mailbox.store.search.ListeningMessageSearchIndex;
import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
public class FakeMessageSearchIndex extends ListeningMessageSearchIndex {
private static class FakeMessageSearchIndexGroup extends Group {
@@ -52,22 +53,22 @@ public class FakeMessageSearchIndex extends ListeningMessageSearchIndex {
}
@Override
- public void add(MailboxSession session, Mailbox mailbox, MailboxMessage message) throws Exception {
+ public Mono<Void> add(MailboxSession session, Mailbox mailbox, MailboxMessage message) {
throw new NotImplementedException("not implemented");
}
@Override
- public void delete(MailboxSession session, Mailbox mailbox, Collection<MessageUid> expungedUids) throws Exception {
+ public Mono<Void> delete(MailboxSession session, Mailbox mailbox, Collection<MessageUid> expungedUids) {
throw new NotImplementedException("not implemented");
}
@Override
- public void deleteAll(MailboxSession session, MailboxId mailboxId) throws Exception {
+ public Mono<Void> deleteAll(MailboxSession session, MailboxId mailboxId) {
throw new NotImplementedException("not implemented");
}
@Override
- public void update(MailboxSession session, Mailbox mailbox, List<UpdatedFlags> updatedFlagsList) throws Exception {
+ public Mono<Void> update(MailboxSession session, Mailbox mailbox, List<UpdatedFlags> updatedFlagsList) {
throw new NotImplementedException("not implemented");
}
diff --git a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MailboxesRoutesTest.java b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MailboxesRoutesTest.java
index 9d211d4..05a333c 100644
--- a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MailboxesRoutesTest.java
+++ b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MailboxesRoutesTest.java
@@ -26,7 +26,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
@@ -64,9 +64,11 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
import org.testcontainers.shaded.com.google.common.collect.ImmutableSet;
import io.restassured.RestAssured;
+import reactor.core.publisher.Mono;
class MailboxesRoutesTest {
private static final Username USERNAME = Username.of("benwa@apache.org");
@@ -83,6 +85,8 @@ class MailboxesRoutesTest {
taskManager = new MemoryTaskManager(new Hostname("foo"));
InMemoryId.Factory mailboxIdFactory = new InMemoryId.Factory();
searchIndex = mock(ListeningMessageSearchIndex.class);
+ Mockito.when(searchIndex.add(any(), any(), any())).thenReturn(Mono.empty());
+ Mockito.when(searchIndex.deleteAll(any(), any())).thenReturn(Mono.empty());
ReIndexerPerformer reIndexerPerformer = new ReIndexerPerformer(
mailboxManager,
searchIndex,
@@ -750,7 +754,7 @@ class MailboxesRoutesTest {
.basePath(TasksRoutes.BASE)
.get(taskId + "/await");
- doNothing().when(searchIndex).add(any(MailboxSession.class), any(Mailbox.class), any(MailboxMessage.class));
+ doReturn(Mono.empty()).when(searchIndex).add(any(MailboxSession.class), any(Mailbox.class), any(MailboxMessage.class));
String fixingTaskId = with()
.queryParam("reIndexFailedMessagesOf", taskId)
@@ -843,6 +847,8 @@ class MailboxesRoutesTest {
.get(taskId + "/await");
reset(searchIndex);
+ Mockito.when(searchIndex.add(any(), any(), any())).thenReturn(Mono.empty());
+ Mockito.when(searchIndex.deleteAll(any(), any())).thenReturn(Mono.empty());
String fixingTaskId = with()
.queryParam("reIndexFailedMessagesOf", taskId)
diff --git a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MessageRoutesTest.java b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MessageRoutesTest.java
index 02be730..45c4d98 100644
--- a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MessageRoutesTest.java
+++ b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MessageRoutesTest.java
@@ -55,9 +55,12 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
-import org.testcontainers.shaded.com.google.common.collect.ImmutableSet;
+import org.mockito.Mockito;
+
+import com.google.common.collect.ImmutableSet;
import io.restassured.RestAssured;
+import reactor.core.publisher.Mono;
class MessageRoutesTest {
private static final Username USERNAME = Username.of("benwa@apache.org");
@@ -72,7 +75,9 @@ class MessageRoutesTest {
void beforeEach() {
taskManager = new MemoryTaskManager(new Hostname("foo"));
mailboxManager = InMemoryIntegrationResources.defaultResources().getMailboxManager();
- searchIndex = mock(ListeningMessageSearchIndex.class);
+ searchIndex = mock(ListeningMessageSearchIndex.class);;
+ Mockito.when(searchIndex.add(any(), any(), any())).thenReturn(Mono.empty());
+ Mockito.when(searchIndex.deleteAll(any(), any())).thenReturn(Mono.empty());
ReIndexerPerformer reIndexerPerformer = new ReIndexerPerformer(
mailboxManager,
searchIndex,
diff --git a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/UserMailboxesRoutesTest.java b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/UserMailboxesRoutesTest.java
index 7b6875d..5d21359 100644
--- a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/UserMailboxesRoutesTest.java
+++ b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/UserMailboxesRoutesTest.java
@@ -78,6 +78,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
@@ -103,6 +104,8 @@ class UserMailboxesRoutesTest {
taskManager = new MemoryTaskManager(new Hostname("foo"));
searchIndex = mock(ListeningMessageSearchIndex.class);
+ Mockito.when(searchIndex.add(any(), any(), any())).thenReturn(Mono.empty());
+ Mockito.when(searchIndex.deleteAll(any(), any())).thenReturn(Mono.empty());
ReIndexerPerformer reIndexerPerformer = new ReIndexerPerformer(
mailboxManager,
searchIndex,
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org