You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ma...@apache.org on 2019/01/28 14:53:25 UTC

[10/12] james-project git commit: JAMES-2630 Migrate CassandraAsyncExecutor.executeReturnApplied consumers to Reactor

JAMES-2630 Migrate CassandraAsyncExecutor.executeReturnApplied consumers to Reactor


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/e4a737e5
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/e4a737e5
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/e4a737e5

Branch: refs/heads/master
Commit: e4a737e52c34c217503dee3e69cc9d0275937046
Parents: 06ee1e4
Author: Gautier DI FOLCO <gd...@linagora.com>
Authored: Tue Jan 8 16:41:38 2019 +0100
Committer: Matthieu Baechler <ma...@apache.org>
Committed: Mon Jan 28 15:49:06 2019 +0100

----------------------------------------------------------------------
 .../cassandra/utils/CassandraAsyncExecutor.java |  12 +-
 .../utils/FunctionRunnerWithRetry.java          |  78 --------
 .../utils/FunctionRunnerWithRetryTest.java      | 173 ------------------
 .../eventstore/cassandra/EventStoreDao.java     |   2 +-
 .../james/mailbox/MailboxManagerStressTest.java |   2 +-
 .../cassandra/mail/AttachmentLoader.java        |  51 ++----
 .../cassandra/mail/CassandraACLMapper.java      | 138 ++++++---------
 .../cassandra/mail/CassandraAttachmentDAO.java  |   8 +-
 .../mail/CassandraAttachmentDAOV2.java          |  13 +-
 .../mail/CassandraAttachmentMapper.java         |  87 ++++-----
 .../mail/CassandraAttachmentMessageIdDAO.java   |   5 +-
 .../mail/CassandraAttachmentOwnerDAO.java       |   5 +-
 .../cassandra/mail/CassandraMailboxDAO.java     |  32 ++--
 .../cassandra/mail/CassandraMailboxMapper.java  | 147 +++++++---------
 .../cassandra/mail/CassandraMailboxPathDAO.java |  19 +-
 .../mail/CassandraMailboxPathDAOImpl.java       |  47 ++---
 .../mail/CassandraMailboxPathV2DAO.java         |  47 +++--
 .../cassandra/mail/CassandraMessageDAO.java     |  61 +++----
 .../cassandra/mail/CassandraMessageIdDAO.java   |   9 +-
 .../mail/CassandraMessageIdMapper.java          | 176 +++++++++----------
 .../mail/CassandraMessageIdToImapUidDAO.java    |  21 +--
 .../cassandra/mail/CassandraMessageMapper.java  |  49 +++---
 .../cassandra/mail/CassandraModSeqProvider.java |  98 +++++------
 .../cassandra/mail/CassandraUidProvider.java    |  81 ++++-----
 .../mail/CassandraUserMailboxRightsDAO.java     |  42 +++--
 .../migration/AttachmentMessageIdCreation.java  |  12 +-
 .../mail/migration/AttachmentV2Migration.java   |   2 +-
 .../mail/migration/MailboxPathV2Migration.java  |   4 +-
 .../mail/task/MailboxMergingTaskRunner.java     |   8 +-
 .../cassandra/mail/AttachmentLoaderTest.java    |  82 ++-------
 .../cassandra/mail/CassandraACLMapperTest.java  |  24 +--
 .../mail/CassandraAttachmentDAOTest.java        |   6 +-
 .../mail/CassandraAttachmentDAOV2Test.java      |   6 +-
 .../mail/CassandraAttachmentFallbackTest.java   |  12 +-
 .../mail/CassandraAttachmentOwnerDAOTest.java   |  17 +-
 .../cassandra/mail/CassandraMailboxDAOTest.java |  44 +++--
 .../mail/CassandraMailboxMapperTest.java        |  88 +++++-----
 .../mail/CassandraMailboxPathDAOImplTest.java   |  12 +-
 .../mail/CassandraMailboxPathDAOTest.java       |  37 ++--
 .../cassandra/mail/CassandraMapperProvider.java |   3 +-
 .../cassandra/mail/CassandraMessageDAOTest.java |   6 +-
 .../mail/CassandraMessageIdDAOTest.java         |  24 +--
 .../CassandraMessageIdToImapUidDAOTest.java     |  95 +++++-----
 .../mail/CassandraModSeqProviderTest.java       |  27 ++-
 .../mail/CassandraUidProviderTest.java          |  26 ++-
 .../mail/CassandraUserMailboxRightsDAOTest.java |  12 +-
 .../migration/AttachmentV2MigrationTest.java    |  14 +-
 .../migration/MailboxPathV2MigrationTest.java   |  18 +-
 .../AbstractMailboxManagerAttachmentTest.java   |  23 ++-
 server/container/util/pom.xml                   |   6 +
 .../cassandra/CassandraDomainList.java          |   4 +-
 .../cassandra/CassandraActiveScriptDAO.java     |  19 +-
 .../sieve/cassandra/CassandraSieveDAO.java      |  20 +--
 .../sieve/cassandra/CassandraSieveQuotaDAO.java |   5 +-
 .../cassandra/CassandraSieveRepository.java     |  91 +++++-----
 .../cassandra/CassandraUsersRepository.java     |   6 +-
 .../cassandra/CassandraActiveScriptDAOTest.java |  22 +--
 .../sieve/cassandra/CassandraSieveDAOTest.java  |  34 ++--
 .../cassandra/CassandraSieveQuotaDAOTest.java   |  10 +-
 .../cassandra/CassandraMailRepository.java      |  47 ++---
 .../CassandraMailRepositoryCountDAO.java        |   9 +-
 .../CassandraMailRepositoryKeysDAO.java         |  17 +-
 .../CassandraMailRepositoryMailDAO.java         |   5 +-
 .../CassandraMailRepositoryMailDaoAPI.java      |   4 +-
 .../CassandraMailRepositoryMailDaoV2.java       |   6 +-
 .../MergingCassandraMailRepositoryMailDao.java  |   6 +-
 .../CassandraMailRepositoryCountDAOTest.java    |  18 +-
 .../CassandraMailRepositoryKeysDAOTest.java     |  54 +++---
 .../CassandraMailRepositoryMailDAOTest.java     |   4 +-
 ...ilRepositoryWithFakeImplementationsTest.java |  29 +--
 70 files changed, 1031 insertions(+), 1390 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java
----------------------------------------------------------------------
diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java
index 899635e..361bb93 100644
--- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java
+++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java
@@ -46,11 +46,6 @@ public class CassandraAsyncExecutor {
         return executeReactor(statement).toFuture();
     }
 
-
-    public CompletableFuture<Boolean> executeReturnApplied(Statement statement) {
-        return executeReturnAppliedReactor(statement).toFuture();
-    }
-
     public CompletableFuture<Void> executeVoid(Statement statement) {
         return executeVoidReactor(statement).toFuture();
     }
@@ -67,9 +62,8 @@ public class CassandraAsyncExecutor {
     }
 
 
