You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ma...@apache.org on 2019/01/28 14:53:24 UTC

[09/12] james-project git commit: JAMES-2630 Migrate CassandraAsyncExecutor.executeReturnApplied consumers to Reactor

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
index 23297ee..dab5496 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
@@ -46,7 +46,6 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -76,8 +75,6 @@ import org.apache.james.mailbox.store.mail.MessageMapper.FetchType;
 import org.apache.james.mailbox.store.mail.model.MailboxMessage;
 import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder;
 import org.apache.james.mailbox.store.mail.model.impl.SimpleProperty;
-import org.apache.james.util.CompletableFutureUtil;
-import org.apache.james.util.FluentFutureStream;
 import org.apache.james.util.streams.JamesCollectors;
 import org.apache.james.util.streams.Limit;
 
@@ -95,6 +92,8 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.primitives.Bytes;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
 public class CassandraMessageDAO {
     public static final long DEFAULT_LONG_VALUE = 0L;
@@ -234,42 +233,36 @@ public class CassandraMessageDAO {
             .collect(Guavate.toImmutableList());
     }
 
-    public CompletableFuture<Stream<MessageResult>> retrieveMessages(List<ComposedMessageIdWithMetaData> messageIds, FetchType fetchType, Limit limit) {
-        return CompletableFutureUtil.chainAll(
-                limit.applyOnStream(messageIds.stream().distinct())
-                    .collect(JamesCollectors.chunker(configuration.getMessageReadChunkSize())),
-            ids -> rowToMessages(fetchType, ids))
-            .thenApply(stream -> stream.flatMap(Function.identity()));
+    public Flux<MessageResult> retrieveMessages(List<ComposedMessageIdWithMetaData> messageIds, FetchType fetchType, Limit limit) {
+        return Flux.fromStream(limit.applyOnStream(messageIds.stream().distinct())
+            .collect(JamesCollectors.chunker(configuration.getMessageReadChunkSize())))
+            .flatMap(ids -> rowToMessages(fetchType, ids));
     }
 
-    private CompletableFuture<Stream<MessageResult>> rowToMessages(FetchType fetchType, Collection<ComposedMessageIdWithMetaData> ids) {
-        return FluentFutureStream.of(
-            ids.stream()
-                .map(id -> retrieveRow(id, fetchType)
-                    .thenCompose((ResultSet resultSet) -> message(resultSet, id, fetchType))))
-            .completableFuture();
+    private Flux<MessageResult> rowToMessages(FetchType fetchType, Collection<ComposedMessageIdWithMetaData> ids) {
+        return Flux.fromIterable(ids)
+            .flatMap(id -> retrieveRow(id, fetchType)
+                .flatMap(resultSet -> message(resultSet, id, fetchType)));
     }
 
-    private CompletableFuture<ResultSet> retrieveRow(ComposedMessageIdWithMetaData messageId, FetchType fetchType) {
+    private Mono<ResultSet> retrieveRow(ComposedMessageIdWithMetaData messageId, FetchType fetchType) {
         CassandraMessageId cassandraMessageId = (CassandraMessageId) messageId.getComposedMessageId().getMessageId();
 
-        return cassandraAsyncExecutor.execute(retrieveSelect(fetchType)
+        return cassandraAsyncExecutor.executeReactor(retrieveSelect(fetchType)
             .bind()
             .setUUID(MESSAGE_ID, cassandraMessageId.get()));
     }
 
-    private CompletableFuture<MessageResult>
+    private Mono<MessageResult>
     message(ResultSet rows,ComposedMessageIdWithMetaData messageIdWithMetaData, FetchType fetchType) {
         ComposedMessageId messageId = messageIdWithMetaData.getComposedMessageId();
 
         if (rows.isExhausted()) {
-            return CompletableFuture.completedFuture(notFound(messageIdWithMetaData));
+            return Mono.just(notFound(messageIdWithMetaData));
         }
 
         Row row = rows.one();
-        CompletableFuture<byte[]> contentFuture = buildContentRetriever(fetchType).apply(row);
-
-        return contentFuture.thenApply(content -> {
+        return buildContentRetriever(fetchType, row).map(content -> {
             MessageWithoutAttachment messageWithoutAttachment =
                 new MessageWithoutAttachment(
                     messageId.getMessageId(),
@@ -341,37 +334,37 @@ public class CassandraMessageDAO {
             .setUUID(MESSAGE_ID, messageId.get()));
     }
 
-    private Function<Row, CompletableFuture<byte[]>> buildContentRetriever(FetchType fetchType) {
+    private Mono<byte[]> buildContentRetriever(FetchType fetchType, Row row) {
         switch (fetchType) {
             case Full:
-                return this::getFullContent;
+                return getFullContent(row);
             case Headers:
-                return this::getHeaderContent;
+                return getHeaderContent(row);
             case Body:
-                return row -> getBodyContent(row)
-                    .thenApply(data -> Bytes.concat(new byte[row.getInt(BODY_START_OCTET)], data));
+                return getBodyContent(row)
+                    .map(data -> Bytes.concat(new byte[row.getInt(BODY_START_OCTET)], data));
             case Metadata:
-                return row -> CompletableFuture.completedFuture(EMPTY_BYTE_ARRAY);
+                return Mono.just(EMPTY_BYTE_ARRAY);
             default:
                 throw new RuntimeException("Unknown FetchType " + fetchType);
         }
     }
 
-    private CompletableFuture<byte[]> getFullContent(Row row) {
+    private Mono<byte[]> getFullContent(Row row) {
         return getHeaderContent(row)
-            .thenCombine(getBodyContent(row), Bytes::concat);
+            .zipWith(getBodyContent(row), Bytes::concat);
     }
 
-    private CompletableFuture<byte[]> getBodyContent(Row row) {
+    private Mono<byte[]> getBodyContent(Row row) {
         return getFieldContent(BODY_CONTENT, row);
     }
 
-    private CompletableFuture<byte[]> getHeaderContent(Row row) {
+    private Mono<byte[]> getHeaderContent(Row row) {
         return getFieldContent(HEADER_CONTENT, row);
     }
 
-    private CompletableFuture<byte[]> getFieldContent(String field, Row row) {
-        return blobStore.readBytes(blobIdFactory.from(row.getString(field)));
+    private Mono<byte[]> getFieldContent(String field, Row row) {
+        return Mono.fromFuture(blobStore.readBytes(blobIdFactory.from(row.getString(field))));
     }
 
     public static MessageResult notFound(ComposedMessageIdWithMetaData id) {

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java
index 51365e6..fe93143 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java
@@ -67,6 +67,7 @@ import com.datastax.driver.core.Session;
 import com.datastax.driver.core.querybuilder.QueryBuilder;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableSet;
+import reactor.core.publisher.Mono;
 
 public class CassandraMessageIdDAO {
 
@@ -169,8 +170,8 @@ public class CassandraMessageIdDAO {
                 .and(lte(IMAP_UID, bindMarker(IMAP_UID_LTE))));
     }
 
-    public CompletableFuture<Void> delete(CassandraId mailboxId, MessageUid uid) {
-        return cassandraAsyncExecutor.executeVoid(delete.bind()
+    public Mono<Void> delete(CassandraId mailboxId, MessageUid uid) {
+        return cassandraAsyncExecutor.executeVoidReactor(delete.bind()
                 .setUUID(MAILBOX_ID, mailboxId.asUuid())
                 .setLong(IMAP_UID, uid.asLong()));
     }
@@ -193,10 +194,10 @@ public class CassandraMessageIdDAO {
                 .setSet(USER_FLAGS, ImmutableSet.copyOf(flags.getUserFlags())));
     }
 
-    public CompletableFuture<Void> updateMetadata(ComposedMessageIdWithMetaData composedMessageIdWithMetaData) {
+    public Mono<Void> updateMetadata(ComposedMessageIdWithMetaData composedMessageIdWithMetaData) {
         ComposedMessageId composedMessageId = composedMessageIdWithMetaData.getComposedMessageId();
         Flags flags = composedMessageIdWithMetaData.getFlags();
-        return cassandraAsyncExecutor.executeVoid(update.bind()
+        return cassandraAsyncExecutor.executeVoidReactor(update.bind()
                 .setLong(MOD_SEQ, composedMessageIdWithMetaData.getModSeq())
                 .setBool(ANSWERED, flags.contains(Flag.ANSWERED))
                 .setBool(DELETED, flags.contains(Flag.DELETED))

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
index 880ac16..0a7c0bc 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
@@ -24,14 +24,14 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
 import java.util.stream.Stream;
 
 import javax.mail.Flags;
 
 import org.apache.commons.lang3.tuple.Pair;
+
 import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
-import org.apache.james.backends.cassandra.utils.FunctionRunnerWithRetry;
-import org.apache.james.backends.cassandra.utils.LightweightTransactionException;
 import org.apache.james.mailbox.MailboxSession;
 import org.apache.james.mailbox.MessageManager;
 import org.apache.james.mailbox.cassandra.ids.CassandraId;
@@ -49,7 +49,8 @@ import org.apache.james.mailbox.store.mail.MessageMapper.FetchType;
 import org.apache.james.mailbox.store.mail.ModSeqProvider;
 import org.apache.james.mailbox.store.mail.model.MailboxMessage;
 import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
-import org.apache.james.util.FluentFutureStream;
+import org.apache.james.util.FunctionalUtils;
+import org.apache.james.util.ReactorUtils;
 import org.apache.james.util.streams.JamesCollectors;
 import org.apache.james.util.streams.Limit;
 import org.slf4j.Logger;
@@ -57,6 +58,7 @@ import org.slf4j.LoggerFactory;
 
 import com.github.steveash.guavate.Guavate;
 import com.google.common.collect.Multimap;
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public class CassandraMessageIdMapper implements MessageIdMapper {
@@ -99,47 +101,37 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
     }
 
     private Stream<SimpleMailboxMessage> findAsStream(Collection<MessageId> messageIds, FetchType fetchType) {
-        return FluentFutureStream.of(
-                messageIds.stream()
-                    .map(messageId -> imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.empty())),
-                FluentFutureStream::unboxStream)
-            .collect(Guavate.toImmutableList())
-            .thenCompose(composedMessageIds -> messageDAO.retrieveMessages(composedMessageIds, fetchType, Limit.unlimited()))
-            .thenApply(stream -> stream
+        return Flux.fromStream(messageIds.stream())
+                .flatMap(messageId -> imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.empty()))
+                .collectList()
+                .flatMapMany(composedMessageIds -> messageDAO.retrieveMessages(composedMessageIds, fetchType, Limit.unlimited()))
                 .filter(CassandraMessageDAO.MessageResult::isFound)
-                .map(CassandraMessageDAO.MessageResult::message))
-            .thenCompose(stream -> attachmentLoader.addAttachmentToMessages(stream, fetchType))
-            .thenCompose(this::filterMessagesWithExistingMailbox)
-            .join()
-            .sorted(Comparator.comparing(MailboxMessage::getUid));
-    }
-
-    private CompletableFuture<Stream<SimpleMailboxMessage>> filterMessagesWithExistingMailbox(Stream<SimpleMailboxMessage> stream) {
-        return FluentFutureStream.of(stream.map(this::keepMessageIfMailboxExists), FluentFutureStream::unboxOptional)
-            .completableFuture();
+                .map(CassandraMessageDAO.MessageResult::message)
+                .flatMap(messageRepresentation -> attachmentLoader.addAttachmentToMessage(messageRepresentation, fetchType))
+                .flatMap(this::keepMessageIfMailboxExists)
+                .collectSortedList(Comparator.comparing(MailboxMessage::getUid))
+                .block()
+                .stream();
     }
 
-    private CompletableFuture<Optional<SimpleMailboxMessage>> keepMessageIfMailboxExists(SimpleMailboxMessage message) {
+    private Mono<SimpleMailboxMessage> keepMessageIfMailboxExists(SimpleMailboxMessage message) {
         CassandraId cassandraId = (CassandraId) message.getMailboxId();
         return mailboxDAO.retrieveMailbox(cassandraId)
-            .thenApply(optional -> {
-                if (!optional.isPresent()) {
+            .map(any -> message)
+            .switchIfEmpty(ReactorUtils.executeAndEmpty(() -> {
                     LOGGER.info("Mailbox {} have been deleted but message {} is still attached to it.",
                         cassandraId,
                         message.getMailboxId());
-                    return Optional.empty();
-                }
-
-                return Optional.of(message);
-            });
+                }));
     }
 
     @Override
     public List<MailboxId> findMailboxes(MessageId messageId) {
-        return imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.empty()).join()
+        return imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.empty())
             .map(ComposedMessageIdWithMetaData::getComposedMessageId)
             .map(ComposedMessageId::getMailboxId)
-            .collect(Guavate.toImmutableList());
+            .collectList()
+            .block();
     }
 
     @Override
@@ -182,114 +174,115 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
 
     @Override
     public void delete(MessageId messageId, Collection<MailboxId> mailboxIds) {
-        deleteAsFuture(messageId, mailboxIds).join();
+        deleteAsMono(messageId, mailboxIds).block();
     }
 
-    public CompletableFuture<Void> deleteAsFuture(MessageId messageId, Collection<MailboxId> mailboxIds) {
+    public Mono<Void> deleteAsMono(MessageId messageId, Collection<MailboxId> mailboxIds) {
         CassandraMessageId cassandraMessageId = (CassandraMessageId) messageId;
-        return mailboxIds.stream()
-            .map(mailboxId -> retrieveAndDeleteIndices(cassandraMessageId, Optional.of((CassandraId) mailboxId)))
-            .reduce((f1, f2) -> CompletableFuture.allOf(f1, f2))
-            .orElse(CompletableFuture.completedFuture(null));
+        return Flux.fromStream(mailboxIds.stream())
+            .flatMap(mailboxId -> retrieveAndDeleteIndices(cassandraMessageId, Optional.of((CassandraId) mailboxId)))
+            .then();
     }
 
     @Override
     public void delete(Multimap<MessageId, MailboxId> ids) {
-        ids.asMap()
-            .entrySet()
-            .stream()
-            .collect(JamesCollectors.chunker(cassandraConfiguration.getExpungeChunkSize()))
-            .forEach(chunk ->
-                FluentFutureStream.of(chunk.stream()
-                    .map(entry -> deleteAsFuture(entry.getKey(), entry.getValue())))
-                    .join());
+        Flux.fromIterable(ids.asMap()
+            .entrySet())
+            .limitRate(cassandraConfiguration.getExpungeChunkSize())
+            .flatMap(entry -> deleteAsMono(entry.getKey(), entry.getValue()))
+            .then()
+            .block();
     }
 
-    private CompletableFuture<Void> retrieveAndDeleteIndices(CassandraMessageId messageId, Optional<CassandraId> mailboxId) {
+    private Mono<Void> retrieveAndDeleteIndices(CassandraMessageId messageId, Optional<CassandraId> mailboxId) {
         return imapUidDAO.retrieve(messageId, mailboxId)
-            .thenCompose(composedMessageIds -> composedMessageIds
-                .map(this::deleteIds)
-                .reduce((f1, f2) -> CompletableFuture.allOf(f1, f2))
-                .orElse(CompletableFuture.completedFuture(null)));
+            .flatMap(this::deleteIds)
+            .then();
     }
 
     @Override
     public void delete(MessageId messageId) {
         CassandraMessageId cassandraMessageId = (CassandraMessageId) messageId;
         retrieveAndDeleteIndices(cassandraMessageId, Optional.empty())
-            .join();
+            .block();
     }
 
-    private CompletableFuture<Void> deleteIds(ComposedMessageIdWithMetaData metaData) {
+    private Mono<Void> deleteIds(ComposedMessageIdWithMetaData metaData) {
         CassandraMessageId messageId = (CassandraMessageId) metaData.getComposedMessageId().getMessageId();
         CassandraId mailboxId = (CassandraId) metaData.getComposedMessageId().getMailboxId();
-        return CompletableFuture.allOf(
-            imapUidDAO.delete(messageId, mailboxId),
-            messageIdDAO.delete(mailboxId, metaData.getComposedMessageId().getUid()))
-            .thenCompose(voidValue -> indexTableHandler.updateIndexOnDelete(metaData, mailboxId).toFuture());
+        return Flux.merge(
+                imapUidDAO.delete(messageId, mailboxId),
+                messageIdDAO.delete(mailboxId, metaData.getComposedMessageId().getUid()))
+            .then(indexTableHandler.updateIndexOnDelete(metaData, mailboxId));
     }
 
     @Override
     public Map<MailboxId, UpdatedFlags> setFlags(MessageId messageId, List<MailboxId> mailboxIds, Flags newState, MessageManager.FlagsUpdateMode updateMode) throws MailboxException {
-        return mailboxIds.stream()
+        return Flux.fromIterable(mailboxIds)
             .distinct()
             .map(mailboxId -> (CassandraId) mailboxId)
-            .filter(mailboxId -> imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.of(mailboxId))
-                .join()
-                .findAny()
-                .isPresent())
+            .filter(mailboxId -> haveMetaData(messageId, mailboxId))
             .flatMap(mailboxId -> flagsUpdateWithRetry(newState, updateMode, mailboxId, messageId))
-            .map(this::updateCounts)
-            .map(Mono::block)
-            .collect(Guavate.toImmutableMap(Pair::getLeft, Pair::getRight));
+            .flatMap(this::updateCounts)
+            .collect(Guavate.toImmutableMap(Pair::getLeft, Pair::getRight))
+            .block();
     }
 
-    private Stream<Pair<MailboxId, UpdatedFlags>> flagsUpdateWithRetry(Flags newState, MessageManager.FlagsUpdateMode updateMode, MailboxId mailboxId, MessageId messageId) {
+    private boolean haveMetaData(MessageId messageId, CassandraId mailboxId) {
+        return imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.of(mailboxId))
+            .hasElements()
+            .block();
+    }
+
+    private Mono<Pair<MailboxId, UpdatedFlags>> flagsUpdateWithRetry(Flags newState, MessageManager.FlagsUpdateMode updateMode, MailboxId mailboxId, MessageId messageId) {
         try {
-            Pair<Flags, ComposedMessageIdWithMetaData> pair = new FunctionRunnerWithRetry(cassandraConfiguration.getFlagsUpdateMessageIdMaxRetry())
-                .executeAndRetrieveObject(() -> tryFlagsUpdate(newState, updateMode, mailboxId, messageId));
-            ComposedMessageIdWithMetaData composedMessageIdWithMetaData = pair.getRight();
-            Flags oldFlags = pair.getLeft();
-            return Stream.of(Pair.of(composedMessageIdWithMetaData.getComposedMessageId().getMailboxId(),
-                    UpdatedFlags.builder()
-                        .uid(composedMessageIdWithMetaData.getComposedMessageId().getUid())
-                        .modSeq(composedMessageIdWithMetaData.getModSeq())
-                        .oldFlags(oldFlags)
-                        .newFlags(composedMessageIdWithMetaData.getFlags())
-                        .build()));
-        } catch (LightweightTransactionException e) {
-            throw new RuntimeException(e);
+            return Mono.defer(() -> tryFlagsUpdate(newState, updateMode, mailboxId, messageId))
+                .single()
+                .retry(cassandraConfiguration.getFlagsUpdateMessageIdMaxRetry())
+                .map(pair -> buildUpdatedFlags(pair.getRight(), pair.getLeft()));
         } catch (MailboxDeleteDuringUpdateException e) {
             LOGGER.info("Mailbox {} was deleted during flag update", mailboxId);
-            return Stream.of();
+            return Mono.empty();
         }
     }
 
+    private Pair<MailboxId, UpdatedFlags> buildUpdatedFlags(ComposedMessageIdWithMetaData composedMessageIdWithMetaData, Flags oldFlags) {
+        return Pair.of(composedMessageIdWithMetaData.getComposedMessageId().getMailboxId(),
+                UpdatedFlags.builder()
+                    .uid(composedMessageIdWithMetaData.getComposedMessageId().getUid())
+                    .modSeq(composedMessageIdWithMetaData.getModSeq())
+                    .oldFlags(oldFlags)
+                    .newFlags(composedMessageIdWithMetaData.getFlags())
+                    .build());
+    }
+
     private Mono<Pair<MailboxId, UpdatedFlags>> updateCounts(Pair<MailboxId, UpdatedFlags> pair) {
         CassandraId cassandraId = (CassandraId) pair.getLeft();
         return indexTableHandler.updateIndexOnFlagsUpdate(cassandraId, pair.getRight())
-            .then(Mono.just(pair));
+            .thenReturn(pair);
     }
 
-    private Optional<Pair<Flags, ComposedMessageIdWithMetaData>> tryFlagsUpdate(Flags newState, MessageManager.FlagsUpdateMode updateMode, MailboxId mailboxId, MessageId messageId) {
+    private Mono<Pair<Flags, ComposedMessageIdWithMetaData>> tryFlagsUpdate(Flags newState, MessageManager.FlagsUpdateMode updateMode, MailboxId mailboxId, MessageId messageId) {
         try {
             return updateFlags(mailboxId, messageId, newState, updateMode);
         } catch (MailboxException e) {
             LOGGER.error("Error while updating flags on mailbox: {}", mailboxId);
-            return Optional.empty();
+            return Mono.empty();
         }
     }
 
-    private Optional<Pair<Flags, ComposedMessageIdWithMetaData>> updateFlags(MailboxId mailboxId, MessageId messageId, Flags newState, MessageManager.FlagsUpdateMode updateMode) throws MailboxException {
+    private Mono<Pair<Flags, ComposedMessageIdWithMetaData>> updateFlags(MailboxId mailboxId, MessageId messageId, Flags newState, MessageManager.FlagsUpdateMode updateMode) throws MailboxException {
         CassandraId cassandraId = (CassandraId) mailboxId;
         ComposedMessageIdWithMetaData oldComposedId = imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.of(cassandraId))
-            .join()
-            .findFirst()
+            .next()
+            .blockOptional()
             .orElseThrow(MailboxDeleteDuringUpdateException::new);
+
         Flags newFlags = new FlagsUpdateCalculator(newState, updateMode).buildNewFlags(oldComposedId.getFlags());
         if (identicalFlags(oldComposedId, newFlags)) {
-            return Optional.of(Pair.of(oldComposedId.getFlags(), oldComposedId));
+            return Mono.just(Pair.of(oldComposedId.getFlags(), oldComposedId));
         }
+
         ComposedMessageIdWithMetaData newComposedId = new ComposedMessageIdWithMetaData(
             oldComposedId.getComposedMessageId(),
             newFlags,
@@ -302,15 +295,10 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
         return oldComposedId.getFlags().equals(newFlags);
     }
 
-    private Optional<Pair<Flags, ComposedMessageIdWithMetaData>> updateFlags(ComposedMessageIdWithMetaData oldComposedId, ComposedMessageIdWithMetaData newComposedId) {
+    private Mono<Pair<Flags, ComposedMessageIdWithMetaData>> updateFlags(ComposedMessageIdWithMetaData oldComposedId, ComposedMessageIdWithMetaData newComposedId) {
         return imapUidDAO.updateMetadata(newComposedId, oldComposedId.getModSeq())
-            .thenCompose(updateSuccess -> Optional.of(updateSuccess)
-                .filter(b -> b)
-                .map((Boolean any) -> messageIdDAO.updateMetadata(newComposedId).thenApply(v -> updateSuccess))
-                .orElse(CompletableFuture.completedFuture(updateSuccess)))
-            .thenApply(success -> Optional.of(success)
-                .filter(b -> b)
-                .map(any -> Pair.of(oldComposedId.getFlags(), newComposedId)))
-            .join();
+            .filter(FunctionalUtils.toPredicate(Function.identity()))
+            .flatMap(any -> messageIdDAO.updateMetadata(newComposedId)
+                .thenReturn(Pair.of(oldComposedId.getFlags(), newComposedId)));
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAO.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAO.java
index 169e32d..be5686a 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAO.java
@@ -42,7 +42,6 @@ import static org.apache.james.mailbox.cassandra.table.MessageIdToImapUid.TABLE_
 
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.stream.Stream;
 
 import javax.inject.Inject;
 import javax.mail.Flags;
@@ -64,6 +63,8 @@ import com.datastax.driver.core.Session;
 import com.datastax.driver.core.querybuilder.QueryBuilder;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableSet;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
 public class CassandraMessageIdToImapUidDAO {
 
@@ -148,8 +149,8 @@ public class CassandraMessageIdToImapUidDAO {
                 .and(eq(MAILBOX_ID, bindMarker(MAILBOX_ID))));
     }
 
-    public CompletableFuture<Void> delete(CassandraMessageId messageId, CassandraId mailboxId) {
-        return cassandraAsyncExecutor.executeVoid(delete.bind()
+    public Mono<Void> delete(CassandraMessageId messageId, CassandraId mailboxId) {
+        return cassandraAsyncExecutor.executeVoidReactor(delete.bind()
                 .setUUID(MESSAGE_ID, messageId.get())
                 .setUUID(MAILBOX_ID, mailboxId.asUuid()));
     }
@@ -172,7 +173,7 @@ public class CassandraMessageIdToImapUidDAO {
                 .setSet(USER_FLAGS, ImmutableSet.copyOf(flags.getUserFlags())));
     }
 
-    public CompletableFuture<Boolean> updateMetadata(ComposedMessageIdWithMetaData composedMessageIdWithMetaData, long oldModSeq) {
+    public Mono<Boolean> updateMetadata(ComposedMessageIdWithMetaData composedMessageIdWithMetaData, long oldModSeq) {
         ComposedMessageId composedMessageId = composedMessageIdWithMetaData.getComposedMessageId();
         Flags flags = composedMessageIdWithMetaData.getFlags();
         return cassandraAsyncExecutor.executeReturnApplied(update.bind()
@@ -191,10 +192,10 @@ public class CassandraMessageIdToImapUidDAO {
                 .setLong(MOD_SEQ_CONDITION, oldModSeq));
     }
 
-    public CompletableFuture<Stream<ComposedMessageIdWithMetaData>> retrieve(CassandraMessageId messageId, Optional<CassandraId> mailboxId) {
+    public Flux<ComposedMessageIdWithMetaData> retrieve(CassandraMessageId messageId, Optional<CassandraId> mailboxId) {
         return selectStatement(messageId, mailboxId)
-                .thenApply(resultSet -> cassandraUtils.convertToStream(resultSet)
-                        .map(this::toComposedMessageIdWithMetadata));
+                .flatMapMany(cassandraUtils::convertToFlux)
+                .map(this::toComposedMessageIdWithMetadata);
     }
 
     private ComposedMessageIdWithMetaData toComposedMessageIdWithMetadata(Row row) {
@@ -208,12 +209,12 @@ public class CassandraMessageIdToImapUidDAO {
                 .build();
     }
 
-    private CompletableFuture<ResultSet> selectStatement(CassandraMessageId messageId, Optional<CassandraId> mailboxId) {
+    private Mono<ResultSet> selectStatement(CassandraMessageId messageId, Optional<CassandraId> mailboxId) {
         return mailboxId
-            .map(cassandraId -> cassandraAsyncExecutor.execute(select.bind()
+            .map(cassandraId -> cassandraAsyncExecutor.executeReactor(select.bind()
                 .setUUID(MESSAGE_ID, messageId.get())
                 .setUUID(MAILBOX_ID, cassandraId.asUuid())))
-            .orElseGet(() -> cassandraAsyncExecutor.execute(selectAll.bind()
+            .orElseGet(() -> cassandraAsyncExecutor.executeReactor(selectAll.bind()
                 .setUUID(MESSAGE_ID, messageId.get())));
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index f2b468f..d2c4006 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -24,7 +24,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
 
 import javax.mail.Flags;
 import javax.mail.Flags.Flag;
@@ -50,6 +50,7 @@ import org.apache.james.mailbox.store.mail.model.Mailbox;
 import org.apache.james.mailbox.store.mail.model.MailboxMessage;
 import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
 import org.apache.james.util.OptionalUtils;
+import org.apache.james.util.ReactorUtils;
 import org.apache.james.util.streams.Limit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -153,8 +154,8 @@ public class CassandraMessageMapper implements MessageMapper {
         CassandraId mailboxId = (CassandraId) composedMessageId.getMailboxId();
         MessageUid uid = composedMessageId.getUid();
         return Flux.merge(
-                Mono.fromCompletionStage(imapUidDAO.delete(messageId, mailboxId)),
-                Mono.fromCompletionStage(messageIdDAO.delete(mailboxId, uid)))
+                imapUidDAO.delete(messageId, mailboxId),
+                messageIdDAO.delete(mailboxId, uid))
             .then(indexTableHandler.updateIndexOnDelete(composedMessageIdWithMetaData, mailboxId));
     }
 
@@ -162,9 +163,9 @@ public class CassandraMessageMapper implements MessageMapper {
     public Iterator<MailboxMessage> findInMailbox(Mailbox mailbox, MessageRange messageRange, FetchType ftype, int max) throws MailboxException {
         CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
         return retrieveMessages(retrieveMessageIds(mailboxId, messageRange), ftype, Limit.from(max))
-            .map(SimpleMailboxMessage -> (MailboxMessage) SimpleMailboxMessage)
+            .map(simpleMailboxMessage -> (MailboxMessage) simpleMailboxMessage)
             .collectSortedList(Comparator.comparing(MailboxMessage::getUid))
-            .flatMapMany(Flux::fromIterable)
+            .flatMapIterable(Function.identity())
             .toIterable()
             .iterator();
     }
@@ -176,12 +177,10 @@ public class CassandraMessageMapper implements MessageMapper {
     }
 
     private Flux<SimpleMailboxMessage> retrieveMessages(List<ComposedMessageIdWithMetaData> messageIds, FetchType fetchType, Limit limit) {
-        return Mono.fromCompletionStage(messageDAO.retrieveMessages(messageIds, fetchType, limit))
-            .map(stream -> stream
-                .filter(CassandraMessageDAO.MessageResult::isFound)
-                .map(CassandraMessageDAO.MessageResult::message))
-            .flatMap(stream -> Mono.fromCompletionStage(attachmentLoader.addAttachmentToMessages(stream, fetchType)))
-            .flatMapMany(Flux::fromStream);
+        return messageDAO.retrieveMessages(messageIds, fetchType, limit)
+            .filter(CassandraMessageDAO.MessageResult::isFound)
+            .map(CassandraMessageDAO.MessageResult::message)
+            .flatMap(stream -> attachmentLoader.addAttachmentToMessage(stream, fetchType));
     }
 
     @Override
@@ -196,9 +195,7 @@ public class CassandraMessageMapper implements MessageMapper {
     public MessageUid findFirstUnseenMessageUid(Mailbox mailbox) throws MailboxException {
         CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
         return firstUnseenDAO.retrieveFirstUnread(mailboxId)
-                .map(Optional::of)
-                .defaultIfEmpty(Optional.empty())
-                .block()
+                .blockOptional()
                 .orElse(null);
     }
 
@@ -216,9 +213,8 @@ public class CassandraMessageMapper implements MessageMapper {
     private Flux<SimpleMailboxMessage> expungeOne(CassandraId mailboxId, MessageUid messageUid) {
         return retrieveComposedId(mailboxId, messageUid)
             .flatMap(idWithMetadata -> deleteUsingMailboxId(idWithMetadata).thenReturn(idWithMetadata))
-            .flatMap(idWithMetadata ->
-                Mono.fromCompletionStage(messageDAO.retrieveMessages(ImmutableList.of(idWithMetadata), FetchType.Metadata, Limit.unlimited())))
-            .flatMapMany(Flux::fromStream)
+            .flatMapMany(idWithMetadata ->
+                messageDAO.retrieveMessages(ImmutableList.of(idWithMetadata), FetchType.Metadata, Limit.unlimited()))
             .filter(CassandraMessageDAO.MessageResult::isFound)
             .map(CassandraMessageDAO.MessageResult::message)
             .map(pair -> pair.getKey().toMailboxMessage(ImmutableList.of()));
@@ -262,13 +258,13 @@ public class CassandraMessageMapper implements MessageMapper {
     }
 
     private MailboxMessage addUidAndModseq(MailboxMessage message, CassandraId mailboxId) throws MailboxException {
-        CompletableFuture<Optional<MessageUid>> uidFuture = uidProvider.nextUid(mailboxId);
-        CompletableFuture<Optional<Long>> modseqFuture = modSeqProvider.nextModSeq(mailboxId);
-        CompletableFuture.allOf(uidFuture, modseqFuture).join();
+        final Mono<MessageUid> messageUidMono = uidProvider.nextUid(mailboxId).cache();
+        final Mono<Long> nextModSeqMono = modSeqProvider.nextModSeq(mailboxId).cache();
+        Flux.merge(messageUidMono, nextModSeqMono).then();
 
-        message.setUid(uidFuture.join()
+        message.setUid(messageUidMono.blockOptional()
             .orElseThrow(() -> new MailboxException("Can not find a UID to save " + message.getMessageId() + " in " + mailboxId)));
-        message.setModSeq(modseqFuture.join()
+        message.setModSeq(nextModSeqMono.blockOptional()
             .orElseThrow(() -> new MailboxException("Can not find a MODSEQ to save " + message.getMessageId() + " in " + mailboxId)));
 
         return message;
@@ -323,8 +319,8 @@ public class CassandraMessageMapper implements MessageMapper {
     }
 
     private Mono<Long> computeNewModSeq(CassandraId mailboxId) {
-        return Mono.fromCompletionStage(modSeqProvider.nextModSeq(mailboxId))
-            .map(value -> value.orElseThrow(() -> new RuntimeException("ModSeq generation failed for mailbox " + mailboxId.asUuid())));
+        return modSeqProvider.nextModSeq(mailboxId)
+            .switchIfEmpty(ReactorUtils.executeAndEmpty(() -> new RuntimeException("ModSeq generation failed for mailbox " + mailboxId.asUuid())));
     }
 
     private Mono<FlagsUpdateStageResult> updateIndexesForUpdatesResult(CassandraId mailboxId, FlagsUpdateStageResult result) {
@@ -429,11 +425,10 @@ public class CassandraMessageMapper implements MessageMapper {
                 .modSeq(newModSeq)
                 .flags(newFlags)
                 .build();
-        return Mono.fromCompletionStage(imapUidDAO.updateMetadata(newMetadata, oldMetadata.getModSeq()))
+        return imapUidDAO.updateMetadata(newMetadata, oldMetadata.getModSeq())
             .flatMap(success -> {
                 if (success) {
-                    return Mono.fromCompletionStage(messageIdDAO.updateMetadata(newMetadata))
-                        .then(Mono.just(true));
+                    return messageIdDAO.updateMetadata(newMetadata).thenReturn(true);
                 } else {
                     return Mono.just(false);
                 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
index 63e045b..6f798c5 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
@@ -29,30 +29,33 @@ import static org.apache.james.mailbox.cassandra.table.CassandraMessageModseqTab
 import static org.apache.james.mailbox.cassandra.table.CassandraMessageModseqTable.NEXT_MODSEQ;
 import static org.apache.james.mailbox.cassandra.table.CassandraMessageModseqTable.TABLE_NAME;
 
+import java.time.Duration;
 import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.function.Function;
 import java.util.function.Supplier;
 
 import javax.inject.Inject;
 
 import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
 import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
-import org.apache.james.backends.cassandra.utils.FunctionRunnerWithRetry;
 import org.apache.james.mailbox.MailboxSession;
 import org.apache.james.mailbox.cassandra.ids.CassandraId;
 import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.model.MailboxId;
 import org.apache.james.mailbox.store.mail.ModSeqProvider;
 import org.apache.james.mailbox.store.mail.model.Mailbox;
+import org.apache.james.util.FunctionalUtils;
 
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Session;
-import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
+import reactor.core.publisher.Mono;
 
 public class CassandraModSeqProvider implements ModSeqProvider {
 
     public static final String MOD_SEQ_CONDITION = "modSeqCondition";
+    private final long maxModSeqRetries;
 
     public static class ExceptionRelay extends RuntimeException {
         private final MailboxException underlying;
@@ -81,7 +84,6 @@ public class CassandraModSeqProvider implements ModSeqProvider {
     private static final ModSeq FIRST_MODSEQ = new ModSeq(0);
 
     private final CassandraAsyncExecutor cassandraAsyncExecutor;
-    private final FunctionRunnerWithRetry runner;
     private final PreparedStatement select;
     private final PreparedStatement update;
     private final PreparedStatement insert;
@@ -89,17 +91,12 @@ public class CassandraModSeqProvider implements ModSeqProvider {
     @Inject
     public CassandraModSeqProvider(Session session, CassandraConfiguration cassandraConfiguration) {
         this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session);
-        this.runner = new FunctionRunnerWithRetry(cassandraConfiguration.getModSeqMaxRetry());
+        this.maxModSeqRetries = cassandraConfiguration.getModSeqMaxRetry();
         this.insert = prepareInsert(session);
         this.update = prepareUpdate(session);
         this.select = prepareSelect(session);
     }
 
-    @VisibleForTesting
-    public CassandraModSeqProvider(Session session) {
-        this(session, CassandraConfiguration.DEFAULT_CONFIGURATION);
-    }
-
     private PreparedStatement prepareInsert(Session session) {
         return session.prepare(insertInto(TABLE_NAME)
             .value(NEXT_MODSEQ, bindMarker(NEXT_MODSEQ))
@@ -125,81 +122,79 @@ public class CassandraModSeqProvider implements ModSeqProvider {
     @Override
     public long nextModSeq(MailboxSession mailboxSession, Mailbox mailbox) throws MailboxException {
         CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
-        return nextModSeq(mailboxId).join()
+        return nextModSeq(mailboxId)
+            .blockOptional()
             .orElseThrow(() -> new MailboxException("Can not retrieve modseq for " + mailboxId));
     }
 
     @Override
     public long nextModSeq(MailboxSession session, MailboxId mailboxId) throws MailboxException {
         return nextModSeq((CassandraId) mailboxId)
-            .join()
+            .blockOptional()
             .orElseThrow(() -> new MailboxException("Can not retrieve modseq for " + mailboxId));
     }
 
     @Override
     public long highestModSeq(MailboxSession mailboxSession, Mailbox mailbox) throws MailboxException {
-        return unbox(() -> findHighestModSeq((CassandraId) mailbox.getMailboxId()).join().getValue());
+        return highestModSeq(mailboxSession, mailbox.getMailboxId());
     }
 
     @Override
     public long highestModSeq(MailboxSession mailboxSession, MailboxId mailboxId) throws MailboxException {
-        return unbox(() -> findHighestModSeq((CassandraId) mailboxId).join().getValue());
+        return unbox(() -> findHighestModSeq((CassandraId) mailboxId).block().orElse(FIRST_MODSEQ).getValue());
     }
 
-    private CompletableFuture<ModSeq> findHighestModSeq(CassandraId mailboxId) {
-        return cassandraAsyncExecutor.executeSingleRow(
+    private Mono<Optional<ModSeq>> findHighestModSeq(CassandraId mailboxId) {
+        return cassandraAsyncExecutor.executeSingleRowOptionalReactor(
             select.bind()
                 .setUUID(MAILBOX_ID, mailboxId.asUuid()))
-            .thenApply(optional -> optional.map(row -> new ModSeq(row.getLong(NEXT_MODSEQ)))
-                .orElse(FIRST_MODSEQ));
+            .map(maybeRow -> maybeRow.map(row -> new ModSeq(row.getLong(NEXT_MODSEQ))));
     }
 
-    private CompletableFuture<Optional<ModSeq>> tryInsertModSeq(CassandraId mailboxId, ModSeq modSeq) {
+    private Mono<ModSeq> tryInsertModSeq(CassandraId mailboxId, ModSeq modSeq) {
         ModSeq nextModSeq = modSeq.next();
         return cassandraAsyncExecutor.executeReturnApplied(
             insert.bind()
                 .setUUID(MAILBOX_ID, mailboxId.asUuid())
                 .setLong(NEXT_MODSEQ, nextModSeq.getValue()))
-            .thenApply(success -> successToModSeq(nextModSeq, success));
+            .flatMap(success -> successToModSeq(nextModSeq, success));
     }
 
-    private CompletableFuture<Optional<ModSeq>> tryUpdateModSeq(CassandraId mailboxId, ModSeq modSeq) {
+    private Mono<ModSeq> tryUpdateModSeq(CassandraId mailboxId, ModSeq modSeq) {
         ModSeq nextModSeq = modSeq.next();
         return cassandraAsyncExecutor.executeReturnApplied(
             update.bind()
                 .setUUID(MAILBOX_ID, mailboxId.asUuid())
                 .setLong(NEXT_MODSEQ, nextModSeq.getValue())
                 .setLong(MOD_SEQ_CONDITION, modSeq.getValue()))
-            .thenApply(success -> successToModSeq(nextModSeq, success));
+            .flatMap(success -> successToModSeq(nextModSeq, success));
     }
 
-    private Optional<ModSeq> successToModSeq(ModSeq modSeq, Boolean success) {
-        if (success) {
-            return Optional.of(modSeq);
-        }
-        return Optional.empty();
+    private Mono<ModSeq> successToModSeq(ModSeq modSeq, Boolean success) {
+        return Mono.just(success)
+            .filter(FunctionalUtils.toPredicate(Function.identity()))
+            .map(any -> modSeq);
     }
-    
-    public CompletableFuture<Optional<Long>> nextModSeq(CassandraId mailboxId) {
+
+    public Mono<Long> nextModSeq(CassandraId mailboxId) {
         return findHighestModSeq(mailboxId)
-            .thenCompose(modSeq -> {
-                if (modSeq.isFirst()) {
-                    return tryInsertModSeq(mailboxId, FIRST_MODSEQ);
-                }
-                return tryUpdateModSeq(mailboxId, modSeq);
-            }).thenCompose(firstInsert -> {
-                    if (firstInsert.isPresent()) {
-                        return CompletableFuture.completedFuture(firstInsert);
-                    }
-                    return handleRetries(mailboxId);
-                })
-            .thenApply(optional -> optional.map(ModSeq::getValue));
-    }
-
-    private CompletableFuture<Optional<ModSeq>> handleRetries(CassandraId mailboxId) {
-        return runner.executeAsyncAndRetrieveObject(
-            () -> findHighestModSeq(mailboxId)
-                .thenCompose(newModSeq -> tryUpdateModSeq(mailboxId, newModSeq)));
+            .flatMap(maybeHighestModSeq -> maybeHighestModSeq
+                        .map(highestModSeq -> tryUpdateModSeq(mailboxId, highestModSeq))
+                        .orElseGet(() -> tryInsertModSeq(mailboxId, FIRST_MODSEQ)))
+            .switchIfEmpty(handleRetries(mailboxId))
+            .map(ModSeq::getValue);
+    }
+
+    private Mono<ModSeq> handleRetries(CassandraId mailboxId) {
+        return tryFindThenUpdateOnce(mailboxId)
+            .single()
+            .retryBackoff(maxModSeqRetries, Duration.ofMillis(2));
+    }
+
+    private Mono<ModSeq> tryFindThenUpdateOnce(CassandraId mailboxId) {
+        return Mono.defer(() -> findHighestModSeq(mailboxId)
+            .flatMap(Mono::justOrEmpty)
+            .flatMap(highestModSeq -> tryUpdateModSeq(mailboxId, highestModSeq)));
     }
 
     private static class ModSeq {
@@ -216,9 +211,12 @@ public class CassandraModSeqProvider implements ModSeqProvider {
         public long getValue() {
             return value;
         }
-        
-        public boolean isFirst() {
-            return value == FIRST_MODSEQ.value;
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(this)
+                    .add("value", value)
+                    .toString();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java
index b402bf8..bd3baac 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java
@@ -30,13 +30,11 @@ import static org.apache.james.mailbox.cassandra.table.CassandraMessageUidTable.
 import static org.apache.james.mailbox.cassandra.table.CassandraMessageUidTable.TABLE_NAME;
 
 import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
 
 import javax.inject.Inject;
 
 import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
 import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
-import org.apache.james.backends.cassandra.utils.FunctionRunnerWithRetry;
 import org.apache.james.mailbox.MailboxSession;
 import org.apache.james.mailbox.MessageUid;
 import org.apache.james.mailbox.cassandra.ids.CassandraId;
@@ -47,13 +45,13 @@ import org.apache.james.mailbox.store.mail.model.Mailbox;
 
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Session;
-import com.google.common.annotations.VisibleForTesting;
+import reactor.core.publisher.Mono;
 
 public class CassandraUidProvider implements UidProvider {
     private static final String CONDITION = "Condition";
 
     private final CassandraAsyncExecutor executor;
-    private final FunctionRunnerWithRetry runner;
+    private final long maxUidRetries;
     private final PreparedStatement insertStatement;
     private final PreparedStatement updateStatement;
     private final PreparedStatement selectStatement;
@@ -61,17 +59,12 @@ public class CassandraUidProvider implements UidProvider {
     @Inject
     public CassandraUidProvider(Session session, CassandraConfiguration cassandraConfiguration) {
         this.executor = new CassandraAsyncExecutor(session);
-        this.runner = new FunctionRunnerWithRetry(cassandraConfiguration.getUidMaxRetry());
+        this.maxUidRetries = cassandraConfiguration.getUidMaxRetry();
         this.selectStatement = prepareSelect(session);
         this.updateStatement = prepareUpdate(session);
         this.insertStatement = prepareInsert(session);
     }
 
-    @VisibleForTesting
-    public CassandraUidProvider(Session session) {
-        this(session, CassandraConfiguration.DEFAULT_CONFIGURATION);
-    }
-
     private PreparedStatement prepareSelect(Session session) {
         return session.prepare(select(NEXT_UID)
             .from(TABLE_NAME)
@@ -101,66 +94,56 @@ public class CassandraUidProvider implements UidProvider {
     public MessageUid nextUid(MailboxSession session, MailboxId mailboxId) throws MailboxException {
         CassandraId cassandraId = (CassandraId) mailboxId;
         return nextUid(cassandraId)
-        .join()
-        .orElseThrow(() -> new MailboxException("Error during Uid update"));
+            .blockOptional()
+            .orElseThrow(() -> new MailboxException("Error during Uid update"));
     }
 
-    public CompletableFuture<Optional<MessageUid>> nextUid(CassandraId cassandraId) {
-        return findHighestUid(cassandraId)
-            .thenCompose(optional -> {
-                if (optional.isPresent()) {
-                    return tryUpdateUid(cassandraId, optional);
-                }
-                return tryInsert(cassandraId);
-            })
-            .thenCompose(optional -> {
-                if (optional.isPresent()) {
-                    return CompletableFuture.completedFuture(optional);
-                }
-                return runner.executeAsyncAndRetrieveObject(
-                    () -> findHighestUid(cassandraId)
-                        .thenCompose(readUid -> tryUpdateUid(cassandraId, readUid)));
-            });
+    public Mono<MessageUid> nextUid(CassandraId cassandraId) {
+        Mono<MessageUid> updateUid = findHighestUid(cassandraId)
+            .flatMap(messageUid -> tryUpdateUid(cassandraId, messageUid));
+
+        return updateUid
+            .switchIfEmpty(tryInsert(cassandraId))
+            .switchIfEmpty(updateUid)
+            .single()
+            .retry(maxUidRetries);
     }
 
     @Override
     public Optional<MessageUid> lastUid(MailboxSession mailboxSession, Mailbox mailbox) throws MailboxException {
-        return findHighestUid((CassandraId) mailbox.getMailboxId()).join();
+        return findHighestUid((CassandraId) mailbox.getMailboxId())
+                .blockOptional();
     }
 
-    private CompletableFuture<Optional<MessageUid>> findHighestUid(CassandraId mailboxId) {
-        return executor.executeSingleRow(
+    private Mono<MessageUid> findHighestUid(CassandraId mailboxId) {
+        return Mono.defer(() -> executor.executeSingleRowReactor(
             selectStatement.bind()
                 .setUUID(MAILBOX_ID, mailboxId.asUuid()))
-            .thenApply(optional -> optional.map(row -> MessageUid.of(row.getLong(NEXT_UID))));
+            .map(row -> MessageUid.of(row.getLong(NEXT_UID))));
     }
 
-    private CompletableFuture<Optional<MessageUid>> tryUpdateUid(CassandraId mailboxId, Optional<MessageUid> uid) {
-        if (uid.isPresent()) {
-            MessageUid nextUid = uid.get().next();
-            return executor.executeReturnApplied(
+    private Mono<MessageUid> tryUpdateUid(CassandraId mailboxId, MessageUid uid) {
+        MessageUid nextUid = uid.next();
+        return Mono.defer(() -> executor.executeReturnApplied(
                 updateStatement.bind()
-                    .setUUID(MAILBOX_ID, mailboxId.asUuid())
-                    .setLong(CONDITION, uid.get().asLong())
-                    .setLong(NEXT_UID, nextUid.asLong()))
-                .thenApply(success -> successToUid(nextUid, success));
-        } else {
-            return tryInsert(mailboxId);
-        }
+                        .setUUID(MAILBOX_ID, mailboxId.asUuid())
+                        .setLong(CONDITION, uid.asLong())
+                        .setLong(NEXT_UID, nextUid.asLong()))
+                .flatMap(success -> successToUid(nextUid, success)));
     }
 
-    private CompletableFuture<Optional<MessageUid>> tryInsert(CassandraId mailboxId) {
-        return executor.executeReturnApplied(
+    private Mono<MessageUid> tryInsert(CassandraId mailboxId) {
+        return Mono.defer(() -> executor.executeReturnApplied(
             insertStatement.bind()
                 .setUUID(MAILBOX_ID, mailboxId.asUuid()))
-            .thenApply(success -> successToUid(MessageUid.MIN_VALUE, success));
+            .flatMap(success -> successToUid(MessageUid.MIN_VALUE, success)));
     }
 
-    private Optional<MessageUid> successToUid(MessageUid uid, Boolean success) {
+    private Mono<MessageUid> successToUid(MessageUid uid, Boolean success) {
         if (success) {
-            return Optional.of(uid);
+            return Mono.just(uid);
         }
-        return Optional.empty();
+        return Mono.empty();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUserMailboxRightsDAO.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUserMailboxRightsDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUserMailboxRightsDAO.java
index 7f93150..4bfeafb 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUserMailboxRightsDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUserMailboxRightsDAO.java
@@ -27,10 +27,8 @@ import static org.apache.james.mailbox.cassandra.table.CassandraUserMailboxRight
 import static org.apache.james.mailbox.cassandra.table.CassandraUserMailboxRightsTable.TABLE_NAME;
 import static org.apache.james.mailbox.cassandra.table.CassandraUserMailboxRightsTable.USER_NAME;
 
-import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import javax.inject.Inject;
@@ -44,13 +42,14 @@ import org.apache.james.mailbox.cassandra.ids.CassandraId;
 import org.apache.james.mailbox.exception.UnsupportedRightException;
 import org.apache.james.mailbox.model.MailboxACL;
 import org.apache.james.mailbox.model.MailboxACL.Rfc4314Rights;
-import org.apache.james.util.FluentFutureStream;
 
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.querybuilder.QueryBuilder;
 import com.github.fge.lambdas.Throwing;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
 public class CassandraUserMailboxRightsDAO {
 
@@ -98,31 +97,32 @@ public class CassandraUserMailboxRightsDAO {
             .where(eq(USER_NAME, bindMarker(USER_NAME))));
     }
 
-    public CompletableFuture<Void> update(CassandraId cassandraId, ACLDiff aclDiff) {
+    public Mono<Void> update(CassandraId cassandraId, ACLDiff aclDiff) {
         PositiveUserACLDiff userACLDiff = new PositiveUserACLDiff(aclDiff);
-        return CompletableFuture.allOf(
+        return Flux.merge(
             addAll(cassandraId, userACLDiff.addedEntries()),
             removeAll(cassandraId, userACLDiff.removedEntries()),
-            addAll(cassandraId, userACLDiff.changedEntries()));
+            addAll(cassandraId, userACLDiff.changedEntries()))
+            .then();
     }
 
-    private CompletableFuture<Stream<Void>> removeAll(CassandraId cassandraId, Stream<MailboxACL.Entry> removedEntries) {
-        return FluentFutureStream.of(removedEntries
-            .map(entry -> cassandraAsyncExecutor.executeVoid(
+    private Mono<Void> removeAll(CassandraId cassandraId, Stream<MailboxACL.Entry> removedEntries) {
+        return Flux.fromStream(removedEntries)
+            .flatMap(entry -> cassandraAsyncExecutor.executeVoidReactor(
                 delete.bind()
                     .setString(USER_NAME, entry.getKey().getName())
-                    .setUUID(MAILBOX_ID, cassandraId.asUuid()))))
-        .completableFuture();
+                    .setUUID(MAILBOX_ID, cassandraId.asUuid())))
+            .then();
     }
 
-    private CompletableFuture<Stream<Void>> addAll(CassandraId cassandraId, Stream<MailboxACL.Entry> addedEntries) {
-        return FluentFutureStream.of(addedEntries
-            .map(entry -> cassandraAsyncExecutor.executeVoid(
+    private Mono<Void> addAll(CassandraId cassandraId, Stream<MailboxACL.Entry> addedEntries) {
+        return Flux.fromStream(addedEntries)
+            .flatMap(entry -> cassandraAsyncExecutor.executeVoidReactor(
                 insert.bind()
                     .setString(USER_NAME, entry.getKey().getName())
                     .setUUID(MAILBOX_ID, cassandraId.asUuid())
-                    .setString(RIGHTS, entry.getValue().serialize()))))
-        .completableFuture();
+                    .setString(RIGHTS, entry.getValue().serialize())))
+            .then();
     }
 
     public CompletableFuture<Optional<Rfc4314Rights>> retrieve(String userName, CassandraId mailboxId) {
@@ -134,14 +134,12 @@ public class CassandraUserMailboxRightsDAO {
                 rowOptional.map(Throwing.function(row -> Rfc4314Rights.fromSerializedRfc4314Rights(row.getString(RIGHTS)))));
     }
 
-    public CompletableFuture<Map<CassandraId, Rfc4314Rights>> listRightsForUser(String userName) {
-        return cassandraAsyncExecutor.execute(
+    public Flux<Pair<CassandraId, Rfc4314Rights>> listRightsForUser(String userName) {
+        return cassandraAsyncExecutor.executeReactor(
             selectUser.bind()
                 .setString(USER_NAME, userName))
-            .thenApply(cassandraUtils::convertToStream)
-            .thenApply(row ->
-                row.map(Throwing.function(this::toPair))
-                    .collect(Collectors.toMap(Pair::getLeft, Pair::getRight)));
+            .flatMapMany(cassandraUtils::convertToFlux)
+            .map(Throwing.function(this::toPair));
     }
 
     private Pair<CassandraId, Rfc4314Rights> toPair(Row row) throws UnsupportedRightException {

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentMessageIdCreation.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentMessageIdCreation.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentMessageIdCreation.java
index d43c8a6..2ab5e56 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentMessageIdCreation.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentMessageIdCreation.java
@@ -25,10 +25,13 @@ import org.apache.james.backends.cassandra.migration.Migration;
 import org.apache.james.mailbox.cassandra.mail.CassandraAttachmentMessageIdDAO;
 import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAO;
 import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAO.MessageIdAttachmentIds;
+import org.apache.james.mailbox.model.MessageId;
 import org.apache.james.task.Task;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import reactor.core.publisher.Flux;
+
 public class AttachmentMessageIdCreation implements Migration {
     private static final Logger LOGGER = LoggerFactory.getLogger(AttachmentMessageIdCreation.class);
     private final CassandraMessageDAO cassandraMessageDAO;
@@ -56,10 +59,11 @@ public class AttachmentMessageIdCreation implements Migration {
 
     private Result createIndex(MessageIdAttachmentIds message) {
         try {
-            message.getAttachmentId()
-                .forEach(attachmentId -> attachmentMessageIdDAO
-                    .storeAttachmentForMessageId(attachmentId, message.getMessageId())
-                    .join());
+            MessageId messageId = message.getMessageId();
+            Flux.fromIterable(message.getAttachmentId())
+                .flatMap(attachmentId -> attachmentMessageIdDAO.storeAttachmentForMessageId(attachmentId, messageId))
+                .then()
+                .block();
             return Result.COMPLETED;
         } catch (Exception e) {
             LOGGER.error("Error while creation attachmentId -> messageIds index", e);

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2Migration.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2Migration.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2Migration.java
index 148d395..bebf83d 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2Migration.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2Migration.java
@@ -61,7 +61,7 @@ public class AttachmentV2Migration implements Migration {
         try {
             blobStore.save(attachment.getBytes())
                 .thenApply(blobId -> CassandraAttachmentDAOV2.from(attachment, blobId))
-                .thenCompose(attachmentDAOV2::storeAttachment)
+                .thenCompose(daoAttachement -> attachmentDAOV2.storeAttachment(daoAttachement).toFuture())
                 .thenCompose(any -> attachmentDAOV1.deleteAttachment(attachment.getAttachmentId()))
                 .join();
             return Result.COMPLETED;

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2Migration.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2Migration.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2Migration.java
index 70f4a8e..c4f3fd2 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2Migration.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2Migration.java
@@ -80,9 +80,9 @@ public class MailboxPathV2Migration implements Migration {
 
     public Result migrate(CassandraIdAndPath idAndPath) {
         try {
-            daoV2.save(idAndPath.getMailboxPath(), idAndPath.getCassandraId()).join();
+            daoV2.save(idAndPath.getMailboxPath(), idAndPath.getCassandraId()).block();
 
-            daoV1.delete(idAndPath.getMailboxPath()).join();
+            daoV1.delete(idAndPath.getMailboxPath()).block();
             return Result.COMPLETED;
         } catch (Exception e) {
             LOGGER.error("Error while performing migration for path {}", idAndPath.getMailboxPath(), e);

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/MailboxMergingTaskRunner.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/MailboxMergingTaskRunner.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/MailboxMergingTaskRunner.java
index 1a4a254..0032ef6 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/MailboxMergingTaskRunner.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/MailboxMergingTaskRunner.java
@@ -63,7 +63,7 @@ public class MailboxMergingTaskRunner {
         return moveMessages(oldMailboxId, newMailboxId, mailboxSession, context)
             .onComplete(
                 () -> mergeRights(oldMailboxId, newMailboxId),
-                () -> mailboxDAO.delete(oldMailboxId).join());
+                () -> mailboxDAO.delete(oldMailboxId).block());
     }
 
     private Task.Result moveMessages(CassandraId oldMailboxId, CassandraId newMailboxId, MailboxSession session, MailboxMergingTask.Context context) {
@@ -88,12 +88,12 @@ public class MailboxMergingTaskRunner {
 
     private void mergeRights(CassandraId oldMailboxId, CassandraId newMailboxId) {
         try {
-            MailboxACL oldAcl = cassandraACLMapper.getACL(oldMailboxId).join();
-            MailboxACL newAcl = cassandraACLMapper.getACL(newMailboxId).join();
+            MailboxACL oldAcl = cassandraACLMapper.getACL(oldMailboxId).block();
+            MailboxACL newAcl = cassandraACLMapper.getACL(newMailboxId).block();
             MailboxACL finalAcl = newAcl.union(oldAcl);
 
             cassandraACLMapper.setACL(newMailboxId, finalAcl);
-            rightsDAO.update(oldMailboxId, ACLDiff.computeDiff(oldAcl, MailboxACL.EMPTY)).join();
+            rightsDAO.update(oldMailboxId, ACLDiff.computeDiff(oldAcl, MailboxACL.EMPTY)).block();
         } catch (MailboxException e) {
             throw new RuntimeException(e);
         }

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/AttachmentLoaderTest.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/AttachmentLoaderTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/AttachmentLoaderTest.java
index d4b073d..4161e8d 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/AttachmentLoaderTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/AttachmentLoaderTest.java
@@ -23,21 +23,17 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.util.Collection;
-import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
 
 import org.apache.james.mailbox.model.Attachment;
 import org.apache.james.mailbox.model.AttachmentId;
 import org.apache.james.mailbox.model.Cid;
 import org.apache.james.mailbox.model.MessageAttachment;
-import org.assertj.core.data.MapEntry;
 import org.junit.Before;
 import org.junit.Test;
 
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
+import reactor.core.publisher.Mono;
 
 public class AttachmentLoaderTest {
 
@@ -53,15 +49,15 @@ public class AttachmentLoaderTest {
     @Test
     public void getAttachmentsShouldWorkWithDuplicatedAttachments() {
         AttachmentId attachmentId = AttachmentId.from("1");
-        Set<AttachmentId> attachmentIds = ImmutableSet.of(attachmentId);
 
         Attachment attachment = Attachment.builder()
             .attachmentId(attachmentId)
             .bytes("attachment".getBytes())
             .type("type")
             .build();
-        when(attachmentMapper.getAttachmentsAsFuture(attachmentIds))
-            .thenReturn(CompletableFuture.completedFuture(ImmutableList.of(attachment)));
+
+        when(attachmentMapper.getAttachmentsAsMono(attachmentId))
+            .thenReturn(Mono.just(attachment));
 
         Optional<String> name = Optional.of("name1");
         Optional<Cid> cid = Optional.empty();
@@ -69,7 +65,7 @@ public class AttachmentLoaderTest {
         MessageAttachmentRepresentation attachmentRepresentation = new MessageAttachmentRepresentation(attachmentId, name, cid, isInlined);
 
         Collection<MessageAttachment> attachments = testee.getAttachments(ImmutableList.of(attachmentRepresentation, attachmentRepresentation))
-            .join();
+            .block();
 
         MessageAttachment expectedAttachment = new MessageAttachment(attachment, name, cid, isInlined);
         assertThat(attachments).hasSize(2)
@@ -79,15 +75,15 @@ public class AttachmentLoaderTest {
     @Test
     public void getAttachmentsShouldWorkWithDuplicatedIds() {
         AttachmentId attachmentId = AttachmentId.from("1");
-        Set<AttachmentId> attachmentIds = ImmutableSet.of(attachmentId);
 
         Attachment attachment = Attachment.builder()
             .attachmentId(attachmentId)
             .bytes("attachment".getBytes())
             .type("type")
             .build();
-        when(attachmentMapper.getAttachmentsAsFuture(attachmentIds))
-            .thenReturn(CompletableFuture.completedFuture(ImmutableList.of(attachment)));
+
+        when(attachmentMapper.getAttachmentsAsMono(attachmentId))
+                .thenReturn(Mono.just(attachment));
 
         Optional<String> name1 = Optional.of("name1");
         Optional<String> name2 = Optional.of("name2");
@@ -97,7 +93,7 @@ public class AttachmentLoaderTest {
         MessageAttachmentRepresentation attachmentRepresentation2 = new MessageAttachmentRepresentation(attachmentId, name2, cid, isInlined);
 
         Collection<MessageAttachment> attachments = testee.getAttachments(ImmutableList.of(attachmentRepresentation1, attachmentRepresentation2))
-            .join();
+            .block();
 
         assertThat(attachments).hasSize(2)
             .containsOnly(new MessageAttachment(attachment, name1, cid, isInlined),
@@ -108,7 +104,6 @@ public class AttachmentLoaderTest {
     public void getAttachmentsShouldReturnMultipleAttachmentWhenSeveralAttachmentsRepresentation() {
         AttachmentId attachmentId1 = AttachmentId.from("1");
         AttachmentId attachmentId2 = AttachmentId.from("2");
-        Set<AttachmentId> attachmentIds = ImmutableSet.of(attachmentId1, attachmentId2);
 
         Attachment attachment1 = Attachment.builder()
             .attachmentId(attachmentId1)
@@ -120,8 +115,11 @@ public class AttachmentLoaderTest {
             .bytes("attachment2".getBytes())
             .type("type")
             .build();
-        when(attachmentMapper.getAttachmentsAsFuture(attachmentIds))
-            .thenReturn(CompletableFuture.completedFuture(ImmutableList.of(attachment1, attachment2)));
+
+        when(attachmentMapper.getAttachmentsAsMono(attachmentId1))
+                .thenReturn(Mono.just(attachment1));
+        when(attachmentMapper.getAttachmentsAsMono(attachmentId2))
+                .thenReturn(Mono.just(attachment2));
 
         Optional<String> name1 = Optional.of("name1");
         Optional<String> name2 = Optional.of("name2");
@@ -131,7 +129,7 @@ public class AttachmentLoaderTest {
         MessageAttachmentRepresentation attachmentRepresentation2 = new MessageAttachmentRepresentation(attachmentId2, name2, cid, isInlined);
 
         Collection<MessageAttachment> attachments = testee.getAttachments(ImmutableList.of(attachmentRepresentation1, attachmentRepresentation2))
-            .join();
+            .block();
 
         assertThat(attachments).hasSize(2)
             .containsOnly(new MessageAttachment(attachment1, name1, cid, isInlined),
@@ -141,61 +139,19 @@ public class AttachmentLoaderTest {
     @Test
     public void getAttachmentsShouldReturnEmptyByDefault() {
         AttachmentId attachmentId = AttachmentId.from("1");
-        Set<AttachmentId> attachmentIds = ImmutableSet.of(attachmentId);
 
         Attachment attachment = Attachment.builder()
             .attachmentId(attachmentId)
             .bytes("attachment".getBytes())
             .type("type")
             .build();
-        when(attachmentMapper.getAttachmentsAsFuture(attachmentIds))
-            .thenReturn(CompletableFuture.completedFuture(ImmutableList.of(attachment)));
+
+        when(attachmentMapper.getAttachmentsAsMono(attachmentId))
+                .thenReturn(Mono.just(attachment));
 
         Collection<MessageAttachment> attachments = testee.getAttachments(ImmutableList.of())
-            .join();
+            .block();
 
         assertThat(attachments).isEmpty();
     }
-
-    @Test
-    public void attachmentsByIdShouldReturnMapWhenExist() {
-        AttachmentId attachmentId = AttachmentId.from("1");
-        AttachmentId attachmentId2 = AttachmentId.from("2");
-        Set<AttachmentId> attachmentIds = ImmutableSet.of(attachmentId, attachmentId2);
-
-        Attachment attachment = Attachment.builder()
-                .attachmentId(attachmentId)
-                .bytes("attachment".getBytes())
-                .type("type")
-                .build();
-        Attachment attachment2 = Attachment.builder()
-                .attachmentId(attachmentId2)
-                .bytes("attachment2".getBytes())
-                .type("type")
-                .build();
-        when(attachmentMapper.getAttachmentsAsFuture(attachmentIds))
-            .thenReturn(CompletableFuture.completedFuture(ImmutableList.of(attachment, attachment2)));
-
-        Map<AttachmentId, Attachment> attachmentsById = testee.attachmentsById(attachmentIds)
-            .join();
-
-        assertThat(attachmentsById).hasSize(2)
-                .containsOnly(MapEntry.entry(attachmentId, attachment), MapEntry.entry(attachmentId2, attachment2));
-    }
-
-    @Test
-    public void attachmentsByIdShouldReturnEmptyMapWhenAttachmentsDontExists() {
-        AttachmentId attachmentId = AttachmentId.from("1");
-        AttachmentId attachmentId2 = AttachmentId.from("2");
-        Set<AttachmentId> attachmentIds = ImmutableSet.of(attachmentId, attachmentId2);
-
-        when(attachmentMapper.getAttachmentsAsFuture(attachmentIds))
-                .thenReturn(CompletableFuture.completedFuture(ImmutableList.of()));
-
-        Map<AttachmentId, Attachment> attachmentsById = testee.attachmentsById(attachmentIds)
-            .join();
-
-        assertThat(attachmentsById).hasSize(0);
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapperTest.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapperTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapperTest.java
index d9f7c89..8b94cdb 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapperTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapperTest.java
@@ -78,12 +78,12 @@ class CassandraACLMapperTest {
                 .value(CassandraACLTable.ACL, "{\"entries\":{\"bob\":invalid}}")
                 .value(CassandraACLTable.VERSION, 1));
 
-        assertThat(cassandraACLMapper.getACL(MAILBOX_ID).join()).isEqualTo(MailboxACL.EMPTY);
+        assertThat(cassandraACLMapper.getACL(MAILBOX_ID).block()).isEqualTo(MailboxACL.EMPTY);
     }
 
     @Test
     void retrieveACLWhenNoACLStoredShouldReturnEmptyACL() {
-        assertThat(cassandraACLMapper.getACL(MAILBOX_ID).join()).isEqualTo(MailboxACL.EMPTY);
+        assertThat(cassandraACLMapper.getACL(MAILBOX_ID).block()).isEqualTo(MailboxACL.EMPTY);
     }
 
     @Test
@@ -94,7 +94,7 @@ class CassandraACLMapperTest {
         cassandraACLMapper.updateACL(MAILBOX_ID,
             MailboxACL.command().key(key).rights(rights).asAddition());
 
-        assertThat(cassandraACLMapper.getACL(MAILBOX_ID).join())
+        assertThat(cassandraACLMapper.getACL(MAILBOX_ID).block())
             .isEqualTo(new MailboxACL().union(key, rights));
     }
 
@@ -107,7 +107,7 @@ class CassandraACLMapperTest {
         MailboxACL.EntryKey keyAlice = new MailboxACL.EntryKey("alice", MailboxACL.NameType.user, false);
         cassandraACLMapper.updateACL(MAILBOX_ID, MailboxACL.command().key(keyAlice).rights(rights).asAddition());
 
-        assertThat(cassandraACLMapper.getACL(MAILBOX_ID).join())
+        assertThat(cassandraACLMapper.getACL(MAILBOX_ID).block())
             .isEqualTo(new MailboxACL().union(keyBob, rights).union(keyAlice, rights));
     }
 
@@ -119,7 +119,7 @@ class CassandraACLMapperTest {
         cassandraACLMapper.updateACL(MAILBOX_ID, MailboxACL.command().key(key).rights(rights).asAddition());
         cassandraACLMapper.updateACL(MAILBOX_ID, MailboxACL.command().key(key).rights(rights).asRemoval());
 
-        assertThat(cassandraACLMapper.getACL(MAILBOX_ID).join()).isEqualTo(MailboxACL.EMPTY);
+        assertThat(cassandraACLMapper.getACL(MAILBOX_ID).block()).isEqualTo(MailboxACL.EMPTY);
     }
 
     @Test
@@ -130,7 +130,7 @@ class CassandraACLMapperTest {
         cassandraACLMapper.updateACL(MAILBOX_ID, MailboxACL.command().key(key).rights(rights).asAddition());
         cassandraACLMapper.updateACL(MAILBOX_ID, MailboxACL.command().key(key).noRights().asReplacement());
 
-        assertThat(cassandraACLMapper.getACL(MAILBOX_ID).join()).isEqualTo(MailboxACL.EMPTY);
+        assertThat(cassandraACLMapper.getACL(MAILBOX_ID).block()).isEqualTo(MailboxACL.EMPTY);
     }
 
     @Test
@@ -140,7 +140,7 @@ class CassandraACLMapperTest {
 
         cassandraACLMapper.updateACL(MAILBOX_ID, MailboxACL.command().key(key).rights(rights).asReplacement());
 
-        assertThat(cassandraACLMapper.getACL(MAILBOX_ID).join()).isEqualTo(new MailboxACL().union(key, rights));
+        assertThat(cassandraACLMapper.getACL(MAILBOX_ID).block()).isEqualTo(new MailboxACL().union(key, rights));
     }
 
     @Test
@@ -155,11 +155,11 @@ class CassandraACLMapperTest {
 
         cassandraACLMapper.updateACL(MAILBOX_ID, MailboxACL.command().key(key).rights(rights).asAddition());
 
-        assertThat(cassandraACLMapper.getACL(MAILBOX_ID).join()).isEqualTo(new MailboxACL().union(key, rights));
+        assertThat(cassandraACLMapper.getACL(MAILBOX_ID).block()).isEqualTo(new MailboxACL().union(key, rights));
     }
 
     @Test
-    void twoConcurrentUpdatesWhenNoACEStoredShouldReturnACEWithTwoEntries(CassandraCluster cassandra) throws Exception {
+    void twoConcurrentUpdatesWhenNoACLStoredShouldReturnACLWithTwoEntries(CassandraCluster cassandra) throws Exception {
         CountDownLatch countDownLatch = new CountDownLatch(2);
         MailboxACL.EntryKey keyBob = new MailboxACL.EntryKey("bob", MailboxACL.NameType.user, false);
         MailboxACL.Rfc4314Rights rights = new MailboxACL.Rfc4314Rights(MailboxACL.Right.Read);
@@ -168,12 +168,12 @@ class CassandraACLMapperTest {
         Future<Boolean> future2 = performACLUpdateInExecutor(cassandra, executor, keyAlice, rights, countDownLatch::countDown);
         awaitAll(future1, future2);
 
-        assertThat(cassandraACLMapper.getACL(MAILBOX_ID).join())
+        assertThat(cassandraACLMapper.getACL(MAILBOX_ID).block())
             .isEqualTo(new MailboxACL().union(keyBob, rights).union(keyAlice, rights));
     }
 
     @Test
-    void twoConcurrentUpdatesWhenStoredShouldReturnACEWithTwoEntries(CassandraCluster cassandra) throws Exception {
+    void twoConcurrentUpdatesWhenStoredShouldReturnACLWithTwoEntries(CassandraCluster cassandra) throws Exception {
         CountDownLatch countDownLatch = new CountDownLatch(2);
         MailboxACL.EntryKey keyBenwa = new MailboxACL.EntryKey("benwa", MailboxACL.NameType.user, false);
         MailboxACL.Rfc4314Rights rights = new MailboxACL.Rfc4314Rights(MailboxACL.Right.Read);
@@ -185,7 +185,7 @@ class CassandraACLMapperTest {
         Future<Boolean> future2 = performACLUpdateInExecutor(cassandra, executor, keyAlice, rights, countDownLatch::countDown);
         awaitAll(future1, future2);
 
-        assertThat(cassandraACLMapper.getACL(MAILBOX_ID).join())
+        assertThat(cassandraACLMapper.getACL(MAILBOX_ID).block())
             .isEqualTo(new MailboxACL().union(keyBob, rights).union(keyAlice, rights).union(keyBenwa, rights));
     }
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentDAOTest.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentDAOTest.java
index f82375e..6cd0b2f 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentDAOTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentDAOTest.java
@@ -55,7 +55,7 @@ class CassandraAttachmentDAOTest {
 
     @Test
     void getAttachmentShouldReturnEmptyWhenAbsent() {
-        Optional<Attachment> attachment = testee.getAttachment(ATTACHMENT_ID).join();
+        Optional<Attachment> attachment = testee.getAttachment(ATTACHMENT_ID).blockOptional();
 
         assertThat(attachment).isEmpty();
     }
@@ -98,7 +98,7 @@ class CassandraAttachmentDAOTest {
             .build();
         testee.storeAttachment(attachment).join();
 
-        Optional<Attachment> actual = testee.getAttachment(ATTACHMENT_ID).join();
+        Optional<Attachment> actual = testee.getAttachment(ATTACHMENT_ID).blockOptional();
 
         assertThat(actual).contains(attachment);
     }
@@ -114,7 +114,7 @@ class CassandraAttachmentDAOTest {
 
         testee.deleteAttachment(attachment.getAttachmentId()).join();
 
-        assertThat(testee.getAttachment(attachment.getAttachmentId()).join())
+        assertThat(testee.getAttachment(attachment.getAttachmentId()).blockOptional())
             .isEmpty();
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentDAOV2Test.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentDAOV2Test.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentDAOV2Test.java
index 6e3b705..2bf90c7 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentDAOV2Test.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentDAOV2Test.java
@@ -52,7 +52,7 @@ class CassandraAttachmentDAOV2Test {
 
     @Test
     void getAttachmentShouldReturnEmptyWhenAbsent() {
-        Optional<DAOAttachment> attachment = testee.getAttachment(ATTACHMENT_ID).join();
+        Optional<DAOAttachment> attachment = testee.getAttachment(ATTACHMENT_ID).blockOptional();
 
         assertThat(attachment).isEmpty();
     }
@@ -66,9 +66,9 @@ class CassandraAttachmentDAOV2Test {
             .build();
         BlobId blobId = BLOB_ID_FACTORY.from("blobId");
         DAOAttachment daoAttachment = CassandraAttachmentDAOV2.from(attachment, blobId);
-        testee.storeAttachment(daoAttachment).join();
+        testee.storeAttachment(daoAttachment).block();
 
-        Optional<DAOAttachment> actual = testee.getAttachment(ATTACHMENT_ID).join();
+        Optional<DAOAttachment> actual = testee.getAttachment(ATTACHMENT_ID).blockOptional();
 
         assertThat(actual).contains(daoAttachment);
     }

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentFallbackTest.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentFallbackTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentFallbackTest.java
index 862ea95..10492c6 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentFallbackTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentFallbackTest.java
@@ -23,6 +23,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 import java.nio.charset.StandardCharsets;
+import java.util.List;
 
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.CassandraClusterExtension;
@@ -100,7 +101,7 @@ class CassandraAttachmentFallbackTest {
             .build();
 
         BlobId blobId = blobsDAO.save(attachment.getBytes()).join();
-        attachmentDAOV2.storeAttachment(CassandraAttachmentDAOV2.from(attachment, blobId)).join();
+        attachmentDAOV2.storeAttachment(CassandraAttachmentDAOV2.from(attachment, blobId)).block();
         attachmentDAO.storeAttachment(otherAttachment).join();
 
         assertThat(attachmentMapper.getAttachment(ATTACHMENT_ID_1))
@@ -135,7 +136,7 @@ class CassandraAttachmentFallbackTest {
             .build();
 
         BlobId blobId = blobsDAO.save(attachment.getBytes()).join();
-        attachmentDAOV2.storeAttachment(CassandraAttachmentDAOV2.from(attachment, blobId)).join();
+        attachmentDAOV2.storeAttachment(CassandraAttachmentDAOV2.from(attachment, blobId)).block();
         attachmentDAO.storeAttachment(otherAttachment).join();
 
         assertThat(attachmentMapper.getAttachments(ImmutableList.of(ATTACHMENT_ID_1)))
@@ -170,10 +171,11 @@ class CassandraAttachmentFallbackTest {
             .build();
 
         BlobId blobId = blobsDAO.save(attachment.getBytes()).join();
-        attachmentDAOV2.storeAttachment(CassandraAttachmentDAOV2.from(attachment, blobId)).join();
+        attachmentDAOV2.storeAttachment(CassandraAttachmentDAOV2.from(attachment, blobId)).block();
         attachmentDAO.storeAttachment(otherAttachment).join();
 
-        assertThat(attachmentMapper.getAttachments(ImmutableList.of(ATTACHMENT_ID_1, ATTACHMENT_ID_2)))
-            .containsExactly(attachment, otherAttachment);
+        List<Attachment> attachments = attachmentMapper.getAttachments(ImmutableList.of(ATTACHMENT_ID_1, ATTACHMENT_ID_2));
+        assertThat(attachments)
+            .containsExactlyInAnyOrder(attachment, otherAttachment);
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org