You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/03/02 03:15:58 UTC
[james-project] 01/29: MAILBOX-396 Some reactor improvement in
CassandraMessageMapper
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 9a1d36b71c1b6af7cc3a71b7635c6f32f5f7e5f7
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Feb 7 14:14:58 2020 +0700
MAILBOX-396 Some reactor improvement in CassandraMessageMapper
Avoid calling several time 'block' upon `add`, `setInMailboxes`, `findInMailbox`
---
.../cassandra/mail/CassandraMessageMapper.java | 44 ++++++++++++----------
1 file changed, 25 insertions(+), 19 deletions(-)
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index 695c38f..7fc68f1 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -171,17 +171,17 @@ public class CassandraMessageMapper implements MessageMapper {
@Override
public Iterator<MailboxMessage> findInMailbox(Mailbox mailbox, MessageRange messageRange, FetchType ftype, int max) {
CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
- return retrieveMessages(retrieveMessageIds(mailboxId, messageRange), ftype, Limit.from(max))
+ return retrieveMessageIds(mailboxId, messageRange)
+ .flatMapMany(ids -> retrieveMessages(ids, ftype, Limit.from(max)))
.map(MailboxMessage.class::cast)
.sort(Comparator.comparing(MailboxMessage::getUid))
.toIterable()
.iterator();
}
- private List<ComposedMessageIdWithMetaData> retrieveMessageIds(CassandraId mailboxId, MessageRange messageRange) {
+ private Mono<List<ComposedMessageIdWithMetaData>> retrieveMessageIds(CassandraId mailboxId, MessageRange messageRange) {
return messageIdDAO.retrieveMessages(mailboxId, messageRange)
- .collect(Guavate.toImmutableList())
- .block();
+ .collect(Guavate.toImmutableList());
}
private Flux<MailboxMessage> retrieveMessages(List<ComposedMessageIdWithMetaData> messageIds, FetchType fetchType, Limit limit) {
@@ -267,13 +267,15 @@ public class CassandraMessageMapper implements MessageMapper {
public MessageMetaData add(Mailbox mailbox, MailboxMessage message) throws MailboxException {
CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
- save(mailbox, addUidAndModseq(message, mailboxId))
- .thenEmpty(indexTableHandler.updateIndexOnAdd(message, mailboxId))
- .block();
- return message.metaData();
+ return block(addUidAndModseq(message, mailboxId)
+ .flatMap(Throwing.function(messageWithUidAndModSeq -> save(mailbox, messageWithUidAndModSeq)
+ .thenReturn(messageWithUidAndModSeq)))
+ .flatMap(messageWithUidAndModSeq -> indexTableHandler.updateIndexOnAdd(message, mailboxId)
+ .thenReturn(messageWithUidAndModSeq))
+ .map(MailboxMessage::metaData));
}
- private MailboxMessage addUidAndModseq(MailboxMessage message, CassandraId mailboxId) throws MailboxException {
+ private Mono<MailboxMessage> addUidAndModseq(MailboxMessage message, CassandraId mailboxId) {
Mono<MessageUid> messageUidMono = uidProvider
.nextUid(mailboxId)
.switchIfEmpty(Mono.error(() -> new MailboxException("Can not find a UID to save " + message.getMessageId() + " in " + mailboxId)));
@@ -281,20 +283,23 @@ public class CassandraMessageMapper implements MessageMapper {
Mono<ModSeq> nextModSeqMono = modSeqProvider.nextModSeq(mailboxId)
.switchIfEmpty(Mono.error(() -> new MailboxException("Can not find a MODSEQ to save " + message.getMessageId() + " in " + mailboxId)));
- try {
- Mono.zip(messageUidMono, nextModSeqMono)
+ return Mono.zip(messageUidMono, nextModSeqMono)
.doOnNext(tuple -> {
message.setUid(tuple.getT1());
message.setModSeq(tuple.getT2());
})
- .block();
+ .thenReturn(message);
+ }
+
+ private <T> T block(Mono<T> mono) throws MailboxException {
+ try {
+ return mono.block();
} catch (RuntimeException e) {
if (e.getCause() instanceof MailboxException) {
- throw (MailboxException)e.getCause();
+ throw (MailboxException) e.getCause();
}
throw e;
}
- return message;
}
@Override
@@ -384,11 +389,12 @@ public class CassandraMessageMapper implements MessageMapper {
private MessageMetaData setInMailbox(Mailbox mailbox, MailboxMessage message) throws MailboxException {
CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
-
- insertIds(addUidAndModseq(message, mailboxId), mailboxId)
- .thenEmpty(indexTableHandler.updateIndexOnAdd(message, mailboxId))
- .block();
- return message.metaData();
+ return block(addUidAndModseq(message, mailboxId)
+ .flatMap(messageWithUidAndModseq -> insertIds(messageWithUidAndModseq, mailboxId)
+ .thenReturn(messageWithUidAndModseq))
+ .flatMap(messageWithUidAndModseq -> indexTableHandler.updateIndexOnAdd(message, mailboxId)
+ .thenReturn(messageWithUidAndModseq))
+ .map(MailboxMessage::metaData));
}
private Mono<Void> save(Mailbox mailbox, MailboxMessage message) throws MailboxException {
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org