-    public Mono<Boolean> executeReturnAppliedReactor(Statement statement) {
-        return executeReactor(statement)
-                .map(ResultSet::one)
+    public Mono<Boolean> executeReturnApplied(Statement statement) {
+        return executeSingleRowReactor(statement)
                 .map(row -> row.getBool(CassandraConstants.LIGHTWEIGHT_TRANSACTION_APPLIED));
     }
 
@@ -83,7 +77,7 @@ public class CassandraAsyncExecutor {
                 .flatMap(Mono::justOrEmpty);
     }
 
-    private Mono<Optional<Row>> executeSingleRowOptionalReactor(Statement statement) {
+    public Mono<Optional<Row>> executeSingleRowOptionalReactor(Statement statement) {
         return executeReactor(statement)
             .map(resultSet -> Optional.ofNullable(resultSet.one()));
     }

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/FunctionRunnerWithRetry.java
----------------------------------------------------------------------
diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/FunctionRunnerWithRetry.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/FunctionRunnerWithRetry.java
deleted file mode 100644
index 86e1aba..0000000
--- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/FunctionRunnerWithRetry.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-
-package org.apache.james.backends.cassandra.utils;
-
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.function.BooleanSupplier;
-import java.util.function.Supplier;
-import java.util.stream.IntStream;
-
-import com.google.common.base.Preconditions;
-
-public class FunctionRunnerWithRetry {
-
-    public static class RelayingException extends RuntimeException {}
-
-    @FunctionalInterface
-    public interface OptionalSupplier<T> {
-        Optional<T> getAsOptional();
-    }
-
-    private final int maxRetry;
-
-    public FunctionRunnerWithRetry(int maxRetry) {
-        Preconditions.checkArgument(maxRetry > 0);
-        this.maxRetry = maxRetry;
-    }
-
-    public void execute(BooleanSupplier functionNotifyingSuccess) throws LightweightTransactionException {
-        IntStream.range(0, maxRetry)
-            .filter((x) -> functionNotifyingSuccess.getAsBoolean())
-            .findFirst()
-            .orElseThrow(() -> new LightweightTransactionException(maxRetry));
-    }
-
-    public <T> T executeAndRetrieveObject(OptionalSupplier<T> functionNotifyingSuccess) throws LightweightTransactionException {
-        return IntStream.range(0, maxRetry)
-            .mapToObj((x) -> functionNotifyingSuccess.getAsOptional())
-            .filter(Optional::isPresent)
-            .findFirst()
-            .orElseThrow(() -> new LightweightTransactionException(maxRetry))
-            .get();
-    }
-
-    public <T> CompletableFuture<Optional<T>> executeAsyncAndRetrieveObject(Supplier<CompletableFuture<Optional<T>>> futureSupplier) {
-        return executeAsyncAndRetrieveObject(futureSupplier, 0);
-    }
-
-    public <T> CompletableFuture<Optional<T>> executeAsyncAndRetrieveObject(Supplier<CompletableFuture<Optional<T>>> futureSupplier, int tries) {
-        if (tries >= maxRetry) {
-            return CompletableFuture.completedFuture(Optional.empty());
-        }
-        return futureSupplier.get()
-            .thenCompose(optional -> {
-                if (optional.isPresent()) {
-                    return CompletableFuture.completedFuture(optional);
-                }
-                return executeAsyncAndRetrieveObject(futureSupplier, tries + 1);
-            });
-    }
-}

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/utils/FunctionRunnerWithRetryTest.java
----------------------------------------------------------------------
diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/utils/FunctionRunnerWithRetryTest.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/utils/FunctionRunnerWithRetryTest.java
deleted file mode 100644
index 7fe5813..0000000
--- a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/utils/FunctionRunnerWithRetryTest.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-
-package org.apache.james.backends.cassandra.utils;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.commons.lang.mutable.MutableInt;
-import org.junit.Test;
-
-public class FunctionRunnerWithRetryTest {
-    
-    private static final int MAX_RETRY = 10;
-
-    @Test(expected = IllegalArgumentException.class)
-    public void functionRunnerWithInvalidMaxRetryShouldFail() throws Exception {
-        new FunctionRunnerWithRetry(-1);
-    }
-
-    @Test(expected = LightweightTransactionException.class)
-    public void functionRunnerShouldFailIfTransactionCanNotBePerformed() throws Exception {
-        final MutableInt value = new MutableInt(0);
-        new FunctionRunnerWithRetry(MAX_RETRY).execute(
-            () -> {
-                value.increment();
-                return false;
-            }
-        );
-        assertThat(value.getValue()).isEqualTo(MAX_RETRY);
-    }
-    
-    @Test
-    public void functionRunnerShouldWorkOnFirstTry() throws Exception {
-        final MutableInt value = new MutableInt(0);
-        new FunctionRunnerWithRetry(MAX_RETRY).execute(
-            () -> {
-                value.increment();
-                return true;
-            }
-        );
-        assertThat(value.getValue()).isEqualTo(1);
-    }
-
-    @Test
-    public void functionRunnerShouldWorkIfNotSucceededOnFirstTry() throws Exception {
-        final MutableInt value = new MutableInt(0);
-        new FunctionRunnerWithRetry(MAX_RETRY).execute(
-            () -> {
-                value.increment();
-                return (Integer) value.getValue() == MAX_RETRY / 2;
-            }
-        );
-        assertThat(value.getValue()).isEqualTo(MAX_RETRY / 2);
-    }
-
-    @Test
-    public void functionRunnerShouldWorkIfNotSucceededOnMaxRetryReached() throws Exception {
-        final MutableInt value = new MutableInt(0);
-        new FunctionRunnerWithRetry(MAX_RETRY).execute(
-                () -> {
-                    value.increment();
-                    return (Integer) value.getValue() == MAX_RETRY;
-                }
-        );
-        assertThat(value.getValue()).isEqualTo(MAX_RETRY);
-    }
-
-    @Test
-    public void asyncFunctionRunnerShouldWorkIfSucceedFirstTry() throws Exception {
-        int value = 18;
-
-        Optional<Integer> result = new FunctionRunnerWithRetry(MAX_RETRY)
-            .executeAsyncAndRetrieveObject(
-                () -> CompletableFuture.completedFuture(Optional.of(value)))
-            .join();
-
-        assertThat(result).contains(value);
-    }
-
-    @Test
-    public void asyncFunctionRunnerShouldTryOnlyOnceIfSuccess() throws Exception {
-        int value = 18;
-        AtomicInteger times = new AtomicInteger(0);
-
-        new FunctionRunnerWithRetry(MAX_RETRY)
-            .executeAsyncAndRetrieveObject(
-                () -> {
-                    times.incrementAndGet();
-                    return CompletableFuture.completedFuture(Optional.of(value));
-                })
-            .join();
-
-        assertThat(times.get()).isEqualTo(1);
-    }
-
-    @Test
-    public void asyncFunctionRunnerShouldRetrieveValueOnRetry() throws Exception {
-        int value = 18;
-        AtomicInteger times = new AtomicInteger(0);
-
-        Optional<Integer> result = new FunctionRunnerWithRetry(MAX_RETRY)
-            .executeAsyncAndRetrieveObject(
-                () -> {
-                    int attemptCount = times.incrementAndGet();
-                    if (attemptCount == MAX_RETRY) {
-                        return CompletableFuture.completedFuture(Optional.of(value));
-                    } else {
-                        return CompletableFuture.completedFuture(Optional.empty());
-                    }
-                })
-            .join();
-
-        assertThat(result).contains(value);
-    }
-
-    @Test
-    public void asyncFunctionRunnerShouldMakeMaxRetryAttempts() throws Exception {
-        int value = 18;
-        AtomicInteger times = new AtomicInteger(0);
-
-        new FunctionRunnerWithRetry(MAX_RETRY)
-            .executeAsyncAndRetrieveObject(
-                () -> {
-                    int attemptCount = times.incrementAndGet();
-                    if (attemptCount == MAX_RETRY) {
-                        return CompletableFuture.completedFuture(Optional.of(value));
-                    } else {
-                        return CompletableFuture.completedFuture(Optional.empty());
-                    }
-                })
-            .join();
-
-        assertThat(times.get()).isEqualTo(MAX_RETRY);
-    }
-
-
-    @Test
-    public void asyncFunctionRunnerShouldReturnEmptyIfAllFailed() throws Exception {
-        AtomicInteger times = new AtomicInteger(0);
-
-        Optional<Integer> result = new FunctionRunnerWithRetry(MAX_RETRY)
-            .executeAsyncAndRetrieveObject(
-                () -> {
-                    times.incrementAndGet();
-                    return CompletableFuture.completedFuture(Optional.<Integer>empty());
-                })
-            .join();
-
-        assertThat(result).isEmpty();
-        assertThat(times.get()).isEqualTo(MAX_RETRY);
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.java
----------------------------------------------------------------------
diff --git a/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.java b/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.java
index e1e55c7..8f6a9d1 100644
--- a/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.java
+++ b/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.java
@@ -82,7 +82,7 @@ public class EventStoreDao {
     public Mono<Boolean> appendAll(List<Event> events) {
         BatchStatement batch = new BatchStatement();
         events.forEach(event -> batch.add(insertEvent(event)));
-        return cassandraAsyncExecutor.executeReturnAppliedReactor(batch);
+        return cassandraAsyncExecutor.executeReturnApplied(batch);
     }
 
     private BoundStatement insertEvent(Event event) {

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerStressTest.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerStressTest.java b/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerStressTest.java
index 97e0103..544e1fe 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerStressTest.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerStressTest.java
@@ -81,7 +81,7 @@ public abstract class MailboxManagerStressTest<T extends MailboxManager> {
         final AtomicBoolean fail = new AtomicBoolean(false);
         final ConcurrentHashMap<MessageUid, Object> uids = new ConcurrentHashMap<>();
 
-        // fire of 1000 append operations
+        // fire of append operations
         for (int i = 0; i < APPEND_OPERATIONS; i++) {
             pool.execute(() -> {
                 if (fail.get()) {

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/AttachmentLoader.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/AttachmentLoader.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/AttachmentLoader.java
index bf486ed..d57e99a 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/AttachmentLoader.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/AttachmentLoader.java
@@ -19,24 +19,18 @@
 package org.apache.james.mailbox.cassandra.mail;
 
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.function.Function;
 import java.util.stream.Stream;
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.james.mailbox.model.Attachment;
-import org.apache.james.mailbox.model.AttachmentId;
 import org.apache.james.mailbox.model.MessageAttachment;
 import org.apache.james.mailbox.store.mail.MessageMapper;
 import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
-import org.apache.james.util.FluentFutureStream;
 
-import com.github.steveash.guavate.Guavate;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
 public class AttachmentLoader {
 
@@ -46,33 +40,23 @@ public class AttachmentLoader {
         this.attachmentMapper = attachmentMapper;
     }
 
-    public CompletableFuture<Stream<SimpleMailboxMessage>> addAttachmentToMessages(Stream<Pair<MessageWithoutAttachment,
-            Stream<MessageAttachmentRepresentation>>> messageRepresentations, MessageMapper.FetchType fetchType) {
+    public Mono<SimpleMailboxMessage> addAttachmentToMessage(Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>> messageRepresentation, MessageMapper.FetchType fetchType) {
 
         if (fetchType == MessageMapper.FetchType.Body || fetchType == MessageMapper.FetchType.Full) {
-            return FluentFutureStream.<SimpleMailboxMessage>of(
-                messageRepresentations
-                    .map(pair -> getAttachments(pair.getRight().collect(Guavate.toImmutableList()))
-                        .thenApply(attachments -> pair.getLeft().toMailboxMessage(attachments))))
-                .completableFuture();
+            return getAttachments(messageRepresentation.getRight().collect(ImmutableList.toImmutableList()))
+                        .map(attachments -> messageRepresentation.getLeft().toMailboxMessage(attachments));
         } else {
-            return CompletableFuture.completedFuture(messageRepresentations
-                .map(pair -> pair.getLeft()
-                    .toMailboxMessage(ImmutableList.of())));
+            return Mono.just(messageRepresentation.getLeft().toMailboxMessage(ImmutableList.of()));
         }
     }
 
     @VisibleForTesting
-    CompletableFuture<List<MessageAttachment>> getAttachments(List<MessageAttachmentRepresentation> attachmentRepresentations) {
-        CompletableFuture<Map<AttachmentId, Attachment>> attachmentsByIdFuture =
-            attachmentsById(attachmentRepresentations.stream()
-                .map(MessageAttachmentRepresentation::getAttachmentId)
-                .collect(Guavate.toImmutableSet()));
-
-        return attachmentsByIdFuture.thenApply(attachmentsById ->
-            attachmentRepresentations.stream()
-                .map(representation -> constructMessageAttachment(attachmentsById.get(representation.getAttachmentId()), representation))
-                .collect(Guavate.toImmutableList()));
+    Mono<List<MessageAttachment>> getAttachments(List<MessageAttachmentRepresentation> attachmentRepresentations) {
+        return Flux.fromIterable(attachmentRepresentations)
+                .flatMap(attachmentRepresentation ->
+                        attachmentMapper.getAttachmentsAsMono(attachmentRepresentation.getAttachmentId())
+                            .map(attachment -> constructMessageAttachment(attachment, attachmentRepresentation)))
+                .collectList();
     }
 
     private MessageAttachment constructMessageAttachment(Attachment attachment, MessageAttachmentRepresentation messageAttachmentRepresentation) {
@@ -84,15 +68,4 @@ public class AttachmentLoader {
                 .build();
     }
 
-    @VisibleForTesting
-    CompletableFuture<Map<AttachmentId, Attachment>> attachmentsById(Set<AttachmentId> attachmentIds) {
-        if (attachmentIds.isEmpty()) {
-            return CompletableFuture.completedFuture(ImmutableMap.of());
-        }
-        return attachmentMapper.getAttachmentsAsFuture(attachmentIds)
-            .thenApply(attachments -> attachments
-                .stream()
-                .collect(Guavate.toImmutableMap(Attachment::getAttachmentId, Function.identity())));
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java
index 7f2bb6a..dd6f6cd 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java
@@ -27,16 +27,12 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.set;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.update;
 
 import java.io.IOException;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
 
 import javax.inject.Inject;
 
 import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
 import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
-import org.apache.james.backends.cassandra.utils.FunctionRunnerWithRetry;
-import org.apache.james.backends.cassandra.utils.LightweightTransactionException;
 import org.apache.james.mailbox.acl.ACLDiff;
 import org.apache.james.mailbox.cassandra.ids.CassandraId;
 import org.apache.james.mailbox.cassandra.table.CassandraACLTable;
@@ -45,14 +41,15 @@ import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.exception.UnsupportedRightException;
 import org.apache.james.mailbox.model.MailboxACL;
 import org.apache.james.mailbox.store.json.MailboxACLJsonConverter;
+import org.apache.james.util.FunctionalUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
 import com.fasterxml.jackson.core.JsonProcessingException;
+import reactor.core.publisher.Mono;
 
 public class CassandraACLMapper {
     public static final int INITIAL_VALUE = 0;
@@ -65,7 +62,7 @@ public class CassandraACLMapper {
     }
 
     private final CassandraAsyncExecutor executor;
-    private final int maxRetry;
+    private final int maxAclRetry;
     private final CodeInjector codeInjector;
     private final CassandraUserMailboxRightsDAO userMailboxRightsDAO;
     private final PreparedStatement conditionalInsertStatement;
@@ -79,7 +76,7 @@ public class CassandraACLMapper {
 
     public CassandraACLMapper(Session session, CassandraUserMailboxRightsDAO userMailboxRightsDAO, CassandraConfiguration cassandraConfiguration, CodeInjector codeInjector) {
         this.executor = new CassandraAsyncExecutor(session);
-        this.maxRetry = cassandraConfiguration.getAclMaxRetry();
+        this.maxAclRetry = cassandraConfiguration.getAclMaxRetry();
         this.codeInjector = codeInjector;
         this.conditionalInsertStatement = prepareConditionalInsert(session);
         this.conditionalUpdateStatement = prepareConditionalUpdate(session);
@@ -112,103 +109,84 @@ public class CassandraACLMapper {
                 .where(eq(CassandraMailboxTable.ID, bindMarker(CassandraACLTable.ID))));
     }
 
-    public CompletableFuture<MailboxACL> getACL(CassandraId cassandraId) {
+    public Mono<MailboxACL> getACL(CassandraId cassandraId) {
         return getStoredACLRow(cassandraId)
-            .thenApply(resultSet -> getAcl(cassandraId, resultSet));
+            .map(row -> getAcl(cassandraId, row))
+            .switchIfEmpty(Mono.just(MailboxACL.EMPTY));
     }
 
-    private MailboxACL getAcl(CassandraId cassandraId, ResultSet resultSet) {
-        if (resultSet.isExhausted()) {
-            return MailboxACL.EMPTY;
-        }
-        String serializedACL = resultSet.one().getString(CassandraACLTable.ACL);
+    private MailboxACL getAcl(CassandraId cassandraId, Row row) {
+        String serializedACL = row.getString(CassandraACLTable.ACL);
         return deserializeACL(cassandraId, serializedACL);
     }
 
     public ACLDiff updateACL(CassandraId cassandraId, MailboxACL.ACLCommand command) throws MailboxException {
         MailboxACL replacement = MailboxACL.EMPTY.apply(command);
-
-        ACLDiff aclDiff = updateAcl(cassandraId, aclWithVersion -> aclWithVersion.apply(command), replacement);
-
-        return userMailboxRightsDAO.update(cassandraId, aclDiff)
-            .thenApply(any -> aclDiff)
-            .join();
+        return updateAcl(cassandraId, aclWithVersion -> aclWithVersion.apply(command), replacement)
+            .flatMap(aclDiff -> userMailboxRightsDAO.update(cassandraId, aclDiff)
+            .thenReturn(aclDiff))
+            .blockOptional()
+            .orElseThrow(() -> new MailboxException("Unable to update ACL"));
     }
 
     public ACLDiff setACL(CassandraId cassandraId, MailboxACL mailboxACL) throws MailboxException {
-        ACLDiff aclDiff = updateAcl(cassandraId,
-            acl -> new ACLWithVersion(acl.version, mailboxACL),
-            mailboxACL);
-
-        return userMailboxRightsDAO.update(cassandraId, aclDiff)
-            .thenApply(any -> aclDiff)
-            .join();
-    }
-
-    private ACLDiff updateAcl(CassandraId cassandraId, Function<ACLWithVersion, ACLWithVersion> aclTransformation, MailboxACL replacement) throws MailboxException {
-        try {
-            return new FunctionRunnerWithRetry(maxRetry)
-                .executeAndRetrieveObject(
-                    () -> {
-                        codeInjector.inject();
-                        return getAclWithVersion(cassandraId)
-                            .map(aclWithVersion ->
-                                updateStoredACL(cassandraId, aclTransformation.apply(aclWithVersion))
-                                    .map(newACL -> ACLDiff.computeDiff(aclWithVersion.mailboxACL, newACL)))
-                            .orElseGet(() -> insertACL(cassandraId, replacement)
-                                .map(newACL -> ACLDiff.computeDiff(MailboxACL.EMPTY, newACL)));
-                    });
-        } catch (LightweightTransactionException e) {
-            throw new MailboxException("Exception during lightweight transaction", e);
-        }
-    }
-
-    private CompletableFuture<ResultSet> getStoredACLRow(CassandraId cassandraId) {
-        return executor.execute(
+        return updateAcl(cassandraId, acl -> new ACLWithVersion(acl.version, mailboxACL), mailboxACL)
+            .flatMap(aclDiff -> userMailboxRightsDAO.update(cassandraId, aclDiff)
+            .thenReturn(aclDiff))
+            .blockOptional()
+            .orElseThrow(() -> new MailboxException("Unable to update ACL"));
+    }
+
+    private Mono<ACLDiff> updateAcl(CassandraId cassandraId, Function<ACLWithVersion, ACLWithVersion> aclTransformation, MailboxACL replacement) throws MailboxException {
+        return Mono.fromRunnable(() -> codeInjector.inject())
+            .then(Mono.defer(() -> getAclWithVersion(cassandraId)))
+            .flatMap(aclWithVersion ->
+                    updateStoredACL(cassandraId, aclTransformation.apply(aclWithVersion))
+                            .map(newACL -> ACLDiff.computeDiff(aclWithVersion.mailboxACL, newACL)))
+            .switchIfEmpty(insertACL(cassandraId, replacement)
+                    .map(newACL -> ACLDiff.computeDiff(MailboxACL.EMPTY, newACL)))
+            .single()
+            .retry(maxAclRetry);
+    }
+
+    private Mono<Row> getStoredACLRow(CassandraId cassandraId) {
+        return executor.executeSingleRowReactor(
             readStatement.bind()
                 .setUUID(CassandraACLTable.ID, cassandraId.asUuid()));
     }
 
-    private Optional<MailboxACL> updateStoredACL(CassandraId cassandraId, ACLWithVersion aclWithVersion) {
-        try {
-            return executor.executeReturnApplied(
-                conditionalUpdateStatement.bind()
+    private Mono<MailboxACL> updateStoredACL(CassandraId cassandraId, ACLWithVersion aclWithVersion) {
+        return executor.executeReturnApplied(
+            conditionalUpdateStatement.bind()
+                .setUUID(CassandraACLTable.ID, cassandraId.asUuid())
+                .setString(CassandraACLTable.ACL, convertAclToJson(aclWithVersion.mailboxACL))
+                .setLong(CassandraACLTable.VERSION, aclWithVersion.version + 1)
+                .setLong(OLD_VERSION, aclWithVersion.version))
+            .filter(FunctionalUtils.toPredicate(Function.identity()))
+            .map(any -> aclWithVersion.mailboxACL);
+    }
+
+    private Mono<MailboxACL> insertACL(CassandraId cassandraId, MailboxACL acl) {
+        return Mono.defer(() -> executor.executeReturnApplied(
+            conditionalInsertStatement.bind()
                     .setUUID(CassandraACLTable.ID, cassandraId.asUuid())
-                    .setString(CassandraACLTable.ACL,  MailboxACLJsonConverter.toJson(aclWithVersion.mailboxACL))
-                    .setLong(CassandraACLTable.VERSION, aclWithVersion.version + 1)
-                    .setLong(OLD_VERSION, aclWithVersion.version))
-                .thenApply(Optional::of)
-                .thenApply(optional -> optional.filter(b -> b).map(any -> aclWithVersion.mailboxACL))
-                .join();
-        } catch (JsonProcessingException exception) {
-            throw new RuntimeException(exception);
-        }
+                    .setString(CassandraACLTable.ACL, convertAclToJson(acl))))
+            .filter(FunctionalUtils.toPredicate(Function.identity()))
+            .map(any -> acl);
     }
 
-    private Optional<MailboxACL> insertACL(CassandraId cassandraId, MailboxACL acl) {
+    private String convertAclToJson(MailboxACL acl) {
         try {
-            return executor.executeReturnApplied(
-                conditionalInsertStatement.bind()
-                    .setUUID(CassandraACLTable.ID, cassandraId.asUuid())
-                    .setString(CassandraACLTable.ACL, MailboxACLJsonConverter.toJson(acl)))
-                .thenApply(Optional::of)
-                .thenApply(optional -> optional.filter(b -> b).map(any -> acl))
-                .join();
+            return MailboxACLJsonConverter.toJson(acl);
         } catch (JsonProcessingException exception) {
             throw new RuntimeException(exception);
         }
     }
 
-    private Optional<ACLWithVersion> getAclWithVersion(CassandraId cassandraId) {
-        ResultSet resultSet = getStoredACLRow(cassandraId).join();
-        if (resultSet.isExhausted()) {
-            return Optional.empty();
-        }
-        Row row = resultSet.one();
-        return Optional.of(
-            new ACLWithVersion(
-                row.getLong(CassandraACLTable.VERSION),
-                deserializeACL(cassandraId, row.getString(CassandraACLTable.ACL))));
+    private Mono<ACLWithVersion> getAclWithVersion(CassandraId cassandraId) {
+        return getStoredACLRow(cassandraId)
+            .map(acl -> new ACLWithVersion(acl.getLong(CassandraACLTable.VERSION),
+                                deserializeACL(cassandraId, acl.getString(CassandraACLTable.ACL))));
     }
 
     private MailboxACL deserializeACL(CassandraId cassandraId, String serializedACL) {

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentDAO.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentDAO.java
index 2e5c767..fb00ec0 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentDAO.java
@@ -33,7 +33,6 @@ import static org.apache.james.mailbox.cassandra.table.CassandraAttachmentTable.
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Stream;
 
@@ -49,6 +48,7 @@ import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
 import com.google.common.base.Preconditions;
+import reactor.core.publisher.Mono;
 
 public class CassandraAttachmentDAO {
 
@@ -99,12 +99,12 @@ public class CassandraAttachmentDAO {
             .from(TABLE_NAME));
     }
 
-    public CompletableFuture<Optional<Attachment>> getAttachment(AttachmentId attachmentId) {
+    public Mono<Attachment> getAttachment(AttachmentId attachmentId) {
         Preconditions.checkArgument(attachmentId != null);
-        return cassandraAsyncExecutor.executeSingleRow(
+        return cassandraAsyncExecutor.executeSingleRowReactor(
             selectStatement.bind()
                 .setString(ID, attachmentId.getId()))
-            .thenApply(optional -> optional.map(this::attachment));
+            .map(this::attachment);
     }
 
     public Stream<Attachment> retrieveAll() {

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentDAOV2.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentDAOV2.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentDAOV2.java
index f88723f..38b6e21 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentDAOV2.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentDAOV2.java
@@ -32,8 +32,6 @@ import static org.apache.james.mailbox.cassandra.table.CassandraAttachmentV2Tabl
 import static org.apache.james.mailbox.cassandra.table.CassandraAttachmentV2Table.TYPE;
 
 import java.util.Objects;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
 
 import javax.inject.Inject;
 
@@ -46,6 +44,7 @@ import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
 import com.google.common.base.Preconditions;
+import reactor.core.publisher.Mono;
 
 public class CassandraAttachmentDAOV2 {
     public static class DAOAttachment {
@@ -150,16 +149,16 @@ public class CassandraAttachmentDAOV2 {
             .where(eq(ID_AS_UUID, bindMarker(ID_AS_UUID))));
     }
 
-    public CompletableFuture<Optional<DAOAttachment>> getAttachment(AttachmentId attachmentId) {
+    public Mono<DAOAttachment> getAttachment(AttachmentId attachmentId) {
         Preconditions.checkArgument(attachmentId != null);
-        return cassandraAsyncExecutor.executeSingleRow(
+        return cassandraAsyncExecutor.executeSingleRowReactor(
             selectStatement.bind()
                 .setUUID(ID_AS_UUID, attachmentId.asUUID()))
-            .thenApply(rowOptional -> rowOptional.map(row -> CassandraAttachmentDAOV2.fromRow(row, blobIdFactory)));
+            .map(row -> CassandraAttachmentDAOV2.fromRow(row, blobIdFactory));
     }
 
-    public CompletableFuture<Void> storeAttachment(DAOAttachment attachment) {
-        return cassandraAsyncExecutor.executeVoid(
+    public Mono<Void> storeAttachment(DAOAttachment attachment) {
+        return cassandraAsyncExecutor.executeVoidReactor(
             insertStatement.bind()
                 .setUUID(ID_AS_UUID, attachment.getAttachmentId().asUUID())
                 .setString(ID, attachment.getAttachmentId().getId())

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
index ecb43de..a7d9dfc 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
@@ -21,10 +21,6 @@ package org.apache.james.mailbox.cassandra.mail;
 
 import java.util.Collection;
 import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
-import java.util.stream.Stream;
 
 import javax.inject.Inject;
 
@@ -37,13 +33,14 @@ import org.apache.james.mailbox.model.AttachmentId;
 import org.apache.james.mailbox.model.MessageId;
 import org.apache.james.mailbox.store.mail.AttachmentMapper;
 import org.apache.james.mailbox.store.mail.model.Username;
-import org.apache.james.util.FluentFutureStream;
+import org.apache.james.util.ReactorUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.github.steveash.guavate.Guavate;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
 public class CassandraAttachmentMapper implements AttachmentMapper {
     private static final Logger LOGGER = LoggerFactory.getLogger(CassandraAttachmentMapper.class);
@@ -76,65 +73,54 @@ public class CassandraAttachmentMapper implements AttachmentMapper {
     public Attachment getAttachment(AttachmentId attachmentId) throws AttachmentNotFoundException {
         Preconditions.checkArgument(attachmentId != null);
         return getAttachmentInternal(attachmentId)
-            .join()
+            .blockOptional()
             .orElseThrow(() -> new AttachmentNotFoundException(attachmentId.getId()));
     }
 
-    private CompletionStage<Optional<Attachment>> retrievePayload(Optional<DAOAttachment> daoAttachmentOptional) {
-        if (!daoAttachmentOptional.isPresent()) {
-            return CompletableFuture.completedFuture(Optional.empty());
-        }
-        DAOAttachment daoAttachment = daoAttachmentOptional.get();
-        return blobStore.readBytes(daoAttachment.getBlobId())
-            .thenApply(bytes -> Optional.of(daoAttachment.toAttachment(bytes)));
+    private Mono<Attachment> retrievePayload(DAOAttachment daoAttachment) {
+        return Mono.fromCompletionStage(blobStore.readBytes(daoAttachment.getBlobId())
+            .thenApply(daoAttachment::toAttachment));
     }
 
     @Override
     public List<Attachment> getAttachments(Collection<AttachmentId> attachmentIds) {
-        return getAttachmentsAsFuture(attachmentIds).join();
-    }
-
-    public CompletableFuture<ImmutableList<Attachment>> getAttachmentsAsFuture(Collection<AttachmentId> attachmentIds) {
         Preconditions.checkArgument(attachmentIds != null);
+        return Flux.fromIterable(attachmentIds)
+            .flatMap(this::getAttachmentsAsMono)
+            .collectList()
+            .block();
+    }
 
-        Stream<CompletableFuture<Optional<Attachment>>> attachments = attachmentIds
-                .stream()
-                .distinct()
-                .map(id -> getAttachmentInternal(id)
-                    .thenApply(finalValue -> logNotFound(id, finalValue)));
-
-        return FluentFutureStream.of(attachments, FluentFutureStream::unboxOptional)
-            .collect(Guavate.toImmutableList());
+    public Mono<Attachment> getAttachmentsAsMono(AttachmentId attachmentId) {
+        return getAttachmentInternal(attachmentId)
+            .switchIfEmpty(ReactorUtils.executeAndEmpty(() -> logNotFound((attachmentId))));
     }
 
-    private CompletableFuture<Optional<Attachment>> getAttachmentInternal(AttachmentId id) {
+    private Mono<Attachment> getAttachmentInternal(AttachmentId id) {
         return attachmentDAOV2.getAttachment(id)
-            .thenCompose(this::retrievePayload)
-            .thenCompose(v2Value -> fallbackToV1(id, v2Value));
+            .flatMap(this::retrievePayload)
+            .switchIfEmpty(fallbackToV1(id));
     }
 
-    private CompletionStage<Optional<Attachment>> fallbackToV1(AttachmentId attachmentId, Optional<Attachment> v2Value) {
-        if (v2Value.isPresent()) {
-            return CompletableFuture.completedFuture(v2Value);
-        }
+    private Mono<Attachment> fallbackToV1(AttachmentId attachmentId) {
         return attachmentDAO.getAttachment(attachmentId);
     }
 
     @Override
     public void storeAttachmentForOwner(Attachment attachment, Username owner) throws MailboxException {
         ownerDAO.addOwner(attachment.getAttachmentId(), owner)
-            .thenCompose(any -> blobStore.save(attachment.getBytes()))
-            .thenApply(blobId -> CassandraAttachmentDAOV2.from(attachment, blobId))
-            .thenCompose(attachmentDAOV2::storeAttachment)
-            .join();
+            .then(Mono.fromFuture(blobStore.save(attachment.getBytes())))
+            .map(blobId -> CassandraAttachmentDAOV2.from(attachment, blobId))
+            .flatMap(attachmentDAOV2::storeAttachment)
+            .block();
     }
 
     @Override
     public void storeAttachmentsForMessage(Collection<Attachment> attachments, MessageId ownerMessageId) throws MailboxException {
-        FluentFutureStream.of(
-            attachments.stream()
-                .map(attachment -> storeAttachmentAsync(attachment, ownerMessageId)))
-            .join();
+        Flux.fromIterable(attachments)
+            .flatMap(attachment -> storeAttachmentAsync(attachment, ownerMessageId))
+            .then()
+            .block();
     }
 
     @Override
@@ -148,21 +134,18 @@ public class CassandraAttachmentMapper implements AttachmentMapper {
         return ownerDAO.retrieveOwners(attachmentId).join().collect(Guavate.toImmutableList());
     }
 
-    public CompletableFuture<Void> storeAttachmentAsync(Attachment attachment, MessageId ownerMessageId) {
-        return blobStore.save(attachment.getBytes())
-            .thenApply(blobId -> CassandraAttachmentDAOV2.from(attachment, blobId))
-            .thenCompose(daoAttachment -> storeAttachmentWithIndex(daoAttachment, ownerMessageId));
+    public Mono<Void> storeAttachmentAsync(Attachment attachment, MessageId ownerMessageId) {
+        return Mono.fromFuture(blobStore.save(attachment.getBytes())
+            .thenApply(blobId -> CassandraAttachmentDAOV2.from(attachment, blobId)))
+            .flatMap(daoAttachment -> storeAttachmentWithIndex(daoAttachment, ownerMessageId));
     }
 
-    private CompletableFuture<Void> storeAttachmentWithIndex(DAOAttachment daoAttachment, MessageId ownerMessageId) {
+    private Mono<Void> storeAttachmentWithIndex(DAOAttachment daoAttachment, MessageId ownerMessageId) {
         return attachmentDAOV2.storeAttachment(daoAttachment)
-                .thenCompose(any -> attachmentMessageIdDAO.storeAttachmentForMessageId(daoAttachment.getAttachmentId(), ownerMessageId));
+                .then(attachmentMessageIdDAO.storeAttachmentForMessageId(daoAttachment.getAttachmentId(), ownerMessageId));
     }
 
-    private Optional<Attachment> logNotFound(AttachmentId attachmentId, Optional<Attachment> optionalAttachment) {
-        if (!optionalAttachment.isPresent()) {
-            LOGGER.warn("Failed retrieving attachment {}", attachmentId);
-        }
-        return optionalAttachment;
+    private void logNotFound(AttachmentId attachmentId) {
+        LOGGER.warn("Failed retrieving attachment {}", attachmentId);
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMessageIdDAO.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMessageIdDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMessageIdDAO.java
index 88b3d4d..3f02c1a 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMessageIdDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMessageIdDAO.java
@@ -44,6 +44,7 @@ import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
 import com.github.steveash.guavate.Guavate;
 import com.google.common.base.Preconditions;
+import reactor.core.publisher.Mono;
 
 public class CassandraAttachmentMessageIdDAO {
 
@@ -91,8 +92,8 @@ public class CassandraAttachmentMessageIdDAO {
         return messageIdFactory.fromString(row.getString(MESSAGE_ID));
     }
 
-    public CompletableFuture<Void> storeAttachmentForMessageId(AttachmentId attachmentId, MessageId ownerMessageId) {
-        return cassandraAsyncExecutor.executeVoid(
+    public Mono<Void> storeAttachmentForMessageId(AttachmentId attachmentId, MessageId ownerMessageId) {
+        return cassandraAsyncExecutor.executeVoidReactor(
             insertStatement.bind()
                 .setUUID(ATTACHMENT_ID_AS_UUID, attachmentId.asUUID())
                 .setString(ATTACHMENT_ID, attachmentId.getId())

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentOwnerDAO.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentOwnerDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentOwnerDAO.java
index e92757c..fce332d 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentOwnerDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentOwnerDAO.java
@@ -41,6 +41,7 @@ import org.apache.james.mailbox.store.mail.model.Username;
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
+import reactor.core.publisher.Mono;
 
 public class CassandraAttachmentOwnerDAO {
 
@@ -72,8 +73,8 @@ public class CassandraAttachmentOwnerDAO {
                 .where(eq(ID, bindMarker(ID))));
     }
 
-    public CompletableFuture<Void> addOwner(AttachmentId attachmentId, Username owner) {
-        return executor.executeVoid(
+    public Mono<Void> addOwner(AttachmentId attachmentId, Username owner) {
+        return executor.executeVoidReactor(
             addStatement.bind()
                 .setUUID(ID, attachmentId.asUUID())
                 .setString(OWNER, owner.getValue()));

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java
index 96075a4..7e75099 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java
@@ -32,7 +32,6 @@ import static org.apache.james.mailbox.cassandra.table.CassandraMailboxTable.NAM
 import static org.apache.james.mailbox.cassandra.table.CassandraMailboxTable.TABLE_NAME;
 import static org.apache.james.mailbox.cassandra.table.CassandraMailboxTable.UIDVALIDITY;
 
-import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
 import javax.inject.Inject;
@@ -46,13 +45,14 @@ import org.apache.james.mailbox.cassandra.table.CassandraMailboxTable;
 import org.apache.james.mailbox.model.MailboxPath;
 import org.apache.james.mailbox.store.mail.model.Mailbox;
 import org.apache.james.mailbox.store.mail.model.impl.SimpleMailbox;
-import org.apache.james.util.FluentFutureStream;
 
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.querybuilder.QueryBuilder;
 import com.google.common.annotations.VisibleForTesting;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
 public class CassandraMailboxDAO {
 
@@ -112,9 +112,9 @@ public class CassandraMailboxDAO {
             .where(eq(ID, bindMarker(ID))));
     }
 
-    public CompletableFuture<Void> save(Mailbox mailbox) {
+    public Mono<Void> save(Mailbox mailbox) {
         CassandraId cassandraId = (CassandraId) mailbox.getMailboxId();
-        return executor.executeVoid(insertStatement.bind()
+        return executor.executeVoidReactor(insertStatement.bind()
             .setUUID(ID, cassandraId.asUuid())
             .setString(NAME, mailbox.getName())
             .setLong(UIDVALIDITY, mailbox.getUidValidity())
@@ -128,21 +128,21 @@ public class CassandraMailboxDAO {
             .setUDTValue(MAILBOX_BASE, mailboxBaseTupleUtil.createMailboxBaseUDT(mailboxPath.getNamespace(), mailboxPath.getUser())));
     }
 
-    public CompletableFuture<Void> delete(CassandraId mailboxId) {
-        return executor.executeVoid(deleteStatement.bind()
+    public Mono<Void> delete(CassandraId mailboxId) {
+        return executor.executeVoidReactor(deleteStatement.bind()
             .setUUID(ID, mailboxId.asUuid()));
     }
 
-    public CompletableFuture<Optional<SimpleMailbox>> retrieveMailbox(CassandraId mailboxId) {
-        return executor.executeSingleRow(readStatement.bind()
+    public Mono<SimpleMailbox> retrieveMailbox(CassandraId mailboxId) {
+        return executor.executeSingleRowReactor(readStatement.bind()
             .setUUID(ID, mailboxId.asUuid()))
-            .thenApply(rowOptional -> rowOptional.map(this::mailboxFromRow))
-            .thenApply(mailbox -> addMailboxId(mailboxId, mailbox));
+            .map(this::mailboxFromRow)
+            .map(mailbox -> addMailboxId(mailboxId, mailbox));
     }
 
-    private Optional<SimpleMailbox> addMailboxId(CassandraId cassandraId, Optional<SimpleMailbox> mailboxOptional) {
-        mailboxOptional.ifPresent(mailbox -> mailbox.setMailboxId(cassandraId));
-        return mailboxOptional;
+    private SimpleMailbox addMailboxId(CassandraId cassandraId, SimpleMailbox mailbox) {
+        mailbox.setMailboxId(cassandraId);
+        return mailbox;
     }
 
     private SimpleMailbox mailboxFromRow(Row row) {
@@ -154,9 +154,9 @@ public class CassandraMailboxDAO {
             row.getLong(UIDVALIDITY));
     }
 
-    public FluentFutureStream<SimpleMailbox> retrieveAllMailboxes() {
-        return FluentFutureStream.of(executor.execute(listStatement.bind())
-            .thenApply(cassandraUtils::convertToStream))
+    public Flux<SimpleMailbox> retrieveAllMailboxes() {
+        return executor.executeReactor(listStatement.bind())
+            .flatMapMany(cassandraUtils::convertToFlux)
             .map(this::toMailboxWithId);
     }
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java
index 4d3b7fd..058d6fe 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java
@@ -21,18 +21,14 @@ package org.apache.james.mailbox.cassandra.mail;
 
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 import java.util.StringTokenizer;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
-import java.util.function.Function;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 import javax.inject.Inject;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.james.mailbox.acl.ACLDiff;
 import org.apache.james.mailbox.cassandra.ids.CassandraId;
 import org.apache.james.mailbox.exception.MailboxException;
@@ -45,20 +41,19 @@ import org.apache.james.mailbox.model.MailboxPath;
 import org.apache.james.mailbox.store.mail.MailboxMapper;
 import org.apache.james.mailbox.store.mail.model.Mailbox;
 import org.apache.james.mailbox.store.mail.model.impl.SimpleMailbox;
-import org.apache.james.util.CompletableFutureUtil;
-import org.apache.james.util.FluentFutureStream;
-import org.apache.james.util.OptionalUtils;
+import org.apache.james.util.ReactorUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.github.fge.lambdas.Throwing;
 import com.github.steveash.guavate.Guavate;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
 public class CassandraMailboxMapper implements MailboxMapper {
 
-    public static final String WILDCARD = "%";
+    private static final String WILDCARD = "%";
     public static final Logger LOGGER = LoggerFactory.getLogger(CassandraMailboxMapper.class);
 
     private final CassandraMailboxDAO mailboxDAO;
@@ -79,77 +74,70 @@ public class CassandraMailboxMapper implements MailboxMapper {
     @Override
     public void delete(Mailbox mailbox) {
         CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
-        FluentFutureStream.ofFutures(
+        Flux.merge(
                 mailboxPathDAO.delete(mailbox.generateAssociatedPath()),
                 mailboxPathV2DAO.delete(mailbox.generateAssociatedPath()))
-            .map(any -> mailboxDAO.delete(mailboxId), FluentFutureStream::unboxFuture)
-            .join();
+            .thenEmpty(mailboxDAO.delete(mailboxId))
+            .block();
     }
 
     @Override
     public Mailbox findMailboxByPath(MailboxPath path) throws MailboxException {
         return mailboxPathV2DAO.retrieveId(path)
-            .thenCompose(cassandraIdOptional ->
-                cassandraIdOptional
-                    .map(CassandraIdAndPath::getCassandraId)
-                    .map(this::retrieveMailbox)
-                    .orElseGet(Throwing.supplier(() -> fromPreviousTable(path))))
-            .join()
+            .map(CassandraIdAndPath::getCassandraId)
+            .flatMap(this::retrieveMailbox)
+            .switchIfEmpty(fromPreviousTable(path))
+            .blockOptional()
             .orElseThrow(() -> new MailboxNotFoundException(path));
     }
 
-    private CompletableFuture<Optional<SimpleMailbox>> fromPreviousTable(MailboxPath path) {
+    private Mono<SimpleMailbox> fromPreviousTable(MailboxPath path) {
         return mailboxPathDAO.retrieveId(path)
-            .thenCompose(cassandraIdOptional ->
-                cassandraIdOptional
-                    .map(CassandraIdAndPath::getCassandraId)
-                    .map(this::retrieveMailbox)
-                    .orElse(CompletableFuture.completedFuture(Optional.empty())))
-            .thenCompose(maybeMailbox -> maybeMailbox.map(this::migrate)
-                .orElse(CompletableFuture.completedFuture(maybeMailbox)));
+            .map(CassandraIdAndPath::getCassandraId)
+            .flatMap(this::retrieveMailbox)
+            .flatMap(this::migrate);
     }
 
-    private CompletableFuture<Optional<SimpleMailbox>> migrate(SimpleMailbox mailbox) {
+    private Mono<SimpleMailbox> migrate(SimpleMailbox mailbox) {
         CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
         return mailboxPathV2DAO.save(mailbox.generateAssociatedPath(), mailboxId)
-            .thenCompose(success -> deleteIfSuccess(mailbox, success))
-            .thenApply(any -> Optional.of(mailbox));
+            .flatMap(success -> deleteIfSuccess(mailbox, success))
+            .thenReturn(mailbox);
     }
 
-    private CompletionStage<Void> deleteIfSuccess(SimpleMailbox mailbox, boolean success) {
+    private Mono<Void> deleteIfSuccess(SimpleMailbox mailbox, boolean success) {
         if (success) {
             return mailboxPathDAO.delete(mailbox.generateAssociatedPath());
         }
         LOGGER.info("Concurrent execution lead to data race while migrating {} to 'mailboxPathV2DAO'.",
             mailbox.generateAssociatedPath());
-        return CompletableFuture.completedFuture(null);
+        return Mono.empty();
     }
 
     @Override
     public Mailbox findMailboxById(MailboxId id) throws MailboxException {
         CassandraId mailboxId = (CassandraId) id;
         return retrieveMailbox(mailboxId)
-            .join()
+            .blockOptional()
             .orElseThrow(() -> new MailboxNotFoundException(id));
     }
 
-    private CompletableFuture<Optional<SimpleMailbox>> retrieveMailbox(CassandraId mailboxId) {
-        CompletableFuture<MailboxACL> aclCompletableFuture = cassandraACLMapper.getACL(mailboxId);
+    private Mono<SimpleMailbox> retrieveMailbox(CassandraId mailboxId) {
+        Mono<MailboxACL> aclCompletableFuture = cassandraACLMapper.getACL(mailboxId);
+        Mono<SimpleMailbox> simpleMailboxFuture = mailboxDAO.retrieveMailbox(mailboxId);
 
-        CompletableFuture<Optional<SimpleMailbox>> simpleMailboxFuture = mailboxDAO.retrieveMailbox(mailboxId);
-
-        return aclCompletableFuture.thenCombine(simpleMailboxFuture, this::addAcl);
+        return aclCompletableFuture.zipWith(simpleMailboxFuture, this::addAcl);
     }
 
-    private Optional<SimpleMailbox> addAcl(MailboxACL acl, Optional<SimpleMailbox> mailboxOptional) {
-        mailboxOptional.ifPresent(mailbox -> mailbox.setACL(acl));
-        return mailboxOptional;
+    private SimpleMailbox addAcl(MailboxACL acl, SimpleMailbox mailbox) {
+        mailbox.setACL(acl);
+        return mailbox;
     }
 
     @Override
     public List<Mailbox> findMailboxWithPathLike(MailboxPath path) {
-        List<Mailbox> mailboxesV2 = toMailboxes(path, mailboxPathV2DAO.listUserMailboxes(path.getNamespace(), path.getUser()));
-        List<Mailbox> mailboxesV1 = toMailboxes(path, mailboxPathDAO.listUserMailboxes(path.getNamespace(), path.getUser()));
+        List<SimpleMailbox> mailboxesV2 = toMailboxes(path, mailboxPathV2DAO.listUserMailboxes(path.getNamespace(), path.getUser()));
+        List<SimpleMailbox> mailboxesV1 = toMailboxes(path, mailboxPathDAO.listUserMailboxes(path.getNamespace(), path.getUser()));
 
         List<Mailbox> mailboxesV1NotInV2 = mailboxesV1.stream()
             .filter(mailboxV1 -> mailboxesV2.stream()
@@ -163,19 +151,19 @@ public class CassandraMailboxMapper implements MailboxMapper {
             .build();
     }
 
-    private List<Mailbox> toMailboxes(MailboxPath path, CompletableFuture<Stream<CassandraIdAndPath>> listUserMailboxes) {
+    private List<SimpleMailbox> toMailboxes(MailboxPath path, Flux<CassandraIdAndPath> listUserMailboxes) {
         Pattern regex = Pattern.compile(constructEscapedRegexForMailboxNameMatching(path));
 
-        return FluentFutureStream.of(listUserMailboxes)
+        return listUserMailboxes
                 .filter(idAndPath -> regex.matcher(idAndPath.getMailboxPath().getName()).matches())
-                .map(this::retrieveMailbox, FluentFutureStream::unboxFutureOptional)
-                .join()
-                .collect(Guavate.toImmutableList());
+                .flatMap(this::retrieveMailbox)
+                .collectList()
+                .block();
     }
 
-    private CompletableFuture<Optional<SimpleMailbox>> retrieveMailbox(CassandraIdAndPath idAndPath) {
+    private Mono<SimpleMailbox> retrieveMailbox(CassandraIdAndPath idAndPath) {
         return retrieveMailbox(idAndPath.getCassandraId())
-            .thenApply(optional -> OptionalUtils.executeIfEmpty(optional,
+            .switchIfEmpty(ReactorUtils.executeAndEmpty(
                 () -> LOGGER.warn("Could not retrieve mailbox {} with path {} in mailbox table.", idAndPath.getCassandraId(), idAndPath.getMailboxPath())));
     }
 
@@ -186,22 +174,20 @@ public class CassandraMailboxMapper implements MailboxMapper {
 
         CassandraId cassandraId = retrieveId(cassandraMailbox);
         cassandraMailbox.setMailboxId(cassandraId);
-        boolean applied = trySave(cassandraMailbox, cassandraId).join();
-        if (!applied) {
+        if (!trySave(cassandraMailbox, cassandraId)) {
             throw new MailboxExistsException(mailbox.generateAssociatedPath().asString());
         }
         return cassandraId;
     }
 
-    private CompletableFuture<Boolean> trySave(SimpleMailbox cassandraMailbox, CassandraId cassandraId) {
-        return mailboxPathV2DAO.save(cassandraMailbox.generateAssociatedPath(), cassandraId)
-            .thenCompose(CompletableFutureUtil.composeIfTrue(
-                () -> retrieveMailbox(cassandraId)
-                    .thenCompose(optional -> CompletableFuture
-                        .allOf(optional
-                                .map(storedMailbox -> mailboxPathV2DAO.delete(storedMailbox.generateAssociatedPath()))
-                                .orElse(CompletableFuture.completedFuture(null)),
-                            mailboxDAO.save(cassandraMailbox)))));
+    private boolean trySave(SimpleMailbox cassandraMailbox, CassandraId cassandraId) {
+        boolean isCreated = mailboxPathV2DAO.save(cassandraMailbox.generateAssociatedPath(), cassandraId).block();
+        if (isCreated) {
+            Optional<SimpleMailbox> simpleMailbox = retrieveMailbox(cassandraId).blockOptional();
+            simpleMailbox.ifPresent(mbx -> mailboxPathV2DAO.delete(mbx.generateAssociatedPath()).block());
+            mailboxDAO.save(cassandraMailbox).block();
+        }
+        return isCreated;
     }
 
     private CassandraId retrieveId(SimpleMailbox cassandraMailbox) {
@@ -214,21 +200,21 @@ public class CassandraMailboxMapper implements MailboxMapper {
 
     @Override
     public boolean hasChildren(Mailbox mailbox, char delimiter) {
-        return ImmutableList.of(
+        return Flux.merge(
                 mailboxPathDAO.listUserMailboxes(mailbox.getNamespace(), mailbox.getUser()),
                 mailboxPathV2DAO.listUserMailboxes(mailbox.getNamespace(), mailbox.getUser()))
-            .stream()
-            .map(CompletableFuture::join)
-            .flatMap(Function.identity())
-            .anyMatch(idAndPath -> idAndPath.getMailboxPath().getName().startsWith(mailbox.getName() + String.valueOf(delimiter)));
+            .filter(idAndPath -> idAndPath.getMailboxPath().getName().startsWith(mailbox.getName() + String.valueOf(delimiter)))
+            .hasElements()
+            .block();
     }
 
     @Override
     public List<Mailbox> list() {
         return mailboxDAO.retrieveAllMailboxes()
-            .map(this::toMailboxWithAclFuture, FluentFutureStream::unboxFuture)
-            .join()
-            .collect(Guavate.toImmutableList());
+            .flatMap(this::toMailboxWithAcl)
+            .map(simpleMailboxes -> (Mailbox) simpleMailboxes)
+            .collectList()
+            .block();
     }
 
     @Override
@@ -269,10 +255,10 @@ public class CassandraMailboxMapper implements MailboxMapper {
         }
     }
 
-    private CompletableFuture<SimpleMailbox> toMailboxWithAclFuture(SimpleMailbox mailbox) {
+    private Mono<SimpleMailbox> toMailboxWithAcl(SimpleMailbox mailbox) {
         CassandraId cassandraId = (CassandraId) mailbox.getMailboxId();
         return cassandraACLMapper.getACL(cassandraId)
-            .thenApply(acl -> {
+            .map(acl -> {
                 mailbox.setACL(acl);
                 return mailbox;
             });
@@ -280,18 +266,17 @@ public class CassandraMailboxMapper implements MailboxMapper {
 
     @Override
     public List<Mailbox> findNonPersonalMailboxes(String userName, Right right) {
-        return FluentFutureStream.of(userMailboxRightsDAO.listRightsForUser(userName)
-            .thenApply(map -> toAuthorizedMailboxIds(map, right)))
-            .map(this::retrieveMailbox, FluentFutureStream::unboxFutureOptional)
-            .join()
-            .collect(Guavate.toImmutableList());
+        return userMailboxRightsDAO.listRightsForUser(userName)
+            .filter(mailboxId -> authorizedMailbox(mailboxId.getRight(), right))
+            .map(Pair::getLeft)
+            .flatMap(this::retrieveMailbox)
+            .map(simpleMailboxes -> (Mailbox) simpleMailboxes)
+            .collectList()
+            .block();
     }
 
-    private Stream<CassandraId> toAuthorizedMailboxIds(Map<CassandraId, MailboxACL.Rfc4314Rights> map, Right right) {
-        return map.entrySet()
-            .stream()
-            .filter(Throwing.predicate(entry -> entry.getValue().contains(right)))
-            .map(Map.Entry::getKey);
+    private boolean authorizedMailbox(MailboxACL.Rfc4314Rights rights, Right right) {
+        return rights.contains(right);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAO.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAO.java
index 33fac3c..1eee863 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAO.java
@@ -19,23 +19,24 @@
 
 package org.apache.james.mailbox.cassandra.mail;
 
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.stream.Stream;
-
 import org.apache.james.mailbox.cassandra.ids.CassandraId;
 import org.apache.james.mailbox.model.MailboxPath;
 
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
 public interface CassandraMailboxPathDAO {
 
-    CompletableFuture<Optional<CassandraIdAndPath>> retrieveId(MailboxPath mailboxPath);
+    Mono<CassandraIdAndPath> retrieveId(MailboxPath mailboxPath);
+
+    Flux<CassandraIdAndPath> listUserMailboxes(String namespace, String user);
 
-    CompletableFuture<Stream<CassandraIdAndPath>> listUserMailboxes(String namespace, String user);
+    void logGhostMailboxSuccess(CassandraIdAndPath value);
 
-    Optional<CassandraIdAndPath> logGhostMailbox(MailboxPath mailboxPath, Optional<CassandraIdAndPath> value);
+    void logGhostMailboxFailure(MailboxPath mailboxPath);
 
-    CompletableFuture<Boolean> save(MailboxPath mailboxPath, CassandraId mailboxId);
+    Mono<Boolean> save(MailboxPath mailboxPath, CassandraId mailboxId);
 
-    CompletableFuture<Void> delete(MailboxPath mailboxPath);
+    Mono<Void> delete(MailboxPath mailboxPath);
 
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAOImpl.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAOImpl.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAOImpl.java
index 41f5dd0..46975a0 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAOImpl.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAOImpl.java
@@ -31,7 +31,6 @@ import static org.apache.james.mailbox.cassandra.table.CassandraMailboxPathTable
 import static org.apache.james.mailbox.cassandra.table.CassandraMailboxPathTable.NAMESPACE_AND_USER;
 import static org.apache.james.mailbox.cassandra.table.CassandraMailboxPathTable.TABLE_NAME;
 
-import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Stream;
 
@@ -45,12 +44,16 @@ import org.apache.james.mailbox.cassandra.ids.CassandraId;
 import org.apache.james.mailbox.cassandra.mail.utils.MailboxBaseTupleUtil;
 import org.apache.james.mailbox.cassandra.table.CassandraMailboxTable;
 import org.apache.james.mailbox.model.MailboxPath;
+import org.apache.james.util.FunctionalUtils;
+import org.apache.james.util.ReactorUtils;
 
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.querybuilder.QueryBuilder;
 import com.google.common.annotations.VisibleForTesting;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
 public class CassandraMailboxPathDAOImpl implements CassandraMailboxPathDAO {
 
@@ -80,7 +83,7 @@ public class CassandraMailboxPathDAOImpl implements CassandraMailboxPathDAO {
     }
 
     @VisibleForTesting
-    public CassandraMailboxPathDAOImpl(Session session, CassandraTypesProvider typesProvider) {
+     CassandraMailboxPathDAOImpl(Session session, CassandraTypesProvider typesProvider) {
         this(session, typesProvider, CassandraUtils.WITH_DEFAULT_CONFIGURATION);
     }
 
@@ -122,24 +125,24 @@ public class CassandraMailboxPathDAOImpl implements CassandraMailboxPathDAO {
             .from(TABLE_NAME));
     }
 
-    public CompletableFuture<Optional<CassandraIdAndPath>> retrieveId(MailboxPath mailboxPath) {
-        return cassandraAsyncExecutor.executeSingleRow(
+    public Mono<CassandraIdAndPath> retrieveId(MailboxPath mailboxPath) {
+        return cassandraAsyncExecutor.executeSingleRowReactor(
             select.bind()
                 .setUDTValue(NAMESPACE_AND_USER, mailboxBaseTupleUtil.createMailboxBaseUDT(mailboxPath.getNamespace(), mailboxPath.getUser()))
                 .setString(MAILBOX_NAME, mailboxPath.getName()))
-            .thenApply(rowOptional ->
-                rowOptional.map(this::fromRowToCassandraIdAndPath))
-            .thenApply(value -> logGhostMailbox(mailboxPath, value));
+            .map(this::fromRowToCassandraIdAndPath)
+            .map(FunctionalUtils.toFunction(this::logGhostMailboxSuccess))
+            .switchIfEmpty(ReactorUtils.executeAndEmpty(() -> logGhostMailboxFailure(mailboxPath)));
     }
 
     @Override
-    public CompletableFuture<Stream<CassandraIdAndPath>> listUserMailboxes(String namespace, String user) {
-        return cassandraAsyncExecutor.execute(
+    public Flux<CassandraIdAndPath> listUserMailboxes(String namespace, String user) {
+        return cassandraAsyncExecutor.executeReactor(
             selectAllForUser.bind()
                 .setUDTValue(NAMESPACE_AND_USER, mailboxBaseTupleUtil.createMailboxBaseUDT(namespace, user)))
-            .thenApply(resultSet -> cassandraUtils.convertToStream(resultSet)
+            .flatMapMany(resultSet -> cassandraUtils.convertToFlux(resultSet)
                 .map(this::fromRowToCassandraIdAndPath)
-                .peek(this::logReadSuccess));
+                .map(FunctionalUtils.toFunction(this::logReadSuccess)));
     }
 
     /**
@@ -149,19 +152,19 @@ public class CassandraMailboxPathDAOImpl implements CassandraMailboxPathDAO {
      * reads and write operations are also added in order to allow audit in order to know if the mailbox existed.
      */
     @Override
-    public Optional<CassandraIdAndPath> logGhostMailbox(MailboxPath mailboxPath, Optional<CassandraIdAndPath> value) {
-        if (value.isPresent()) {
-            CassandraIdAndPath cassandraIdAndPath = value.get();
-            logReadSuccess(cassandraIdAndPath);
-        } else {
-            GhostMailbox.logger()
+    public void logGhostMailboxSuccess(CassandraIdAndPath value) {
+        logReadSuccess(value);
+    }
+
+    @Override
+    public void logGhostMailboxFailure(MailboxPath mailboxPath) {
+        GhostMailbox.logger()
                 .addField(GhostMailbox.MAILBOX_NAME, mailboxPath)
                 .addField(TYPE, "readMiss")
                 .log(logger -> logger.info("Read mailbox missed"));
-        }
-        return value;
     }
 
+
     /**
      * See https://issues.apache.org/jira/browse/MAILBOX-322 to read about the Ghost mailbox bug.
      *
@@ -185,7 +188,7 @@ public class CassandraMailboxPathDAOImpl implements CassandraMailboxPathDAO {
     }
 
     @Override
-    public CompletableFuture<Boolean> save(MailboxPath mailboxPath, CassandraId mailboxId) {
+    public Mono<Boolean> save(MailboxPath mailboxPath, CassandraId mailboxId) {
         return cassandraAsyncExecutor.executeReturnApplied(insert.bind()
             .setUDTValue(NAMESPACE_AND_USER, mailboxBaseTupleUtil.createMailboxBaseUDT(mailboxPath.getNamespace(), mailboxPath.getUser()))
             .setString(MAILBOX_NAME, mailboxPath.getName())
@@ -193,8 +196,8 @@ public class CassandraMailboxPathDAOImpl implements CassandraMailboxPathDAO {
     }
 
     @Override
-    public CompletableFuture<Void> delete(MailboxPath mailboxPath) {
-        return cassandraAsyncExecutor.executeVoid(delete.bind()
+    public Mono<Void> delete(MailboxPath mailboxPath) {
+        return cassandraAsyncExecutor.executeVoidReactor(delete.bind()
             .setUDTValue(NAMESPACE_AND_USER, mailboxBaseTupleUtil.createMailboxBaseUDT(mailboxPath.getNamespace(), mailboxPath.getUser()))
             .setString(MAILBOX_NAME, mailboxPath.getName()));
     }

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV2DAO.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV2DAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV2DAO.java
index 1f58898..2247c66 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV2DAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV2DAO.java
@@ -31,10 +31,6 @@ import static org.apache.james.mailbox.cassandra.table.CassandraMailboxPathV2Tab
 import static org.apache.james.mailbox.cassandra.table.CassandraMailboxPathV2Table.TABLE_NAME;
 import static org.apache.james.mailbox.cassandra.table.CassandraMailboxPathV2Table.USER;
 
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.stream.Stream;
-
 import javax.inject.Inject;
 
 import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
@@ -42,11 +38,15 @@ import org.apache.james.backends.cassandra.utils.CassandraUtils;
 import org.apache.james.mailbox.cassandra.GhostMailbox;
 import org.apache.james.mailbox.cassandra.ids.CassandraId;
 import org.apache.james.mailbox.model.MailboxPath;
+import org.apache.james.util.FunctionalUtils;
+import org.apache.james.util.ReactorUtils;
 
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.querybuilder.QueryBuilder;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
 public class CassandraMailboxPathV2DAO implements CassandraMailboxPathDAO {
 
@@ -101,26 +101,26 @@ public class CassandraMailboxPathV2DAO implements CassandraMailboxPathDAO {
     }
 
     @Override
-    public CompletableFuture<Optional<CassandraIdAndPath>> retrieveId(MailboxPath mailboxPath) {
-        return cassandraAsyncExecutor.executeSingleRow(
+    public Mono<CassandraIdAndPath> retrieveId(MailboxPath mailboxPath) {
+        return cassandraAsyncExecutor.executeSingleRowReactor(
             select.bind()
                 .setString(NAMESPACE, mailboxPath.getNamespace())
                 .setString(USER, sanitizeUser(mailboxPath.getUser()))
                 .setString(MAILBOX_NAME, mailboxPath.getName()))
-            .thenApply(rowOptional ->
-                rowOptional.map(this::fromRowToCassandraIdAndPath))
-            .thenApply(value -> logGhostMailbox(mailboxPath, value));
+            .map(this::fromRowToCassandraIdAndPath)
+            .map(FunctionalUtils.toFunction(this::logGhostMailboxSuccess))
+            .switchIfEmpty(ReactorUtils.executeAndEmpty(() -> logGhostMailboxFailure(mailboxPath)));
     }
 
     @Override
-    public CompletableFuture<Stream<CassandraIdAndPath>> listUserMailboxes(String namespace, String user) {
-        return cassandraAsyncExecutor.execute(
+    public Flux<CassandraIdAndPath> listUserMailboxes(String namespace, String user) {
+        return cassandraAsyncExecutor.executeReactor(
             selectAll.bind()
                 .setString(NAMESPACE, namespace)
                 .setString(USER, sanitizeUser(user)))
-            .thenApply(resultSet -> cassandraUtils.convertToStream(resultSet)
+            .flatMapMany(resultSet -> cassandraUtils.convertToFlux(resultSet)
                 .map(this::fromRowToCassandraIdAndPath)
-                .peek(this::logReadSuccess));
+                .map(FunctionalUtils.toFunction(this::logReadSuccess)));
     }
 
     /**
@@ -130,17 +130,16 @@ public class CassandraMailboxPathV2DAO implements CassandraMailboxPathDAO {
      * reads and write operations are also added in order to allow audit in order to know if the mailbox existed.
      */
     @Override
-    public Optional<CassandraIdAndPath> logGhostMailbox(MailboxPath mailboxPath, Optional<CassandraIdAndPath> value) {
-        if (value.isPresent()) {
-            CassandraIdAndPath cassandraIdAndPath = value.get();
-            logReadSuccess(cassandraIdAndPath);
-        } else {
-            GhostMailbox.logger()
+    public void logGhostMailboxSuccess(CassandraIdAndPath value) {
+        logReadSuccess(value);
+    }
+
+    @Override
+    public void logGhostMailboxFailure(MailboxPath mailboxPath) {
+        GhostMailbox.logger()
                 .addField(GhostMailbox.MAILBOX_NAME, mailboxPath)
                 .addField(TYPE, "readMiss")
                 .log(logger -> logger.info("Read mailbox missed"));
-        }
-        return value;
     }
 
     /**
@@ -166,7 +165,7 @@ public class CassandraMailboxPathV2DAO implements CassandraMailboxPathDAO {
     }
 
     @Override
-    public CompletableFuture<Boolean> save(MailboxPath mailboxPath, CassandraId mailboxId) {
+    public Mono<Boolean> save(MailboxPath mailboxPath, CassandraId mailboxId) {
         return cassandraAsyncExecutor.executeReturnApplied(insert.bind()
             .setString(NAMESPACE, mailboxPath.getNamespace())
             .setString(USER, sanitizeUser(mailboxPath.getUser()))
@@ -175,8 +174,8 @@ public class CassandraMailboxPathV2DAO implements CassandraMailboxPathDAO {
     }
 
     @Override
-    public CompletableFuture<Void> delete(MailboxPath mailboxPath) {
-        return cassandraAsyncExecutor.executeVoid(delete.bind()
+    public Mono<Void> delete(MailboxPath mailboxPath) {
+        return cassandraAsyncExecutor.executeVoidReactor(delete.bind()
             .setString(NAMESPACE, mailboxPath.getNamespace())
             .setString(USER, sanitizeUser(mailboxPath.getUser()))
             .setString(MAILBOX_NAME, mailboxPath.getName()));


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org