You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by rc...@apache.org on 2020/05/13 02:03:25 UTC

[james-project] 04/09: [Refactoring] Extract a ReactorUtils::unboxOptional utils

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit e694f392989540a416765c224bee028327651eba
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Mon May 4 10:36:06 2020 +0700

    [Refactoring] Extract a ReactorUtils::unboxOptional utils
    
    This reduces the boiler plate around the use of handle, which
    optimizes Reactor synchronous transformations.
---
 .../james/backends/cassandra/utils/CassandraAsyncExecutor.java    | 4 +++-
 .../james/mailbox/cassandra/mail/CassandraMessageMapper.java      | 4 +++-
 .../james/mailbox/cassandra/mail/CassandraModSeqProvider.java     | 8 +++++---
 .../apache/james/mailbox/cassandra/mail/CassandraUidProvider.java | 7 +++++--
 .../cassandra/mail/task/SolveMessageInconsistenciesService.java   | 6 ++++--
 .../james/mailbox/cassandra/quota/CassandraGlobalMaxQuotaDao.java | 7 +++++--
 .../mailbox/cassandra/quota/CassandraPerDomainMaxQuotaDao.java    | 7 +++++--
 .../mailbox/cassandra/quota/CassandraPerUserMaxQuotaDao.java      | 7 +++++--
 .../events/ElasticSearchListeningMessageSearchIndex.java          | 3 ++-
 .../main/java/org/apache/james/vault/metadata/MetadataDAO.java    | 4 +++-
 .../util/src/main/java/org/apache/james/util/ReactorUtils.java    | 5 +++++
 .../james/mailrepository/cassandra/CassandraMailRepository.java   | 4 +++-
 .../org/apache/james/jmap/draft/methods/GetMailboxesMethod.java   | 8 +++++---
 .../configuration/EventsourcingConfigurationManagement.java       | 7 +++++--
 .../eventsourcing/distributed/RabbitMQTerminationSubscriber.java  | 5 ++++-
 .../main/java/org/apache/james/task/SerialTaskManagerWorker.java  | 4 +++-
 16 files changed, 65 insertions(+), 25 deletions(-)

diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java
index 86b192b..24bf48e 100644
--- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java
+++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.backends.cassandra.utils;
 
+import static org.apache.james.util.ReactorUtils.publishIfPresent;
+
 import java.util.Optional;
 
 import javax.inject.Inject;
