You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by rc...@apache.org on 2020/05/13 02:03:25 UTC
[james-project] 04/09: [Refactoring] Extract a
ReactorUtils::unboxOptional utils
This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit e694f392989540a416765c224bee028327651eba
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Mon May 4 10:36:06 2020 +0700
[Refactoring] Extract a ReactorUtils::unboxOptional utils
This reduces the boiler plate around the use of handle, which
optimizes Reactor synchronous transformations.
---
.../james/backends/cassandra/utils/CassandraAsyncExecutor.java | 4 +++-
.../james/mailbox/cassandra/mail/CassandraMessageMapper.java | 4 +++-
.../james/mailbox/cassandra/mail/CassandraModSeqProvider.java | 8 +++++---
.../apache/james/mailbox/cassandra/mail/CassandraUidProvider.java | 7 +++++--
.../cassandra/mail/task/SolveMessageInconsistenciesService.java | 6 ++++--
.../james/mailbox/cassandra/quota/CassandraGlobalMaxQuotaDao.java | 7 +++++--
.../mailbox/cassandra/quota/CassandraPerDomainMaxQuotaDao.java | 7 +++++--
.../mailbox/cassandra/quota/CassandraPerUserMaxQuotaDao.java | 7 +++++--
.../events/ElasticSearchListeningMessageSearchIndex.java | 3 ++-
.../main/java/org/apache/james/vault/metadata/MetadataDAO.java | 4 +++-
.../util/src/main/java/org/apache/james/util/ReactorUtils.java | 5 +++++
.../james/mailrepository/cassandra/CassandraMailRepository.java | 4 +++-
.../org/apache/james/jmap/draft/methods/GetMailboxesMethod.java | 8 +++++---
.../configuration/EventsourcingConfigurationManagement.java | 7 +++++--
.../eventsourcing/distributed/RabbitMQTerminationSubscriber.java | 5 ++++-
.../main/java/org/apache/james/task/SerialTaskManagerWorker.java | 4 +++-
16 files changed, 65 insertions(+), 25 deletions(-)
diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java
index 86b192b..24bf48e 100644
--- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java
+++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java
@@ -19,6 +19,8 @@
package org.apache.james.backends.cassandra.utils;
+import static org.apache.james.util.ReactorUtils.publishIfPresent;
+
import java.util.Optional;
import javax.inject.Inject;
@@ -66,7 +68,7 @@ public class CassandraAsyncExecutor {
public Mono<Row> executeSingleRow(Statement statement) {
return executeSingleRowOptional(statement)
- .handle((t, sink) -> t.ifPresent(sink::next));
+ .handle(publishIfPresent());
}
public Flux<Row> executeRows(Statement statement) {
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index 465c9cf..cb31434 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -19,6 +19,8 @@
package org.apache.james.mailbox.cassandra.mail;
+import static org.apache.james.util.ReactorUtils.publishIfPresent;
+
import java.time.Duration;
import java.util.Comparator;
import java.util.Iterator;
@@ -312,7 +314,7 @@ public class CassandraMessageMapper implements MessageMapper {
if (!failed.isEmpty()) {
Flux<ComposedMessageIdWithMetaData> toUpdate = Flux.fromIterable(failed)
.flatMap(uid -> messageIdDAO.retrieve(mailboxId, uid))
- .handle((t, sink) -> t.ifPresent(sink::next));
+ .handle(publishIfPresent());
return runUpdateStage(mailboxId, toUpdate, flagsUpdateCalculator);
} else {
return Mono.empty();
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
index 15f1a2d..fc1647c 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
@@ -28,6 +28,7 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.update;
import static org.apache.james.mailbox.cassandra.table.CassandraMessageModseqTable.MAILBOX_ID;
import static org.apache.james.mailbox.cassandra.table.CassandraMessageModseqTable.NEXT_MODSEQ;
import static org.apache.james.mailbox.cassandra.table.CassandraMessageModseqTable.TABLE_NAME;
+import static org.apache.james.util.ReactorUtils.publishIfPresent;
import java.time.Duration;
import java.util.Optional;
@@ -157,7 +158,8 @@ public class CassandraModSeqProvider implements ModSeqProvider {
insert.bind()
.setUUID(MAILBOX_ID, mailboxId.asUuid())
.setLong(NEXT_MODSEQ, nextModSeq.asLong()))
- .handle((success, sink) -> successToModSeq(nextModSeq, success).ifPresent(sink::next));
+ .map(success -> successToModSeq(nextModSeq, success))
+ .handle(publishIfPresent());
}
private Mono<ModSeq> tryUpdateModSeq(CassandraId mailboxId, ModSeq modSeq) {
@@ -167,7 +169,8 @@ public class CassandraModSeqProvider implements ModSeqProvider {
.setUUID(MAILBOX_ID, mailboxId.asUuid())
.setLong(NEXT_MODSEQ, nextModSeq.asLong())
.setLong(MOD_SEQ_CONDITION, modSeq.asLong()))
- .handle((success, sink) -> successToModSeq(nextModSeq, success).ifPresent(sink::next));
+ .map(success -> successToModSeq(nextModSeq, success))
+ .handle(publishIfPresent());
}
private Optional<ModSeq> successToModSeq(ModSeq modSeq, Boolean success) {
@@ -187,5 +190,4 @@ public class CassandraModSeqProvider implements ModSeqProvider {
.single()
.retryWhen(Retry.backoff(maxModSeqRetries, firstBackoff).scheduler(Schedulers.elastic()));
}
-
}
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java
index fea0d5e..86fa061 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java
@@ -28,6 +28,7 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.update;
import static org.apache.james.mailbox.cassandra.table.CassandraMessageUidTable.MAILBOX_ID;
import static org.apache.james.mailbox.cassandra.table.CassandraMessageUidTable.NEXT_UID;
import static org.apache.james.mailbox.cassandra.table.CassandraMessageUidTable.TABLE_NAME;
+import static org.apache.james.util.ReactorUtils.publishIfPresent;
import java.time.Duration;
import java.util.Optional;
@@ -135,14 +136,16 @@ public class CassandraUidProvider implements UidProvider {
.setUUID(MAILBOX_ID, mailboxId.asUuid())
.setLong(CONDITION, uid.asLong())
.setLong(NEXT_UID, nextUid.asLong()))
- .handle((success, sink) -> successToUid(nextUid, success).ifPresent(sink::next));
+ .map(success -> successToUid(nextUid, success))
+ .handle(publishIfPresent());
}
private Mono<MessageUid> tryInsert(CassandraId mailboxId) {
return executor.executeReturnApplied(
insertStatement.bind()
.setUUID(MAILBOX_ID, mailboxId.asUuid()))
- .handle((success, sink) -> successToUid(MessageUid.MIN_VALUE, success).ifPresent(sink::next));
+ .map(success -> successToUid(MessageUid.MIN_VALUE, success))
+ .handle(publishIfPresent());
}
private Optional<MessageUid> successToUid(MessageUid uid, Boolean success) {
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java
index 8518f1e..f0ec09b 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java
@@ -19,6 +19,8 @@
package org.apache.james.mailbox.cassandra.mail.task;
+import static org.apache.james.util.ReactorUtils.publishIfPresent;
+
import java.util.Collection;
import java.util.Objects;
import java.util.Optional;
@@ -414,7 +416,7 @@ public class SolveMessageInconsistenciesService {
private Mono<Inconsistency> compareWithMessageIdRecord(ComposedMessageIdWithMetaData upToDateMessageFromImapUid) {
return messageIdDAO.retrieve((CassandraId) upToDateMessageFromImapUid.getComposedMessageId().getMailboxId(), upToDateMessageFromImapUid.getComposedMessageId().getUid())
- .flatMap(Mono::justOrEmpty)
+ .handle(publishIfPresent())
.map(messageIdRecord -> {
if (messageIdRecord.equals(upToDateMessageFromImapUid)) {
return NO_INCONSISTENCY;
@@ -433,7 +435,7 @@ public class SolveMessageInconsistenciesService {
private Mono<Inconsistency> detectInconsistencyInMessageId(ComposedMessageIdWithMetaData message) {
return messageIdDAO.retrieve((CassandraId) message.getComposedMessageId().getMailboxId(), message.getComposedMessageId().getUid())
- .flatMap(Mono::justOrEmpty)
+ .handle(publishIfPresent())
.flatMap(upToDateMessage -> messageIdToImapUidDAO.retrieve((CassandraMessageId) message.getComposedMessageId().getMessageId(), Optional.of((CassandraId) message.getComposedMessageId().getMailboxId()))
.map(uidRecord -> NO_INCONSISTENCY)
.switchIfEmpty(Mono.just(new OrphanMessageIdEntry(message)))
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraGlobalMaxQuotaDao.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraGlobalMaxQuotaDao.java
index 74fd9bf..9ee9159 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraGlobalMaxQuotaDao.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraGlobalMaxQuotaDao.java
@@ -24,6 +24,7 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.delete;
import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
+import static org.apache.james.util.ReactorUtils.publishIfPresent;
import javax.inject.Inject;
@@ -93,14 +94,16 @@ public class CassandraGlobalMaxQuotaDao {
return queryExecutor.executeSingleRow(getGlobalMaxStatement.bind()
.setString(CassandraGlobalMaxQuota.TYPE, CassandraGlobalMaxQuota.STORAGE))
.flatMap(row -> Mono.justOrEmpty(row.get(CassandraGlobalMaxQuota.VALUE, Long.class)))
- .flatMap(maxStorage -> Mono.justOrEmpty(QuotaCodec.longToQuotaSize(maxStorage)));
+ .map(QuotaCodec::longToQuotaSize)
+ .handle(publishIfPresent());
}
Mono<QuotaCountLimit> getGlobalMaxMessage() {
return queryExecutor.executeSingleRow(getGlobalMaxStatement.bind()
.setString(CassandraGlobalMaxQuota.TYPE, CassandraGlobalMaxQuota.MESSAGE))
.flatMap(row -> Mono.justOrEmpty(row.get(CassandraGlobalMaxQuota.VALUE, Long.class)))
- .flatMap(maxMessages -> Mono.justOrEmpty(QuotaCodec.longToQuotaCount(maxMessages)));
+ .map(QuotaCodec::longToQuotaCount)
+ .handle(publishIfPresent());
}
Mono<Void> removeGlobaltMaxStorage() {
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerDomainMaxQuotaDao.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerDomainMaxQuotaDao.java
index cb49d3a..38161ef 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerDomainMaxQuotaDao.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerDomainMaxQuotaDao.java
@@ -24,6 +24,7 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.delete;
import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
+import static org.apache.james.util.ReactorUtils.publishIfPresent;
import javax.inject.Inject;
@@ -109,13 +110,15 @@ public class CassandraPerDomainMaxQuotaDao {
Mono<QuotaSizeLimit> getMaxStorage(Domain domain) {
return queryExecutor.executeSingleRow(getMaxStorageStatement.bind(domain.asString()))
.flatMap(row -> Mono.justOrEmpty(row.get(CassandraDomainMaxQuota.STORAGE, Long.class)))
- .flatMap(maxStorage -> Mono.justOrEmpty(QuotaCodec.longToQuotaSize(maxStorage)));
+ .map(QuotaCodec::longToQuotaSize)
+ .handle(publishIfPresent());
}
Mono<QuotaCountLimit> getMaxMessage(Domain domain) {
return queryExecutor.executeSingleRow(getMaxMessageStatement.bind(domain.asString()))
.flatMap(row -> Mono.justOrEmpty(row.get(CassandraDomainMaxQuota.MESSAGE_COUNT, Long.class)))
- .flatMap(maxMessages -> Mono.justOrEmpty(QuotaCodec.longToQuotaCount(maxMessages)));
+ .map(QuotaCodec::longToQuotaCount)
+ .handle(publishIfPresent());
}
Mono<Void> removeMaxMessage(Domain domain) {
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerUserMaxQuotaDao.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerUserMaxQuotaDao.java
index cff560d..2b4e21b 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerUserMaxQuotaDao.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerUserMaxQuotaDao.java
@@ -24,6 +24,7 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.delete;
import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
+import static org.apache.james.util.ReactorUtils.publishIfPresent;
import javax.inject.Inject;
@@ -109,13 +110,15 @@ public class CassandraPerUserMaxQuotaDao {
Mono<QuotaSizeLimit> getMaxStorage(QuotaRoot quotaRoot) {
return queryExecutor.executeSingleRow(getMaxStorageStatement.bind(quotaRoot.getValue()))
.flatMap(row -> Mono.justOrEmpty(row.get(CassandraMaxQuota.STORAGE, Long.class)))
- .flatMap(maxStorage -> Mono.justOrEmpty(QuotaCodec.longToQuotaSize(maxStorage)));
+ .map(QuotaCodec::longToQuotaSize)
+ .handle(publishIfPresent());
}
Mono<QuotaCountLimit> getMaxMessage(QuotaRoot quotaRoot) {
return queryExecutor.executeSingleRow(getMaxMessageStatement.bind(quotaRoot.getValue()))
.flatMap(row -> Mono.justOrEmpty(row.get(CassandraMaxQuota.MESSAGE_COUNT, Long.class)))
- .flatMap(maxMessages -> Mono.justOrEmpty(QuotaCodec.longToQuotaCount(maxMessages)));
+ .map(QuotaCodec::longToQuotaCount)
+ .handle(publishIfPresent());
}
Mono<Void> removeMaxMessage(QuotaRoot quotaRoot) {
diff --git a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java
index 3d2d099..091db6b 100644
--- a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java
+++ b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java
@@ -18,6 +18,7 @@
****************************************************************/
package org.apache.james.mailbox.elasticsearch.events;
+import static org.apache.james.util.ReactorUtils.publishIfPresent;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
import java.util.Collection;
@@ -128,7 +129,7 @@ public class ElasticSearchListeningMessageSearchIndex extends ListeningMessageSe
return searcher.search(mailboxIds, searchQuery, Optional.empty())
.doOnNext(this::logIfNoMessageId)
.map(SearchResult::getMessageId)
- .flatMap(Mono::justOrEmpty)
+ .handle(publishIfPresent())
.distinct()
.take(limit);
}
diff --git a/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/MetadataDAO.java b/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/MetadataDAO.java
index 87553e9..f03fd4e 100644
--- a/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/MetadataDAO.java
+++ b/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/MetadataDAO.java
@@ -24,6 +24,7 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.delete;
import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
+import static org.apache.james.util.ReactorUtils.publishIfPresent;
import static org.apache.james.vault.metadata.DeletedMessageMetadataModule.DeletedMessageMetadataTable.BUCKET_NAME;
import static org.apache.james.vault.metadata.DeletedMessageMetadataModule.DeletedMessageMetadataTable.MESSAGE_ID;
import static org.apache.james.vault.metadata.DeletedMessageMetadataModule.DeletedMessageMetadataTable.OWNER;
@@ -109,7 +110,8 @@ public class MetadataDAO {
.setString(BUCKET_NAME, bucketName.asString())
.setString(OWNER, username.asString()))
.map(row -> row.getString(PAYLOAD))
- .handle((json, sink) -> metadataSerializer.deserialize(json).ifPresent(sink::next));
+ .map(metadataSerializer::deserialize)
+ .handle(publishIfPresent());
}
Flux<MessageId> retrieveMessageIds(BucketName bucketName, Username username) {
diff --git a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
index df9c937..a477355 100644
--- a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
+++ b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
@@ -24,11 +24,13 @@ import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.Optional;
+import java.util.function.BiConsumer;
import java.util.function.Consumer;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Signal;
+import reactor.core.publisher.SynchronousSink;
import reactor.util.context.Context;
public class ReactorUtils {
@@ -39,6 +41,9 @@ public class ReactorUtils {
return Mono.fromRunnable(runnable).then(Mono.empty());
}
+ public static <T> BiConsumer<Optional<T>, SynchronousSink<T>> publishIfPresent() {
+ return (element, sink) -> element.ifPresent(sink::next);
+ }
public static InputStream toInputStream(Flux<ByteBuffer> byteArrays) {
return new StreamInputStream(byteArrays.toIterable(1).iterator());
diff --git a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java
index cf98198..134d969 100644
--- a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java
+++ b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java
@@ -19,6 +19,8 @@
package org.apache.james.mailrepository.cassandra;
+import static org.apache.james.util.ReactorUtils.publishIfPresent;
+
import java.util.Collection;
import java.util.Iterator;
@@ -93,7 +95,7 @@ public class CassandraMailRepository implements MailRepository {
@Override
public Mail retrieve(MailKey key) {
return mailDAO.read(url, key)
- .<MailDTO>handle((t, sink) -> t.ifPresent(sink::next))
+ .handle(publishIfPresent())
.flatMap(this::toMail)
.blockOptional()
.orElse(null);
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMailboxesMethod.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMailboxesMethod.java
index f74b243..f5936a9 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMailboxesMethod.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMailboxesMethod.java
@@ -20,6 +20,7 @@
package org.apache.james.jmap.draft.methods;
import static org.apache.james.util.ReactorUtils.context;
+import static org.apache.james.util.ReactorUtils.publishIfPresent;
import java.util.Comparator;
import java.util.List;
@@ -144,7 +145,7 @@ public class GetMailboxesMethod implements Method {
.usingPreloadedMailboxesMetadata(NO_PRELOADED_METADATA)
.build())
.subscribeOn(Schedulers.elastic()))
- .handle((element, sink) -> element.ifPresent(sink::next));
+ .handle(publishIfPresent());
}
private Flux<Mailbox> retrieveAllMailboxes(MailboxSession mailboxSession) {
@@ -156,12 +157,13 @@ public class GetMailboxesMethod implements Method {
return userMailboxesMono.zipWith(quotaLoaderMono)
.flatMapMany(
tuple -> Flux.fromIterable(tuple.getT1())
- .flatMap(mailboxMetaData -> Mono.justOrEmpty(mailboxFactory.builder()
+ .map(mailboxMetaData -> mailboxFactory.builder()
.mailboxMetadata(mailboxMetaData)
.session(mailboxSession)
.usingPreloadedMailboxesMetadata(Optional.of(tuple.getT1()))
.quotaLoader(tuple.getT2())
- .build())));
+ .build())
+ .handle(publishIfPresent()));
}
private Flux<MailboxMetaData> getAllMailboxesMetaData(MailboxSession mailboxSession) {
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/EventsourcingConfigurationManagement.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/EventsourcingConfigurationManagement.java
index ba2c621..6cacd12 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/EventsourcingConfigurationManagement.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/EventsourcingConfigurationManagement.java
@@ -19,6 +19,8 @@
package org.apache.james.queue.rabbitmq.view.cassandra.configuration;
+import static org.apache.james.util.ReactorUtils.publishIfPresent;
+
import javax.inject.Inject;
import org.apache.james.eventsourcing.AggregateId;
@@ -53,9 +55,10 @@ public class EventsourcingConfigurationManagement {
@VisibleForTesting
Mono<CassandraMailQueueViewConfiguration> load() {
return Mono.from(eventStore.getEventsOfAggregate(CONFIGURATION_AGGREGATE_ID))
- .flatMap(history -> Mono.justOrEmpty(ConfigurationAggregate
+ .map(history -> ConfigurationAggregate
.load(CONFIGURATION_AGGREGATE_ID, history)
- .getCurrentConfiguration()));
+ .getCurrentConfiguration())
+ .handle(publishIfPresent());
}
public void registerConfiguration(CassandraMailQueueViewConfiguration newConfiguration) {
diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
index 174101a..2c1812f 100644
--- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
+++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
@@ -20,6 +20,8 @@
package org.apache.james.task.eventsourcing.distributed;
+import static org.apache.james.util.ReactorUtils.publishIfPresent;
+
import java.io.Closeable;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
@@ -91,7 +93,8 @@ public class RabbitMQTerminationSubscriber implements TerminationSubscriber, Sta
listenQueueHandle = listenerReceiver
.consumeAutoAck(queueName)
.subscribeOn(Schedulers.elastic())
- .<Event>handle((delivery, sink) -> toEvent(delivery).ifPresent(sink::next))
+ .map(this::toEvent)
+ .handle(publishIfPresent())
.subscribe(listener::onNext);
}
diff --git a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
index b024c4c..8341a16 100644
--- a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
+++ b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
@@ -18,6 +18,8 @@
****************************************************************/
package org.apache.james.task;
+import static org.apache.james.util.ReactorUtils.publishIfPresent;
+
import java.io.IOException;
import java.time.Duration;
import java.util.Optional;
@@ -91,7 +93,7 @@ public class SerialTaskManagerWorker implements TaskManagerWorker {
return Mono.fromCallable(() -> taskWithId.getTask().details())
.delayElement(pollingInterval, Schedulers.elastic())
.repeat()
- .<TaskExecutionDetails.AdditionalInformation>handle((maybeDetails, sink) -> maybeDetails.ifPresent(sink::next))
+ .handle(publishIfPresent())
.flatMap(information -> Mono.from(listener.updated(taskWithId.getId(), information)).thenReturn(information));
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org