You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/01/13 09:55:56 UTC

[james-project] 10/10: [Refactoring] fix Reactor Intellij inspections

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit e3c0b3fd96e127bfb74fbe00c6cbac25d1d36856
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Fri Dec 13 14:47:34 2019 +0100

    [Refactoring] fix Reactor Intellij inspections
---
 .../cassandra/mail/CassandraMessageIdMapper.java   | 24 ++++++++++--------
 .../cassandra/mail/CassandraMessageMapper.java     | 29 +++++++++++++++-------
 .../james/blob/cassandra/CassandraBlobStore.java   |  5 ++--
 .../memory/vacation/MemoryVacationRepository.java  |  6 ++---
 4 files changed, 38 insertions(+), 26 deletions(-)

diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
index 1b15350..3a206c3 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
@@ -258,22 +258,24 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
 
     private Mono<Pair<Flags, ComposedMessageIdWithMetaData>> updateFlags(MailboxId mailboxId, MessageId messageId, Flags newState, MessageManager.FlagsUpdateMode updateMode) throws MailboxException {
         CassandraId cassandraId = (CassandraId) mailboxId;
-        ComposedMessageIdWithMetaData oldComposedId = imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.of(cassandraId))
-            .next()
-            .blockOptional()
-            .orElseThrow(MailboxDeleteDuringUpdateException::new);
+        return imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.of(cassandraId))
+            .single()
+            .switchIfEmpty(Mono.error(MailboxDeleteDuringUpdateException::new))
+            .flatMap(oldComposedId -> updateFlags(newState, updateMode, cassandraId, oldComposedId));
+    }
 
+    private Mono<Pair<Flags, ComposedMessageIdWithMetaData>> updateFlags(Flags newState, MessageManager.FlagsUpdateMode updateMode, CassandraId cassandraId, ComposedMessageIdWithMetaData oldComposedId) {
         Flags newFlags = new FlagsUpdateCalculator(newState, updateMode).buildNewFlags(oldComposedId.getFlags());
         if (identicalFlags(oldComposedId, newFlags)) {
             return Mono.just(Pair.of(oldComposedId.getFlags(), oldComposedId));
+        } else {
+            return Mono
+                .fromCallable(() -> new ComposedMessageIdWithMetaData(
+                    oldComposedId.getComposedMessageId(),
+                    newFlags,
+                    modSeqProvider.nextModSeq(cassandraId)))
+            .flatMap(newComposedId -> updateFlags(oldComposedId, newComposedId));
         }
-
-        ComposedMessageIdWithMetaData newComposedId = new ComposedMessageIdWithMetaData(
-            oldComposedId.getComposedMessageId(),
-            newFlags,
-            modSeqProvider.nextModSeq(cassandraId));
-
-        return updateFlags(oldComposedId, newComposedId);
     }
 
     private boolean identicalFlags(ComposedMessageIdWithMetaData oldComposedId, Flags newFlags) {
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index ebc29a7..ed93cc2 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -274,15 +274,26 @@ public class CassandraMessageMapper implements MessageMapper {
     }
 
     private MailboxMessage addUidAndModseq(MailboxMessage message, CassandraId mailboxId) throws MailboxException {
-        final Mono<MessageUid> messageUidMono = uidProvider.nextUid(mailboxId).cache();
-        final Mono<ModSeq> nextModSeqMono = modSeqProvider.nextModSeq(mailboxId).cache();
-        Flux.merge(messageUidMono, nextModSeqMono).then();
-
-        message.setUid(messageUidMono.blockOptional()
-            .orElseThrow(() -> new MailboxException("Can not find a UID to save " + message.getMessageId() + " in " + mailboxId)));
-        message.setModSeq(nextModSeqMono.blockOptional()
-            .orElseThrow(() -> new MailboxException("Can not find a MODSEQ to save " + message.getMessageId() + " in " + mailboxId)));
-
+        Mono<MessageUid> messageUidMono = uidProvider
+            .nextUid(mailboxId)
+            .switchIfEmpty(Mono.error(() -> new MailboxException("Can not find a UID to save " + message.getMessageId() + " in " + mailboxId)));
+
+        Mono<ModSeq> nextModSeqMono = modSeqProvider.nextModSeq(mailboxId)
+            .switchIfEmpty(Mono.error(() -> new MailboxException("Can not find a MODSEQ to save " + message.getMessageId() + " in " + mailboxId)));
+
+        try {
+            Mono.zip(messageUidMono, nextModSeqMono)
+                .doOnNext(tuple -> {
+                    message.setUid(tuple.getT1());
+                    message.setModSeq(tuple.getT2());
+                })
+                .block();
+        } catch (RuntimeException e) {
+            if (e.getCause() instanceof MailboxException) {
+                throw (MailboxException)e.getCause();
+            }
+            throw e;
+        }
         return message;
     }
 
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
index cd179f4..1993f84 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
@@ -128,13 +128,12 @@ public class CassandraBlobStore implements BlobStore {
     }
 
     private Flux<ByteBuffer> readBlobParts(BucketName bucketName, BlobId blobId) {
-        Integer rowCount = selectRowCount(bucketName, blobId)
+        return selectRowCount(bucketName, blobId)
             .publishOn(Schedulers.elastic())
             .single()
             .onErrorResume(NoSuchElementException.class, e -> Mono.error(
                 new ObjectNotFoundException(String.format("Could not retrieve blob metadata for %s", blobId))))
-            .block();
-        return Flux.range(0, rowCount)
+            .flatMapMany(rowCount -> Flux.range(0, rowCount))
             .publishOn(Schedulers.elastic(), PREFETCH)
             .flatMapSequential(partIndex -> readPart(bucketName, blobId, partIndex)
                 .single()
diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/memory/vacation/MemoryVacationRepository.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/memory/vacation/MemoryVacationRepository.java
index 5e3b085..5f06c24 100644
--- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/memory/vacation/MemoryVacationRepository.java
+++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/memory/vacation/MemoryVacationRepository.java
@@ -48,9 +48,9 @@ public class MemoryVacationRepository implements VacationRepository {
     public Mono<Void> modifyVacation(AccountId accountId, VacationPatch vacationPatch) {
         Preconditions.checkNotNull(accountId);
         Preconditions.checkNotNull(vacationPatch);
-        Vacation oldVacation = retrieveVacation(accountId).block();
-        vacationMap.put(accountId, vacationPatch.patch(oldVacation));
-        return Mono.empty();
+        return retrieveVacation(accountId)
+            .doOnNext(oldVacation -> vacationMap.put(accountId, vacationPatch.patch(oldVacation)))
+            .then();
     }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org