You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ma...@apache.org on 2019/01/28 14:53:19 UTC

[04/12] james-project git commit: JAMES-2630 Port CassandraMessageMapper to Reactor

JAMES-2630 Port CassandraMessageMapper to Reactor


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/1295a89b
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/1295a89b
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/1295a89b

Branch: refs/heads/master
Commit: 1295a89b3cf63106dadfa7d4e9c859a38c9618eb
Parents: 2133f0b
Author: Matthieu Baechler <ma...@apache.org>
Authored: Sun Nov 25 16:06:06 2018 +0100
Committer: Matthieu Baechler <ma...@apache.org>
Committed: Mon Jan 28 15:30:53 2019 +0100

----------------------------------------------------------------------
 .../cassandra/mail/CassandraMessageMapper.java  | 190 ++++++++++---------
 1 file changed, 100 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/1295a89b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index 59d9a1e..0133f05 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -26,7 +26,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.stream.Stream;
 
 import javax.mail.Flags;
 import javax.mail.Flags.Flag;
@@ -51,9 +50,7 @@ import org.apache.james.mailbox.store.mail.MessageMapper;
 import org.apache.james.mailbox.store.mail.model.Mailbox;
 import org.apache.james.mailbox.store.mail.model.MailboxMessage;
 import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
-import org.apache.james.util.FluentFutureStream;
 import org.apache.james.util.OptionalUtils;
