You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/01/13 09:55:56 UTC
[james-project] 10/10: [Refactoring] fix Reactor Intellij
inspections
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit e3c0b3fd96e127bfb74fbe00c6cbac25d1d36856
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Fri Dec 13 14:47:34 2019 +0100
[Refactoring] fix Reactor Intellij inspections
---
.../cassandra/mail/CassandraMessageIdMapper.java | 24 ++++++++++--------
.../cassandra/mail/CassandraMessageMapper.java | 29 +++++++++++++++-------
.../james/blob/cassandra/CassandraBlobStore.java | 5 ++--
.../memory/vacation/MemoryVacationRepository.java | 6 ++---
4 files changed, 38 insertions(+), 26 deletions(-)
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
index 1b15350..3a206c3 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
@@ -258,22 +258,24 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
private Mono<Pair<Flags, ComposedMessageIdWithMetaData>> updateFlags(MailboxId mailboxId, MessageId messageId, Flags newState, MessageManager.FlagsUpdateMode updateMode) throws MailboxException {
CassandraId cassandraId = (CassandraId) mailboxId;
- ComposedMessageIdWithMetaData oldComposedId = imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.of(cassandraId))
- .next()
- .blockOptional()
- .orElseThrow(MailboxDeleteDuringUpdateException::new);
+ return imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.of(cassandraId))
+ .single()
+ .switchIfEmpty(Mono.error(MailboxDeleteDuringUpdateException::new))
+ .flatMap(oldComposedId -> updateFlags(newState, updateMode, cassandraId, oldComposedId));
+ }
+ private Mono<Pair<Flags, ComposedMessageIdWithMetaData>> updateFlags(Flags newState, MessageManager.FlagsUpdateMode updateMode, CassandraId cassandraId, ComposedMessageIdWithMetaData oldComposedId) {
Flags newFlags = new FlagsUpdateCalculator(newState, updateMode).buildNewFlags(oldComposedId.getFlags());
if (identicalFlags(oldComposedId, newFlags)) {
return Mono.just(Pair.of(oldComposedId.getFlags(), oldComposedId));
+ } else {
+ return Mono
+ .fromCallable(() -> new ComposedMessageIdWithMetaData(
+ oldComposedId.getComposedMessageId(),
+ newFlags,
+ modSeqProvider.nextModSeq(cassandraId)))
+ .flatMap(newComposedId -> updateFlags(oldComposedId, newComposedId));
}
-
- ComposedMessageIdWithMetaData newComposedId = new ComposedMessageIdWithMetaData(
- oldComposedId.getComposedMessageId(),
- newFlags,
- modSeqProvider.nextModSeq(cassandraId));
-
- return updateFlags(oldComposedId, newComposedId);
}
private boolean identicalFlags(ComposedMessageIdWithMetaData oldComposedId, Flags newFlags) {
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index ebc29a7..ed93cc2 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -274,15 +274,26 @@ public class CassandraMessageMapper implements MessageMapper {
}
private MailboxMessage addUidAndModseq(MailboxMessage message, CassandraId mailboxId) throws MailboxException {
- final Mono<MessageUid> messageUidMono = uidProvider.nextUid(mailboxId).cache();
- final Mono<ModSeq> nextModSeqMono = modSeqProvider.nextModSeq(mailboxId).cache();
- Flux.merge(messageUidMono, nextModSeqMono).then();
-
- message.setUid(messageUidMono.blockOptional()
- .orElseThrow(() -> new MailboxException("Can not find a UID to save " + message.getMessageId() + " in " + mailboxId)));
- message.setModSeq(nextModSeqMono.blockOptional()
- .orElseThrow(() -> new MailboxException("Can not find a MODSEQ to save " + message.getMessageId() + " in " + mailboxId)));
-
+ Mono<MessageUid> messageUidMono = uidProvider
+ .nextUid(mailboxId)
+ .switchIfEmpty(Mono.error(() -> new MailboxException("Can not find a UID to save " + message.getMessageId() + " in " + mailboxId)));
+
+ Mono<ModSeq> nextModSeqMono = modSeqProvider.nextModSeq(mailboxId)
+ .switchIfEmpty(Mono.error(() -> new MailboxException("Can not find a MODSEQ to save " + message.getMessageId() + " in " + mailboxId)));
+
+ try {
+ Mono.zip(messageUidMono, nextModSeqMono)
+ .doOnNext(tuple -> {
+ message.setUid(tuple.getT1());
+ message.setModSeq(tuple.getT2());
+ })
+ .block();
+ } catch (RuntimeException e) {
+ if (e.getCause() instanceof MailboxException) {
+ throw (MailboxException)e.getCause();
+ }
+ throw e;
+ }
return message;
}
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
index cd179f4..1993f84 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
@@ -128,13 +128,12 @@ public class CassandraBlobStore implements BlobStore {
}
private Flux<ByteBuffer> readBlobParts(BucketName bucketName, BlobId blobId) {
- Integer rowCount = selectRowCount(bucketName, blobId)
+ return selectRowCount(bucketName, blobId)
.publishOn(Schedulers.elastic())
.single()
.onErrorResume(NoSuchElementException.class, e -> Mono.error(
new ObjectNotFoundException(String.format("Could not retrieve blob metadata for %s", blobId))))
- .block();
- return Flux.range(0, rowCount)
+ .flatMapMany(rowCount -> Flux.range(0, rowCount))
.publishOn(Schedulers.elastic(), PREFETCH)
.flatMapSequential(partIndex -> readPart(bucketName, blobId, partIndex)
.single()
diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/memory/vacation/MemoryVacationRepository.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/memory/vacation/MemoryVacationRepository.java
index 5e3b085..5f06c24 100644
--- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/memory/vacation/MemoryVacationRepository.java
+++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/memory/vacation/MemoryVacationRepository.java
@@ -48,9 +48,9 @@ public class MemoryVacationRepository implements VacationRepository {
public Mono<Void> modifyVacation(AccountId accountId, VacationPatch vacationPatch) {
Preconditions.checkNotNull(accountId);
Preconditions.checkNotNull(vacationPatch);
- Vacation oldVacation = retrieveVacation(accountId).block();
- vacationMap.put(accountId, vacationPatch.patch(oldVacation));
- return Mono.empty();
+ return retrieveVacation(accountId)
+ .doOnNext(oldVacation -> vacationMap.put(accountId, vacationPatch.patch(oldVacation)))
+ .then();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org