You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ma...@apache.org on 2019/01/28 14:53:19 UTC
[04/12] james-project git commit: JAMES-2630 Port
CassandraMessageMapper to Reactor
JAMES-2630 Port CassandraMessageMapper to Reactor
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/1295a89b
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/1295a89b
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/1295a89b
Branch: refs/heads/master
Commit: 1295a89b3cf63106dadfa7d4e9c859a38c9618eb
Parents: 2133f0b
Author: Matthieu Baechler <ma...@apache.org>
Authored: Sun Nov 25 16:06:06 2018 +0100
Committer: Matthieu Baechler <ma...@apache.org>
Committed: Mon Jan 28 15:30:53 2019 +0100
----------------------------------------------------------------------
.../cassandra/mail/CassandraMessageMapper.java | 190 ++++++++++---------
1 file changed, 100 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/1295a89b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index 59d9a1e..0133f05 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -26,7 +26,6 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
-import java.util.stream.Stream;
import javax.mail.Flags;
import javax.mail.Flags.Flag;
@@ -51,9 +50,7 @@ import org.apache.james.mailbox.store.mail.MessageMapper;
import org.apache.james.mailbox.store.mail.model.Mailbox;
import org.apache.james.mailbox.store.mail.model.MailboxMessage;
import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
-import org.apache.james.util.FluentFutureStream;
import org.apache.james.util.OptionalUtils;
-import org.apache.james.util.streams.JamesCollectors;
import org.apache.james.util.streams.Limit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,6 +58,8 @@ import org.slf4j.LoggerFactory;
import com.github.fge.lambdas.Throwing;
import com.github.steveash.guavate.Guavate;
import com.google.common.collect.ImmutableList;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
public class CassandraMessageMapper implements MessageMapper {
public static final MailboxCounters INITIAL_COUNTERS = MailboxCounters.builder()
@@ -140,33 +139,34 @@ public class CassandraMessageMapper implements MessageMapper {
@Override
public void delete(Mailbox mailbox, MailboxMessage message) {
deleteAsFuture(message)
- .join();
+ .block();
}
- private CompletableFuture<Void> deleteAsFuture(MailboxMessage message) {
+ private Mono<Void> deleteAsFuture(MailboxMessage message) {
ComposedMessageIdWithMetaData composedMessageIdWithMetaData = message.getComposedMessageIdWithMetaData();
return deleteUsingMailboxId(composedMessageIdWithMetaData);
}
- private CompletableFuture<Void> deleteUsingMailboxId(ComposedMessageIdWithMetaData composedMessageIdWithMetaData) {
+ private Mono<Void> deleteUsingMailboxId(ComposedMessageIdWithMetaData composedMessageIdWithMetaData) {
ComposedMessageId composedMessageId = composedMessageIdWithMetaData.getComposedMessageId();
CassandraMessageId messageId = (CassandraMessageId) composedMessageId.getMessageId();
CassandraId mailboxId = (CassandraId) composedMessageId.getMailboxId();
MessageUid uid = composedMessageId.getUid();
- return CompletableFuture.allOf(
- imapUidDAO.delete(messageId, mailboxId),
- messageIdDAO.delete(mailboxId, uid)
- ).thenCompose(voidValue -> indexTableHandler.updateIndexOnDelete(composedMessageIdWithMetaData, mailboxId));
+ return Flux.merge(
+ Mono.fromCompletionStage(imapUidDAO.delete(messageId, mailboxId)),
+ Mono.fromCompletionStage(messageIdDAO.delete(mailboxId, uid)))
+ .concatWith(Mono.fromCompletionStage(indexTableHandler.updateIndexOnDelete(composedMessageIdWithMetaData, mailboxId)))
+ .last();
}
@Override
public Iterator<MailboxMessage> findInMailbox(Mailbox mailbox, MessageRange messageRange, FetchType ftype, int max) throws MailboxException {
CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
return retrieveMessages(retrieveMessageIds(mailboxId, messageRange), ftype, Limit.from(max))
- .join()
.map(SimpleMailboxMessage -> (MailboxMessage) SimpleMailboxMessage)
- .sorted(Comparator.comparing(MailboxMessage::getUid))
+ .sort(Comparator.comparing(MailboxMessage::getUid))
+ .toIterable()
.iterator();
}
@@ -176,12 +176,13 @@ public class CassandraMessageMapper implements MessageMapper {
.collect(Guavate.toImmutableList());
}
- private CompletableFuture<Stream<SimpleMailboxMessage>> retrieveMessages(List<ComposedMessageIdWithMetaData> messageIds, FetchType fetchType, Limit limit) {
- return messageDAO.retrieveMessages(messageIds, fetchType, limit)
- .thenApply(steam -> steam
+ private Flux<SimpleMailboxMessage> retrieveMessages(List<ComposedMessageIdWithMetaData> messageIds, FetchType fetchType, Limit limit) {
+ return Mono.fromCompletionStage(messageDAO.retrieveMessages(messageIds, fetchType, limit))
+ .map(stream -> stream
.filter(CassandraMessageDAO.MessageResult::isFound)
.map(CassandraMessageDAO.MessageResult::message))
- .thenCompose(stream -> attachmentLoader.addAttachmentToMessages(stream, fetchType));
+ .flatMap(stream -> Mono.fromCompletionStage(attachmentLoader.addAttachmentToMessages(stream, fetchType)))
+ .flatMapMany(Flux::fromStream);
}
@Override
@@ -204,32 +205,31 @@ public class CassandraMessageMapper implements MessageMapper {
public Map<MessageUid, MessageMetaData> expungeMarkedForDeletionInMailbox(Mailbox mailbox, MessageRange messageRange) throws MailboxException {
CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
- return deletedMessageDAO.retrieveDeletedMessage(mailboxId, messageRange)
- .join()
- .collect(JamesCollectors.chunker(cassandraConfiguration.getExpungeChunkSize()))
- .map(uidChunk -> expungeUidChunk(mailboxId, uidChunk))
- .flatMap(CompletableFuture::join)
- .collect(Guavate.toImmutableMap(MailboxMessage::getUid, MailboxMessage::metaData));
- }
-
- private CompletableFuture<Stream<SimpleMailboxMessage>> expungeUidChunk(CassandraId mailboxId, Collection<MessageUid> uidChunk) {
- return FluentFutureStream.of(
- uidChunk.stream().map(uid -> retrieveComposedId(mailboxId, uid)),
- FluentFutureStream::unboxOptional)
- .performOnAll(this::deleteUsingMailboxId)
- .map(idWithMetadata -> FluentFutureStream.of(
- messageDAO.retrieveMessages(ImmutableList.of(idWithMetadata), FetchType.Metadata, Limit.unlimited())),
- FluentFutureStream::unboxFluentFuture)
+ return Mono.fromCompletionStage(deletedMessageDAO.retrieveDeletedMessage(mailboxId, messageRange))
+ .flatMapMany(Flux::fromStream)
+ .buffer(cassandraConfiguration.getExpungeChunkSize())
+ .flatMap(uidChunk -> expungeUidChunk(mailboxId, uidChunk))
+ .collect(Guavate.<SimpleMailboxMessage, MessageUid, MessageMetaData>toImmutableMap(MailboxMessage::getUid, MailboxMessage::metaData))
+ .block();
+ }
+
+ private Flux<SimpleMailboxMessage> expungeUidChunk(CassandraId mailboxId, Collection<MessageUid> uidChunk) {
+ return Flux.fromStream(uidChunk.stream())
+ .flatMap(uid -> retrieveComposedId(mailboxId, uid))
+ .doOnNext(this::deleteUsingMailboxId)
+ .flatMap(idWithMetadata ->
+ Mono.fromCompletionStage(messageDAO.retrieveMessages(ImmutableList.of(idWithMetadata), FetchType.Metadata, Limit.unlimited())))
+ .flatMap(Flux::fromStream)
.filter(CassandraMessageDAO.MessageResult::isFound)
.map(CassandraMessageDAO.MessageResult::message)
- .map(pair -> pair.getKey().toMailboxMessage(ImmutableList.of()))
- .completableFuture();
+ .map(pair -> pair.getKey().toMailboxMessage(ImmutableList.of()));
}
- private CompletableFuture<Optional<ComposedMessageIdWithMetaData>> retrieveComposedId(CassandraId mailboxId, MessageUid uid) {
- return messageIdDAO.retrieve(mailboxId, uid)
- .thenApply(optional -> OptionalUtils.executeIfEmpty(optional,
- () -> LOGGER.warn("Could not retrieve message {} {}", mailboxId, uid)));
+ private Mono<ComposedMessageIdWithMetaData> retrieveComposedId(CassandraId mailboxId, MessageUid uid) {
+ return Mono.fromCompletionStage(messageIdDAO.retrieve(mailboxId, uid))
+ .doOnNext(optional -> OptionalUtils.executeIfEmpty(optional,
+ () -> LOGGER.warn("Could not retrieve message {} {}", mailboxId, uid)))
+ .flatMap(Mono::justOrEmpty);
}
@Override
@@ -237,7 +237,7 @@ public class CassandraMessageMapper implements MessageMapper {
ComposedMessageIdWithMetaData composedMessageIdWithMetaData = original.getComposedMessageIdWithMetaData();
MessageMetaData messageMetaData = copy(destinationMailbox, original);
- deleteUsingMailboxId(composedMessageIdWithMetaData).join();
+ deleteUsingMailboxId(composedMessageIdWithMetaData).block();
return messageMetaData;
}
@@ -257,8 +257,8 @@ public class CassandraMessageMapper implements MessageMapper {
CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
save(mailbox, addUidAndModseq(message, mailboxId))
- .thenCompose(voidValue -> indexTableHandler.updateIndexOnAdd(message, mailboxId))
- .join();
+ .map(voidValue -> indexTableHandler.updateIndexOnAdd(message, mailboxId))
+ .block();
return message.metaData();
}
@@ -279,9 +279,10 @@ public class CassandraMessageMapper implements MessageMapper {
public Iterator<UpdatedFlags> updateFlags(Mailbox mailbox, FlagsUpdateCalculator flagUpdateCalculator, MessageRange range) throws MailboxException {
CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
- Stream<ComposedMessageIdWithMetaData> toBeUpdated = messageIdDAO.retrieveMessages(mailboxId, range).join();
+ Flux<ComposedMessageIdWithMetaData> toBeUpdated = Mono.fromCompletionStage(messageIdDAO.retrieveMessages(mailboxId, range))
+ .flatMapMany(Flux::fromStream);
- FlagsUpdateStageResult firstResult = runUpdateStage(mailboxId, toBeUpdated, flagUpdateCalculator);
+ FlagsUpdateStageResult firstResult = runUpdateStage(mailboxId, toBeUpdated, flagUpdateCalculator).block();
FlagsUpdateStageResult finalResult = handleUpdatesStagedRetry(mailboxId, flagUpdateCalculator, firstResult);
if (finalResult.containsFailedResults()) {
LOGGER.error("Can not update following UIDs {} for mailbox {}", finalResult.getFailed(), mailboxId.asUuid());
@@ -294,50 +295,55 @@ public class CassandraMessageMapper implements MessageMapper {
int retryCount = 0;
while (retryCount < cassandraConfiguration.getFlagsUpdateMessageMaxRetry() && globalResult.containsFailedResults()) {
retryCount++;
- FlagsUpdateStageResult stageResult = retryUpdatesStage(mailboxId, flagUpdateCalculator, globalResult.getFailed());
+ FlagsUpdateStageResult stageResult = retryUpdatesStage(mailboxId, flagUpdateCalculator, globalResult.getFailed()).block();
globalResult = globalResult.keepSucceded().merge(stageResult);
}
return globalResult;
}
- private FlagsUpdateStageResult retryUpdatesStage(CassandraId mailboxId, FlagsUpdateCalculator flagsUpdateCalculator, List<MessageUid> failed) {
- Stream<ComposedMessageIdWithMetaData> idsFailed = FluentFutureStream.of(
- failed.stream().map(uid -> messageIdDAO.retrieve(mailboxId, uid)),
- FluentFutureStream::unboxOptional)
- .join();
-
- return runUpdateStage(mailboxId, idsFailed, flagsUpdateCalculator);
+ private Mono<FlagsUpdateStageResult> retryUpdatesStage(CassandraId mailboxId, FlagsUpdateCalculator flagsUpdateCalculator, List<MessageUid> failed) {
+ if (!failed.isEmpty()) {
+ Flux<ComposedMessageIdWithMetaData> toUpdate = Flux.fromIterable(failed)
+ .flatMap(uid ->
+ Mono.fromCompletionStage(messageIdDAO.retrieve(mailboxId, uid))
+ .flatMap(Mono::justOrEmpty)
+ );
+ return runUpdateStage(mailboxId, toUpdate, flagsUpdateCalculator);
+ } else {
+ return Mono.empty();
+ }
}
- private FlagsUpdateStageResult runUpdateStage(CassandraId mailboxId, Stream<ComposedMessageIdWithMetaData> toBeUpdated, FlagsUpdateCalculator flagsUpdateCalculator) {
- Long newModSeq = modSeqProvider.nextModSeq(mailboxId).join().orElseThrow(() -> new RuntimeException("ModSeq generation failed for mailbox " + mailboxId.asUuid()));
-
- return toBeUpdated.collect(JamesCollectors.chunker(cassandraConfiguration.getFlagsUpdateChunkSize()))
- .map(uidChunk -> performUpdatesForChunk(mailboxId, flagsUpdateCalculator, newModSeq, uidChunk))
- .map(CompletableFuture::join)
+ private Mono<FlagsUpdateStageResult> runUpdateStage(CassandraId mailboxId, Flux<ComposedMessageIdWithMetaData> toBeUpdated, FlagsUpdateCalculator flagsUpdateCalculator) {
+ Mono<Long> newModSeq = computeNewModSeq(mailboxId);
+ return toBeUpdated
+ .buffer(cassandraConfiguration.getFlagsUpdateChunkSize())
+ .flatMap(uidChunk -> newModSeq.flatMap(modSeq -> performUpdatesForChunk(mailboxId, flagsUpdateCalculator, modSeq, uidChunk)))
.reduce(FlagsUpdateStageResult.none(), FlagsUpdateStageResult::merge);
}
- private CompletableFuture<FlagsUpdateStageResult> performUpdatesForChunk(CassandraId mailboxId, FlagsUpdateCalculator flagsUpdateCalculator, Long newModSeq, Collection<ComposedMessageIdWithMetaData> uidChunk) {
- Stream<CompletableFuture<FlagsUpdateStageResult>> updateMetaDataFuture =
- uidChunk.stream().map(oldMetadata -> tryFlagsUpdate(flagsUpdateCalculator, newModSeq, oldMetadata));
+ private Mono<Long> computeNewModSeq(CassandraId mailboxId) {
+ return Mono.fromCompletionStage(modSeqProvider.nextModSeq(mailboxId))
+ .map(value -> value.orElseThrow(() -> new RuntimeException("ModSeq generation failed for mailbox " + mailboxId.asUuid())));
+ }
- return FluentFutureStream.of(updateMetaDataFuture)
+ private Mono<FlagsUpdateStageResult> performUpdatesForChunk(CassandraId mailboxId, FlagsUpdateCalculator flagsUpdateCalculator, Long newModSeq, Collection<ComposedMessageIdWithMetaData> uidChunk) {
+ return Flux.fromIterable(uidChunk)
+ .flatMap(oldMetadata -> tryFlagsUpdate(flagsUpdateCalculator, newModSeq, oldMetadata))
.reduce(FlagsUpdateStageResult.none(), FlagsUpdateStageResult::merge)
- .thenCompose(result -> updateIndexesForUpdatesResult(mailboxId, result));
+ .flatMap(result -> updateIndexesForUpdatesResult(mailboxId, result));
}
- private CompletableFuture<FlagsUpdateStageResult> updateIndexesForUpdatesResult(CassandraId mailboxId, FlagsUpdateStageResult result) {
- return FluentFutureStream.of(
- result.getSucceeded().stream()
- .map(Throwing
- .function((UpdatedFlags updatedFlags) -> indexTableHandler.updateIndexOnFlagsUpdate(mailboxId, updatedFlags))
- .fallbackTo(failedindex -> {
- LOGGER.error("Could not update flag indexes for mailboxId {} UID {}. This will lead to inconsistencies across Cassandra tables", mailboxId, failedindex.getUid());
- return CompletableFuture.completedFuture(null);
- })))
- .completableFuture()
- .thenApply(any -> result);
+ private Mono<FlagsUpdateStageResult> updateIndexesForUpdatesResult(CassandraId mailboxId, FlagsUpdateStageResult result) {
+ return Flux.fromIterable(result.getSucceeded())
+ .flatMap(Throwing
+ .function((UpdatedFlags updatedFlags) -> Mono.fromCompletionStage(indexTableHandler.updateIndexOnFlagsUpdate(mailboxId, updatedFlags)))
+ .fallbackTo(failedindex -> {
+ LOGGER.error("Could not update flag indexes for mailboxId {} UID {}. This will lead to inconsistencies across Cassandra tables", mailboxId, failedindex.getUid());
+ return Mono.just(null);
+ }))
+ .collectList()
+ .map(any -> result);
}
@Override
@@ -369,34 +375,36 @@ public class CassandraMessageMapper implements MessageMapper {
CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
insertIds(addUidAndModseq(message, mailboxId), mailboxId)
- .thenCompose(voidValue -> indexTableHandler.updateIndexOnAdd(message, mailboxId))
- .join();
+ .map(voidValue -> indexTableHandler.updateIndexOnAdd(message, mailboxId))
+ .block();
return message.metaData();
}
- private CompletableFuture<Void> save(Mailbox mailbox, MailboxMessage message) throws MailboxException {
+ private Mono<Void> save(Mailbox mailbox, MailboxMessage message) throws MailboxException {
CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
return messageDAO.save(message)
- .thenCompose(aVoid -> insertIds(message, mailboxId));
+ .flatMap(aVoid -> insertIds(message, mailboxId));
}
- private CompletableFuture<Void> insertIds(MailboxMessage message, CassandraId mailboxId) {
+ private Mono<Void> insertIds(MailboxMessage message, CassandraId mailboxId) {
ComposedMessageIdWithMetaData composedMessageIdWithMetaData = ComposedMessageIdWithMetaData.builder()
.composedMessageId(new ComposedMessageId(mailboxId, message.getMessageId(), message.getUid()))
.flags(message.createFlags())
.modSeq(message.getModSeq())
.build();
- return CompletableFuture.allOf(messageIdDAO.insert(composedMessageIdWithMetaData),
- imapUidDAO.insert(composedMessageIdWithMetaData));
+ return Flux.merge(
+ Mono.fromCompletionStage(messageIdDAO.insert(composedMessageIdWithMetaData)),
+ Mono.fromCompletionStage(imapUidDAO.insert(composedMessageIdWithMetaData)))
+ .last();
}
- private CompletableFuture<FlagsUpdateStageResult> tryFlagsUpdate(FlagsUpdateCalculator flagUpdateCalculator, long newModSeq, ComposedMessageIdWithMetaData oldMetaData) {
+ private Mono<FlagsUpdateStageResult> tryFlagsUpdate(FlagsUpdateCalculator flagUpdateCalculator, long newModSeq, ComposedMessageIdWithMetaData oldMetaData) {
Flags oldFlags = oldMetaData.getFlags();
Flags newFlags = flagUpdateCalculator.buildNewFlags(oldFlags);
if (identicalFlags(oldFlags, newFlags)) {
- return CompletableFuture.completedFuture(FlagsUpdateStageResult.success(UpdatedFlags.builder()
+ return Mono.just(FlagsUpdateStageResult.success(UpdatedFlags.builder()
.uid(oldMetaData.getComposedMessageId().getUid())
.modSeq(oldMetaData.getModSeq())
.oldFlags(oldFlags)
@@ -405,7 +413,7 @@ public class CassandraMessageMapper implements MessageMapper {
}
return updateFlags(oldMetaData, newFlags, newModSeq)
- .thenApply(success -> {
+ .map(success -> {
if (success) {
return FlagsUpdateStageResult.success(UpdatedFlags.builder()
.uid(oldMetaData.getComposedMessageId().getUid())
@@ -423,17 +431,19 @@ public class CassandraMessageMapper implements MessageMapper {
return oldFlags.equals(newFlags);
}
- private CompletableFuture<Boolean> updateFlags(ComposedMessageIdWithMetaData oldMetadata, Flags newFlags, long newModSeq) {
+ private Mono<Boolean> updateFlags(ComposedMessageIdWithMetaData oldMetadata, Flags newFlags, long newModSeq) {
ComposedMessageIdWithMetaData newMetadata = ComposedMessageIdWithMetaData.builder()
.composedMessageId(oldMetadata.getComposedMessageId())
.modSeq(newModSeq)
.flags(newFlags)
.build();
- return imapUidDAO.updateMetadata(newMetadata, oldMetadata.getModSeq())
- .thenCompose(success -> Optional.of(success)
- .filter(b -> b)
- .map((Boolean any) -> messageIdDAO.updateMetadata(newMetadata)
- .thenApply(v -> success))
- .orElse(CompletableFuture.completedFuture(success)));
+ return Mono.fromCompletionStage(imapUidDAO.updateMetadata(newMetadata, oldMetadata.getModSeq()))
+ .flatMap(success -> {
+ if (success) {
+ return Mono.fromCompletionStage(messageIdDAO.updateMetadata(newMetadata))
+ .map(ignored -> true);
+ } else {
+ return Mono.just(false);
+ }});
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org