-import org.apache.james.util.streams.JamesCollectors;
 import org.apache.james.util.streams.Limit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,6 +58,8 @@ import org.slf4j.LoggerFactory;
 import com.github.fge.lambdas.Throwing;
 import com.github.steveash.guavate.Guavate;
 import com.google.common.collect.ImmutableList;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
 public class CassandraMessageMapper implements MessageMapper {
     public static final MailboxCounters INITIAL_COUNTERS =  MailboxCounters.builder()
@@ -140,33 +139,34 @@ public class CassandraMessageMapper implements MessageMapper {
     @Override
     public void delete(Mailbox mailbox, MailboxMessage message) {
         deleteAsFuture(message)
-            .join();
+            .block();
     }
 
-    private CompletableFuture<Void> deleteAsFuture(MailboxMessage message) {
+    private Mono<Void> deleteAsFuture(MailboxMessage message) {
         ComposedMessageIdWithMetaData composedMessageIdWithMetaData = message.getComposedMessageIdWithMetaData();
 
         return deleteUsingMailboxId(composedMessageIdWithMetaData);
     }
 
-    private CompletableFuture<Void> deleteUsingMailboxId(ComposedMessageIdWithMetaData composedMessageIdWithMetaData) {
+    private Mono<Void> deleteUsingMailboxId(ComposedMessageIdWithMetaData composedMessageIdWithMetaData) {
         ComposedMessageId composedMessageId = composedMessageIdWithMetaData.getComposedMessageId();
         CassandraMessageId messageId = (CassandraMessageId) composedMessageId.getMessageId();
         CassandraId mailboxId = (CassandraId) composedMessageId.getMailboxId();
         MessageUid uid = composedMessageId.getUid();
-        return CompletableFuture.allOf(
-            imapUidDAO.delete(messageId, mailboxId),
-            messageIdDAO.delete(mailboxId, uid)
-        ).thenCompose(voidValue -> indexTableHandler.updateIndexOnDelete(composedMessageIdWithMetaData, mailboxId));
+        return Flux.merge(
+                Mono.fromCompletionStage(imapUidDAO.delete(messageId, mailboxId)),
+                Mono.fromCompletionStage(messageIdDAO.delete(mailboxId, uid)))
+            .concatWith(Mono.fromCompletionStage(indexTableHandler.updateIndexOnDelete(composedMessageIdWithMetaData, mailboxId)))
+            .last();
     }
 
     @Override
     public Iterator<MailboxMessage> findInMailbox(Mailbox mailbox, MessageRange messageRange, FetchType ftype, int max) throws MailboxException {
         CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
         return retrieveMessages(retrieveMessageIds(mailboxId, messageRange), ftype, Limit.from(max))
-            .join()
             .map(SimpleMailboxMessage -> (MailboxMessage) SimpleMailboxMessage)
-            .sorted(Comparator.comparing(MailboxMessage::getUid))
+            .sort(Comparator.comparing(MailboxMessage::getUid))
+            .toIterable()
             .iterator();
     }
 
@@ -176,12 +176,13 @@ public class CassandraMessageMapper implements MessageMapper {
             .collect(Guavate.toImmutableList());
     }
 
-    private CompletableFuture<Stream<SimpleMailboxMessage>> retrieveMessages(List<ComposedMessageIdWithMetaData> messageIds, FetchType fetchType, Limit limit) {
-        return messageDAO.retrieveMessages(messageIds, fetchType, limit)
-            .thenApply(steam -> steam
+    private Flux<SimpleMailboxMessage> retrieveMessages(List<ComposedMessageIdWithMetaData> messageIds, FetchType fetchType, Limit limit) {
+        return Mono.fromCompletionStage(messageDAO.retrieveMessages(messageIds, fetchType, limit))
+            .map(stream -> stream
                 .filter(CassandraMessageDAO.MessageResult::isFound)
                 .map(CassandraMessageDAO.MessageResult::message))
-            .thenCompose(stream -> attachmentLoader.addAttachmentToMessages(stream, fetchType));
+            .flatMap(stream -> Mono.fromCompletionStage(attachmentLoader.addAttachmentToMessages(stream, fetchType)))
+            .flatMapMany(Flux::fromStream);
     }
 
     @Override
@@ -204,32 +205,31 @@ public class CassandraMessageMapper implements MessageMapper {
     public Map<MessageUid, MessageMetaData> expungeMarkedForDeletionInMailbox(Mailbox mailbox, MessageRange messageRange) throws MailboxException {
         CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
 
-        return deletedMessageDAO.retrieveDeletedMessage(mailboxId, messageRange)
-            .join()
-            .collect(JamesCollectors.chunker(cassandraConfiguration.getExpungeChunkSize()))
-            .map(uidChunk -> expungeUidChunk(mailboxId, uidChunk))
-            .flatMap(CompletableFuture::join)
-            .collect(Guavate.toImmutableMap(MailboxMessage::getUid, MailboxMessage::metaData));
-    }
-
-    private CompletableFuture<Stream<SimpleMailboxMessage>> expungeUidChunk(CassandraId mailboxId, Collection<MessageUid> uidChunk) {
-        return FluentFutureStream.of(
-                uidChunk.stream().map(uid -> retrieveComposedId(mailboxId, uid)),
-                FluentFutureStream::unboxOptional)
-            .performOnAll(this::deleteUsingMailboxId)
-            .map(idWithMetadata -> FluentFutureStream.of(
-                messageDAO.retrieveMessages(ImmutableList.of(idWithMetadata), FetchType.Metadata, Limit.unlimited())),
-                FluentFutureStream::unboxFluentFuture)
+        return Mono.fromCompletionStage(deletedMessageDAO.retrieveDeletedMessage(mailboxId, messageRange))
+            .flatMapMany(Flux::fromStream)
+            .buffer(cassandraConfiguration.getExpungeChunkSize())
+            .flatMap(uidChunk -> expungeUidChunk(mailboxId, uidChunk))
+            .collect(Guavate.<SimpleMailboxMessage, MessageUid, MessageMetaData>toImmutableMap(MailboxMessage::getUid, MailboxMessage::metaData))
+            .block();
+    }
+
+    private Flux<SimpleMailboxMessage> expungeUidChunk(CassandraId mailboxId, Collection<MessageUid> uidChunk) {
+        return Flux.fromStream(uidChunk.stream())
+            .flatMap(uid -> retrieveComposedId(mailboxId, uid))
+            .doOnNext(this::deleteUsingMailboxId)
+            .flatMap(idWithMetadata ->
+                Mono.fromCompletionStage(messageDAO.retrieveMessages(ImmutableList.of(idWithMetadata), FetchType.Metadata, Limit.unlimited())))
+            .flatMap(Flux::fromStream)
             .filter(CassandraMessageDAO.MessageResult::isFound)
             .map(CassandraMessageDAO.MessageResult::message)
-            .map(pair -> pair.getKey().toMailboxMessage(ImmutableList.of()))
-            .completableFuture();
+            .map(pair -> pair.getKey().toMailboxMessage(ImmutableList.of()));
     }
 
-    private CompletableFuture<Optional<ComposedMessageIdWithMetaData>> retrieveComposedId(CassandraId mailboxId, MessageUid uid) {
-        return messageIdDAO.retrieve(mailboxId, uid)
-            .thenApply(optional -> OptionalUtils.executeIfEmpty(optional,
-                () -> LOGGER.warn("Could not retrieve message {} {}", mailboxId, uid)));
+    private Mono<ComposedMessageIdWithMetaData> retrieveComposedId(CassandraId mailboxId, MessageUid uid) {
+        return Mono.fromCompletionStage(messageIdDAO.retrieve(mailboxId, uid))
+            .doOnNext(optional -> OptionalUtils.executeIfEmpty(optional,
+                () -> LOGGER.warn("Could not retrieve message {} {}", mailboxId, uid)))
+            .flatMap(Mono::justOrEmpty);
     }
 
     @Override
@@ -237,7 +237,7 @@ public class CassandraMessageMapper implements MessageMapper {
         ComposedMessageIdWithMetaData composedMessageIdWithMetaData = original.getComposedMessageIdWithMetaData();
 
         MessageMetaData messageMetaData = copy(destinationMailbox, original);
-        deleteUsingMailboxId(composedMessageIdWithMetaData).join();
+        deleteUsingMailboxId(composedMessageIdWithMetaData).block();
 
         return messageMetaData;
     }
@@ -257,8 +257,8 @@ public class CassandraMessageMapper implements MessageMapper {
         CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
 
         save(mailbox, addUidAndModseq(message, mailboxId))
-            .thenCompose(voidValue -> indexTableHandler.updateIndexOnAdd(message, mailboxId))
-            .join();
+            .map(voidValue -> indexTableHandler.updateIndexOnAdd(message, mailboxId))
+            .block();
         return message.metaData();
     }
 
@@ -279,9 +279,10 @@ public class CassandraMessageMapper implements MessageMapper {
     public Iterator<UpdatedFlags> updateFlags(Mailbox mailbox, FlagsUpdateCalculator flagUpdateCalculator, MessageRange range) throws MailboxException {
         CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
 
-        Stream<ComposedMessageIdWithMetaData> toBeUpdated = messageIdDAO.retrieveMessages(mailboxId, range).join();
+        Flux<ComposedMessageIdWithMetaData> toBeUpdated = Mono.fromCompletionStage(messageIdDAO.retrieveMessages(mailboxId, range))
+            .flatMapMany(Flux::fromStream);
 
-        FlagsUpdateStageResult firstResult = runUpdateStage(mailboxId, toBeUpdated, flagUpdateCalculator);
+        FlagsUpdateStageResult firstResult = runUpdateStage(mailboxId, toBeUpdated, flagUpdateCalculator).block();
         FlagsUpdateStageResult finalResult = handleUpdatesStagedRetry(mailboxId, flagUpdateCalculator, firstResult);
         if (finalResult.containsFailedResults()) {
             LOGGER.error("Can not update following UIDs {} for mailbox {}", finalResult.getFailed(), mailboxId.asUuid());
@@ -294,50 +295,55 @@ public class CassandraMessageMapper implements MessageMapper {
         int retryCount = 0;
         while (retryCount < cassandraConfiguration.getFlagsUpdateMessageMaxRetry() && globalResult.containsFailedResults()) {
             retryCount++;
-            FlagsUpdateStageResult stageResult = retryUpdatesStage(mailboxId, flagUpdateCalculator, globalResult.getFailed());
+            FlagsUpdateStageResult stageResult = retryUpdatesStage(mailboxId, flagUpdateCalculator, globalResult.getFailed()).block();
             globalResult = globalResult.keepSucceded().merge(stageResult);
         }
         return globalResult;
     }
 
-    private FlagsUpdateStageResult retryUpdatesStage(CassandraId mailboxId, FlagsUpdateCalculator flagsUpdateCalculator, List<MessageUid> failed) {
-        Stream<ComposedMessageIdWithMetaData> idsFailed = FluentFutureStream.of(
-                failed.stream().map(uid -> messageIdDAO.retrieve(mailboxId, uid)),
-                FluentFutureStream::unboxOptional)
-            .join();
-
-        return runUpdateStage(mailboxId, idsFailed, flagsUpdateCalculator);
+    private Mono<FlagsUpdateStageResult> retryUpdatesStage(CassandraId mailboxId, FlagsUpdateCalculator flagsUpdateCalculator, List<MessageUid> failed) {
+        if (!failed.isEmpty()) {
+            Flux<ComposedMessageIdWithMetaData> toUpdate = Flux.fromIterable(failed)
+                .flatMap(uid ->
+                    Mono.fromCompletionStage(messageIdDAO.retrieve(mailboxId, uid))
+                        .flatMap(Mono::justOrEmpty)
+                );
+            return runUpdateStage(mailboxId, toUpdate, flagsUpdateCalculator);
+        } else {
+            return Mono.empty();
+        }
     }
 
-    private FlagsUpdateStageResult runUpdateStage(CassandraId mailboxId, Stream<ComposedMessageIdWithMetaData> toBeUpdated, FlagsUpdateCalculator flagsUpdateCalculator) {
-        Long newModSeq = modSeqProvider.nextModSeq(mailboxId).join().orElseThrow(() -> new RuntimeException("ModSeq generation failed for mailbox " + mailboxId.asUuid()));
-
-        return toBeUpdated.collect(JamesCollectors.chunker(cassandraConfiguration.getFlagsUpdateChunkSize()))
-            .map(uidChunk -> performUpdatesForChunk(mailboxId, flagsUpdateCalculator, newModSeq, uidChunk))
-            .map(CompletableFuture::join)
+    private Mono<FlagsUpdateStageResult> runUpdateStage(CassandraId mailboxId, Flux<ComposedMessageIdWithMetaData> toBeUpdated, FlagsUpdateCalculator flagsUpdateCalculator) {
+        Mono<Long> newModSeq = computeNewModSeq(mailboxId);
+        return toBeUpdated
+            .buffer(cassandraConfiguration.getFlagsUpdateChunkSize())
+            .flatMap(uidChunk -> newModSeq.flatMap(modSeq -> performUpdatesForChunk(mailboxId, flagsUpdateCalculator, modSeq, uidChunk)))
             .reduce(FlagsUpdateStageResult.none(), FlagsUpdateStageResult::merge);
     }
 
-    private CompletableFuture<FlagsUpdateStageResult> performUpdatesForChunk(CassandraId mailboxId, FlagsUpdateCalculator flagsUpdateCalculator, Long newModSeq, Collection<ComposedMessageIdWithMetaData> uidChunk) {
-        Stream<CompletableFuture<FlagsUpdateStageResult>> updateMetaDataFuture =
-            uidChunk.stream().map(oldMetadata -> tryFlagsUpdate(flagsUpdateCalculator, newModSeq, oldMetadata));
+    private Mono<Long> computeNewModSeq(CassandraId mailboxId) {
+        return Mono.fromCompletionStage(modSeqProvider.nextModSeq(mailboxId))
+            .map(value -> value.orElseThrow(() -> new RuntimeException("ModSeq generation failed for mailbox " + mailboxId.asUuid())));
+    }
 
-        return FluentFutureStream.of(updateMetaDataFuture)
+    private Mono<FlagsUpdateStageResult> performUpdatesForChunk(CassandraId mailboxId, FlagsUpdateCalculator flagsUpdateCalculator, Long newModSeq, Collection<ComposedMessageIdWithMetaData> uidChunk) {
+        return Flux.fromIterable(uidChunk)
+            .flatMap(oldMetadata -> tryFlagsUpdate(flagsUpdateCalculator, newModSeq, oldMetadata))
             .reduce(FlagsUpdateStageResult.none(), FlagsUpdateStageResult::merge)
-            .thenCompose(result -> updateIndexesForUpdatesResult(mailboxId, result));
+            .flatMap(result -> updateIndexesForUpdatesResult(mailboxId, result));
     }
 
-    private CompletableFuture<FlagsUpdateStageResult> updateIndexesForUpdatesResult(CassandraId mailboxId, FlagsUpdateStageResult result) {
-        return FluentFutureStream.of(
-            result.getSucceeded().stream()
-                .map(Throwing
-                    .function((UpdatedFlags updatedFlags) -> indexTableHandler.updateIndexOnFlagsUpdate(mailboxId, updatedFlags))
-                    .fallbackTo(failedindex -> {
-                        LOGGER.error("Could not update flag indexes for mailboxId {} UID {}. This will lead to inconsistencies across Cassandra tables", mailboxId, failedindex.getUid());
-                        return CompletableFuture.completedFuture(null);
-                    })))
-            .completableFuture()
-            .thenApply(any -> result);
+    private Mono<FlagsUpdateStageResult> updateIndexesForUpdatesResult(CassandraId mailboxId, FlagsUpdateStageResult result) {
+        return Flux.fromIterable(result.getSucceeded())
+            .flatMap(Throwing
+                .function((UpdatedFlags updatedFlags) -> Mono.fromCompletionStage(indexTableHandler.updateIndexOnFlagsUpdate(mailboxId, updatedFlags)))
+                .fallbackTo(failedindex -> {
+                    LOGGER.error("Could not update flag indexes for mailboxId {} UID {}. This will lead to inconsistencies across Cassandra tables", mailboxId, failedindex.getUid());
+                    return Mono.just(null);
+                }))
+            .collectList()
+            .map(any -> result);
     }
 
     @Override
@@ -369,34 +375,36 @@ public class CassandraMessageMapper implements MessageMapper {
         CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
 
         insertIds(addUidAndModseq(message, mailboxId), mailboxId)
-                .thenCompose(voidValue -> indexTableHandler.updateIndexOnAdd(message, mailboxId))
-                .join();
+                .map(voidValue -> indexTableHandler.updateIndexOnAdd(message, mailboxId))
+                .block();
         return message.metaData();
     }
 
-    private CompletableFuture<Void> save(Mailbox mailbox, MailboxMessage message) throws MailboxException {
+    private Mono<Void> save(Mailbox mailbox, MailboxMessage message) throws MailboxException {
         CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
         return messageDAO.save(message)
-            .thenCompose(aVoid -> insertIds(message, mailboxId));
+            .flatMap(aVoid -> insertIds(message, mailboxId));
     }
 
-    private CompletableFuture<Void> insertIds(MailboxMessage message, CassandraId mailboxId) {
+    private Mono<Void> insertIds(MailboxMessage message, CassandraId mailboxId) {
         ComposedMessageIdWithMetaData composedMessageIdWithMetaData = ComposedMessageIdWithMetaData.builder()
                 .composedMessageId(new ComposedMessageId(mailboxId, message.getMessageId(), message.getUid()))
                 .flags(message.createFlags())
                 .modSeq(message.getModSeq())
                 .build();
-        return CompletableFuture.allOf(messageIdDAO.insert(composedMessageIdWithMetaData),
-                imapUidDAO.insert(composedMessageIdWithMetaData));
+        return Flux.merge(
+            Mono.fromCompletionStage(messageIdDAO.insert(composedMessageIdWithMetaData)),
+            Mono.fromCompletionStage(imapUidDAO.insert(composedMessageIdWithMetaData)))
+            .last();
     }
 
 
-    private CompletableFuture<FlagsUpdateStageResult> tryFlagsUpdate(FlagsUpdateCalculator flagUpdateCalculator, long newModSeq, ComposedMessageIdWithMetaData oldMetaData) {
+    private Mono<FlagsUpdateStageResult> tryFlagsUpdate(FlagsUpdateCalculator flagUpdateCalculator, long newModSeq, ComposedMessageIdWithMetaData oldMetaData) {
         Flags oldFlags = oldMetaData.getFlags();
         Flags newFlags = flagUpdateCalculator.buildNewFlags(oldFlags);
 
         if (identicalFlags(oldFlags, newFlags)) {
-            return CompletableFuture.completedFuture(FlagsUpdateStageResult.success(UpdatedFlags.builder()
+            return Mono.just(FlagsUpdateStageResult.success(UpdatedFlags.builder()
                 .uid(oldMetaData.getComposedMessageId().getUid())
                 .modSeq(oldMetaData.getModSeq())
                 .oldFlags(oldFlags)
@@ -405,7 +413,7 @@ public class CassandraMessageMapper implements MessageMapper {
         }
 
         return updateFlags(oldMetaData, newFlags, newModSeq)
-            .thenApply(success -> {
+            .map(success -> {
                 if (success) {
                     return FlagsUpdateStageResult.success(UpdatedFlags.builder()
                         .uid(oldMetaData.getComposedMessageId().getUid())
@@ -423,17 +431,19 @@ public class CassandraMessageMapper implements MessageMapper {
         return oldFlags.equals(newFlags);
     }
 
-    private CompletableFuture<Boolean> updateFlags(ComposedMessageIdWithMetaData oldMetadata, Flags newFlags, long newModSeq) {
+    private Mono<Boolean> updateFlags(ComposedMessageIdWithMetaData oldMetadata, Flags newFlags, long newModSeq) {
         ComposedMessageIdWithMetaData newMetadata = ComposedMessageIdWithMetaData.builder()
                 .composedMessageId(oldMetadata.getComposedMessageId())
                 .modSeq(newModSeq)
                 .flags(newFlags)
                 .build();
-        return imapUidDAO.updateMetadata(newMetadata, oldMetadata.getModSeq())
-            .thenCompose(success -> Optional.of(success)
-                .filter(b -> b)
-                .map((Boolean any) -> messageIdDAO.updateMetadata(newMetadata)
-                    .thenApply(v -> success))
-                .orElse(CompletableFuture.completedFuture(success)));
+        return Mono.fromCompletionStage(imapUidDAO.updateMetadata(newMetadata, oldMetadata.getModSeq()))
+            .flatMap(success -> {
+                if (success) {
+                    return Mono.fromCompletionStage(messageIdDAO.updateMetadata(newMetadata))
+                        .map(ignored -> true);
+                } else {
+                    return Mono.just(false);
+                }});
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org