You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/01/22 10:49:49 UTC
[james-project] 04/06: [REFACTORING] Reactor: Use handle instead of
flatMap for mono synchronous transformations
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 9bdf1bc1d3365320fc970aef6a9bc46516bf16c5
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Thu Jan 16 16:20:20 2020 +0700
[REFACTORING] Reactor: Use handle instead of flatMap for mono synchronous transformations
---
.../backends/cassandra/utils/CassandraAsyncExecutor.java | 2 +-
.../mailbox/cassandra/mail/CassandraMessageMapper.java | 7 +++----
.../mailbox/cassandra/mail/CassandraModSeqProvider.java | 16 ++++++++--------
.../mailbox/cassandra/mail/CassandraUidProvider.java | 12 ++++++------
.../apache/james/blob/cassandra/CassandraBlobStore.java | 2 +-
.../cassandra/CassandraMailRepository.java | 5 +++--
.../distributed/RabbitMQTerminationSubscriber.java | 1 -
7 files changed, 22 insertions(+), 23 deletions(-)
diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java
index 1b6464c..3654ebe 100644
--- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java
+++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java
@@ -59,7 +59,7 @@ public class CassandraAsyncExecutor {
public Mono<Row> executeSingleRow(Statement statement) {
return executeSingleRowOptional(statement)
- .flatMap(Mono::justOrEmpty);
+ .handle((t, sink) -> t.ifPresent(sink::next));
}
public Flux<Row> executeRows(Statement statement) {
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index ed93cc2..695c38f 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -240,7 +240,7 @@ public class CassandraMessageMapper implements MessageMapper {
return messageIdDAO.retrieve(mailboxId, uid)
.doOnNext(optional -> OptionalUtils.executeIfEmpty(optional,
() -> LOGGER.warn("Could not retrieve message {} {}", mailboxId, uid)))
- .flatMap(Mono::justOrEmpty);
+ .handle((t, sink) -> t.ifPresent(sink::next));
}
@Override
@@ -325,9 +325,8 @@ public class CassandraMessageMapper implements MessageMapper {
private Mono<FlagsUpdateStageResult> retryUpdatesStage(CassandraId mailboxId, FlagsUpdateCalculator flagsUpdateCalculator, List<MessageUid> failed) {
if (!failed.isEmpty()) {
Flux<ComposedMessageIdWithMetaData> toUpdate = Flux.fromIterable(failed)
- .flatMap(uid -> messageIdDAO.retrieve(mailboxId, uid)
- .flatMap(Mono::justOrEmpty)
- );
+ .flatMap(uid -> messageIdDAO.retrieve(mailboxId, uid))
+ .handle((t, sink) -> t.ifPresent(sink::next));
return runUpdateStage(mailboxId, toUpdate, flagsUpdateCalculator);
} else {
return Mono.empty();
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
index e5d7290..2199f74 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
@@ -44,7 +44,6 @@ import org.apache.james.mailbox.exception.MailboxException;
import org.apache.james.mailbox.model.Mailbox;
import org.apache.james.mailbox.model.MailboxId;
import org.apache.james.mailbox.store.mail.ModSeqProvider;
-import org.apache.james.util.FunctionalUtils;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.PreparedStatement;
@@ -157,7 +156,7 @@ public class CassandraModSeqProvider implements ModSeqProvider {
insert.bind()
.setUUID(MAILBOX_ID, mailboxId.asUuid())
.setLong(NEXT_MODSEQ, nextModSeq.asLong()))
- .flatMap(success -> successToModSeq(nextModSeq, success));
+ .handle((success, sink) -> successToModSeq(nextModSeq, success).ifPresent(sink::next));
}
private Mono<ModSeq> tryUpdateModSeq(CassandraId mailboxId, ModSeq modSeq) {
@@ -167,13 +166,14 @@ public class CassandraModSeqProvider implements ModSeqProvider {
.setUUID(MAILBOX_ID, mailboxId.asUuid())
.setLong(NEXT_MODSEQ, nextModSeq.asLong())
.setLong(MOD_SEQ_CONDITION, modSeq.asLong()))
- .flatMap(success -> successToModSeq(nextModSeq, success));
+ .handle((success, sink) -> successToModSeq(nextModSeq, success).ifPresent(sink::next));
}
- private Mono<ModSeq> successToModSeq(ModSeq modSeq, Boolean success) {
- return Mono.just(success)
- .filter(FunctionalUtils.identityPredicate())
- .map(any -> modSeq);
+ private Optional<ModSeq> successToModSeq(ModSeq modSeq, Boolean success) {
+ if (success) {
+ return Optional.of(modSeq);
+ }
+ return Optional.empty();
}
public Mono<ModSeq> nextModSeq(CassandraId mailboxId) {
@@ -193,7 +193,7 @@ public class CassandraModSeqProvider implements ModSeqProvider {
private Mono<ModSeq> tryFindThenUpdateOnce(CassandraId mailboxId) {
return Mono.defer(() -> findHighestModSeq(mailboxId)
- .flatMap(Mono::justOrEmpty)
+ .<ModSeq>handle((t, sink) -> t.ifPresent(sink::next))
.flatMap(highestModSeq -> tryUpdateModSeq(mailboxId, highestModSeq)));
}
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java
index 276ba2b..a5d9968 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java
@@ -111,7 +111,7 @@ public class CassandraUidProvider implements UidProvider {
}
@Override
- public Optional<MessageUid> lastUid(Mailbox mailbox) throws MailboxException {
+ public Optional<MessageUid> lastUid(Mailbox mailbox) {
return findHighestUid((CassandraId) mailbox.getMailboxId())
.blockOptional();
}
@@ -131,21 +131,21 @@ public class CassandraUidProvider implements UidProvider {
.setUUID(MAILBOX_ID, mailboxId.asUuid())
.setLong(CONDITION, uid.asLong())
.setLong(NEXT_UID, nextUid.asLong()))
- .flatMap(success -> successToUid(nextUid, success)));
+ .handle((success, sink) -> successToUid(nextUid, success).ifPresent(sink::next)));
}
private Mono<MessageUid> tryInsert(CassandraId mailboxId) {
return Mono.defer(() -> executor.executeReturnApplied(
insertStatement.bind()
.setUUID(MAILBOX_ID, mailboxId.asUuid()))
- .flatMap(success -> successToUid(MessageUid.MIN_VALUE, success)));
+ .handle((success, sink) -> successToUid(MessageUid.MIN_VALUE, success).ifPresent(sink::next)));
}
- private Mono<MessageUid> successToUid(MessageUid uid, Boolean success) {
+ private Optional<MessageUid> successToUid(MessageUid uid, Boolean success) {
if (success) {
- return Mono.just(uid);
+ return Optional.of(uid);
}
- return Mono.empty();
+ return Optional.empty();
}
}
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
index 1993f84..854bf44 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
@@ -96,7 +96,7 @@ public class CassandraBlobStore implements BlobStore {
.flatMap(pair -> writePart(bucketName, blobId, pair.getKey(), pair.getValue())
.then(Mono.just(getChunkNum(pair))))
.collect(Collectors.maxBy(Comparator.comparingInt(x -> x)))
- .flatMap(Mono::justOrEmpty)
+ .<Integer>handle((t, sink) -> t.ifPresent(sink::next))
.map(this::numToCount)
.defaultIfEmpty(0);
}
diff --git a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java
index bbaf01f..cf98198 100644
--- a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java
+++ b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java
@@ -32,6 +32,7 @@ import org.apache.james.blob.mail.MimeMessageStore;
import org.apache.james.mailrepository.api.MailKey;
import org.apache.james.mailrepository.api.MailRepository;
import org.apache.james.mailrepository.api.MailRepositoryUrl;
+import org.apache.james.mailrepository.cassandra.CassandraMailRepositoryMailDaoAPI.MailDTO;
import org.apache.mailet.Mail;
import reactor.core.publisher.Flux;
@@ -92,13 +93,13 @@ public class CassandraMailRepository implements MailRepository {
@Override
public Mail retrieve(MailKey key) {
return mailDAO.read(url, key)
- .flatMap(Mono::justOrEmpty)
+ .<MailDTO>handle((t, sink) -> t.ifPresent(sink::next))
.flatMap(this::toMail)
.blockOptional()
.orElse(null);
}
- private Mono<Mail> toMail(CassandraMailRepositoryMailDAO.MailDTO mailDTO) {
+ private Mono<Mail> toMail(MailDTO mailDTO) {
MimeMessagePartsId parts = MimeMessagePartsId.builder()
.headerBlobId(mailDTO.getHeaderBlobId())
.bodyBlobId(mailDTO.getBodyBlobId())
diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
index 8f4b2b7..a949af5 100644
--- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
+++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
@@ -43,7 +43,6 @@ import com.rabbitmq.client.Delivery;
import reactor.core.Disposable;
import reactor.core.publisher.DirectProcessor;
-import reactor.core.publisher.Mono;
import reactor.core.publisher.UnicastProcessor;
import reactor.core.scheduler.Schedulers;
import reactor.rabbitmq.BindingSpecification;
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org