You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/01/22 10:49:49 UTC

[james-project] 04/06: [REFACTORING] Reactor: Use handle instead of flatMap for mono synchronous transformations

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 9bdf1bc1d3365320fc970aef6a9bc46516bf16c5
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Thu Jan 16 16:20:20 2020 +0700

    [REFACTORING] Reactor: Use handle instead of flatMap for mono synchronous transformations
---
 .../backends/cassandra/utils/CassandraAsyncExecutor.java |  2 +-
 .../mailbox/cassandra/mail/CassandraMessageMapper.java   |  7 +++----
 .../mailbox/cassandra/mail/CassandraModSeqProvider.java  | 16 ++++++++--------
 .../mailbox/cassandra/mail/CassandraUidProvider.java     | 12 ++++++------
 .../apache/james/blob/cassandra/CassandraBlobStore.java  |  2 +-
 .../cassandra/CassandraMailRepository.java               |  5 +++--
 .../distributed/RabbitMQTerminationSubscriber.java       |  1 -
 7 files changed, 22 insertions(+), 23 deletions(-)

diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java
index 1b6464c..3654ebe 100644
--- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java
+++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java
@@ -59,7 +59,7 @@ public class CassandraAsyncExecutor {
 
     public Mono<Row> executeSingleRow(Statement statement) {
         return executeSingleRowOptional(statement)
-                .flatMap(Mono::justOrEmpty);
+                .handle((t, sink) -> t.ifPresent(sink::next));
     }
 
     public Flux<Row> executeRows(Statement statement) {
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index ed93cc2..695c38f 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -240,7 +240,7 @@ public class CassandraMessageMapper implements MessageMapper {
         return messageIdDAO.retrieve(mailboxId, uid)
             .doOnNext(optional -> OptionalUtils.executeIfEmpty(optional,
                 () -> LOGGER.warn("Could not retrieve message {} {}", mailboxId, uid)))
-            .flatMap(Mono::justOrEmpty);
+            .handle((t, sink) -> t.ifPresent(sink::next));
     }
 
     @Override
@@ -325,9 +325,8 @@ public class CassandraMessageMapper implements MessageMapper {
     private Mono<FlagsUpdateStageResult> retryUpdatesStage(CassandraId mailboxId, FlagsUpdateCalculator flagsUpdateCalculator, List<MessageUid> failed) {
         if (!failed.isEmpty()) {
             Flux<ComposedMessageIdWithMetaData> toUpdate = Flux.fromIterable(failed)
-                .flatMap(uid -> messageIdDAO.retrieve(mailboxId, uid)
-                        .flatMap(Mono::justOrEmpty)
-                );
+                .flatMap(uid -> messageIdDAO.retrieve(mailboxId, uid))
+                .handle((t, sink) -> t.ifPresent(sink::next));
             return runUpdateStage(mailboxId, toUpdate, flagsUpdateCalculator);
         } else {
             return Mono.empty();
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
index e5d7290..2199f74 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
@@ -44,7 +44,6 @@ import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.model.Mailbox;
 import org.apache.james.mailbox.model.MailboxId;
 import org.apache.james.mailbox.store.mail.ModSeqProvider;
-import org.apache.james.util.FunctionalUtils;
 
 import com.datastax.driver.core.ConsistencyLevel;
 import com.datastax.driver.core.PreparedStatement;
@@ -157,7 +156,7 @@ public class CassandraModSeqProvider implements ModSeqProvider {
             insert.bind()
                 .setUUID(MAILBOX_ID, mailboxId.asUuid())
                 .setLong(NEXT_MODSEQ, nextModSeq.asLong()))
-            .flatMap(success -> successToModSeq(nextModSeq, success));
+            .handle((success, sink) -> successToModSeq(nextModSeq, success).ifPresent(sink::next));
     }
 
     private Mono<ModSeq> tryUpdateModSeq(CassandraId mailboxId, ModSeq modSeq) {
@@ -167,13 +166,14 @@ public class CassandraModSeqProvider implements ModSeqProvider {
                 .setUUID(MAILBOX_ID, mailboxId.asUuid())
                 .setLong(NEXT_MODSEQ, nextModSeq.asLong())
                 .setLong(MOD_SEQ_CONDITION, modSeq.asLong()))
-            .flatMap(success -> successToModSeq(nextModSeq, success));
+            .handle((success, sink) -> successToModSeq(nextModSeq, success).ifPresent(sink::next));
     }
 
-    private Mono<ModSeq> successToModSeq(ModSeq modSeq, Boolean success) {
-        return Mono.just(success)
-            .filter(FunctionalUtils.identityPredicate())
-            .map(any -> modSeq);
+    private Optional<ModSeq> successToModSeq(ModSeq modSeq, Boolean success) {
+        if (success) {
+            return Optional.of(modSeq);
+        }
+        return Optional.empty();
     }
 
     public Mono<ModSeq> nextModSeq(CassandraId mailboxId) {
@@ -193,7 +193,7 @@ public class CassandraModSeqProvider implements ModSeqProvider {
 
     private Mono<ModSeq> tryFindThenUpdateOnce(CassandraId mailboxId) {
         return Mono.defer(() -> findHighestModSeq(mailboxId)
-            .flatMap(Mono::justOrEmpty)
+            .<ModSeq>handle((t, sink) -> t.ifPresent(sink::next))
             .flatMap(highestModSeq -> tryUpdateModSeq(mailboxId, highestModSeq)));
     }
 
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java
index 276ba2b..a5d9968 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java
@@ -111,7 +111,7 @@ public class CassandraUidProvider implements UidProvider {
     }
 
     @Override
-    public Optional<MessageUid> lastUid(Mailbox mailbox) throws MailboxException {
+    public Optional<MessageUid> lastUid(Mailbox mailbox) {
         return findHighestUid((CassandraId) mailbox.getMailboxId())
                 .blockOptional();
     }
@@ -131,21 +131,21 @@ public class CassandraUidProvider implements UidProvider {
                         .setUUID(MAILBOX_ID, mailboxId.asUuid())
                         .setLong(CONDITION, uid.asLong())
                         .setLong(NEXT_UID, nextUid.asLong()))
-                .flatMap(success -> successToUid(nextUid, success)));
+                .handle((success, sink) -> successToUid(nextUid, success).ifPresent(sink::next)));
     }
 
     private Mono<MessageUid> tryInsert(CassandraId mailboxId) {
         return Mono.defer(() -> executor.executeReturnApplied(
             insertStatement.bind()
                 .setUUID(MAILBOX_ID, mailboxId.asUuid()))
-            .flatMap(success -> successToUid(MessageUid.MIN_VALUE, success)));
+            .handle((success, sink) -> successToUid(MessageUid.MIN_VALUE, success).ifPresent(sink::next)));
     }
 
-    private Mono<MessageUid> successToUid(MessageUid uid, Boolean success) {
+    private Optional<MessageUid> successToUid(MessageUid uid, Boolean success) {
         if (success) {
-            return Mono.just(uid);
+            return Optional.of(uid);
         }
-        return Mono.empty();
+        return Optional.empty();
     }
 
 }
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
index 1993f84..854bf44 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
@@ -96,7 +96,7 @@ public class CassandraBlobStore implements BlobStore {
             .flatMap(pair -> writePart(bucketName, blobId, pair.getKey(), pair.getValue())
                 .then(Mono.just(getChunkNum(pair))))
             .collect(Collectors.maxBy(Comparator.comparingInt(x -> x)))
-            .flatMap(Mono::justOrEmpty)
+            .<Integer>handle((t, sink) -> t.ifPresent(sink::next))
             .map(this::numToCount)
             .defaultIfEmpty(0);
     }
diff --git a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java
index bbaf01f..cf98198 100644
--- a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java
+++ b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java
@@ -32,6 +32,7 @@ import org.apache.james.blob.mail.MimeMessageStore;
 import org.apache.james.mailrepository.api.MailKey;
 import org.apache.james.mailrepository.api.MailRepository;
 import org.apache.james.mailrepository.api.MailRepositoryUrl;
+import org.apache.james.mailrepository.cassandra.CassandraMailRepositoryMailDaoAPI.MailDTO;
 import org.apache.mailet.Mail;
 
 import reactor.core.publisher.Flux;
@@ -92,13 +93,13 @@ public class CassandraMailRepository implements MailRepository {
     @Override
     public Mail retrieve(MailKey key) {
         return mailDAO.read(url, key)
-            .flatMap(Mono::justOrEmpty)
+            .<MailDTO>handle((t, sink) -> t.ifPresent(sink::next))
             .flatMap(this::toMail)
             .blockOptional()
             .orElse(null);
     }
 
-    private Mono<Mail> toMail(CassandraMailRepositoryMailDAO.MailDTO mailDTO) {
+    private Mono<Mail> toMail(MailDTO mailDTO) {
         MimeMessagePartsId parts = MimeMessagePartsId.builder()
             .headerBlobId(mailDTO.getHeaderBlobId())
             .bodyBlobId(mailDTO.getBodyBlobId())
diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
index 8f4b2b7..a949af5 100644
--- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
+++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
@@ -43,7 +43,6 @@ import com.rabbitmq.client.Delivery;
 
 import reactor.core.Disposable;
 import reactor.core.publisher.DirectProcessor;
-import reactor.core.publisher.Mono;
 import reactor.core.publisher.UnicastProcessor;
 import reactor.core.scheduler.Schedulers;
 import reactor.rabbitmq.BindingSpecification;


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org