@@ -66,7 +68,7 @@ public class CassandraAsyncExecutor {
 
     public Mono<Row> executeSingleRow(Statement statement) {
         return executeSingleRowOptional(statement)
-                .handle((t, sink) -> t.ifPresent(sink::next));
+                .handle(publishIfPresent());
     }
 
     public Flux<Row> executeRows(Statement statement) {
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index 465c9cf..cb31434 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.mailbox.cassandra.mail;
 
+import static org.apache.james.util.ReactorUtils.publishIfPresent;
+
 import java.time.Duration;
 import java.util.Comparator;
 import java.util.Iterator;
@@ -312,7 +314,7 @@ public class CassandraMessageMapper implements MessageMapper {
         if (!failed.isEmpty()) {
             Flux<ComposedMessageIdWithMetaData> toUpdate = Flux.fromIterable(failed)
                 .flatMap(uid -> messageIdDAO.retrieve(mailboxId, uid))
-                .handle((t, sink) -> t.ifPresent(sink::next));
+                .handle(publishIfPresent());
             return runUpdateStage(mailboxId, toUpdate, flagsUpdateCalculator);
         } else {
             return Mono.empty();
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
index 15f1a2d..fc1647c 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
@@ -28,6 +28,7 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.update;
 import static org.apache.james.mailbox.cassandra.table.CassandraMessageModseqTable.MAILBOX_ID;
 import static org.apache.james.mailbox.cassandra.table.CassandraMessageModseqTable.NEXT_MODSEQ;
 import static org.apache.james.mailbox.cassandra.table.CassandraMessageModseqTable.TABLE_NAME;
+import static org.apache.james.util.ReactorUtils.publishIfPresent;
 
 import java.time.Duration;
 import java.util.Optional;
@@ -157,7 +158,8 @@ public class CassandraModSeqProvider implements ModSeqProvider {
             insert.bind()
                 .setUUID(MAILBOX_ID, mailboxId.asUuid())
                 .setLong(NEXT_MODSEQ, nextModSeq.asLong()))
-            .handle((success, sink) -> successToModSeq(nextModSeq, success).ifPresent(sink::next));
+            .map(success -> successToModSeq(nextModSeq, success))
+            .handle(publishIfPresent());
     }
 
     private Mono<ModSeq> tryUpdateModSeq(CassandraId mailboxId, ModSeq modSeq) {
@@ -167,7 +169,8 @@ public class CassandraModSeqProvider implements ModSeqProvider {
                 .setUUID(MAILBOX_ID, mailboxId.asUuid())
                 .setLong(NEXT_MODSEQ, nextModSeq.asLong())
                 .setLong(MOD_SEQ_CONDITION, modSeq.asLong()))
-            .handle((success, sink) -> successToModSeq(nextModSeq, success).ifPresent(sink::next));
+            .map(success -> successToModSeq(nextModSeq, success))
+            .handle(publishIfPresent());
     }
 
     private Optional<ModSeq> successToModSeq(ModSeq modSeq, Boolean success) {
@@ -187,5 +190,4 @@ public class CassandraModSeqProvider implements ModSeqProvider {
             .single()
             .retryWhen(Retry.backoff(maxModSeqRetries, firstBackoff).scheduler(Schedulers.elastic()));
     }
-
 }
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java
index fea0d5e..86fa061 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java
@@ -28,6 +28,7 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.update;
 import static org.apache.james.mailbox.cassandra.table.CassandraMessageUidTable.MAILBOX_ID;
 import static org.apache.james.mailbox.cassandra.table.CassandraMessageUidTable.NEXT_UID;
 import static org.apache.james.mailbox.cassandra.table.CassandraMessageUidTable.TABLE_NAME;
+import static org.apache.james.util.ReactorUtils.publishIfPresent;
 
 import java.time.Duration;
 import java.util.Optional;
@@ -135,14 +136,16 @@ public class CassandraUidProvider implements UidProvider {
                         .setUUID(MAILBOX_ID, mailboxId.asUuid())
                         .setLong(CONDITION, uid.asLong())
                         .setLong(NEXT_UID, nextUid.asLong()))
-                .handle((success, sink) -> successToUid(nextUid, success).ifPresent(sink::next));
+                .map(success -> successToUid(nextUid, success))
+                .handle(publishIfPresent());
     }
 
     private Mono<MessageUid> tryInsert(CassandraId mailboxId) {
         return executor.executeReturnApplied(
             insertStatement.bind()
                 .setUUID(MAILBOX_ID, mailboxId.asUuid()))
-            .handle((success, sink) -> successToUid(MessageUid.MIN_VALUE, success).ifPresent(sink::next));
+            .map(success -> successToUid(MessageUid.MIN_VALUE, success))
+            .handle(publishIfPresent());
     }
 
     private Optional<MessageUid> successToUid(MessageUid uid, Boolean success) {
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java
index 8518f1e..f0ec09b 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.mailbox.cassandra.mail.task;
 
+import static org.apache.james.util.ReactorUtils.publishIfPresent;
+
 import java.util.Collection;
 import java.util.Objects;
 import java.util.Optional;
@@ -414,7 +416,7 @@ public class SolveMessageInconsistenciesService {
 
     private Mono<Inconsistency> compareWithMessageIdRecord(ComposedMessageIdWithMetaData upToDateMessageFromImapUid) {
         return messageIdDAO.retrieve((CassandraId) upToDateMessageFromImapUid.getComposedMessageId().getMailboxId(), upToDateMessageFromImapUid.getComposedMessageId().getUid())
-            .flatMap(Mono::justOrEmpty)
+            .handle(publishIfPresent())
             .map(messageIdRecord -> {
                 if (messageIdRecord.equals(upToDateMessageFromImapUid)) {
                     return NO_INCONSISTENCY;
@@ -433,7 +435,7 @@ public class SolveMessageInconsistenciesService {
 
     private Mono<Inconsistency> detectInconsistencyInMessageId(ComposedMessageIdWithMetaData message) {
         return messageIdDAO.retrieve((CassandraId) message.getComposedMessageId().getMailboxId(), message.getComposedMessageId().getUid())
-            .flatMap(Mono::justOrEmpty)
+            .handle(publishIfPresent())
             .flatMap(upToDateMessage -> messageIdToImapUidDAO.retrieve((CassandraMessageId) message.getComposedMessageId().getMessageId(), Optional.of((CassandraId) message.getComposedMessageId().getMailboxId()))
                 .map(uidRecord -> NO_INCONSISTENCY)
                 .switchIfEmpty(Mono.just(new OrphanMessageIdEntry(message)))
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraGlobalMaxQuotaDao.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraGlobalMaxQuotaDao.java
index 74fd9bf..9ee9159 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraGlobalMaxQuotaDao.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraGlobalMaxQuotaDao.java
@@ -24,6 +24,7 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.delete;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
+import static org.apache.james.util.ReactorUtils.publishIfPresent;
 
 import javax.inject.Inject;
 
@@ -93,14 +94,16 @@ public class CassandraGlobalMaxQuotaDao {
         return queryExecutor.executeSingleRow(getGlobalMaxStatement.bind()
                 .setString(CassandraGlobalMaxQuota.TYPE, CassandraGlobalMaxQuota.STORAGE))
             .flatMap(row -> Mono.justOrEmpty(row.get(CassandraGlobalMaxQuota.VALUE, Long.class)))
-            .flatMap(maxStorage -> Mono.justOrEmpty(QuotaCodec.longToQuotaSize(maxStorage)));
+            .map(QuotaCodec::longToQuotaSize)
+            .handle(publishIfPresent());
     }
 
     Mono<QuotaCountLimit> getGlobalMaxMessage() {
         return queryExecutor.executeSingleRow(getGlobalMaxStatement.bind()
             .setString(CassandraGlobalMaxQuota.TYPE, CassandraGlobalMaxQuota.MESSAGE))
             .flatMap(row -> Mono.justOrEmpty(row.get(CassandraGlobalMaxQuota.VALUE, Long.class)))
-            .flatMap(maxMessages -> Mono.justOrEmpty(QuotaCodec.longToQuotaCount(maxMessages)));
+            .map(QuotaCodec::longToQuotaCount)
+            .handle(publishIfPresent());
     }
 
     Mono<Void> removeGlobaltMaxStorage() {
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerDomainMaxQuotaDao.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerDomainMaxQuotaDao.java
index cb49d3a..38161ef 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerDomainMaxQuotaDao.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerDomainMaxQuotaDao.java
@@ -24,6 +24,7 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.delete;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
+import static org.apache.james.util.ReactorUtils.publishIfPresent;
 
 import javax.inject.Inject;
 
@@ -109,13 +110,15 @@ public class CassandraPerDomainMaxQuotaDao {
     Mono<QuotaSizeLimit> getMaxStorage(Domain domain) {
         return queryExecutor.executeSingleRow(getMaxStorageStatement.bind(domain.asString()))
             .flatMap(row -> Mono.justOrEmpty(row.get(CassandraDomainMaxQuota.STORAGE, Long.class)))
-            .flatMap(maxStorage -> Mono.justOrEmpty(QuotaCodec.longToQuotaSize(maxStorage)));
+            .map(QuotaCodec::longToQuotaSize)
+            .handle(publishIfPresent());
     }
 
     Mono<QuotaCountLimit> getMaxMessage(Domain domain) {
         return queryExecutor.executeSingleRow(getMaxMessageStatement.bind(domain.asString()))
             .flatMap(row -> Mono.justOrEmpty(row.get(CassandraDomainMaxQuota.MESSAGE_COUNT, Long.class)))
-            .flatMap(maxMessages -> Mono.justOrEmpty(QuotaCodec.longToQuotaCount(maxMessages)));
+            .map(QuotaCodec::longToQuotaCount)
+            .handle(publishIfPresent());
     }
 
     Mono<Void> removeMaxMessage(Domain domain) {
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerUserMaxQuotaDao.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerUserMaxQuotaDao.java
index cff560d..2b4e21b 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerUserMaxQuotaDao.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerUserMaxQuotaDao.java
@@ -24,6 +24,7 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.delete;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
+import static org.apache.james.util.ReactorUtils.publishIfPresent;
 
 import javax.inject.Inject;
 
@@ -109,13 +110,15 @@ public class CassandraPerUserMaxQuotaDao {
     Mono<QuotaSizeLimit> getMaxStorage(QuotaRoot quotaRoot) {
         return queryExecutor.executeSingleRow(getMaxStorageStatement.bind(quotaRoot.getValue()))
             .flatMap(row -> Mono.justOrEmpty(row.get(CassandraMaxQuota.STORAGE, Long.class)))
-            .flatMap(maxStorage -> Mono.justOrEmpty(QuotaCodec.longToQuotaSize(maxStorage)));
+            .map(QuotaCodec::longToQuotaSize)
+            .handle(publishIfPresent());
     }
 
     Mono<QuotaCountLimit> getMaxMessage(QuotaRoot quotaRoot) {
         return queryExecutor.executeSingleRow(getMaxMessageStatement.bind(quotaRoot.getValue()))
             .flatMap(row -> Mono.justOrEmpty(row.get(CassandraMaxQuota.MESSAGE_COUNT, Long.class)))
-            .flatMap(maxMessages -> Mono.justOrEmpty(QuotaCodec.longToQuotaCount(maxMessages)));
+            .map(QuotaCodec::longToQuotaCount)
+            .handle(publishIfPresent());
     }
 
     Mono<Void> removeMaxMessage(QuotaRoot quotaRoot) {
diff --git a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java
index 3d2d099..091db6b 100644
--- a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java
+++ b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java
@@ -18,6 +18,7 @@
  ****************************************************************/
 package org.apache.james.mailbox.elasticsearch.events;
 
+import static org.apache.james.util.ReactorUtils.publishIfPresent;
 import static org.elasticsearch.index.query.QueryBuilders.termQuery;
 
 import java.util.Collection;
@@ -128,7 +129,7 @@ public class ElasticSearchListeningMessageSearchIndex extends ListeningMessageSe
         return searcher.search(mailboxIds, searchQuery, Optional.empty())
             .doOnNext(this::logIfNoMessageId)
             .map(SearchResult::getMessageId)
-            .flatMap(Mono::justOrEmpty)
+            .handle(publishIfPresent())
             .distinct()
             .take(limit);
     }
diff --git a/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/MetadataDAO.java b/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/MetadataDAO.java
index 87553e9..f03fd4e 100644
--- a/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/MetadataDAO.java
+++ b/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/MetadataDAO.java
@@ -24,6 +24,7 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.delete;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
+import static org.apache.james.util.ReactorUtils.publishIfPresent;
 import static org.apache.james.vault.metadata.DeletedMessageMetadataModule.DeletedMessageMetadataTable.BUCKET_NAME;
 import static org.apache.james.vault.metadata.DeletedMessageMetadataModule.DeletedMessageMetadataTable.MESSAGE_ID;
 import static org.apache.james.vault.metadata.DeletedMessageMetadataModule.DeletedMessageMetadataTable.OWNER;
@@ -109,7 +110,8 @@ public class MetadataDAO {
                 .setString(BUCKET_NAME, bucketName.asString())
                 .setString(OWNER, username.asString()))
             .map(row -> row.getString(PAYLOAD))
-            .handle((json, sink) -> metadataSerializer.deserialize(json).ifPresent(sink::next));
+            .map(metadataSerializer::deserialize)
+            .handle(publishIfPresent());
     }
 
     Flux<MessageId> retrieveMessageIds(BucketName bucketName, Username username) {
diff --git a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
index df9c937..a477355 100644
--- a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
+++ b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
@@ -24,11 +24,13 @@ import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.util.Iterator;
 import java.util.Optional;
+import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.publisher.Signal;
+import reactor.core.publisher.SynchronousSink;
 import reactor.util.context.Context;
 
 public class ReactorUtils {
@@ -39,6 +41,9 @@ public class ReactorUtils {
         return Mono.fromRunnable(runnable).then(Mono.empty());
     }
 
+    public static <T> BiConsumer<Optional<T>, SynchronousSink<T>> publishIfPresent() {
+        return (element, sink) -> element.ifPresent(sink::next);
+    }
 
     public static InputStream toInputStream(Flux<ByteBuffer> byteArrays) {
         return new StreamInputStream(byteArrays.toIterable(1).iterator());
diff --git a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java
index cf98198..134d969 100644
--- a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java
+++ b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.mailrepository.cassandra;
 
+import static org.apache.james.util.ReactorUtils.publishIfPresent;
+
 import java.util.Collection;
 import java.util.Iterator;
 
@@ -93,7 +95,7 @@ public class CassandraMailRepository implements MailRepository {
     @Override
     public Mail retrieve(MailKey key) {
         return mailDAO.read(url, key)
-            .<MailDTO>handle((t, sink) -> t.ifPresent(sink::next))
+            .handle(publishIfPresent())
             .flatMap(this::toMail)
             .blockOptional()
             .orElse(null);
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMailboxesMethod.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMailboxesMethod.java
index f74b243..f5936a9 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMailboxesMethod.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMailboxesMethod.java
@@ -20,6 +20,7 @@
 package org.apache.james.jmap.draft.methods;
 
 import static org.apache.james.util.ReactorUtils.context;
+import static org.apache.james.util.ReactorUtils.publishIfPresent;
 
 import java.util.Comparator;
 import java.util.List;
@@ -144,7 +145,7 @@ public class GetMailboxesMethod implements Method {
                     .usingPreloadedMailboxesMetadata(NO_PRELOADED_METADATA)
                     .build())
                 .subscribeOn(Schedulers.elastic()))
-            .handle((element, sink) -> element.ifPresent(sink::next));
+            .handle(publishIfPresent());
     }
 
     private Flux<Mailbox> retrieveAllMailboxes(MailboxSession mailboxSession) {
@@ -156,12 +157,13 @@ public class GetMailboxesMethod implements Method {
         return userMailboxesMono.zipWith(quotaLoaderMono)
             .flatMapMany(
                 tuple -> Flux.fromIterable(tuple.getT1())
-                    .flatMap(mailboxMetaData -> Mono.justOrEmpty(mailboxFactory.builder()
+                    .map(mailboxMetaData -> mailboxFactory.builder()
                         .mailboxMetadata(mailboxMetaData)
                         .session(mailboxSession)
                         .usingPreloadedMailboxesMetadata(Optional.of(tuple.getT1()))
                         .quotaLoader(tuple.getT2())
-                        .build())));
+                        .build())
+                    .handle(publishIfPresent()));
     }
 
     private Flux<MailboxMetaData> getAllMailboxesMetaData(MailboxSession mailboxSession) {
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/EventsourcingConfigurationManagement.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/EventsourcingConfigurationManagement.java
index ba2c621..6cacd12 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/EventsourcingConfigurationManagement.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/EventsourcingConfigurationManagement.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.queue.rabbitmq.view.cassandra.configuration;
 
+import static org.apache.james.util.ReactorUtils.publishIfPresent;
+
 import javax.inject.Inject;
 
 import org.apache.james.eventsourcing.AggregateId;
@@ -53,9 +55,10 @@ public class EventsourcingConfigurationManagement {
     @VisibleForTesting
     Mono<CassandraMailQueueViewConfiguration> load() {
         return Mono.from(eventStore.getEventsOfAggregate(CONFIGURATION_AGGREGATE_ID))
-            .flatMap(history -> Mono.justOrEmpty(ConfigurationAggregate
+            .map(history -> ConfigurationAggregate
                 .load(CONFIGURATION_AGGREGATE_ID, history)
-                .getCurrentConfiguration()));
+                .getCurrentConfiguration())
+            .handle(publishIfPresent());
     }
 
     public void registerConfiguration(CassandraMailQueueViewConfiguration newConfiguration) {
diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
index 174101a..2c1812f 100644
--- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
+++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
@@ -20,6 +20,8 @@
 
 package org.apache.james.task.eventsourcing.distributed;
 
+import static org.apache.james.util.ReactorUtils.publishIfPresent;
+
 import java.io.Closeable;
 import java.nio.charset.StandardCharsets;
 import java.util.Optional;
@@ -91,7 +93,8 @@ public class RabbitMQTerminationSubscriber implements TerminationSubscriber, Sta
         listenQueueHandle = listenerReceiver
             .consumeAutoAck(queueName)
             .subscribeOn(Schedulers.elastic())
-            .<Event>handle((delivery, sink) -> toEvent(delivery).ifPresent(sink::next))
+            .map(this::toEvent)
+            .handle(publishIfPresent())
             .subscribe(listener::onNext);
     }
 
diff --git a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
index b024c4c..8341a16 100644
--- a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
+++ b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
@@ -18,6 +18,8 @@
  ****************************************************************/
 package org.apache.james.task;
 
+import static org.apache.james.util.ReactorUtils.publishIfPresent;
+
 import java.io.IOException;
 import java.time.Duration;
 import java.util.Optional;
@@ -91,7 +93,7 @@ public class SerialTaskManagerWorker implements TaskManagerWorker {
         return Mono.fromCallable(() -> taskWithId.getTask().details())
             .delayElement(pollingInterval, Schedulers.elastic())
             .repeat()
-            .<TaskExecutionDetails.AdditionalInformation>handle((maybeDetails, sink) -> maybeDetails.ifPresent(sink::next))
+            .handle(publishIfPresent())
             .flatMap(information -> Mono.from(listener.updated(taskWithId.getId(), information)).thenReturn(information));
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org