You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/04/16 02:11:58 UTC
[james-project] 03/03: JAMES-3149 Reactive GetMessageList
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 58af5c33e911053e9971ff142b26ff640a95a518
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Sun Apr 5 23:50:42 2020 +0700
JAMES-3149 Reactive GetMessageList
---
.../versions/CassandraSchemaVersionManager.java | 22 ++---
.../SessionWithInitializedTablesFactoryTest.java | 14 ++--
.../CassandraSchemaVersionManagerTest.java | 6 +-
.../org/apache/james/mailbox/MailboxManager.java | 2 +-
.../apache/james/mailbox/MailboxManagerTest.java | 49 ++++++++----
.../cassandra/mail/CassandraMailboxMapper.java | 52 ++++++------
.../cassandra/mail/CassandraMessageIdMapper.java | 27 +++++--
.../task/SolveMailboxInconsistenciesService.java | 2 +-
.../cassandra/mail/CassandraMailboxMapperTest.java | 93 ++++++++++++++--------
.../ElasticSearchListeningMessageSearchIndex.java | 9 +--
.../search/ElasticSearchSearcherTest.java | 3 +-
.../james/mailbox/jpa/mail/JPAMailboxMapper.java | 16 ++--
.../jpa/mail/TransactionalMailboxMapper.java | 5 +-
.../lucene/search/LuceneMessageSearchIndex.java | 10 ++-
.../LuceneMailboxMessageSearchIndexTest.java | 12 ++-
.../mailbox/maildir/mail/MaildirMailboxMapper.java | 18 ++---
.../inmemory/mail/InMemoryMailboxMapper.java | 18 ++---
.../james/vault/DeletedMessageVaultHook.java | 14 ++--
.../james/mailbox/store/StoreMailboxManager.java | 53 ++++++------
.../james/mailbox/store/mail/MailboxMapper.java | 26 +++---
.../store/quota/DefaultUserQuotaRootResolver.java | 3 +-
.../store/search/LazyMessageSearchIndex.java | 4 +-
.../mailbox/store/search/MessageSearchIndex.java | 5 +-
.../store/search/SimpleMessageSearchIndex.java | 31 ++++----
.../store/AbstractCombinationManagerTest.java | 5 +-
.../store/mail/model/MailboxMapperACLTest.java | 24 +++---
.../store/mail/model/MailboxMapperTest.java | 44 ++--------
.../quota/DefaultUserQuotaRootResolverTest.java | 4 +-
.../search/AbstractMessageSearchIndexTest.java | 36 ++++++---
.../CassandraSchemaVersionStartUpCheck.java | 6 +-
.../org/apache/james/FakeMessageSearchIndex.java | 4 +-
.../cassandra/CassandraRecipientRewriteTable.java | 2 +-
.../jmap/draft/methods/GetMessageListMethod.java | 71 +++++++++--------
.../james/jmap/draft/methods/ReferenceUpdater.java | 5 +-
.../mailet/ExtractMDNOriginalJMAPMessageId.java | 4 +-
.../routes/DeletedMessagesVaultRoutesTest.java | 3 +-
36 files changed, 378 insertions(+), 324 deletions(-)
diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManager.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManager.java
index 9b47f39..516fbd3 100644
--- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManager.java
+++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManager.java
@@ -32,6 +32,8 @@ import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import reactor.core.publisher.Mono;
+
public class CassandraSchemaVersionManager {
public static final SchemaVersion MIN_VERSION = new SchemaVersion(5);
public static final SchemaVersion MAX_VERSION = new SchemaVersion(7);
@@ -65,24 +67,26 @@ public class CassandraSchemaVersionManager {
this.minVersion = minVersion;
this.maxVersion = maxVersion;
- this.initialSchemaVersion = computeVersion();
+ this.initialSchemaVersion = computeVersion().block();
}
- public boolean isBefore(SchemaVersion minimum) {
- return initialSchemaVersion.isBefore(minimum)
+ public Mono<Boolean> isBefore(SchemaVersion minimum) {
+ if (initialSchemaVersion.isBefore(minimum)) {
// If we started with a legacy james then maybe schema version had been updated since then
- && computeVersion().isBefore(minimum);
+ return computeVersion()
+ .map(computedVersion -> computedVersion.isBefore(minimum));
+ }
+ return Mono.just(false);
}
- public SchemaVersion computeVersion() {
+ public Mono<SchemaVersion> computeVersion() {
return schemaVersionDAO
.getCurrentSchemaVersion()
- .block()
- .orElseGet(() -> {
+ .map(maybeVersion -> maybeVersion.orElseGet(() -> {
LOGGER.warn("No schema version information found on Cassandra, we assume schema is at version {}",
CassandraSchemaVersionManager.DEFAULT_VERSION);
return DEFAULT_VERSION;
- });
+ }));
}
public SchemaVersion getMinimumSupportedVersion() {
@@ -94,7 +98,7 @@ public class CassandraSchemaVersionManager {
}
public SchemaState computeSchemaState() {
- SchemaVersion version = computeVersion();
+ SchemaVersion version = computeVersion().block();
if (version.isBefore(minVersion)) {
return TOO_OLD;
} else if (version.isBefore(maxVersion)) {
diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/init/SessionWithInitializedTablesFactoryTest.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/init/SessionWithInitializedTablesFactoryTest.java
index 3c3345f..817fe85 100644
--- a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/init/SessionWithInitializedTablesFactoryTest.java
+++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/init/SessionWithInitializedTablesFactoryTest.java
@@ -83,39 +83,39 @@ class SessionWithInitializedTablesFactoryTest {
@Test
void createSessionShouldSetTheLatestSchemaVersionWhenCreatingTypesAndTables() {
- assertThat(versionManager(testee.get()).computeVersion())
+ assertThat(versionManager(testee.get()).computeVersion().block())
.isEqualTo(MAX_VERSION);
}
@Test
void createSessionShouldKeepTheSetSchemaVersionWhenTypesAndTablesHaveNotChanged() {
Session session = testee.get();
- assertThat(versionManager(session).computeVersion())
+ assertThat(versionManager(session).computeVersion().block())
.isEqualTo(MAX_VERSION);
new CassandraTableManager(MODULE, session).clearAllTables();
versionManagerDAO(session).updateVersion(MIN_VERSION);
- assertThat(versionManager(session).computeVersion())
+ assertThat(versionManager(session).computeVersion().block())
.isEqualTo(MIN_VERSION);
- assertThat(versionManager(testee.get()).computeVersion())
+ assertThat(versionManager(testee.get()).computeVersion().block())
.isEqualTo(MIN_VERSION);
}
@Test
void createSessionShouldKeepTheSetSchemaVersionWhenTypesAndTablesHavePartiallyChanged() {
Session session = testee.get();
- assertThat(versionManager(session).computeVersion())
+ assertThat(versionManager(session).computeVersion().block())
.isEqualTo(MAX_VERSION);
new CassandraTableManager(MODULE, session).clearAllTables();
versionManagerDAO(session).updateVersion(MIN_VERSION);
- assertThat(versionManager(session).computeVersion())
+ assertThat(versionManager(session).computeVersion().block())
.isEqualTo(MIN_VERSION);
session.execute(SchemaBuilder.dropTable(TABLE_NAME));
session.execute(SchemaBuilder.dropType(TYPE_NAME));
- assertThat(versionManager(testee.get()).computeVersion())
+ assertThat(versionManager(testee.get()).computeVersion().block())
.isEqualTo(MIN_VERSION);
}
diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManagerTest.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManagerTest.java
index 64c498e..4fb33fb 100644
--- a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManagerTest.java
+++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManagerTest.java
@@ -74,7 +74,7 @@ class CassandraSchemaVersionManagerTest {
minVersion,
maxVersion);
- assertThat(testee.isBefore(maxVersion)).isTrue();
+ assertThat(testee.isBefore(maxVersion).block()).isTrue();
}
@Test
@@ -89,7 +89,7 @@ class CassandraSchemaVersionManagerTest {
minVersion,
maxVersion);
- assertThat(testee.isBefore(maxVersion)).isFalse();
+ assertThat(testee.isBefore(maxVersion).block()).isFalse();
}
@Test
@@ -107,7 +107,7 @@ class CassandraSchemaVersionManagerTest {
when(schemaVersionDAO.getCurrentSchemaVersion())
.thenReturn(Mono.just(Optional.of(maxVersion)));
- assertThat(testee.isBefore(maxVersion)).isFalse();
+ assertThat(testee.isBefore(maxVersion).block()).isFalse();
}
@Test
diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/MailboxManager.java b/mailbox/api/src/main/java/org/apache/james/mailbox/MailboxManager.java
index 6b83d9f..9f31163 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/MailboxManager.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/MailboxManager.java
@@ -251,7 +251,7 @@ public interface MailboxManager extends RequestAware, RightManager, MailboxAnnot
* @param session
* the context for this call, not null
*/
- List<MessageId> search(MultimailboxesSearchQuery expression, MailboxSession session, long limit) throws MailboxException;
+ Publisher<MessageId> search(MultimailboxesSearchQuery expression, MailboxSession session, long limit) throws MailboxException;
/**
* Does the given mailbox exist?
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerTest.java b/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerTest.java
index 5397ed9..f51c124 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerTest.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerTest.java
@@ -94,6 +94,7 @@ import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
@@ -1208,7 +1209,8 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
.build();
- assertThat(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT))
+ assertThat(Flux.from(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT))
+ .collectList().block())
.containsOnly(cacahueteMessageId, pirouetteMessageId);
}
@@ -1237,7 +1239,8 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
.from(new SearchQuery())
.build();
- assertThat(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT))
+ assertThat(Flux.from(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT))
+ .collectList().block())
.containsOnly(messageId);
}
@@ -1264,7 +1267,8 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
.from(new SearchQuery())
.build();
- assertThat(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT))
+ assertThat(Flux.from(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT))
+ .collectList().block())
.isEmpty();
}
@@ -1284,7 +1288,8 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
.from(new SearchQuery())
.build();
- assertThat(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT))
+ assertThat(Flux.from(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT))
+ .collectList().block())
.isEmpty();
}
@@ -1305,7 +1310,8 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
.inMailboxes(otherMailboxId)
.build();
- assertThat(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT))
+ assertThat(Flux.from(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT))
+ .collectList().block())
.isEmpty();
}
@@ -1325,7 +1331,8 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
.notInMailboxes(otherMailboxId)
.build();
- assertThat(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT))
+ assertThat(Flux.from(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT))
+ .collectList().block())
.isEmpty();
}
@@ -1346,7 +1353,8 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
.notInMailboxes(otherMailboxId)
.build();
- assertThat(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT))
+ assertThat(Flux.from(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT))
+ .collectList().block())
.isEmpty();
}
@@ -1375,7 +1383,8 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
.inMailboxes(searchedMailboxId)
.build();
- assertThat(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT))
+ assertThat(Flux.from(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT))
+ .collectList().block())
.containsExactly(messageId);
}
@@ -1850,9 +1859,11 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
.inMailboxes(otherMailboxId)
.build();
- assertThat(mailboxManager.search(inboxQuery, session, DEFAULT_MAXIMUM_LIMIT))
+ assertThat(Flux.from(mailboxManager.search(inboxQuery, session, DEFAULT_MAXIMUM_LIMIT))
+ .collectList().block())
.isEmpty();
- assertThat(mailboxManager.search(otherMailboxQuery, session, DEFAULT_MAXIMUM_LIMIT))
+ assertThat(Flux.from(mailboxManager.search(otherMailboxQuery, session, DEFAULT_MAXIMUM_LIMIT))
+ .collectList().block())
.containsExactly(messageId1, messageId2);
}
@@ -1889,9 +1900,11 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
.inMailboxes(otherMailboxId)
.build();
- assertThat(mailboxManager.search(inboxQuery, session, DEFAULT_MAXIMUM_LIMIT))
+ assertThat(Flux.from(mailboxManager.search(inboxQuery, session, DEFAULT_MAXIMUM_LIMIT))
+ .collectList().block())
.containsExactly(messageId2);
- assertThat(mailboxManager.search(otherMailboxQuery, session, DEFAULT_MAXIMUM_LIMIT))
+ assertThat(Flux.from(mailboxManager.search(otherMailboxQuery, session, DEFAULT_MAXIMUM_LIMIT))
+ .collectList().block())
.containsExactly(composedMessageId1.getMessageId());
}
@@ -1979,9 +1992,11 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
.inMailboxes(otherMailboxId)
.build();
- assertThat(mailboxManager.search(inboxQuery, session, DEFAULT_MAXIMUM_LIMIT))
+ assertThat(Flux.from(mailboxManager.search(inboxQuery, session, DEFAULT_MAXIMUM_LIMIT))
+ .collectList().block())
.containsExactly(messageId1, messageId2);
- assertThat(mailboxManager.search(otherMailboxQuery, session, DEFAULT_MAXIMUM_LIMIT))
+ assertThat(Flux.from(mailboxManager.search(otherMailboxQuery, session, DEFAULT_MAXIMUM_LIMIT))
+ .collectList().block())
.containsExactly(messageId1, messageId2);
}
@@ -2019,9 +2034,11 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
.inMailboxes(otherMailboxId)
.build();
- assertThat(mailboxManager.search(inboxQuery, session, DEFAULT_MAXIMUM_LIMIT))
+ assertThat(Flux.from(mailboxManager.search(inboxQuery, session, DEFAULT_MAXIMUM_LIMIT))
+ .collectList().block())
.containsExactly(messageId1, messageId2);
- assertThat(mailboxManager.search(otherMailboxQuery, session, DEFAULT_MAXIMUM_LIMIT))
+ assertThat(Flux.from(mailboxManager.search(otherMailboxQuery, session, DEFAULT_MAXIMUM_LIMIT))
+ .collectList().block())
.containsExactly(messageId1);
}
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java
index d74ce1c..5a15d9e 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java
@@ -20,9 +20,7 @@
package org.apache.james.mailbox.cassandra.mail;
import java.time.Duration;
-import java.util.Collection;
import java.util.List;
-import java.util.stream.Stream;
import javax.inject.Inject;
@@ -82,7 +80,7 @@ public class CassandraMailboxMapper implements MailboxMapper {
this.versionManager = versionManager;
}
- private boolean needMailboxPathV1Support() {
+ private Mono<Boolean> needMailboxPathV1Support() {
return versionManager.isBefore(MAILBOX_PATH_V_2_MIGRATION_PERFORMED_VERSION);
}
@@ -96,12 +94,15 @@ public class CassandraMailboxMapper implements MailboxMapper {
}
private Flux<Void> deletePath(Mailbox mailbox) {
- if (needMailboxPathV1Support()) {
- return Flux.merge(
- mailboxPathDAO.delete(mailbox.generateAssociatedPath()),
- mailboxPathV2DAO.delete(mailbox.generateAssociatedPath()));
- }
- return Flux.from(mailboxPathV2DAO.delete(mailbox.generateAssociatedPath()));
+ return needMailboxPathV1Support()
+ .flatMapMany(needSupport -> {
+ if (needSupport) {
+ return Flux.merge(
+ mailboxPathDAO.delete(mailbox.generateAssociatedPath()),
+ mailboxPathV2DAO.delete(mailbox.generateAssociatedPath()));
+ }
+ return Flux.from(mailboxPathV2DAO.delete(mailbox.generateAssociatedPath()));
+ });
}
@Override
@@ -144,11 +145,9 @@ public class CassandraMailboxMapper implements MailboxMapper {
}
@Override
- public Stream<Mailbox> findMailboxesById(Collection<MailboxId> mailboxIds) {
- return Flux.fromIterable(mailboxIds)
- .map(CassandraId.class::cast)
- .concatMap(this::retrieveMailbox)
- .toStream();
+ public Mono<Mailbox> findMailboxByIdReactive(MailboxId id) {
+ CassandraId mailboxId = (CassandraId) id;
+ return retrieveMailbox(mailboxId);
}
private Mono<Mailbox> retrieveMailbox(CassandraId mailboxId) {
@@ -164,24 +163,25 @@ public class CassandraMailboxMapper implements MailboxMapper {
}
@Override
- public List<Mailbox> findMailboxWithPathLike(MailboxQuery.UserBound query) {
+ public Flux<Mailbox> findMailboxWithPathLike(MailboxQuery.UserBound query) {
String fixedNamespace = query.getFixedNamespace();
Username fixedUser = query.getFixedUser();
return listPaths(fixedNamespace, fixedUser)
.filter(idAndPath -> query.isPathMatch(idAndPath.getMailboxPath()))
.distinct(CassandraIdAndPath::getMailboxPath)
- .concatMap(this::retrieveMailbox)
- .collectList()
- .block();
+ .concatMap(this::retrieveMailbox);
}
private Flux<CassandraIdAndPath> listPaths(String fixedNamespace, Username fixedUser) {
- if (needMailboxPathV1Support()) {
- return Flux.concat(mailboxPathV2DAO.listUserMailboxes(fixedNamespace, fixedUser),
- mailboxPathDAO.listUserMailboxes(fixedNamespace, fixedUser));
- }
- return mailboxPathV2DAO.listUserMailboxes(fixedNamespace, fixedUser);
+ return needMailboxPathV1Support()
+ .flatMapMany(needSupport -> {
+ if (needSupport) {
+ return Flux.concat(mailboxPathV2DAO.listUserMailboxes(fixedNamespace, fixedUser),
+ mailboxPathDAO.listUserMailboxes(fixedNamespace, fixedUser));
+ }
+ return mailboxPathV2DAO.listUserMailboxes(fixedNamespace, fixedUser);
+ });
}
private Mono<Mailbox> retrieveMailbox(CassandraIdAndPath idAndPath) {
@@ -302,12 +302,10 @@ public class CassandraMailboxMapper implements MailboxMapper {
}
@Override
- public List<Mailbox> findNonPersonalMailboxes(Username userName, Right right) {
+ public Flux<Mailbox> findNonPersonalMailboxes(Username userName, Right right) {
return userMailboxRightsDAO.listRightsForUser(userName)
.filter(mailboxId -> mailboxId.getRight().contains(right))
.map(Pair::getLeft)
- .flatMap(this::retrieveMailbox)
- .collectList()
- .block();
+ .flatMap(this::retrieveMailbox);
}
}
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
index 47cd80b..107e37b 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
@@ -32,6 +32,7 @@ import org.apache.james.mailbox.MessageManager;
import org.apache.james.mailbox.cassandra.ids.CassandraId;
import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
import org.apache.james.mailbox.exception.MailboxException;
+import org.apache.james.mailbox.exception.MailboxNotFoundException;
import org.apache.james.mailbox.model.ComposedMessageId;
import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
import org.apache.james.mailbox.model.MailboxId;
@@ -48,6 +49,7 @@ import org.apache.james.util.streams.Limit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.github.fge.lambdas.runnable.ThrowingRunnable;
import com.github.steveash.guavate.Guavate;
import com.google.common.collect.Multimap;
@@ -129,20 +131,31 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
@Override
public void save(MailboxMessage mailboxMessage) throws MailboxException {
CassandraId mailboxId = (CassandraId) mailboxMessage.getMailboxId();
- mailboxMapper.findMailboxById(mailboxId);
-
- messageDAO.save(mailboxMessage)
+ unbox(() -> mailboxMapper.findMailboxByIdReactive(mailboxId)
+ .switchIfEmpty(Mono.error(new MailboxNotFoundException(mailboxId)))
+ .then(messageDAO.save(mailboxMessage))
.thenEmpty(saveMessageMetadata(mailboxMessage, mailboxId))
- .block();
+ .block());
}
@Override
public void copyInMailbox(MailboxMessage mailboxMessage) throws MailboxException {
CassandraId mailboxId = (CassandraId) mailboxMessage.getMailboxId();
- mailboxMapper.findMailboxById(mailboxId);
+ unbox(() -> mailboxMapper.findMailboxByIdReactive(mailboxId)
+ .switchIfEmpty(Mono.error(new MailboxNotFoundException(mailboxId)))
+ .then(saveMessageMetadata(mailboxMessage, mailboxId))
+ .block());
+ }
- saveMessageMetadata(mailboxMessage, mailboxId)
- .block();
+ private void unbox(ThrowingRunnable runnable) throws MailboxNotFoundException {
+ try {
+ runnable.run();
+ } catch (RuntimeException e) {
+ if (e.getCause() instanceof MailboxNotFoundException) {
+ throw (MailboxNotFoundException) e.getCause();
+ }
+ throw e;
+ }
}
private Mono<Void> saveMessageMetadata(MailboxMessage mailboxMessage, CassandraId mailboxId) {
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesService.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesService.java
index 5aeb181..a10fc6d 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesService.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesService.java
@@ -375,7 +375,7 @@ public class SolveMailboxInconsistenciesService {
}
private void assertValidVersion() {
- SchemaVersion version = versionManager.computeVersion();
+ SchemaVersion version = versionManager.computeVersion().block();
boolean isVersionValid = version.isAfterOrEquals(MAILBOX_PATH_V_2_MIGRATION_PERFORMED_VERSION);
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java
index f6d1dc2..f8e6839 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java
@@ -23,8 +23,6 @@ import static org.apache.james.backends.cassandra.Scenario.Builder.fail;
import static org.apache.james.mailbox.model.MailboxAssertingTool.softly;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doReturn;
import java.util.List;
@@ -61,7 +59,6 @@ import org.junit.jupiter.api.extension.RegisterExtension;
import com.github.fge.lambdas.Throwing;
import com.github.fge.lambdas.runnable.ThrowingRunnable;
-import reactor.core.publisher.Mono;
class CassandraMailboxMapperTest {
private static final UidValidity UID_VALIDITY = UidValidity.of(52);
@@ -162,7 +159,8 @@ class CassandraMailboxMapperTest {
softly(softly)
.assertThat(testee.findMailboxByPath(inboxPathRenamed).block())
.isEqualTo(inboxRenamed);
- softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery))
+ softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery)
+ .collectList().block())
.hasOnlyOneElementSatisfying(searchMailbox -> softly(softly)
.assertThat(searchMailbox)
.isEqualTo(inboxRenamed));
@@ -189,7 +187,8 @@ class CassandraMailboxMapperTest {
softly(softly)
.assertThat(testee.findMailboxByPath(inboxPathRenamed).block())
.isEqualTo(inboxRenamed);
- softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery))
+ softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery)
+ .collectList().block())
.hasOnlyOneElementSatisfying(searchMailbox -> softly(softly)
.assertThat(searchMailbox)
.isEqualTo(inboxRenamed));
@@ -212,7 +211,8 @@ class CassandraMailboxMapperTest {
softly(softly)
.assertThat(testee.findMailboxByPath(inboxPath).block())
.isEqualTo(inbox);
- softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery))
+ softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery)
+ .collectList().block())
.hasOnlyOneElementSatisfying(searchMailbox -> softly(softly)
.assertThat(searchMailbox)
.isEqualTo(inbox));
@@ -235,7 +235,8 @@ class CassandraMailboxMapperTest {
.isInstanceOf(MailboxNotFoundException.class);
softly.assertThat(testee.findMailboxByPath(inboxPath).blockOptional())
.isEmpty();
- softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery))
+ softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery)
+ .collectList().block())
.isEmpty();
}));
}
@@ -253,9 +254,11 @@ class CassandraMailboxMapperTest {
SoftAssertions.assertSoftly(softly -> {
softly.assertThat(testee.findMailboxByPath(inboxPath).blockOptional())
.isEmpty();
- softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery))
+ softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery)
+ .collectList().block())
.isEmpty();
- softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery))
+ softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery)
+ .collectList().block())
.isEmpty();
});
}
@@ -282,7 +285,8 @@ class CassandraMailboxMapperTest {
softly(softly)
.assertThat(testee.findMailboxByPath(inboxPath).block())
.isEqualTo(inbox);
- softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery))
+ softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery)
+ .collectList().block())
.hasOnlyOneElementSatisfying(searchMailbox -> softly(softly)
.assertThat(searchMailbox)
.isEqualTo(inbox));
@@ -304,7 +308,8 @@ class CassandraMailboxMapperTest {
doQuietly(() -> testee.rename(inboxRenamed));
SoftAssertions.assertSoftly(Throwing.consumer(softly -> {
- softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery))
+ softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery)
+ .collectList().block())
.hasOnlyOneElementSatisfying(searchMailbox -> softly(softly)
.assertThat(searchMailbox)
.isEqualTo(inbox));
@@ -328,7 +333,8 @@ class CassandraMailboxMapperTest {
SoftAssertions.assertSoftly(Throwing.consumer(softly -> {
softly.assertThatThrownBy(() -> testee.findMailboxByPath(inboxPathRenamed))
.isInstanceOf(MailboxNotFoundException.class);
- softly.assertThat(testee.findMailboxWithPathLike(inboxRenamedSearchQuery))
+ softly.assertThat(testee.findMailboxWithPathLike(inboxRenamedSearchQuery)
+ .collectList().block())
.isEmpty();
}));
}
@@ -353,7 +359,8 @@ class CassandraMailboxMapperTest {
softly(softly)
.assertThat(testee.findMailboxByPath(inboxPath).block())
.isEqualTo(inbox);
- softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery))
+ softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery)
+ .collectList().block())
.hasOnlyOneElementSatisfying(searchMailbox -> softly(softly)
.assertThat(searchMailbox)
.isEqualTo(inbox));
@@ -375,7 +382,8 @@ class CassandraMailboxMapperTest {
doQuietly(() -> testee.rename(inboxRenamed));
SoftAssertions.assertSoftly(Throwing.consumer(softly -> {
- softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery))
+ softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery)
+ .collectList().block())
.hasOnlyOneElementSatisfying(searchMailbox -> softly(softly)
.assertThat(searchMailbox)
.isEqualTo(inbox));
@@ -399,7 +407,8 @@ class CassandraMailboxMapperTest {
SoftAssertions.assertSoftly(Throwing.consumer(softly -> {
softly.assertThatThrownBy(() -> testee.findMailboxByPath(inboxPathRenamed))
.isInstanceOf(MailboxNotFoundException.class);
- softly.assertThat(testee.findMailboxWithPathLike(inboxRenamedSearchQuery))
+ softly.assertThat(testee.findMailboxWithPathLike(inboxRenamedSearchQuery)
+ .collectList().block())
.isEmpty();
}));
}
@@ -422,11 +431,13 @@ class CassandraMailboxMapperTest {
.doesNotThrowAnyException();
softly.assertThatCode(() -> testee.findMailboxByPath(inboxPath))
.doesNotThrowAnyException();
- softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery))
+ softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery)
+ .collectList().block())
.hasOnlyOneElementSatisfying(searchMailbox -> softly(softly)
.assertThat(searchMailbox)
.isEqualTo(inbox));
- softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery))
+ softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery)
+ .collectList().block())
.hasOnlyOneElementSatisfying(searchMailbox -> softly(softly)
.assertThat(searchMailbox)
.isEqualTo(inbox));
@@ -468,11 +479,13 @@ class CassandraMailboxMapperTest {
softly(softly)
.assertThat(testee.findMailboxByPath(inboxPath).block())
.isEqualTo(inbox);
- softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery))
+ softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery)
+ .collectList().block())
.hasOnlyOneElementSatisfying(searchMailbox -> softly(softly)
.assertThat(searchMailbox)
.isEqualTo(inbox));
- softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery))
+ softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery)
+ .collectList().block())
.hasOnlyOneElementSatisfying(searchMailbox -> softly(softly)
.assertThat(searchMailbox)
.isEqualTo(inbox));
@@ -495,11 +508,13 @@ class CassandraMailboxMapperTest {
softly(softly)
.assertThat(testee.findMailboxByPath(inboxPath).block())
.isEqualTo(inbox);
- softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery))
+ softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery)
+ .collectList().block())
.hasOnlyOneElementSatisfying(searchMailbox -> softly(softly)
.assertThat(searchMailbox)
.isEqualTo(inbox));
- softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery))
+ softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery)
+ .collectList().block())
.hasOnlyOneElementSatisfying(searchMailbox -> softly(softly)
.assertThat(searchMailbox)
.isEqualTo(inbox));
@@ -525,9 +540,11 @@ class CassandraMailboxMapperTest {
.isInstanceOf(MailboxNotFoundException.class);
softly.assertThat(testee.findMailboxByPath(inboxPath).blockOptional())
.isEmpty();
- softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery))
+ softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery)
+ .collectList().block())
.isEmpty();
- softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery))
+ softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery)
+ .collectList().block())
.isEmpty();
}));
}
@@ -556,13 +573,16 @@ class CassandraMailboxMapperTest {
softly(softly)
.assertThat(testee.findMailboxByPath(inboxPathRenamed).block())
.isEqualTo(inboxRenamed);
- softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery))
+ softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery)
+ .collectList().block())
.isEmpty();
- softly.assertThat(testee.findMailboxWithPathLike(inboxRenamedSearchQuery))
+ softly.assertThat(testee.findMailboxWithPathLike(inboxRenamedSearchQuery)
+ .collectList().block())
.hasOnlyOneElementSatisfying(searchMailbox -> softly(softly)
.assertThat(searchMailbox)
.isEqualTo(inboxRenamed));
- softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery))
+ softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery)
+ .collectList().block())
.hasOnlyOneElementSatisfying(searchMailbox -> softly(softly)
.assertThat(searchMailbox)
.isEqualTo(inboxRenamed));
@@ -592,14 +612,17 @@ class CassandraMailboxMapperTest {
softly(softly)
.assertThat(testee.findMailboxByPath(inboxPathRenamed).block())
.isEqualTo(inboxRenamed);
- softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery))
+ softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery)
+ .collectList().block())
.isEmpty();
- softly.assertThat(testee.findMailboxWithPathLike(inboxRenamedSearchQuery))
+ softly.assertThat(testee.findMailboxWithPathLike(inboxRenamedSearchQuery)
+ .collectList().block())
.hasOnlyOneElementSatisfying(searchMailbox ->
softly(softly)
.assertThat(searchMailbox)
.isEqualTo(inboxRenamed));
- softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery))
+ softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery)
+ .collectList().block())
.hasOnlyOneElementSatisfying(searchMailbox -> softly(softly)
.assertThat(searchMailbox)
.isEqualTo(inboxRenamed));
@@ -763,7 +786,8 @@ class CassandraMailboxMapperTest {
.username(USER)
.expression(Wildcard.INSTANCE)
.build()
- .asUserBound());
+ .asUserBound())
+ .collectList().block();
assertThat(mailboxes).containsOnly(MAILBOX);
}
@@ -782,7 +806,8 @@ class CassandraMailboxMapperTest {
.username(USER)
.expression(Wildcard.INSTANCE)
.build()
- .asUserBound());
+ .asUserBound())
+ .collectList().block();
assertThat(mailboxes).containsOnly(MAILBOX);
}
@@ -799,7 +824,8 @@ class CassandraMailboxMapperTest {
.username(USER)
.expression(Wildcard.INSTANCE)
.build()
- .asUserBound());
+ .asUserBound())
+ .collectList().block();
assertThat(mailboxes).containsOnly(MAILBOX);
}
@@ -877,7 +903,8 @@ class CassandraMailboxMapperTest {
.username(USER)
.expression(Wildcard.INSTANCE)
.build()
- .asUserBound()))
+ .asUserBound())
+ .collectList().block())
.containsOnly(MAILBOX);
}
}
diff --git a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java
index b64ecb3..a85467b 100644
--- a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java
+++ b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java
@@ -62,6 +62,7 @@ import com.github.steveash.guavate.Guavate;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class ElasticSearchListeningMessageSearchIndex extends ListeningMessageSearchIndex {
@@ -118,11 +119,11 @@ public class ElasticSearchListeningMessageSearchIndex extends ListeningMessageSe
}
@Override
- public List<MessageId> search(MailboxSession session, Collection<MailboxId> mailboxIds, SearchQuery searchQuery, long limit) {
+ public Flux<MessageId> search(MailboxSession session, Collection<MailboxId> mailboxIds, SearchQuery searchQuery, long limit) {
Preconditions.checkArgument(session != null, "'session' is mandatory");
if (mailboxIds.isEmpty()) {
- return ImmutableList.of();
+ return Flux.empty();
}
return searcher.search(mailboxIds, searchQuery, Optional.empty())
@@ -130,9 +131,7 @@ public class ElasticSearchListeningMessageSearchIndex extends ListeningMessageSe
.map(SearchResult::getMessageId)
.flatMap(Mono::justOrEmpty)
.distinct()
- .take(limit)
- .collect(Guavate.toImmutableList())
- .block();
+ .take(limit);
}
@Override
diff --git a/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcherTest.java b/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcherTest.java
index f7d6564..fcaa97b 100644
--- a/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcherTest.java
+++ b/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcherTest.java
@@ -154,7 +154,8 @@ class ElasticSearchSearcherTest {
.stream()
.map(ComposedMessageId::getMessageId)
.collect(Guavate.toImmutableList());
- assertThat(storeMailboxManager.search(multimailboxesSearchQuery, session, numberOfMailboxes + 1))
+ assertThat(storeMailboxManager.search(multimailboxesSearchQuery, session, numberOfMailboxes + 1)
+ .collectList().block())
.containsExactlyInAnyOrderElementsOf(expectedMessageIds);
}
diff --git a/mailbox/jpa/src/main/java/org/apache/james/mailbox/jpa/mail/JPAMailboxMapper.java b/mailbox/jpa/src/main/java/org/apache/james/mailbox/jpa/mail/JPAMailboxMapper.java
index 777edf3..c2d4514 100644
--- a/mailbox/jpa/src/main/java/org/apache/james/mailbox/jpa/mail/JPAMailboxMapper.java
+++ b/mailbox/jpa/src/main/java/org/apache/james/mailbox/jpa/mail/JPAMailboxMapper.java
@@ -48,8 +48,8 @@ import org.apache.james.mailbox.store.mail.MailboxMapper;
import com.github.steveash.guavate.Guavate;
import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
@@ -186,15 +186,13 @@ public class JPAMailboxMapper extends JPATransactionalMapper implements MailboxM
}
@Override
- public List<Mailbox> findMailboxWithPathLike(MailboxQuery.UserBound query) throws MailboxException {
+ public Flux<Mailbox> findMailboxWithPathLike(MailboxQuery.UserBound query) throws MailboxException {
try {
String pathLike = MailboxExpressionBackwardCompatibility.getPathLike(query);
- return findMailboxWithPathLikeTypedQuery(query.getFixedNamespace(), query.getFixedUser(), pathLike)
- .getResultList()
- .stream()
+ return Flux.fromIterable(findMailboxWithPathLikeTypedQuery(query.getFixedNamespace(), query.getFixedUser(), pathLike)
+ .getResultList())
.map(JPAMailbox::toMailbox)
- .filter(query::matches)
- .collect(Guavate.toImmutableList());
+ .filter(query::matches);
} catch (PersistenceException e) {
throw new MailboxException("Search of mailbox " + query + " failed", e);
}
@@ -250,7 +248,7 @@ public class JPAMailboxMapper extends JPATransactionalMapper implements MailboxM
}
@Override
- public List<Mailbox> findNonPersonalMailboxes(Username userName, Right right) throws MailboxException {
- return ImmutableList.of();
+ public Flux<Mailbox> findNonPersonalMailboxes(Username userName, Right right) {
+ return Flux.empty();
}
}
diff --git a/mailbox/jpa/src/test/java/org/apache/james/mailbox/jpa/mail/TransactionalMailboxMapper.java b/mailbox/jpa/src/test/java/org/apache/james/mailbox/jpa/mail/TransactionalMailboxMapper.java
index 30b4da7..787db5b 100644
--- a/mailbox/jpa/src/test/java/org/apache/james/mailbox/jpa/mail/TransactionalMailboxMapper.java
+++ b/mailbox/jpa/src/test/java/org/apache/james/mailbox/jpa/mail/TransactionalMailboxMapper.java
@@ -36,6 +36,7 @@ import org.apache.james.mailbox.model.search.MailboxQuery;
import org.apache.james.mailbox.store.mail.MailboxMapper;
import org.apache.james.mailbox.store.transaction.Mapper;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class TransactionalMailboxMapper implements MailboxMapper {
@@ -81,7 +82,7 @@ public class TransactionalMailboxMapper implements MailboxMapper {
}
@Override
- public List<Mailbox> findMailboxWithPathLike(MailboxQuery.UserBound query) throws MailboxException {
+ public Flux<Mailbox> findMailboxWithPathLike(MailboxQuery.UserBound query) throws MailboxException {
return wrapped.findMailboxWithPathLike(query);
}
@@ -106,7 +107,7 @@ public class TransactionalMailboxMapper implements MailboxMapper {
}
@Override
- public List<Mailbox> findNonPersonalMailboxes(Username userName, Right right) throws MailboxException {
+ public Flux<Mailbox> findNonPersonalMailboxes(Username userName, Right right) {
return wrapped.findNonPersonalMailboxes(userName, right);
}
diff --git a/mailbox/lucene/src/main/java/org/apache/james/mailbox/lucene/search/LuceneMessageSearchIndex.java b/mailbox/lucene/src/main/java/org/apache/james/mailbox/lucene/search/LuceneMessageSearchIndex.java
index b93306b..793f4a0 100644
--- a/mailbox/lucene/src/main/java/org/apache/james/mailbox/lucene/search/LuceneMessageSearchIndex.java
+++ b/mailbox/lucene/src/main/java/org/apache/james/mailbox/lucene/search/LuceneMessageSearchIndex.java
@@ -123,6 +123,8 @@ import com.github.steveash.guavate.Guavate;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
+import reactor.core.publisher.Flux;
+
/**
* Lucene based {@link ListeningMessageSearchIndex} which offers message searching via a Lucene index
*/
@@ -461,18 +463,18 @@ public class LuceneMessageSearchIndex extends ListeningMessageSearchIndex {
}
@Override
- public List<MessageId> search(MailboxSession session, Collection<MailboxId> mailboxIds, SearchQuery searchQuery, long limit) throws MailboxException {
+ public Flux<MessageId> search(MailboxSession session, Collection<MailboxId> mailboxIds, SearchQuery searchQuery, long limit) throws MailboxException {
Preconditions.checkArgument(session != null, "'session' is mandatory");
if (mailboxIds.isEmpty()) {
- return ImmutableList.of();
+ return Flux.empty();
}
- return searchMultimap(mailboxIds, searchQuery)
+ return Flux.fromIterable(searchMultimap(mailboxIds, searchQuery)
.stream()
.map(searchResult -> searchResult.getMessageId().get())
.filter(SearchUtil.distinct())
.limit(Long.valueOf(limit).intValue())
- .collect(Guavate.toImmutableList());
+ .collect(Guavate.toImmutableList()));
}
private List<SearchResult> searchMultimap(Collection<MailboxId> mailboxIds, SearchQuery searchQuery) throws MailboxException {
diff --git a/mailbox/lucene/src/test/java/org/apache/james/mailbox/lucene/search/LuceneMailboxMessageSearchIndexTest.java b/mailbox/lucene/src/test/java/org/apache/james/mailbox/lucene/search/LuceneMailboxMessageSearchIndexTest.java
index 6ff52fb..4fa87f4 100644
--- a/mailbox/lucene/src/test/java/org/apache/james/mailbox/lucene/search/LuceneMailboxMessageSearchIndexTest.java
+++ b/mailbox/lucene/src/test/java/org/apache/james/mailbox/lucene/search/LuceneMailboxMessageSearchIndexTest.java
@@ -310,7 +310,8 @@ class LuceneMailboxMessageSearchIndexTest {
SearchQuery query = new SearchQuery();
query.andCriteria(SearchQuery.bodyContains("My Body"));
- List<MessageId> result = index.search(session, ImmutableList.of(mailbox.getMailboxId(), mailbox2.getMailboxId(), mailbox3.getMailboxId()), query, LIMIT);
+ List<MessageId> result = index.search(session, ImmutableList.of(mailbox.getMailboxId(), mailbox2.getMailboxId(), mailbox3.getMailboxId()), query, LIMIT)
+ .collectList().block();
assertThat(result).containsOnly(id1, id2);
}
@@ -323,7 +324,8 @@ class LuceneMailboxMessageSearchIndexTest {
List<MessageId> result = index.search(session,
ImmutableList.of(mailbox.getMailboxId(), mailbox3.getMailboxId()),
query,
- LIMIT);
+ LIMIT)
+ .collectList().block();
assertThat(result).containsOnly(id1);
}
@@ -333,7 +335,8 @@ class LuceneMailboxMessageSearchIndexTest {
SearchQuery query = new SearchQuery();
query.andCriteria(SearchQuery.all());
- List<MessageId> result = index.search(session, ImmutableList.of(mailbox.getMailboxId(), mailbox2.getMailboxId(), mailbox3.getMailboxId()), query, LIMIT);
+ List<MessageId> result = index.search(session, ImmutableList.of(mailbox.getMailboxId(), mailbox2.getMailboxId(), mailbox3.getMailboxId()), query, LIMIT)
+ .collectList().block();
// The query is not limited to one mailbox and we have 5 indexed messages
assertThat(result).hasSize(5);
@@ -345,7 +348,8 @@ class LuceneMailboxMessageSearchIndexTest {
query.andCriteria(SearchQuery.all());
int limit = 1;
- List<MessageId> result = index.search(session, ImmutableList.of(mailbox.getMailboxId(), mailbox2.getMailboxId(), mailbox3.getMailboxId()), query, limit);
+ List<MessageId> result = index.search(session, ImmutableList.of(mailbox.getMailboxId(), mailbox2.getMailboxId(), mailbox3.getMailboxId()), query, limit)
+ .collectList().block();
assertThat(result).hasSize(limit);
}
diff --git a/mailbox/maildir/src/main/java/org/apache/james/mailbox/maildir/mail/MaildirMailboxMapper.java b/mailbox/maildir/src/main/java/org/apache/james/mailbox/maildir/mail/MaildirMailboxMapper.java
index d9b5c0d..05d9799 100644
--- a/mailbox/maildir/src/main/java/org/apache/james/mailbox/maildir/mail/MaildirMailboxMapper.java
+++ b/mailbox/maildir/src/main/java/org/apache/james/mailbox/maildir/mail/MaildirMailboxMapper.java
@@ -51,9 +51,7 @@ import org.apache.james.mailbox.store.transaction.NonTransactionalMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.github.steveash.guavate.Guavate;
-import com.google.common.collect.ImmutableList;
-
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class MaildirMailboxMapper extends NonTransactionalMapper implements MailboxMapper {
@@ -128,7 +126,7 @@ public class MaildirMailboxMapper extends NonTransactionalMapper implements Mail
}
@Override
- public List<Mailbox> findMailboxWithPathLike(MailboxQuery.UserBound query) throws MailboxException {
+ public Flux<Mailbox> findMailboxWithPathLike(MailboxQuery.UserBound query) throws MailboxException {
String pathLike = MailboxExpressionBackwardCompatibility.getPathLike(query);
final Pattern searchPattern = Pattern.compile("[" + MaildirStore.maildirDelimiter + "]"
+ pathLike.replace(".", "\\.").replace(MaildirStore.WILDCARD, ".*"));
@@ -147,9 +145,8 @@ public class MaildirMailboxMapper extends NonTransactionalMapper implements Mail
Mailbox mailbox = maildirStore.loadMailbox(session, root, query.getFixedNamespace(), query.getFixedUser(), "");
mailboxList.add(0, mailbox);
}
- return mailboxList.stream()
- .filter(query::matches)
- .collect(Guavate.toImmutableList());
+ return Flux.fromIterable(mailboxList)
+ .filter(query::matches);
}
@Override
@@ -159,7 +156,8 @@ public class MaildirMailboxMapper extends NonTransactionalMapper implements Mail
.userAndNamespaceFrom(mailbox.generateAssociatedPath())
.expression(new PrefixedWildcard(mailbox.getName() + delimiter))
.build()
- .asUserBound());
+ .asUserBound())
+ .collectList().block();
return mailboxes.size() > 0;
}
@@ -333,7 +331,7 @@ public class MaildirMailboxMapper extends NonTransactionalMapper implements Mail
}
@Override
- public List<Mailbox> findNonPersonalMailboxes(Username userName, Right right) throws MailboxException {
- return ImmutableList.of();
+ public Flux<Mailbox> findNonPersonalMailboxes(Username userName, Right right) {
+ return Flux.empty();
}
}
diff --git a/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMailboxMapper.java b/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMailboxMapper.java
index fc55302..77db71d 100644
--- a/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMailboxMapper.java
+++ b/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMailboxMapper.java
@@ -40,10 +40,10 @@ import org.apache.james.mailbox.model.UidValidity;
import org.apache.james.mailbox.model.search.MailboxQuery;
import org.apache.james.mailbox.store.mail.MailboxMapper;
-import com.github.steveash.guavate.Guavate;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class InMemoryMailboxMapper implements MailboxMapper {
@@ -84,12 +84,10 @@ public class InMemoryMailboxMapper implements MailboxMapper {
}
@Override
- public List<Mailbox> findMailboxWithPathLike(MailboxQuery.UserBound query) {
- return mailboxesByPath.values()
- .stream()
+ public Flux<Mailbox> findMailboxWithPathLike(MailboxQuery.UserBound query) {
+ return Flux.fromIterable(mailboxesByPath.values())
.filter(query::matches)
- .map(Mailbox::new)
- .collect(Guavate.toImmutableList());
+ .map(Mailbox::new);
}
@Override
@@ -166,11 +164,9 @@ public class InMemoryMailboxMapper implements MailboxMapper {
}
@Override
- public List<Mailbox> findNonPersonalMailboxes(Username userName, Right right) throws MailboxException {
- return mailboxesByPath.values()
- .stream()
- .filter(mailbox -> hasRightOn(mailbox, userName, right))
- .collect(Guavate.toImmutableList());
+ public Flux<Mailbox> findNonPersonalMailboxes(Username userName, Right right) {
+ return Flux.fromIterable(mailboxesByPath.values())
+ .filter(mailbox -> hasRightOn(mailbox, userName, right));
}
private Boolean hasRightOn(Mailbox mailbox, Username userName, Right right) {
diff --git a/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/DeletedMessageVaultHook.java b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/DeletedMessageVaultHook.java
index a69b4bd..cdb5071 100644
--- a/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/DeletedMessageVaultHook.java
+++ b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/DeletedMessageVaultHook.java
@@ -35,6 +35,7 @@ import org.apache.james.mailbox.MetadataWithMailboxId;
import org.apache.james.mailbox.SessionProvider;
import org.apache.james.mailbox.exception.MailboxException;
import org.apache.james.mailbox.extension.PreDeletionHook;
+import org.apache.james.mailbox.model.Mailbox;
import org.apache.james.mailbox.model.MailboxId;
import org.apache.james.mailbox.model.MessageId;
import org.apache.james.mailbox.store.MailboxSessionMapperFactory;
@@ -154,18 +155,19 @@ public class DeletedMessageVaultHook implements PreDeletionHook {
.flatMap(groupFlux -> groupFlux.reduce(DeletedMessageMailboxContext::combine));
}
- private Publisher<DeletedMessageMailboxContext> addOwnerToMetadata(GroupedFlux<MailboxId, MetadataWithMailboxId> groupedFlux) throws MailboxException {
- Username owner = retrieveMailboxUser(groupedFlux.key());
- return groupedFlux.map(metadata -> new DeletedMessageMailboxContext(metadata.getMessageMetaData().getMessageId(), owner, ImmutableList.of(metadata.getMailboxId())));
+ private Flux<DeletedMessageMailboxContext> addOwnerToMetadata(GroupedFlux<MailboxId, MetadataWithMailboxId> groupedFlux) throws MailboxException {
+ return retrieveMailboxUser(groupedFlux.key())
+ .flatMapMany(owner -> groupedFlux.map(metadata ->
+ new DeletedMessageMailboxContext(metadata.getMessageMetaData().getMessageId(), owner, ImmutableList.of(metadata.getMailboxId()))));
}
private Pair<MessageId, Username> toMessageIdUserPair(DeletedMessageMailboxContext deletedMessageMetadata) {
return Pair.of(deletedMessageMetadata.getMessageId(), deletedMessageMetadata.getOwner());
}
- private Username retrieveMailboxUser(MailboxId mailboxId) throws MailboxException {
+ private Mono<Username> retrieveMailboxUser(MailboxId mailboxId) throws MailboxException {
return mapperFactory.getMailboxMapper(session)
- .findMailboxById(mailboxId)
- .getUser();
+ .findMailboxByIdReactive(mailboxId)
+ .map(Mailbox::getUser);
}
}
\ No newline at end of file
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
index c3d7c02..76f8bfb 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
@@ -93,6 +93,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
@@ -536,7 +537,7 @@ public class StoreMailboxManager implements MailboxManager {
.build()
.asUserBound();
locker.executeWithLock(from, (LockAwareExecution<Void>) () -> {
- List<Mailbox> subMailboxes = mapper.findMailboxWithPathLike(query);
+ List<Mailbox> subMailboxes = mapper.findMailboxWithPathLike(query).collectList().block();
for (Mailbox sub : subMailboxes) {
String subOriginalName = sub.getName();
String subNewName = newMailboxPath.getName() + subOriginalName.substring(from.getName().length());
@@ -596,7 +597,7 @@ public class StoreMailboxManager implements MailboxManager {
}
private List<MailboxMetaData> searchMailboxesMetadata(MailboxQuery mailboxQuery, MailboxSession session, Right right) throws MailboxException {
- List<Mailbox> mailboxes = searchMailboxes(mailboxQuery, session, right);
+ List<Mailbox> mailboxes = searchMailboxes(mailboxQuery, session, right).collectList().block();
ImmutableMap<MailboxId, MailboxCounters> counters = getMailboxCounters(mailboxes, session)
.stream()
@@ -614,16 +615,13 @@ public class StoreMailboxManager implements MailboxManager {
.collect(Guavate.toImmutableList());
}
- private List<Mailbox> searchMailboxes(MailboxQuery mailboxQuery, MailboxSession session, Right right) throws MailboxException {
+ private Flux<Mailbox> searchMailboxes(MailboxQuery mailboxQuery, MailboxSession session, Right right) throws MailboxException {
MailboxMapper mailboxMapper = mailboxSessionMapperFactory.getMailboxMapper(session);
- Stream<Mailbox> baseMailboxes = mailboxMapper
- .findMailboxWithPathLike(toSingleUserQuery(mailboxQuery, session))
- .stream();
- Stream<Mailbox> delegatedMailboxes = getDelegatedMailboxes(mailboxMapper, mailboxQuery, right, session);
- return Stream.concat(baseMailboxes, delegatedMailboxes)
+ Flux<Mailbox> baseMailboxes = mailboxMapper.findMailboxWithPathLike(toSingleUserQuery(mailboxQuery, session));
+ Flux<Mailbox> delegatedMailboxes = getDelegatedMailboxes(mailboxMapper, mailboxQuery, right, session);
+ return Flux.merge(baseMailboxes, delegatedMailboxes)
.distinct()
- .filter(Throwing.predicate(mailbox -> storeRightManager.hasRight(mailbox, right, session)))
- .collect(Guavate.toImmutableList());
+ .filter(Throwing.predicate(mailbox -> storeRightManager.hasRight(mailbox, right, session)));
}
static MailboxQuery.UserBound toSingleUserQuery(MailboxQuery mailboxQuery, MailboxSession mailboxSession) {
@@ -644,12 +642,12 @@ public class StoreMailboxManager implements MailboxManager {
.build());
}
- private Stream<Mailbox> getDelegatedMailboxes(MailboxMapper mailboxMapper, MailboxQuery mailboxQuery,
- Right right, MailboxSession session) throws MailboxException {
+ private Flux<Mailbox> getDelegatedMailboxes(MailboxMapper mailboxMapper, MailboxQuery mailboxQuery,
+ Right right, MailboxSession session) {
if (mailboxQuery.isPrivateMailboxes(session)) {
- return Stream.of();
+ return Flux.empty();
}
- return mailboxMapper.findNonPersonalMailboxes(session.getUser(), right).stream();
+ return mailboxMapper.findNonPersonalMailboxes(session.getUser(), right);
}
private MailboxMetaData toMailboxMetadata(MailboxSession session, List<Mailbox> mailboxes, Mailbox mailbox, MailboxCounters counters) throws UnsupportedRightException {
@@ -677,32 +675,31 @@ public class StoreMailboxManager implements MailboxManager {
}
@Override
- public List<MessageId> search(MultimailboxesSearchQuery expression, MailboxSession session, long limit) throws MailboxException {
- ImmutableSet<MailboxId> wantedMailboxesId =
- getInMailboxes(expression.getInMailboxes(), session)
- .filter(id -> !expression.getNotInMailboxes().contains(id))
- .collect(Guavate.toImmutableSet());
-
- return index.search(session, wantedMailboxesId, expression.getSearchQuery(), limit);
+ public Flux<MessageId> search(MultimailboxesSearchQuery expression, MailboxSession session, long limit) throws MailboxException {
+ return getInMailboxes(expression.getInMailboxes(), session)
+ .filter(id -> !expression.getNotInMailboxes().contains(id))
+ .collect(Guavate.toImmutableSet())
+ .flatMapMany(Throwing.function(ids -> index.search(session, ids, expression.getSearchQuery(), limit)));
}
- private Stream<MailboxId> getInMailboxes(ImmutableSet<MailboxId> inMailboxes, MailboxSession session) throws MailboxException {
- if (inMailboxes.isEmpty()) {
+
+ private Flux<MailboxId> getInMailboxes(ImmutableSet<MailboxId> inMailboxes, MailboxSession session) throws MailboxException {
+ if (inMailboxes.isEmpty()) {
return getAllReadableMailbox(session);
} else {
return filterReadable(inMailboxes, session);
}
}
- private Stream<MailboxId> getAllReadableMailbox(MailboxSession session) throws MailboxException {
+ private Flux<MailboxId> getAllReadableMailbox(MailboxSession session) throws MailboxException {
return searchMailboxes(MailboxQuery.builder().matchesAllMailboxNames().build(), session, Right.Read)
- .stream()
.map(Mailbox::getMailboxId);
}
- private Stream<MailboxId> filterReadable(ImmutableSet<MailboxId> inMailboxes, MailboxSession session) throws MailboxException {
- return mailboxSessionMapperFactory.getMailboxMapper(session)
- .findMailboxesById(inMailboxes)
+ private Flux<MailboxId> filterReadable(ImmutableSet<MailboxId> inMailboxes, MailboxSession session) throws MailboxException {
+ MailboxMapper mailboxMapper = mailboxSessionMapperFactory.getMailboxMapper(session);
+ return Flux.fromIterable(inMailboxes)
+ .concatMap(mailboxMapper::findMailboxByIdReactive)
.filter(Throwing.<Mailbox>predicate(mailbox -> storeRightManager.hasRight(mailbox, Right.Read, session)).sneakyThrow())
.map(Mailbox::getMailboxId);
}
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MailboxMapper.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MailboxMapper.java
index 7ace127..6c5e3b8 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MailboxMapper.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MailboxMapper.java
@@ -18,9 +18,7 @@
****************************************************************/
package org.apache.james.mailbox.store.mail;
-import java.util.Collection;
import java.util.List;
-import java.util.stream.Stream;
import org.apache.james.core.Username;
import org.apache.james.mailbox.acl.ACLDiff;
@@ -35,8 +33,7 @@ import org.apache.james.mailbox.model.UidValidity;
import org.apache.james.mailbox.model.search.MailboxQuery;
import org.apache.james.mailbox.store.transaction.Mapper;
-import com.github.fge.lambdas.Throwing;
-
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
@@ -87,26 +84,25 @@ public interface MailboxMapper extends Mapper {
Mailbox findMailboxById(MailboxId mailboxId)
throws MailboxException, MailboxNotFoundException;
- default Stream<Mailbox> findMailboxesById(Collection<MailboxId> mailboxIds) throws MailboxException {
- return mailboxIds.stream()
- .flatMap(Throwing.<MailboxId, Stream<Mailbox>>function(id -> {
- try {
- return Stream.of(findMailboxById(id));
- } catch (MailboxNotFoundException e) {
- return Stream.empty();
- }
- }).sneakyThrow());
+ default Mono<Mailbox> findMailboxByIdReactive(MailboxId id) {
+ try {
+ return Mono.justOrEmpty(findMailboxById(id));
+ } catch (MailboxNotFoundException e) {
+ return Mono.empty();
+ } catch (MailboxException e) {
+ return Mono.error(e);
+ }
}
/**
* Return a List of {@link Mailbox} for the given userName and matching the right
*/
- List<Mailbox> findNonPersonalMailboxes(Username userName, Right right) throws MailboxException;
+ Flux<Mailbox> findNonPersonalMailboxes(Username userName, Right right);
/**
* Return a List of {@link Mailbox} which name is like the given name
*/
- List<Mailbox> findMailboxWithPathLike(MailboxQuery.UserBound query)
+ Flux<Mailbox> findMailboxWithPathLike(MailboxQuery.UserBound query)
throws MailboxException;
/**
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/DefaultUserQuotaRootResolver.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/DefaultUserQuotaRootResolver.java
index 9c29ac4..6ea1f88 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/DefaultUserQuotaRootResolver.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/DefaultUserQuotaRootResolver.java
@@ -118,6 +118,7 @@ public class DefaultUserQuotaRootResolver implements UserQuotaRootResolver {
.user(Username.of(user))
.matchesAllMailboxNames()
.build()
- .asUserBound());
+ .asUserBound())
+ .collectList().block();
}
}
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/LazyMessageSearchIndex.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/LazyMessageSearchIndex.java
index 4a06c64..6e75f26 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/LazyMessageSearchIndex.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/LazyMessageSearchIndex.java
@@ -47,6 +47,8 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
+import reactor.core.publisher.Flux;
+
/**
* {@link ListeningMessageSearchIndex} implementation which wraps another {@link ListeningMessageSearchIndex} and will forward all calls to it.
*
@@ -141,7 +143,7 @@ public class LazyMessageSearchIndex extends ListeningMessageSearchIndex {
@Override
- public List<MessageId> search(MailboxSession session, Collection<MailboxId> mailboxIds, SearchQuery searchQuery, long limit) throws MailboxException {
+ public Flux<MessageId> search(MailboxSession session, Collection<MailboxId> mailboxIds, SearchQuery searchQuery, long limit) throws MailboxException {
throw new UnsupportedSearchException();
}
}
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/MessageSearchIndex.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/MessageSearchIndex.java
index eb1dcee..4873099 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/MessageSearchIndex.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/MessageSearchIndex.java
@@ -21,7 +21,6 @@ package org.apache.james.mailbox.store.search;
import java.util.Collection;
import java.util.EnumSet;
-import java.util.List;
import java.util.Optional;
import java.util.stream.Stream;
@@ -34,6 +33,8 @@ import org.apache.james.mailbox.model.MailboxId;
import org.apache.james.mailbox.model.MessageId;
import org.apache.james.mailbox.model.SearchQuery;
+import reactor.core.publisher.Flux;
+
/**
* An index which can be used to search for MailboxMessage UID's that match a {@link SearchQuery}.
*
@@ -50,7 +51,7 @@ public interface MessageSearchIndex {
/**
* Return all uids of all {@link Mailbox}'s the current user has access to which match the {@link SearchQuery}
*/
- List<MessageId> search(MailboxSession session, Collection<MailboxId> mailboxIds, SearchQuery searchQuery, long limit) throws MailboxException;
+ Flux<MessageId> search(MailboxSession session, Collection<MailboxId> mailboxIds, SearchQuery searchQuery, long limit) throws MailboxException;
EnumSet<MailboxManager.SearchCapabilities> getSupportedCapabilities(EnumSet<MailboxManager.MessageCapabilities> messageCapabilities);
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/SimpleMessageSearchIndex.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/SimpleMessageSearchIndex.java
index 26b602d..ed6fd17 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/SimpleMessageSearchIndex.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/SimpleMessageSearchIndex.java
@@ -53,10 +53,11 @@ import org.apache.james.mailbox.store.mail.MessageMapperFactory;
import org.apache.james.mailbox.store.mail.model.MailboxMessage;
import com.github.fge.lambdas.Throwing;
-import com.github.steveash.guavate.Guavate;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
+import reactor.core.publisher.Flux;
+
/**
* {@link MessageSearchIndex} which just fetch {@link MailboxMessage}'s from the {@link MessageMapper} and use {@link MessageSearcher}
* to match them against the {@link SearchQuery}.
@@ -108,10 +109,10 @@ public class SimpleMessageSearchIndex implements MessageSearchIndex {
@Override
public Stream<MessageUid> search(MailboxSession session, final Mailbox mailbox, SearchQuery query) throws MailboxException {
Preconditions.checkArgument(session != null, "'session' is mandatory");
- return searchResults(session, ImmutableList.of(mailbox).stream(), query)
- .stream()
+ return searchResults(session, Flux.just(mailbox), query)
.filter(searchResult -> searchResult.getMailboxId().equals(mailbox.getMailboxId()))
- .map(SearchResult::getMessageUid);
+ .map(SearchResult::getMessageUid)
+ .toStream();
}
private List<SearchResult> searchResults(MailboxSession session, Mailbox mailbox, SearchQuery query) throws MailboxException {
@@ -142,19 +143,17 @@ public class SimpleMessageSearchIndex implements MessageSearchIndex {
}
@Override
- public List<MessageId> search(MailboxSession session, final Collection<MailboxId> mailboxIds, SearchQuery searchQuery, long limit) throws MailboxException {
- MailboxMapper mailboxManager = mailboxMapperFactory.getMailboxMapper(session);
+ public Flux<MessageId> search(MailboxSession session, final Collection<MailboxId> mailboxIds, SearchQuery searchQuery, long limit) throws MailboxException {
+ MailboxMapper mailboxMapper = mailboxMapperFactory.getMailboxMapper(session);
- Stream<Mailbox> filteredMailboxes = mailboxIds
- .stream()
- .map(Throwing.function(mailboxManager::findMailboxById).sneakyThrow());
+ Flux<Mailbox> filteredMailboxes = Flux.fromIterable(mailboxIds)
+ .concatMap(Throwing.function(mailboxMapper::findMailboxByIdReactive).sneakyThrow());
return getAsMessageIds(searchResults(session, filteredMailboxes, searchQuery), limit);
}
- private List<SearchResult> searchResults(MailboxSession session, Stream<Mailbox> mailboxes, SearchQuery query) throws MailboxException {
- return mailboxes.flatMap(mailbox -> getSearchResultStream(session, query, mailbox))
- .collect(Guavate.toImmutableList());
+ private Flux<SearchResult> searchResults(MailboxSession session, Flux<Mailbox> mailboxes, SearchQuery query) throws MailboxException {
+ return mailboxes.concatMap(mailbox -> Flux.fromStream(getSearchResultStream(session, query, mailbox)));
}
private Stream<? extends SearchResult> getSearchResultStream(MailboxSession session, SearchQuery query, Mailbox mailbox) {
@@ -165,12 +164,10 @@ public class SimpleMessageSearchIndex implements MessageSearchIndex {
}
}
- private List<MessageId> getAsMessageIds(List<SearchResult> temp, long limit) {
- return temp.stream()
- .map(searchResult -> searchResult.getMessageId().get())
+ private Flux<MessageId> getAsMessageIds(Flux<SearchResult> temp, long limit) {
+ return temp.map(searchResult -> searchResult.getMessageId().get())
.filter(SearchUtil.distinct())
- .limit(Long.valueOf(limit).intValue())
- .collect(Guavate.toImmutableList());
+ .take(Long.valueOf(limit).intValue());
}
}
diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractCombinationManagerTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractCombinationManagerTest.java
index 30dedcf..03f5900 100644
--- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractCombinationManagerTest.java
+++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractCombinationManagerTest.java
@@ -52,6 +52,8 @@ import org.junit.jupiter.api.Test;
import com.google.common.collect.ImmutableList;
+import reactor.core.publisher.Flux;
+
public abstract class AbstractCombinationManagerTest {
private static final int DEFAULT_MAXIMUM_LIMIT = 256;
@@ -163,7 +165,8 @@ public abstract class AbstractCombinationManagerTest {
messageIdManager.setInMailboxes(messageId, ImmutableList.of(mailbox1.getMailboxId(), mailbox2.getMailboxId()), session);
- assertThat(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT)).containsOnly(messageId);
+ assertThat(Flux.from(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT))
+ .collectList().block()).containsOnly(messageId);
}
@Test
diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MailboxMapperACLTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MailboxMapperACLTest.java
index d4dfaec..c8aef54 100644
--- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MailboxMapperACLTest.java
+++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MailboxMapperACLTest.java
@@ -233,7 +233,7 @@ public abstract class MailboxMapperACLTest {
@Test
void findMailboxesShouldReturnEmptyWhenNone() throws MailboxException {
- assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Administer)).isEmpty();
+ assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Administer).collectList().block()).isEmpty();
}
@Test
@@ -246,7 +246,7 @@ public abstract class MailboxMapperACLTest {
.rights(rights)
.asReplacement());
- assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Read)).isEmpty();
+ assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Read).collectList().block()).isEmpty();
}
@Test
@@ -258,7 +258,7 @@ public abstract class MailboxMapperACLTest {
.rights(rights)
.asAddition());
- assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Administer))
+ assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Administer).collectList().block())
.containsOnly(benwaInboxMailbox);
}
@@ -278,10 +278,10 @@ public abstract class MailboxMapperACLTest {
.rights(newRights)
.asReplacement());
- assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Read))
+ assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Read).collectList().block())
.containsOnly(benwaInboxMailbox);
- assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Administer))
+ assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Administer).collectList().block())
.isEmpty();
}
@@ -302,7 +302,7 @@ public abstract class MailboxMapperACLTest {
.rights(new Rfc4314Rights())
.build());
- assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Administer))
+ assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Administer).collectList().block())
.isEmpty();
}
@@ -321,7 +321,7 @@ public abstract class MailboxMapperACLTest {
.rights(initialRights)
.asRemoval());
- assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Administer))
+ assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Administer).collectList().block())
.isEmpty();
}
@@ -336,7 +336,7 @@ public abstract class MailboxMapperACLTest {
.asReplacement());
mailboxMapper.delete(benwaInboxMailbox);
- assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Administer))
+ assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Administer).collectList().block())
.isEmpty();
}
@@ -349,9 +349,9 @@ public abstract class MailboxMapperACLTest {
new MailboxACL.Entry(user1, new Rfc4314Rights(Right.Administer)),
new MailboxACL.Entry(user2, new Rfc4314Rights(Right.Read))));
- assertThat(mailboxMapper.findNonPersonalMailboxes(USER_1, Right.Administer))
+ assertThat(mailboxMapper.findNonPersonalMailboxes(USER_1, Right.Administer).collectList().block())
.containsOnly(benwaInboxMailbox);
- assertThat(mailboxMapper.findNonPersonalMailboxes(USER_2, Right.Read))
+ assertThat(mailboxMapper.findNonPersonalMailboxes(USER_2, Right.Read).collectList().block())
.containsOnly(benwaInboxMailbox);
}
@@ -367,7 +367,7 @@ public abstract class MailboxMapperACLTest {
mailboxMapper.setACL(benwaInboxMailbox, new MailboxACL(
new MailboxACL.Entry(user2, new Rfc4314Rights(Right.Read))));
- assertThat(mailboxMapper.findNonPersonalMailboxes(USER_1, Right.Administer))
+ assertThat(mailboxMapper.findNonPersonalMailboxes(USER_1, Right.Administer).collectList().block())
.isEmpty();
}
@@ -383,7 +383,7 @@ public abstract class MailboxMapperACLTest {
mailboxMapper.setACL(benwaInboxMailbox, new MailboxACL(
new MailboxACL.Entry(user2, new Rfc4314Rights(Right.Write))));
- assertThat(mailboxMapper.findNonPersonalMailboxes(USER_2, Right.Write))
+ assertThat(mailboxMapper.findNonPersonalMailboxes(USER_2, Right.Write).collectList().block())
.containsOnly(benwaInboxMailbox);
}
diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MailboxMapperTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MailboxMapperTest.java
index 3fdf808..998a862 100644
--- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MailboxMapperTest.java
+++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MailboxMapperTest.java
@@ -193,7 +193,8 @@ public abstract class MailboxMapperTest {
.build()
.asUserBound();
- List<Mailbox> mailboxes = mailboxMapper.findMailboxWithPathLike(mailboxQuery);
+ List<Mailbox> mailboxes = mailboxMapper.findMailboxWithPathLike(mailboxQuery)
+ .collectList().block();
assertMailboxes(mailboxes).containOnly(bobInboxMailbox);
}
@@ -216,7 +217,8 @@ public abstract class MailboxMapperTest {
.build()
.asUserBound();
- List<Mailbox> mailboxes = mailboxMapper.findMailboxWithPathLike(mailboxQuery);
+ List<Mailbox> mailboxes = mailboxMapper.findMailboxWithPathLike(mailboxQuery)
+ .collectList().block();
assertMailboxes(mailboxes).containOnly(benwaWorkMailbox, benwaWorkDoneMailbox, benwaWorkTodoMailbox);
}
@@ -230,7 +232,8 @@ public abstract class MailboxMapperTest {
.build()
.asUserBound();
- List<Mailbox> mailboxes = mailboxMapper.findMailboxWithPathLike(mailboxQuery);
+ List<Mailbox> mailboxes = mailboxMapper.findMailboxWithPathLike(mailboxQuery)
+ .collectList().block();
assertMailboxes(mailboxes).containOnly(benwaInboxMailbox);
}
@@ -244,7 +247,8 @@ public abstract class MailboxMapperTest {
.build()
.asUserBound();
- assertThat(mailboxMapper.findMailboxWithPathLike(mailboxQuery)).isEmpty();
+ assertThat(mailboxMapper.findMailboxWithPathLike(mailboxQuery)
+ .collectList().block()).isEmpty();
}
@Test
@@ -253,38 +257,6 @@ public abstract class MailboxMapperTest {
Mailbox actual = mailboxMapper.findMailboxById(benwaInboxMailbox.getMailboxId());
assertThat(actual).isEqualTo(benwaInboxMailbox);
}
-
- @Test
- void findMailboxesByIdShouldReturnEmptyWhenNoIdSupplied() throws MailboxException {
- createAll();
-
- Stream<Mailbox> mailboxes = mailboxMapper.findMailboxesById(ImmutableList.of());
-
- assertThat(mailboxes).isEmpty();
- }
-
- @Test
- void findMailboxesByIdShouldReturnMailboxOfSuppliedId() throws MailboxException {
- createAll();
-
- Stream<Mailbox> mailboxes = mailboxMapper.findMailboxesById(ImmutableList.of(
- benwaInboxMailbox.getMailboxId(),
- benwaWorkMailbox.getMailboxId()));
-
- assertThat(mailboxes).containsOnly(benwaWorkMailbox, benwaInboxMailbox);
- }
-
- @Test
- void findMailboxesByIdShouldFilterOutNonExistingMailbox() throws MailboxException {
- createAll();
- mailboxMapper.delete(benwaWorkMailbox);
-
- Stream<Mailbox> mailboxes = mailboxMapper.findMailboxesById(ImmutableList.of(
- benwaInboxMailbox.getMailboxId(),
- benwaWorkMailbox.getMailboxId()));
-
- assertThat(mailboxes).containsOnly(benwaInboxMailbox);
- }
@Test
void findMailboxByIdShouldFailWhenAbsent() throws MailboxException {
diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/quota/DefaultUserQuotaRootResolverTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/quota/DefaultUserQuotaRootResolverTest.java
index d744e15..36cb715 100644
--- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/quota/DefaultUserQuotaRootResolverTest.java
+++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/quota/DefaultUserQuotaRootResolverTest.java
@@ -42,7 +42,7 @@ import org.apache.james.mailbox.store.mail.MailboxMapper;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import com.google.common.collect.Lists;
+import reactor.core.publisher.Flux;
class DefaultUserQuotaRootResolverTest {
@@ -92,7 +92,7 @@ class DefaultUserQuotaRootResolverTest {
void retrieveAssociatedMailboxesShouldWork() throws Exception {
MailboxMapper mockedMapper = mock(MailboxMapper.class);
when(mockedFactory.getMailboxMapper(MAILBOX_SESSION)).thenReturn(mockedMapper);
- when(mockedMapper.findMailboxWithPathLike(any())).thenReturn(Lists.newArrayList(MAILBOX, MAILBOX_2));
+ when(mockedMapper.findMailboxWithPathLike(any())).thenReturn(Flux.just(MAILBOX, MAILBOX_2));
assertThat(testee.retrieveAssociatedMailboxes(QUOTA_ROOT, MAILBOX_SESSION)).containsOnly(MAILBOX, MAILBOX_2);
}
diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/search/AbstractMessageSearchIndexTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/search/AbstractMessageSearchIndexTest.java
index 8271602..6da8aa0 100644
--- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/search/AbstractMessageSearchIndexTest.java
+++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/search/AbstractMessageSearchIndexTest.java
@@ -248,7 +248,8 @@ public abstract class AbstractMessageSearchIndexTest {
List<MessageId> result = messageSearchIndex.search(session,
ImmutableList.of(mailbox.getMailboxId(), mailbox2.getMailboxId()),
searchQuery,
- LIMIT);
+ LIMIT)
+ .collectList().block();
assertThat(result)
.hasSize(12)
@@ -280,7 +281,8 @@ public abstract class AbstractMessageSearchIndexTest {
List<MessageId> result = messageSearchIndex.search(session,
ImmutableList.of(mailbox.getMailboxId(), mailbox2.getMailboxId()),
searchQuery,
- LIMIT);
+ LIMIT)
+ .collectList().block();
assertThat(result)
.containsOnly(m1.getMessageId(),
@@ -316,7 +318,8 @@ public abstract class AbstractMessageSearchIndexTest {
List<MessageId> result = messageSearchIndex.search(session,
ImmutableList.of(mailbox2.getMailboxId(), mailbox.getMailboxId()),
searchQuery,
- limit);
+ limit)
+ .collectList().block();
assertThat(result)
.hasSize(limit);
@@ -329,7 +332,8 @@ public abstract class AbstractMessageSearchIndexTest {
List<MessageId> result = messageSearchIndex.search(session,
ImmutableList.of(),
searchQuery,
- LIMIT);
+ LIMIT)
+ .collectList().block();
assertThat(result)
.isEmpty();
@@ -353,7 +357,8 @@ public abstract class AbstractMessageSearchIndexTest {
List<MessageId> result = messageSearchIndex.search(session,
ImmutableList.of(mailbox2.getMailboxId(), mailbox.getMailboxId()),
searchQuery,
- limit);
+ limit)
+ .collectList().block();
assertThat(result)
.hasSize(limit);
@@ -549,7 +554,8 @@ public abstract class AbstractMessageSearchIndexTest {
session,
ImmutableList.of(mailbox.getMailboxId(), mailbox2.getMailboxId()),
searchQuery,
- LIMIT);
+ LIMIT)
+ .collectList().block();
assertThat(actual).containsOnly(mOther.getMessageId(), m6.getMessageId());
}
@@ -558,7 +564,8 @@ public abstract class AbstractMessageSearchIndexTest {
void multimailboxSearchShouldReturnUidOfMessageMarkedAsSeenInOneMailbox() throws MailboxException {
SearchQuery searchQuery = new SearchQuery(SearchQuery.flagIsSet(Flags.Flag.SEEN));
- List<MessageId> actual = messageSearchIndex.search(session, ImmutableList.of(mailbox.getMailboxId()), searchQuery, LIMIT);
+ List<MessageId> actual = messageSearchIndex.search(session, ImmutableList.of(mailbox.getMailboxId()), searchQuery, LIMIT)
+ .collectList().block();
assertThat(actual).containsOnly(m6.getMessageId());
}
@@ -571,7 +578,8 @@ public abstract class AbstractMessageSearchIndexTest {
session,
ImmutableList.of(mailbox.getMailboxId(), mailbox2.getMailboxId()),
searchQuery,
- LIMIT);
+ LIMIT)
+ .collectList().block();
assertThat(actual).containsOnly(mOther.getMessageId(), m8.getMessageId());
}
@@ -584,7 +592,8 @@ public abstract class AbstractMessageSearchIndexTest {
session,
ImmutableList.of(mailbox.getMailboxId(), mailbox2.getMailboxId()),
searchQuery,
- LIMIT);
+ LIMIT)
+ .collectList().block();
assertThat(actual).containsOnly(mOther.getMessageId(), m6.getMessageId());
}
@@ -598,7 +607,8 @@ public abstract class AbstractMessageSearchIndexTest {
session,
ImmutableList.of(mailbox.getMailboxId(), mailbox2.getMailboxId()),
searchQuery,
- limit);
+ limit)
+ .collectList().block();
// Two messages matches this query : mOther and m6
assertThat(actual).hasSize(1);
@@ -614,7 +624,8 @@ public abstract class AbstractMessageSearchIndexTest {
session,
ImmutableList.of(otherMailbox.getMailboxId()),
searchQuery,
- limit);
+ limit)
+ .collectList().block();
assertThat(actual).contains(m10.getMessageId());
}
@@ -1408,7 +1419,8 @@ public abstract class AbstractMessageSearchIndexTest {
session,
ImmutableList.of(mailbox.getMailboxId(), mailbox2.getMailboxId()),
searchQuery,
- LIMIT);
+ LIMIT)
+ .collectList().block();
assertThat(actual).containsOnly(m1.getMessageId(), m2.getMessageId(), m3.getMessageId(), m4.getMessageId(), m5.getMessageId(),
m6.getMessageId(), m7.getMessageId(), m8.getMessageId(), m9.getMessageId(), mOther.getMessageId(), mailWithAttachment.getMessageId(), mailWithInlinedAttachment.getMessageId());
diff --git a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSchemaVersionStartUpCheck.java b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSchemaVersionStartUpCheck.java
index 2c992cb..1cf5ebb 100644
--- a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSchemaVersionStartUpCheck.java
+++ b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSchemaVersionStartUpCheck.java
@@ -69,7 +69,7 @@ public class CassandraSchemaVersionStartUpCheck implements StartUpCheck {
private CheckResult checkUpgradeAbleState() {
String upgradeVersionMessage =
String.format("Current schema version is %d. Recommended version is %d",
- versionManager.computeVersion().getValue(),
+ versionManager.computeVersion().block().getValue(),
versionManager.getMaximumSupportedVersion().getValue());
LOGGER.warn(upgradeVersionMessage);
return CheckResult.builder()
@@ -93,7 +93,7 @@ public class CassandraSchemaVersionStartUpCheck implements StartUpCheck {
String versionExceedMaximumSupportedMessage =
String.format("Current schema version is %d whereas the maximum supported version is %d. " +
"Recommended version is %d.",
- versionManager.computeVersion().getValue(),
+ versionManager.computeVersion().block().getValue(),
versionManager.getMaximumSupportedVersion().getValue(),
versionManager.getMaximumSupportedVersion().getValue());
LOGGER.error(versionExceedMaximumSupportedMessage);
@@ -108,7 +108,7 @@ public class CassandraSchemaVersionStartUpCheck implements StartUpCheck {
String versionToOldMessage =
String.format("Current schema version is %d whereas minimum required version is %d. " +
"Recommended version is %d",
- versionManager.computeVersion().getValue(),
+ versionManager.computeVersion().block().getValue(),
versionManager.getMinimumSupportedVersion().getValue(),
versionManager.getMaximumSupportedVersion().getValue());
LOGGER.error(versionToOldMessage);
diff --git a/server/container/guice/memory-guice/src/main/java/org/apache/james/FakeMessageSearchIndex.java b/server/container/guice/memory-guice/src/main/java/org/apache/james/FakeMessageSearchIndex.java
index e272402..f9b866c 100644
--- a/server/container/guice/memory-guice/src/main/java/org/apache/james/FakeMessageSearchIndex.java
+++ b/server/container/guice/memory-guice/src/main/java/org/apache/james/FakeMessageSearchIndex.java
@@ -38,6 +38,8 @@ import org.apache.james.mailbox.model.UpdatedFlags;
import org.apache.james.mailbox.store.mail.model.MailboxMessage;
import org.apache.james.mailbox.store.search.ListeningMessageSearchIndex;
+import reactor.core.publisher.Flux;
+
public class FakeMessageSearchIndex extends ListeningMessageSearchIndex {
private static class FakeMessageSearchIndexGroup extends Group {
@@ -80,7 +82,7 @@ public class FakeMessageSearchIndex extends ListeningMessageSearchIndex {
}
@Override
- public List<MessageId> search(MailboxSession session, Collection<MailboxId> mailboxIds, SearchQuery searchQuery, long limit) throws MailboxException {
+ public Flux<MessageId> search(MailboxSession session, Collection<MailboxId> mailboxIds, SearchQuery searchQuery, long limit) {
throw new NotImplementedException("not implemented");
}
diff --git a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java
index 6dc233b..8167ace 100644
--- a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java
+++ b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java
@@ -97,7 +97,7 @@ public class CassandraRecipientRewriteTable extends AbstractRecipientRewriteTabl
Preconditions.checkArgument(listSourcesSupportedType.contains(mapping.getType()),
"Not supported mapping of type %s", mapping.getType());
- if (versionManager.isBefore(MAPPINGS_SOURCES_SUPPORTED_VERSION)) {
+ if (versionManager.isBefore(MAPPINGS_SOURCES_SUPPORTED_VERSION).block()) {
return super.listSources(mapping);
}
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMessageListMethod.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMessageListMethod.java
index 19c7503..cd31826 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMessageListMethod.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMessageListMethod.java
@@ -19,6 +19,8 @@
package org.apache.james.jmap.draft.methods;
+import static org.apache.james.util.ReactorUtils.context;
+
import java.util.List;
import java.util.Optional;
import java.util.Set;
@@ -39,7 +41,6 @@ import org.apache.james.jmap.draft.utils.FilterToSearchQuery;
import org.apache.james.jmap.draft.utils.SortConverter;
import org.apache.james.mailbox.MailboxManager;
import org.apache.james.mailbox.MailboxSession;
-import org.apache.james.mailbox.exception.MailboxException;
import org.apache.james.mailbox.model.MailboxId;
import org.apache.james.mailbox.model.MailboxId.Factory;
import org.apache.james.mailbox.model.MultimailboxesSearchQuery;
@@ -47,10 +48,15 @@ import org.apache.james.mailbox.model.SearchQuery;
import org.apache.james.metrics.api.MetricFactory;
import org.apache.james.util.MDCBuilder;
+import com.github.fge.lambdas.Throwing;
import com.github.steveash.guavate.Guavate;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
public class GetMessageListMethod implements Method {
private static final long DEFAULT_POSITION = 0;
@@ -89,11 +95,18 @@ public class GetMessageListMethod implements Method {
}
@Override
- public Stream<JmapResponse> processToStream(JmapRequest request, MethodCallId methodCallId, MailboxSession mailboxSession) {
+ public Flux<JmapResponse> process(JmapRequest request, MethodCallId methodCallId, MailboxSession mailboxSession) {
Preconditions.checkArgument(request instanceof GetMessageListRequest);
GetMessageListRequest messageListRequest = (GetMessageListRequest) request;
+ return metricFactory.runPublishingTimerMetricLogP99(JMAP_PREFIX + METHOD_NAME.getName(),
+ () -> process(methodCallId, mailboxSession, messageListRequest)
+ .subscriberContext(context("GET_MESSAGE_LIST", mdc(messageListRequest))))
+ .subscribeOn(Schedulers.elastic());
+ }
+
+ private MDCBuilder mdc(GetMessageListRequest messageListRequest) {
return MDCBuilder.create()
.addContext(MDCBuilder.ACTION, "GET_MESSAGE_LIST")
.addContext("accountId", messageListRequest.getAccountId())
@@ -105,38 +118,30 @@ public class GetMessageListMethod implements Method {
.addContext("filters", messageListRequest.getFilter())
.addContext("sorts", messageListRequest.getSort())
.addContext("isFetchMessage", messageListRequest.isFetchMessages())
- .addContext("isCollapseThread", messageListRequest.isCollapseThreads())
- .wrapArround(
- () -> metricFactory.runPublishingTimerMetricLogP99(JMAP_PREFIX + METHOD_NAME.getName(),
- () -> process(methodCallId, mailboxSession, messageListRequest)))
- .get();
+ .addContext("isCollapseThread", messageListRequest.isCollapseThreads());
}
- private Stream<JmapResponse> process(MethodCallId methodCallId, MailboxSession mailboxSession, GetMessageListRequest messageListRequest) {
- GetMessageListResponse messageListResponse = getMessageListResponse(messageListRequest, mailboxSession);
- Stream<JmapResponse> jmapResponse = Stream.of(JmapResponse.builder().methodCallId(methodCallId)
- .response(messageListResponse)
- .responseName(RESPONSE_NAME)
- .build());
- return Stream.concat(jmapResponse,
- processGetMessages(messageListRequest, messageListResponse, methodCallId, mailboxSession));
+ private Flux<JmapResponse> process(MethodCallId methodCallId, MailboxSession mailboxSession, GetMessageListRequest messageListRequest) {
+ return getMessageListResponse(messageListRequest, mailboxSession)
+ .flatMapMany(messageListResponse -> Flux.concat(
+ Mono.just(JmapResponse.builder().methodCallId(methodCallId)
+ .response(messageListResponse)
+ .responseName(RESPONSE_NAME)
+ .build()),
+ processGetMessages(messageListRequest, messageListResponse, methodCallId, mailboxSession)));
}
- private GetMessageListResponse getMessageListResponse(GetMessageListRequest messageListRequest, MailboxSession mailboxSession) {
- GetMessageListResponse.Builder builder = GetMessageListResponse.builder();
- try {
- MultimailboxesSearchQuery searchQuery = convertToSearchQuery(messageListRequest);
- Long postionValue = messageListRequest.getPosition().map(Number::asLong).orElse(DEFAULT_POSITION);
- mailboxManager.search(searchQuery,
- mailboxSession,
- postionValue + messageListRequest.getLimit().map(Number::asLong).orElse(maximumLimit))
- .stream()
- .skip(postionValue)
- .forEach(builder::messageId);
- return builder.build();
- } catch (MailboxException e) {
- throw new RuntimeException(e);
- }
+ private Mono<GetMessageListResponse> getMessageListResponse(GetMessageListRequest messageListRequest, MailboxSession mailboxSession) {
+ Mono<MultimailboxesSearchQuery> searchQuery = Mono.fromCallable(() -> convertToSearchQuery(messageListRequest))
+ .subscribeOn(Schedulers.parallel());
+ Long positionValue = messageListRequest.getPosition().map(Number::asLong).orElse(DEFAULT_POSITION);
+ long limit = positionValue + messageListRequest.getLimit().map(Number::asLong).orElse(maximumLimit);
+
+ return searchQuery
+ .flatMapMany(Throwing.function(query -> mailboxManager.search(query, mailboxSession, limit)))
+ .skip(positionValue)
+ .reduce(GetMessageListResponse.builder(), GetMessageListResponse.Builder::messageId)
+ .map(GetMessageListResponse.Builder::build);
}
private MultimailboxesSearchQuery convertToSearchQuery(GetMessageListRequest messageListRequest) {
@@ -174,15 +179,15 @@ public class GetMessageListMethod implements Method {
});
}
- private Stream<JmapResponse> processGetMessages(GetMessageListRequest messageListRequest, GetMessageListResponse messageListResponse, MethodCallId methodCallId, MailboxSession mailboxSession) {
+ private Flux<JmapResponse> processGetMessages(GetMessageListRequest messageListRequest, GetMessageListResponse messageListResponse, MethodCallId methodCallId, MailboxSession mailboxSession) {
if (shouldChainToGetMessages(messageListRequest)) {
GetMessagesRequest getMessagesRequest = GetMessagesRequest.builder()
.ids(messageListResponse.getMessageIds())
.properties(messageListRequest.getFetchMessageProperties())
.build();
- return getMessagesMethod.processToStream(getMessagesRequest, methodCallId, mailboxSession);
+ return getMessagesMethod.process(getMessagesRequest, methodCallId, mailboxSession);
}
- return Stream.empty();
+ return Flux.empty();
}
private boolean shouldChainToGetMessages(GetMessageListRequest messageListRequest) {
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/ReferenceUpdater.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/ReferenceUpdater.java
index dca90fc..4abc540 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/ReferenceUpdater.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/ReferenceUpdater.java
@@ -49,6 +49,8 @@ import com.github.fge.lambdas.Throwing;
import com.github.steveash.guavate.Guavate;
import com.google.common.collect.Iterables;
+import reactor.core.publisher.Flux;
+
public class ReferenceUpdater {
public static final String X_FORWARDED_ID_HEADER = "X-Forwarded-Message-Id";
public static final Flags FORWARDED_FLAG = new Flags("$Forwarded");
@@ -90,7 +92,8 @@ public class ReferenceUpdater {
MultimailboxesSearchQuery searchByRFC822MessageId = MultimailboxesSearchQuery
.from(new SearchQuery(SearchQuery.mimeMessageID(messageId)))
.build();
- List<MessageId> references = mailboxManager.search(searchByRFC822MessageId, session, limit);
+ List<MessageId> references = Flux.from(mailboxManager.search(searchByRFC822MessageId, session, limit))
+ .collectList().block();
try {
MessageId reference = Iterables.getOnlyElement(references);
List<MailboxId> mailboxIds = messageIdManager.getMessage(reference, FetchGroup.MINIMAL, session).stream()
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/mailet/ExtractMDNOriginalJMAPMessageId.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/mailet/ExtractMDNOriginalJMAPMessageId.java
index 1679fd9..823867d 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/mailet/ExtractMDNOriginalJMAPMessageId.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/mailet/ExtractMDNOriginalJMAPMessageId.java
@@ -53,6 +53,8 @@ import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
+import reactor.core.publisher.Flux;
+
/**
* This mailet handles MDN messages and define a header X-JAMES-MDN-JMAP-MESSAGE-ID referencing
* the original message (by its Jmap Id) asking for the recipient to send an MDN.
@@ -107,7 +109,7 @@ public class ExtractMDNOriginalJMAPMessageId extends GenericMailet {
MultimailboxesSearchQuery searchByRFC822MessageId = MultimailboxesSearchQuery
.from(new SearchQuery(SearchQuery.mimeMessageID(messageId)))
.build();
- return mailboxManager.search(searchByRFC822MessageId, session, limit).stream().findFirst();
+ return Flux.from(mailboxManager.search(searchByRFC822MessageId, session, limit)).toStream().findFirst();
} catch (MailboxException | UsersRepositoryException e) {
LOGGER.error("unable to find message with Message-Id: " + messageId, e);
}
diff --git a/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/test/java/org/apache/james/webadmin/vault/routes/DeletedMessagesVaultRoutesTest.java b/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/test/java/org/apache/james/webadmin/vault/routes/DeletedMessagesVaultRoutesTest.java
index 5990161..3e7dcb7 100644
--- a/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/test/java/org/apache/james/webadmin/vault/routes/DeletedMessagesVaultRoutesTest.java
+++ b/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/test/java/org/apache/james/webadmin/vault/routes/DeletedMessagesVaultRoutesTest.java
@@ -2262,7 +2262,8 @@ class DeletedMessagesVaultRoutesTest {
MailboxSession session = mailboxManager.createSystemSession(username);
int limitToOneMessage = 1;
- return !mailboxManager.search(MultimailboxesSearchQuery.from(new SearchQuery()).build(), session, limitToOneMessage)
+ return !Flux.from(mailboxManager.search(MultimailboxesSearchQuery.from(new SearchQuery()).build(), session, limitToOneMessage))
+ .collectList().block()
.isEmpty();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org