You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ma...@apache.org on 2019/01/28 14:53:23 UTC

[08/12] james-project git commit: JAMES-2630 Migrate CassandraAsyncExecutor.executeReturnApplied consumers to Reactor

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentOwnerDAOTest.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentOwnerDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentOwnerDAOTest.java
index f7d7b5d..0b87025 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentOwnerDAOTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentOwnerDAOTest.java
@@ -29,12 +29,13 @@ import org.apache.james.backends.cassandra.utils.CassandraUtils;
 import org.apache.james.mailbox.cassandra.modules.CassandraAttachmentModule;
 import org.apache.james.mailbox.model.AttachmentId;
 import org.apache.james.mailbox.store.mail.model.Username;
-import org.apache.james.util.FluentFutureStream;
 import org.apache.james.util.streams.JamesCollectors;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
+import reactor.core.publisher.Flux;
+
 class CassandraAttachmentOwnerDAOTest {
     private static final AttachmentId ATTACHMENT_ID = AttachmentId.from("id1");
     private static final Username OWNER_1 = Username.fromRawValue("owner1");
@@ -59,7 +60,7 @@ class CassandraAttachmentOwnerDAOTest {
 
     @Test
     void retrieveOwnersShouldReturnAddedOwner() {
-        testee.addOwner(ATTACHMENT_ID, OWNER_1).join();
+        testee.addOwner(ATTACHMENT_ID, OWNER_1).block();
 
         assertThat(testee.retrieveOwners(ATTACHMENT_ID).join())
             .containsOnly(OWNER_1);
@@ -67,8 +68,8 @@ class CassandraAttachmentOwnerDAOTest {
 
     @Test
     void retrieveOwnersShouldReturnAddedOwners() {
-        testee.addOwner(ATTACHMENT_ID, OWNER_1).join();
-        testee.addOwner(ATTACHMENT_ID, OWNER_2).join();
+        testee.addOwner(ATTACHMENT_ID, OWNER_1).block();
+        testee.addOwner(ATTACHMENT_ID, OWNER_2).block();
 
         assertThat(testee.retrieveOwners(ATTACHMENT_ID).join())
             .containsOnly(OWNER_1, OWNER_2);
@@ -81,10 +82,10 @@ class CassandraAttachmentOwnerDAOTest {
         IntStream.range(0, referenceCountExceedingPaging)
             .boxed()
             .collect(JamesCollectors.chunker(128))
-            .forEach(chunk -> FluentFutureStream.of(
-                chunk.stream()
-                    .map(i -> testee.addOwner(ATTACHMENT_ID, Username.fromRawValue("owner" + i))))
-                .join());
+            .forEach(chunk -> Flux.fromIterable(chunk)
+                    .flatMap(i -> testee.addOwner(ATTACHMENT_ID, Username.fromRawValue("owner" + i)))
+                    .then()
+                    .block());
 
         assertThat(testee.retrieveOwners(ATTACHMENT_ID).join())
             .hasSize(referenceCountExceedingPaging);

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAOTest.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAOTest.java
index 09dc1ee..4af1bdc 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAOTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAOTest.java
@@ -36,8 +36,6 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
-import com.github.steveash.guavate.Guavate;
-
 class CassandraMailboxDAOTest {
     private static final int UID_VALIDITY_1 = 145;
     private static final int UID_VALIDITY_2 = 147;
@@ -70,30 +68,30 @@ class CassandraMailboxDAOTest {
 
     @Test
     void retrieveMailboxShouldReturnEmptyWhenNone() {
-        assertThat(testee.retrieveMailbox(CASSANDRA_ID_1).join())
+        assertThat(testee.retrieveMailbox(CASSANDRA_ID_1).blockOptional())
             .isEmpty();
     }
 
     @Test
     void saveShouldAddAMailbox() {
-        testee.save(mailbox1).join();
+        testee.save(mailbox1).block();
 
         Optional<SimpleMailbox> readMailbox = testee.retrieveMailbox(CASSANDRA_ID_1)
-            .join();
+            .blockOptional();
         assertThat(readMailbox.isPresent()).isTrue();
         assertThat(readMailbox.get()).isEqualToComparingFieldByField(mailbox1);
     }
 
     @Test
     void saveShouldOverride() {
-        testee.save(mailbox1).join();
+        testee.save(mailbox1).block();
 
         mailbox2.setMailboxId(CASSANDRA_ID_1);
-        testee.save(mailbox2).join();
+        testee.save(mailbox2).block();
 
 
         Optional<SimpleMailbox> readMailbox = testee.retrieveMailbox(CASSANDRA_ID_1)
-            .join();
+            .blockOptional();
         assertThat(readMailbox.isPresent()).isTrue();
         assertThat(readMailbox.get()).isEqualToComparingFieldByField(mailbox2);
     }
@@ -101,47 +99,47 @@ class CassandraMailboxDAOTest {
     @Test
     void retrieveAllMailboxesShouldBeEmptyByDefault() {
         List<SimpleMailbox> mailboxes = testee.retrieveAllMailboxes()
-            .join()
-            .collect(Guavate.toImmutableList());
+            .collectList()
+            .block();
 
         assertThat(mailboxes).isEmpty();
     }
 
     @Test
     void retrieveAllMailboxesShouldReturnSingleMailbox() {
-        testee.save(mailbox1).join();
+        testee.save(mailbox1).block();
 
         List<SimpleMailbox> mailboxes = testee.retrieveAllMailboxes()
-            .join()
-            .collect(Guavate.toImmutableList());
+            .collectList()
+            .block();
 
         assertThat(mailboxes).containsOnly(mailbox1);
     }
 
     @Test
     void retrieveAllMailboxesShouldReturnMultiMailboxes() {
-        testee.save(mailbox1).join();
-        testee.save(mailbox2).join();
+        testee.save(mailbox1).block();
+        testee.save(mailbox2).block();
 
         List<SimpleMailbox> mailboxes = testee.retrieveAllMailboxes()
-            .join()
-            .collect(Guavate.toImmutableList());
+            .collectList()
+            .block();
 
         assertThat(mailboxes).containsOnly(mailbox1, mailbox2);
     }
 
     @Test
     void deleteShouldNotFailWhenMailboxIsAbsent() {
-        testee.delete(CASSANDRA_ID_1).join();
+        testee.delete(CASSANDRA_ID_1).block();
     }
 
     @Test
     void deleteShouldRemoveExistingMailbox() {
-        testee.save(mailbox1).join();
+        testee.save(mailbox1).block();
 
-        testee.delete(CASSANDRA_ID_1).join();
+        testee.delete(CASSANDRA_ID_1).block();
 
-        assertThat(testee.retrieveMailbox(CASSANDRA_ID_1).join())
+        assertThat(testee.retrieveMailbox(CASSANDRA_ID_1).blockOptional())
             .isEmpty();
     }
 
@@ -152,14 +150,14 @@ class CassandraMailboxDAOTest {
 
     @Test
     void updateShouldChangeMailboxPath() {
-        testee.save(mailbox1).join();
+        testee.save(mailbox1).block();
 
         testee.updatePath(CASSANDRA_ID_1, NEW_MAILBOX_PATH).join();
 
         mailbox1.setNamespace(NEW_MAILBOX_PATH.getNamespace());
         mailbox1.setUser(NEW_MAILBOX_PATH.getUser());
         mailbox1.setName(NEW_MAILBOX_PATH.getName());
-        Optional<SimpleMailbox> readMailbox = testee.retrieveMailbox(CASSANDRA_ID_1).join();
+        Optional<SimpleMailbox> readMailbox = testee.retrieveMailbox(CASSANDRA_ID_1).blockOptional();
         assertThat(readMailbox.isPresent()).isTrue();
         assertThat(readMailbox.get()).isEqualToComparingFieldByField(mailbox1);
     }

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java
index 1c3fb37..4ffa3dd 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java
@@ -113,7 +113,7 @@ public class CassandraMailboxMapperTest {
             testee.save(newMailbox))
             .isInstanceOf(TooLongMailboxNameException.class);
 
-        assertThat(mailboxPathV2DAO.retrieveId(MAILBOX_PATH).join())
+        assertThat(mailboxPathV2DAO.retrieveId(MAILBOX_PATH).blockOptional())
             .isPresent();
     }
 
@@ -124,9 +124,9 @@ public class CassandraMailboxMapperTest {
     @Test
     public void deleteShouldDeleteMailboxAndMailboxPathFromV1Table() {
         mailboxDAO.save(MAILBOX)
-            .join();
+            .block();
         mailboxPathDAO.save(MAILBOX_PATH, MAILBOX_ID)
-            .join();
+            .block();
 
         testee.delete(MAILBOX);
 
@@ -137,9 +137,9 @@ public class CassandraMailboxMapperTest {
     @Test
     public void deleteShouldDeleteMailboxAndMailboxPathFromV2Table() {
         mailboxDAO.save(MAILBOX)
-            .join();
+            .block();
         mailboxPathV2DAO.save(MAILBOX_PATH, MAILBOX_ID)
-            .join();
+            .block();
 
         testee.delete(MAILBOX);
 
@@ -150,9 +150,9 @@ public class CassandraMailboxMapperTest {
     @Test
     public void findMailboxByPathShouldReturnMailboxWhenExistsInV1Table() throws Exception {
         mailboxDAO.save(MAILBOX)
-            .join();
+            .block();
         mailboxPathDAO.save(MAILBOX_PATH, MAILBOX_ID)
-            .join();
+            .block();
 
         Mailbox mailbox = testee.findMailboxByPath(MAILBOX_PATH);
 
@@ -162,9 +162,9 @@ public class CassandraMailboxMapperTest {
     @Test
     public void findMailboxByPathShouldReturnMailboxWhenExistsInV2Table() throws Exception {
         mailboxDAO.save(MAILBOX)
-            .join();
+            .block();
         mailboxPathV2DAO.save(MAILBOX_PATH, MAILBOX_ID)
-            .join();
+            .block();
 
         Mailbox mailbox = testee.findMailboxByPath(MAILBOX_PATH);
 
@@ -174,11 +174,11 @@ public class CassandraMailboxMapperTest {
     @Test
     public void findMailboxByPathShouldReturnMailboxWhenExistsInBothTables() throws Exception {
         mailboxDAO.save(MAILBOX)
-            .join();
+            .block();
         mailboxPathDAO.save(MAILBOX_PATH, MAILBOX_ID)
-            .join();
+            .block();
         mailboxPathV2DAO.save(MAILBOX_PATH, MAILBOX_ID)
-            .join();
+            .block();
 
         Mailbox mailbox = testee.findMailboxByPath(MAILBOX_PATH);
 
@@ -188,11 +188,11 @@ public class CassandraMailboxMapperTest {
     @Test
     public void deleteShouldRemoveMailboxWhenInBothTables() {
         mailboxDAO.save(MAILBOX)
-            .join();
+            .block();
         mailboxPathDAO.save(MAILBOX_PATH, MAILBOX_ID)
-            .join();
+            .block();
         mailboxPathV2DAO.save(MAILBOX_PATH, MAILBOX_ID)
-            .join();
+            .block();
 
         testee.delete(MAILBOX);
 
@@ -203,9 +203,9 @@ public class CassandraMailboxMapperTest {
     @Test
     public void deleteShouldRemoveMailboxWhenInV1Tables() {
         mailboxDAO.save(MAILBOX)
-            .join();
+            .block();
         mailboxPathDAO.save(MAILBOX_PATH, MAILBOX_ID)
-            .join();
+            .block();
 
         testee.delete(MAILBOX);
 
@@ -216,9 +216,9 @@ public class CassandraMailboxMapperTest {
     @Test
     public void deleteShouldRemoveMailboxWhenInV2Table() {
         mailboxDAO.save(MAILBOX)
-            .join();
+            .block();
         mailboxPathV2DAO.save(MAILBOX_PATH, MAILBOX_ID)
-            .join();
+            .block();
 
         testee.delete(MAILBOX);
 
@@ -229,7 +229,7 @@ public class CassandraMailboxMapperTest {
     @Test
     public void findMailboxByPathShouldThrowWhenDoesntExistInBothTables() {
         mailboxDAO.save(MAILBOX)
-            .join();
+            .block();
 
         assertThatThrownBy(() -> testee.findMailboxByPath(MAILBOX_PATH))
             .isInstanceOf(MailboxNotFoundException.class);
@@ -238,9 +238,9 @@ public class CassandraMailboxMapperTest {
     @Test
     public void findMailboxWithPathLikeShouldReturnMailboxesWhenExistsInV1Table() {
         mailboxDAO.save(MAILBOX)
-            .join();
+            .block();
         mailboxPathDAO.save(MAILBOX_PATH, MAILBOX_ID)
-            .join();
+            .block();
     
         List<Mailbox> mailboxes = testee.findMailboxWithPathLike(new MailboxPath(MailboxConstants.USER_NAMESPACE, USER, WILDCARD));
 
@@ -250,11 +250,11 @@ public class CassandraMailboxMapperTest {
     @Test
     public void findMailboxWithPathLikeShouldReturnMailboxesWhenExistsInBothTables() {
         mailboxDAO.save(MAILBOX)
-            .join();
+            .block();
         mailboxPathDAO.save(MAILBOX_PATH, MAILBOX_ID)
-            .join();
+            .block();
         mailboxPathV2DAO.save(MAILBOX_PATH, MAILBOX_ID)
-            .join();
+            .block();
 
         List<Mailbox> mailboxes = testee.findMailboxWithPathLike(new MailboxPath(MailboxConstants.USER_NAMESPACE, USER, WILDCARD));
 
@@ -264,9 +264,9 @@ public class CassandraMailboxMapperTest {
     @Test
     public void findMailboxWithPathLikeShouldReturnMailboxesWhenExistsInV2Table() {
         mailboxDAO.save(MAILBOX)
-            .join();
+            .block();
         mailboxPathV2DAO.save(MAILBOX_PATH, MAILBOX_ID)
-            .join();
+            .block();
     
         List<Mailbox> mailboxes = testee.findMailboxWithPathLike(new MailboxPath(MailboxConstants.USER_NAMESPACE, USER, WILDCARD));
 
@@ -276,16 +276,16 @@ public class CassandraMailboxMapperTest {
     @Test
     public void hasChildrenShouldReturnChildWhenExistsInV1Table() {
         mailboxDAO.save(MAILBOX)
-            .join();
+            .block();
         mailboxPathDAO.save(MAILBOX_PATH, MAILBOX_ID)
-            .join();
+            .block();
         CassandraId childMailboxId = CassandraId.timeBased();
         MailboxPath childMailboxPath = MailboxPath.forUser(USER, "name.child");
         Mailbox childMailbox = new SimpleMailbox(childMailboxPath, UID_VALIDITY, childMailboxId);
         mailboxDAO.save(childMailbox)
-            .join();
+            .block();
         mailboxPathDAO.save(childMailboxPath, childMailboxId)
-            .join();
+            .block();
     
         boolean hasChildren = testee.hasChildren(MAILBOX, '.');
 
@@ -295,18 +295,18 @@ public class CassandraMailboxMapperTest {
     @Test
     public void hasChildrenShouldReturnChildWhenExistsInBothTables() {
         mailboxDAO.save(MAILBOX)
-            .join();
+            .block();
         mailboxPathDAO.save(MAILBOX_PATH, MAILBOX_ID)
-            .join();
+            .block();
         mailboxPathV2DAO.save(MAILBOX_PATH, MAILBOX_ID)
-            .join();
+            .block();
         CassandraId childMailboxId = CassandraId.timeBased();
         MailboxPath childMailboxPath = MailboxPath.forUser(USER, "name.child");
         Mailbox childMailbox = new SimpleMailbox(childMailboxPath, UID_VALIDITY, childMailboxId);
         mailboxDAO.save(childMailbox)
-            .join();
+            .block();
         mailboxPathDAO.save(childMailboxPath, childMailboxId)
-            .join();
+            .block();
 
         boolean hasChildren = testee.hasChildren(MAILBOX, '.');
 
@@ -316,16 +316,16 @@ public class CassandraMailboxMapperTest {
     @Test
     public void hasChildrenShouldReturnChildWhenExistsInV2Table() {
         mailboxDAO.save(MAILBOX)
-            .join();
+            .block();
         mailboxPathV2DAO.save(MAILBOX_PATH, MAILBOX_ID)
-            .join();
+            .block();
         CassandraId childMailboxId = CassandraId.timeBased();
         MailboxPath childMailboxPath = MailboxPath.forUser(USER, "name.child");
         Mailbox childMailbox = new SimpleMailbox(childMailboxPath, UID_VALIDITY, childMailboxId);
         mailboxDAO.save(childMailbox)
-            .join();
+            .block();
         mailboxPathV2DAO.save(childMailboxPath, childMailboxId)
-            .join();
+            .block();
     
         boolean hasChildren = testee.hasChildren(MAILBOX, '.');
     
@@ -334,11 +334,11 @@ public class CassandraMailboxMapperTest {
 
     @Test
     public void findMailboxWithPathLikeShouldRemoveDuplicatesAndKeepV2() {
-        mailboxDAO.save(MAILBOX).join();
-        mailboxPathV2DAO.save(MAILBOX_PATH, MAILBOX_ID).join();
+        mailboxDAO.save(MAILBOX).block();
+        mailboxPathV2DAO.save(MAILBOX_PATH, MAILBOX_ID).block();
 
-        mailboxDAO.save(MAILBOX_BIS).join();
-        mailboxPathDAO.save(MAILBOX_PATH, MAILBOX_ID_2).join();
+        mailboxDAO.save(MAILBOX_BIS).block();
+        mailboxPathDAO.save(MAILBOX_PATH, MAILBOX_ID_2).block();
 
         assertThat(testee.findMailboxWithPathLike(
             new MailboxPath(MailboxConstants.USER_NAMESPACE, USER, WILDCARD)))

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAOImplTest.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAOImplTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAOImplTest.java
index 429b8af..dbf64f2 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAOImplTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAOImplTest.java
@@ -35,9 +35,9 @@ class CassandraMailboxPathDAOImplTest extends CassandraMailboxPathDAOTest {
 
     @Test
     void countAllShouldReturnEntryCount() {
-        testee.save(USER_INBOX_MAILBOXPATH, INBOX_ID).join();
-        testee.save(USER_OUTBOX_MAILBOXPATH, OUTBOX_ID).join();
-        testee.save(OTHER_USER_MAILBOXPATH, otherMailboxId).join();
+        testee.save(USER_INBOX_MAILBOXPATH, INBOX_ID).block();
+        testee.save(USER_OUTBOX_MAILBOXPATH, OUTBOX_ID).block();
+        testee.save(OTHER_USER_MAILBOXPATH, otherMailboxId).block();
 
         CassandraMailboxPathDAOImpl daoV1 = (CassandraMailboxPathDAOImpl) testee;
 
@@ -55,9 +55,9 @@ class CassandraMailboxPathDAOImplTest extends CassandraMailboxPathDAOTest {
 
     @Test
     void readAllShouldReturnAllStoredData() {
-        testee.save(USER_INBOX_MAILBOXPATH, INBOX_ID).join();
-        testee.save(USER_OUTBOX_MAILBOXPATH, OUTBOX_ID).join();
-        testee.save(OTHER_USER_MAILBOXPATH, otherMailboxId).join();
+        testee.save(USER_INBOX_MAILBOXPATH, INBOX_ID).block();
+        testee.save(USER_OUTBOX_MAILBOXPATH, OUTBOX_ID).block();
+        testee.save(OTHER_USER_MAILBOXPATH, otherMailboxId).block();
 
         CassandraMailboxPathDAOImpl daoV1 = (CassandraMailboxPathDAOImpl) testee;
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAOTest.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAOTest.java
index 43592ae..acd6245 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAOTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAOTest.java
@@ -34,8 +34,6 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
-import com.github.steveash.guavate.Guavate;
-
 import nl.jqno.equalsverifier.EqualsVerifier;
 
 public abstract class CassandraMailboxPathDAOTest {
@@ -70,43 +68,42 @@ public abstract class CassandraMailboxPathDAOTest {
 
     @Test
     void saveShouldInsertNewEntry() {
-        assertThat(testee.save(USER_INBOX_MAILBOXPATH, INBOX_ID).join()).isTrue();
+        assertThat(testee.save(USER_INBOX_MAILBOXPATH, INBOX_ID).block()).isTrue();
 
-        assertThat(testee.retrieveId(USER_INBOX_MAILBOXPATH).join())
+        assertThat(testee.retrieveId(USER_INBOX_MAILBOXPATH).blockOptional())
             .contains(INBOX_ID_AND_PATH);
     }
 
     @Test
     void saveOnSecondShouldBeFalse() {
-        assertThat(testee.save(USER_INBOX_MAILBOXPATH, INBOX_ID).join()).isTrue();
-        assertThat(testee.save(USER_INBOX_MAILBOXPATH, INBOX_ID).join()).isFalse();
+        assertThat(testee.save(USER_INBOX_MAILBOXPATH, INBOX_ID).block()).isTrue();
+        assertThat(testee.save(USER_INBOX_MAILBOXPATH, INBOX_ID).block()).isFalse();
     }
 
     @Test
     void retrieveIdShouldReturnEmptyWhenEmptyData() {
-        assertThat(testee.retrieveId(USER_INBOX_MAILBOXPATH).join()
-            .isPresent())
-            .isFalse();
+        assertThat(testee.retrieveId(USER_INBOX_MAILBOXPATH).blockOptional())
+            .isEmpty();
     }
 
     @Test
     void retrieveIdShouldReturnStoredData() {
-        testee.save(USER_INBOX_MAILBOXPATH, INBOX_ID).join();
+        testee.save(USER_INBOX_MAILBOXPATH, INBOX_ID).block();
 
-        assertThat(testee.retrieveId(USER_INBOX_MAILBOXPATH).join())
+        assertThat(testee.retrieveId(USER_INBOX_MAILBOXPATH).blockOptional())
             .contains(INBOX_ID_AND_PATH);
     }
 
     @Test
     void getUserMailboxesShouldReturnAllMailboxesOfUser() {
-        testee.save(USER_INBOX_MAILBOXPATH, INBOX_ID).join();
-        testee.save(USER_OUTBOX_MAILBOXPATH, OUTBOX_ID).join();
-        testee.save(OTHER_USER_MAILBOXPATH, otherMailboxId).join();
+        testee.save(USER_INBOX_MAILBOXPATH, INBOX_ID).block();
+        testee.save(USER_OUTBOX_MAILBOXPATH, OUTBOX_ID).block();
+        testee.save(OTHER_USER_MAILBOXPATH, otherMailboxId).block();
 
         List<CassandraIdAndPath> cassandraIds = testee
             .listUserMailboxes(USER_INBOX_MAILBOXPATH.getNamespace(), USER_INBOX_MAILBOXPATH.getUser())
-            .join()
-            .collect(Guavate.toImmutableList());
+            .collectList()
+            .block();
 
         assertThat(cassandraIds)
             .hasSize(2)
@@ -115,16 +112,16 @@ public abstract class CassandraMailboxPathDAOTest {
 
     @Test
     void deleteShouldNotThrowWhenEmpty() {
-        testee.delete(USER_INBOX_MAILBOXPATH).join();
+        testee.delete(USER_INBOX_MAILBOXPATH).block();
     }
 
     @Test
     void deleteShouldDeleteTheExistingMailboxId() {
-        testee.save(USER_INBOX_MAILBOXPATH, INBOX_ID).join();
+        testee.save(USER_INBOX_MAILBOXPATH, INBOX_ID).block();
 
-        testee.delete(USER_INBOX_MAILBOXPATH).join();
+        testee.delete(USER_INBOX_MAILBOXPATH).block();
 
-        assertThat(testee.retrieveId(USER_INBOX_MAILBOXPATH).join())
+        assertThat(testee.retrieveId(USER_INBOX_MAILBOXPATH).blockOptional())
             .isEmpty();
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMapperProvider.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMapperProvider.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMapperProvider.java
index 95df7fb..914d1e9 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMapperProvider.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMapperProvider.java
@@ -21,6 +21,7 @@ package org.apache.james.mailbox.cassandra.mail;
 import java.util.List;
 
 import org.apache.james.backends.cassandra.CassandraCluster;
+import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
 import org.apache.james.mailbox.MailboxSession;
 import org.apache.james.mailbox.MailboxSessionUtil;
 import org.apache.james.mailbox.MessageUid;
@@ -55,7 +56,7 @@ public class CassandraMapperProvider implements MapperProvider {
     public CassandraMapperProvider(CassandraCluster cassandra) {
         this.cassandra = cassandra;
         messageUidProvider = new MessageUidProvider();
-        cassandraModSeqProvider = new CassandraModSeqProvider(this.cassandra.getConf());
+        cassandraModSeqProvider = new CassandraModSeqProvider(this.cassandra.getConf(), CassandraConfiguration.DEFAULT_CONFIGURATION);
         mapperFactory = createMapperFactory();
     }
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java
index da90b78..4eabd41 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java
@@ -26,7 +26,6 @@ import java.nio.charset.StandardCharsets;
 import java.util.Collection;
 import java.util.Date;
 import java.util.List;
-import java.util.concurrent.CompletableFuture;
 import java.util.stream.Stream;
 
 import javax.mail.Flags;
@@ -64,6 +63,7 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.primitives.Bytes;
 
 import nl.jqno.equalsverifier.EqualsVerifier;
+import reactor.core.publisher.Flux;
 
 class CassandraMessageDAOTest {
     private static final int BODY_START = 16;
@@ -186,8 +186,8 @@ class CassandraMessageDAOTest {
             .build();
     }
 
-    private MessageWithoutAttachment toMessage(CompletableFuture<Stream<CassandraMessageDAO.MessageResult>> readOptional) throws InterruptedException, java.util.concurrent.ExecutionException {
-        return readOptional.join()
+    private MessageWithoutAttachment toMessage(Flux<CassandraMessageDAO.MessageResult> read) throws InterruptedException, java.util.concurrent.ExecutionException {
+        return read.toStream()
             .map(CassandraMessageDAO.MessageResult::message)
             .map(Pair::getLeft)
             .findAny()

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAOTest.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAOTest.java
index 26947ab..67b9c90 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAOTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAOTest.java
@@ -57,7 +57,7 @@ class CassandraMessageIdDAOTest {
     @Test
     void deleteShouldNotThrowWhenRowDoesntExist() {
         testee.delete(CassandraId.timeBased(), MessageUid.of(1))
-            .join();
+            .block();
     }
 
     @Test
@@ -72,7 +72,7 @@ class CassandraMessageIdDAOTest {
                 .build())
             .join();
 
-        testee.delete(mailboxId, messageUid).join();
+        testee.delete(mailboxId, messageUid).block();
 
         Optional<ComposedMessageIdWithMetaData> message = testee.retrieve(mailboxId, messageUid).join();
         assertThat(message.isPresent()).isFalse();
@@ -98,7 +98,7 @@ class CassandraMessageIdDAOTest {
                     .build()))
         .join();
 
-        testee.delete(mailboxId, messageUid).join();
+        testee.delete(mailboxId, messageUid).block();
 
         Optional<ComposedMessageIdWithMetaData> message = testee.retrieve(mailboxId, messageUid).join();
         assertThat(message.isPresent()).isFalse();
@@ -142,7 +142,7 @@ class CassandraMessageIdDAOTest {
                 .flags(new Flags())
                 .modSeq(2)
                 .build();
-        testee.updateMetadata(expectedComposedMessageId).join();
+        testee.updateMetadata(expectedComposedMessageId).block();
 
         Optional<ComposedMessageIdWithMetaData> message = testee.retrieve(mailboxId, messageUid).join();
         assertThat(message.get()).isEqualTo(expectedComposedMessageId);
@@ -167,7 +167,7 @@ class CassandraMessageIdDAOTest {
                 .flags(new Flags(Flag.ANSWERED))
                 .modSeq(2)
                 .build();
-        testee.updateMetadata(expectedComposedMessageId).join();
+        testee.updateMetadata(expectedComposedMessageId).block();
 
         Optional<ComposedMessageIdWithMetaData> message = testee.retrieve(mailboxId, messageUid).join();
         assertThat(message.get()).isEqualTo(expectedComposedMessageId);
@@ -192,7 +192,7 @@ class CassandraMessageIdDAOTest {
                 .flags(new Flags(Flag.DELETED))
                 .modSeq(2)
                 .build();
-        testee.updateMetadata(expectedComposedMessageId).join();
+        testee.updateMetadata(expectedComposedMessageId).block();
 
         Optional<ComposedMessageIdWithMetaData> message = testee.retrieve(mailboxId, messageUid).join();
         assertThat(message.get()).isEqualTo(expectedComposedMessageId);
@@ -217,7 +217,7 @@ class CassandraMessageIdDAOTest {
                 .flags(new Flags(Flag.DRAFT))
                 .modSeq(2)
                 .build();
-        testee.updateMetadata(expectedComposedMessageId).join();
+        testee.updateMetadata(expectedComposedMessageId).block();
 
         Optional<ComposedMessageIdWithMetaData> message = testee.retrieve(mailboxId, messageUid).join();
         assertThat(message.get()).isEqualTo(expectedComposedMessageId);
@@ -242,7 +242,7 @@ class CassandraMessageIdDAOTest {
                 .flags(new Flags(Flag.FLAGGED))
                 .modSeq(2)
                 .build();
-        testee.updateMetadata(expectedComposedMessageId).join();
+        testee.updateMetadata(expectedComposedMessageId).block();
 
         Optional<ComposedMessageIdWithMetaData> message = testee.retrieve(mailboxId, messageUid).join();
         assertThat(message.get()).isEqualTo(expectedComposedMessageId);
@@ -267,7 +267,7 @@ class CassandraMessageIdDAOTest {
                 .flags(new Flags(Flag.RECENT))
                 .modSeq(2)
                 .build();
-        testee.updateMetadata(expectedComposedMessageId).join();
+        testee.updateMetadata(expectedComposedMessageId).block();
 
         Optional<ComposedMessageIdWithMetaData> message = testee.retrieve(mailboxId, messageUid).join();
         assertThat(message.get()).isEqualTo(expectedComposedMessageId);
@@ -292,7 +292,7 @@ class CassandraMessageIdDAOTest {
                 .flags(new Flags(Flag.SEEN))
                 .modSeq(2)
                 .build();
-        testee.updateMetadata(expectedComposedMessageId).join();
+        testee.updateMetadata(expectedComposedMessageId).block();
 
         Optional<ComposedMessageIdWithMetaData> message = testee.retrieve(mailboxId, messageUid).join();
         assertThat(message.get()).isEqualTo(expectedComposedMessageId);
@@ -317,7 +317,7 @@ class CassandraMessageIdDAOTest {
                 .flags(new Flags(Flag.USER))
                 .modSeq(2)
                 .build();
-        testee.updateMetadata(expectedComposedMessageId).join();
+        testee.updateMetadata(expectedComposedMessageId).block();
 
         Optional<ComposedMessageIdWithMetaData> message = testee.retrieve(mailboxId, messageUid).join();
         assertThat(message.get()).isEqualTo(expectedComposedMessageId);
@@ -344,7 +344,7 @@ class CassandraMessageIdDAOTest {
                 .flags(flags)
                 .modSeq(2)
                 .build();
-        testee.updateMetadata(expectedComposedMessageId).join();
+        testee.updateMetadata(expectedComposedMessageId).block();
 
         Optional<ComposedMessageIdWithMetaData> message = testee.retrieve(mailboxId, messageUid).join();
         assertThat(message.get()).isEqualTo(expectedComposedMessageId);

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAOTest.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAOTest.java
index a200680..f563e5f 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAOTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAOTest.java
@@ -20,9 +20,9 @@ package org.apache.james.mailbox.cassandra.mail;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
+import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.stream.Stream;
 
 import javax.mail.Flags;
 import javax.mail.Flags.Flag;
@@ -42,12 +42,14 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
 import com.datastax.driver.core.utils.UUIDs;
-import com.github.steveash.guavate.Guavate;
 
 class CassandraMessageIdToImapUidDAOTest {
+    public static final CassandraModule MODULE = CassandraModule.aggregateModules(
+            CassandraSchemaVersionModule.MODULE,
+            CassandraMessageModule.MODULE);
+
     @RegisterExtension
-    static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraModule.aggregateModules(
-        CassandraMessageModule.MODULE, CassandraSchemaVersionModule.MODULE));
+    static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(MODULE);
 
     private CassandraMessageId.Factory messageIdFactory;
 
@@ -62,7 +64,7 @@ class CassandraMessageIdToImapUidDAOTest {
     @Test
     void deleteShouldNotThrowWhenRowDoesntExist() {
         testee.delete(messageIdFactory.of(UUIDs.timeBased()), CassandraId.timeBased())
-            .join();
+            .block();
     }
 
     @Test
@@ -77,10 +79,10 @@ class CassandraMessageIdToImapUidDAOTest {
                     .build())
                 .join();
 
-        testee.delete(messageId, mailboxId).join();
+        testee.delete(messageId, mailboxId).block();
 
-        Stream<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).join();
-        assertThat(messages.collect(Guavate.toImmutableList())).isEmpty();
+        List<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).collectList().block();
+        assertThat(messages).isEmpty();
     }
 
     @Test
@@ -103,15 +105,15 @@ class CassandraMessageIdToImapUidDAOTest {
                     .build()))
         .join();
 
-        testee.delete(messageId, mailboxId).join();
+        testee.delete(messageId, mailboxId).block();
 
         ComposedMessageIdWithMetaData expectedComposedMessageId = ComposedMessageIdWithMetaData.builder()
                 .composedMessageId(new ComposedMessageId(mailboxId2, messageId, messageUid2))
                 .flags(new Flags())
                 .modSeq(1)
                 .build();
-        Stream<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.empty()).join();
-        assertThat(messages.collect(Guavate.toImmutableList())).containsOnly(expectedComposedMessageId);
+        List<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.empty()).collectList().block();
+        assertThat(messages).containsOnly(expectedComposedMessageId);
     }
 
     @Test
@@ -132,8 +134,8 @@ class CassandraMessageIdToImapUidDAOTest {
                 .flags(new Flags())
                 .modSeq(1)
                 .build();
-        Stream<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).join();
-        assertThat(messages.collect(Guavate.toImmutableList())).containsOnly(expectedComposedMessageId);
+        List<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).collectList().block();
+        assertThat(messages).containsOnly(expectedComposedMessageId);
     }
 
     @Test
@@ -149,7 +151,7 @@ class CassandraMessageIdToImapUidDAOTest {
                 .build();
         testee.insert(composedMessageIdWithFlags).join();
 
-        Boolean result = testee.updateMetadata(composedMessageIdWithFlags, 1).join();
+        Boolean result = testee.updateMetadata(composedMessageIdWithFlags, 1).block();
 
         assertThat(result).isTrue();
     }
@@ -167,7 +169,7 @@ class CassandraMessageIdToImapUidDAOTest {
                 .build();
         testee.insert(composedMessageIdWithFlags).join();
 
-        Boolean result = testee.updateMetadata(composedMessageIdWithFlags, 3).join();
+        Boolean result = testee.updateMetadata(composedMessageIdWithFlags, 3).block();
 
         assertThat(result).isFalse();
     }
@@ -191,10 +193,10 @@ class CassandraMessageIdToImapUidDAOTest {
                 .flags(new Flags())
                 .modSeq(2)
                 .build();
-        testee.updateMetadata(expectedComposedMessageId, 1).join();
+        testee.updateMetadata(expectedComposedMessageId, 1).block();
 
-        Stream<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).join();
-        assertThat(messages.collect(Guavate.toImmutableList())).containsOnly(expectedComposedMessageId);
+        List<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).collectList().block();
+        assertThat(messages).containsOnly(expectedComposedMessageId);
     }
 
     @Test
@@ -216,10 +218,10 @@ class CassandraMessageIdToImapUidDAOTest {
                 .flags(new Flags(Flag.ANSWERED))
                 .modSeq(2)
                 .build();
-        testee.updateMetadata(expectedComposedMessageId, 1).join();
+        testee.updateMetadata(expectedComposedMessageId, 1).block();
 
-        Stream<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).join();
-        assertThat(messages.collect(Guavate.toImmutableList())).containsOnly(expectedComposedMessageId);
+        List<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).collectList().block();
+        assertThat(messages).containsOnly(expectedComposedMessageId);
     }
 
     @Test
@@ -241,10 +243,10 @@ class CassandraMessageIdToImapUidDAOTest {
                 .flags(new Flags(Flag.DELETED))
                 .modSeq(2)
                 .build();
-        testee.updateMetadata(expectedComposedMessageId, 1).join();
+        testee.updateMetadata(expectedComposedMessageId, 1).block();
 
-        Stream<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).join();
-        assertThat(messages.collect(Guavate.toImmutableList())).containsOnly(expectedComposedMessageId);
+        List<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).collectList().block();
+        assertThat(messages).containsOnly(expectedComposedMessageId);
     }
 
     @Test
@@ -266,10 +268,10 @@ class CassandraMessageIdToImapUidDAOTest {
                 .flags(new Flags(Flag.DRAFT))
                 .modSeq(2)
                 .build();
-        testee.updateMetadata(expectedComposedMessageId, 1).join();
+        testee.updateMetadata(expectedComposedMessageId, 1).block();
 
-        Stream<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).join();
-        assertThat(messages.collect(Guavate.toImmutableList())).containsOnly(expectedComposedMessageId);
+        List<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).collectList().block();
+        assertThat(messages).containsOnly(expectedComposedMessageId);
     }
 
     @Test
@@ -291,10 +293,10 @@ class CassandraMessageIdToImapUidDAOTest {
                 .flags(new Flags(Flag.FLAGGED))
                 .modSeq(2)
                 .build();
-        testee.updateMetadata(expectedComposedMessageId, 1).join();
+        testee.updateMetadata(expectedComposedMessageId, 1).block();
 
-        Stream<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).join();
-        assertThat(messages.collect(Guavate.toImmutableList())).containsOnly(expectedComposedMessageId);
+        List<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).collectList().block();
+        assertThat(messages).containsOnly(expectedComposedMessageId);
     }
 
     @Test
@@ -316,10 +318,10 @@ class CassandraMessageIdToImapUidDAOTest {
                 .flags(new Flags(Flag.RECENT))
                 .modSeq(2)
                 .build();
-        testee.updateMetadata(expectedComposedMessageId, 1).join();
+        testee.updateMetadata(expectedComposedMessageId, 1).block();
 
-        Stream<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).join();
-        assertThat(messages.collect(Guavate.toImmutableList())).containsOnly(expectedComposedMessageId);
+        List<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).collectList().block();
+        assertThat(messages).containsOnly(expectedComposedMessageId);
     }
 
     @Test
@@ -341,10 +343,11 @@ class CassandraMessageIdToImapUidDAOTest {
                 .flags(new Flags(Flag.SEEN))
                 .modSeq(2)
                 .build();
-        testee.updateMetadata(expectedComposedMessageId, 1).join();
+        assertThat(testee.updateMetadata(expectedComposedMessageId, 1).block())
+            .isTrue();
 
-        Stream<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).join();
-        assertThat(messages.collect(Guavate.toImmutableList())).containsOnly(expectedComposedMessageId);
+        List<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).collectList().block();
+        assertThat(messages).containsOnly(expectedComposedMessageId);
     }
 
     @Test
@@ -366,10 +369,10 @@ class CassandraMessageIdToImapUidDAOTest {
                 .flags(new Flags(Flag.USER))
                 .modSeq(2)
                 .build();
-        testee.updateMetadata(expectedComposedMessageId, 1).join();
+        testee.updateMetadata(expectedComposedMessageId, 1).block();
 
-        Stream<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).join();
-        assertThat(messages.collect(Guavate.toImmutableList())).containsOnly(expectedComposedMessageId);
+        List<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).collectList().block();
+        assertThat(messages).containsOnly(expectedComposedMessageId);
     }
 
     @Test
@@ -393,10 +396,10 @@ class CassandraMessageIdToImapUidDAOTest {
                 .flags(flags)
                 .modSeq(2)
                 .build();
-        testee.updateMetadata(expectedComposedMessageId, 1).join();
+        testee.updateMetadata(expectedComposedMessageId, 1).block();
 
-        Stream<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).join();
-        assertThat(messages.collect(Guavate.toImmutableList())).containsOnly(expectedComposedMessageId);
+        List<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).collectList().block();
+        assertThat(messages).containsOnly(expectedComposedMessageId);
     }
 
     @Test
@@ -416,9 +419,9 @@ class CassandraMessageIdToImapUidDAOTest {
                 .flags(new Flags())
                 .modSeq(1)
                 .build();
-        Stream<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).join();
+        List<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).collectList().block();
 
-        assertThat(messages.collect(Guavate.toImmutableList())).containsOnly(expectedComposedMessageId);
+        assertThat(messages).containsOnly(expectedComposedMessageId);
     }
 
     @Test
@@ -451,8 +454,8 @@ class CassandraMessageIdToImapUidDAOTest {
                 .flags(new Flags())
                 .modSeq(1)
                 .build();
-        Stream<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.empty()).join();
+        List<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.empty()).collectList().block();
 
-        assertThat(messages.collect(Guavate.toImmutableList())).containsOnly(expectedComposedMessageId, expectedComposedMessageId2);
+        assertThat(messages).containsOnly(expectedComposedMessageId, expectedComposedMessageId2);
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProviderTest.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProviderTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProviderTest.java
index a0b691d..a8a2722 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProviderTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProviderTest.java
@@ -20,14 +20,19 @@ package org.apache.james.mailbox.cassandra.mail;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
+import java.time.Duration;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutionException;
 import java.util.stream.LongStream;
 
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.CassandraClusterExtension;
+import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
 import org.apache.james.mailbox.cassandra.ids.CassandraId;
 import org.apache.james.mailbox.cassandra.modules.CassandraModSeqModule;
 import org.apache.james.mailbox.model.MailboxPath;
 import org.apache.james.mailbox.store.mail.model.impl.SimpleMailbox;
+import org.apache.james.util.concurrency.ConcurrentTestRunner;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
@@ -46,7 +51,7 @@ class CassandraModSeqProviderTest {
 
     @BeforeEach
     void setUp(CassandraCluster cassandra) {
-        modSeqProvider = new CassandraModSeqProvider(cassandra.getConf());
+        modSeqProvider = new CassandraModSeqProvider(cassandra.getConf(), CassandraConfiguration.DEFAULT_CONFIGURATION);
         MailboxPath path = new MailboxPath("gsoc", "ieugen", "Trash");
         mailbox = new SimpleMailbox(path, 1234);
         mailbox.setMailboxId(CASSANDRA_ID);
@@ -77,13 +82,17 @@ class CassandraModSeqProviderTest {
     }
 
     @Test
-    void nextModSeqShouldGenerateUniqueValuesWhenParallelCalls() {
-        int nbEntries = 100;
-        long nbValues = LongStream.range(0, nbEntries)
-            .parallel()
-            .map(Throwing.longUnaryOperator(x -> modSeqProvider.nextModSeq(null, mailbox)))
-            .distinct()
-            .count();
-        assertThat(nbValues).isEqualTo(nbEntries);
+    void nextModSeqShouldGenerateUniqueValuesWhenParallelCalls() throws ExecutionException, InterruptedException {
+        int nbEntries = 10;
+
+        ConcurrentSkipListSet<Long> modSeqs = new ConcurrentSkipListSet<>();
+        ConcurrentTestRunner.builder()
+            .operation(
+                (threadNumber, step) -> modSeqs.add(modSeqProvider.nextModSeq(null, mailbox)))
+            .threadCount(10)
+            .operationCount(nbEntries)
+            .runSuccessfullyWithin(Duration.ofMinutes(1));
+
+        assertThat(modSeqs).hasSize(100);
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProviderTest.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProviderTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProviderTest.java
index 619ced7..5a3fe69 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProviderTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProviderTest.java
@@ -20,16 +20,21 @@ package org.apache.james.mailbox.cassandra.mail;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
+import java.time.Duration;
 import java.util.Optional;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutionException;
 import java.util.stream.LongStream;
 
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.CassandraClusterExtension;
+import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
 import org.apache.james.mailbox.MessageUid;
 import org.apache.james.mailbox.cassandra.ids.CassandraId;
 import org.apache.james.mailbox.cassandra.modules.CassandraUidModule;
 import org.apache.james.mailbox.model.MailboxPath;
 import org.apache.james.mailbox.store.mail.model.impl.SimpleMailbox;
+import org.apache.james.util.concurrency.ConcurrentTestRunner;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
@@ -47,7 +52,7 @@ class CassandraUidProviderTest {
 
     @BeforeEach
     void setUp(CassandraCluster cassandra) {
-        uidProvider = new CassandraUidProvider(cassandra.getConf());
+        uidProvider = new CassandraUidProvider(cassandra.getConf(), CassandraConfiguration.DEFAULT_CONFIGURATION);
         MailboxPath path = new MailboxPath("gsoc", "ieugen", "Trash");
         mailbox = new SimpleMailbox(path, 1234);
         mailbox.setMailboxId(CASSANDRA_ID);
@@ -77,14 +82,17 @@ class CassandraUidProviderTest {
     }
 
     @Test
-    void nextUidShouldGenerateUniqueValuesWhenParallelCalls() {
+    void nextUidShouldGenerateUniqueValuesWhenParallelCalls() throws ExecutionException, InterruptedException {
+        int threadCount = 10;
         int nbEntries = 100;
-        long nbValues = LongStream.range(0, nbEntries)
-            .parallel()
-            .boxed()
-            .map(Throwing.function(x -> uidProvider.nextUid(null, mailbox)))
-            .distinct()
-            .count();
-        assertThat(nbValues).isEqualTo(nbEntries);
+
+        ConcurrentSkipListSet<MessageUid> messageUids = new ConcurrentSkipListSet<>();
+        ConcurrentTestRunner.builder()
+                .operation((threadNumber, step) -> messageUids.add(uidProvider.nextUid(null, mailbox)))
+            .threadCount(threadCount)
+            .operationCount(nbEntries / threadCount)
+            .runSuccessfullyWithin(Duration.ofMinutes(1));
+
+        assertThat(messageUids).hasSize(nbEntries);
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraUserMailboxRightsDAOTest.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraUserMailboxRightsDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraUserMailboxRightsDAOTest.java
index 9730bda..1596e1c 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraUserMailboxRightsDAOTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraUserMailboxRightsDAOTest.java
@@ -57,7 +57,7 @@ class CassandraUserMailboxRightsDAOTest {
         testee.update(MAILBOX_ID, ACLDiff.computeDiff(
             MailboxACL.EMPTY,
             new MailboxACL(new Entry(ENTRY_KEY, RIGHTS))))
-            .join();
+            .block();
 
         assertThat(testee.retrieve(USER_NAME, MAILBOX_ID).join())
             .contains(RIGHTS);
@@ -68,12 +68,12 @@ class CassandraUserMailboxRightsDAOTest {
         testee.update(MAILBOX_ID, ACLDiff.computeDiff(
             MailboxACL.EMPTY,
             new MailboxACL(new Entry(ENTRY_KEY, RIGHTS))))
-            .join();
+            .block();
 
         testee.update(MAILBOX_ID, ACLDiff.computeDiff(
             new MailboxACL(new Entry(ENTRY_KEY, RIGHTS)),
             new MailboxACL(new Entry(ENTRY_KEY, OTHER_RIGHTS))))
-            .join();
+            .block();
 
         assertThat(testee.retrieve(USER_NAME, MAILBOX_ID).join())
             .contains(OTHER_RIGHTS);
@@ -81,7 +81,7 @@ class CassandraUserMailboxRightsDAOTest {
 
     @Test
     void listRightsForUserShouldReturnEmptyWhenEmptyData() {
-        assertThat(testee.listRightsForUser(USER_NAME).join())
+        assertThat(testee.listRightsForUser(USER_NAME).collectList().block())
             .isEmpty();
     }
 
@@ -90,13 +90,13 @@ class CassandraUserMailboxRightsDAOTest {
         testee.update(MAILBOX_ID, ACLDiff.computeDiff(
             MailboxACL.EMPTY,
             new MailboxACL(new Entry(ENTRY_KEY, RIGHTS))))
-            .join();
+            .block();
 
 
         testee.update(MAILBOX_ID, ACLDiff.computeDiff(
             new MailboxACL(new Entry(ENTRY_KEY, RIGHTS)),
             MailboxACL.EMPTY))
-            .join();
+            .block();
 
         assertThat(testee.retrieve(USER_NAME, MAILBOX_ID).join())
             .isEmpty();

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java
index a298106..85616b1 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java
@@ -46,6 +46,8 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
+import reactor.core.publisher.Mono;
+
 class AttachmentV2MigrationTest {
     private static final AttachmentId ATTACHMENT_ID = AttachmentId.from("id1");
     private static final AttachmentId ATTACHMENT_ID_2 = AttachmentId.from("id2");
@@ -107,9 +109,9 @@ class AttachmentV2MigrationTest {
 
         migration.run();
 
-        assertThat(attachmentDAOV2.getAttachment(ATTACHMENT_ID).join())
+        assertThat(attachmentDAOV2.getAttachment(ATTACHMENT_ID).blockOptional())
             .contains(CassandraAttachmentDAOV2.from(attachment1, BLOB_ID_FACTORY.forPayload(attachment1.getBytes())));
-        assertThat(attachmentDAOV2.getAttachment(ATTACHMENT_ID_2).join())
+        assertThat(attachmentDAOV2.getAttachment(ATTACHMENT_ID_2).blockOptional())
             .contains(CassandraAttachmentDAOV2.from(attachment2, BLOB_ID_FACTORY.forPayload(attachment2.getBytes())));
         assertThat(blobsDAO.readBytes(BLOB_ID_FACTORY.forPayload(attachment1.getBytes())).join())
             .isEqualTo(attachment1.getBytes());
@@ -124,9 +126,9 @@ class AttachmentV2MigrationTest {
 
         migration.run();
 
-        assertThat(attachmentDAO.getAttachment(ATTACHMENT_ID).join())
+        assertThat(attachmentDAO.getAttachment(ATTACHMENT_ID).blockOptional())
             .isEmpty();
-        assertThat(attachmentDAO.getAttachment(ATTACHMENT_ID_2).join())
+        assertThat(attachmentDAO.getAttachment(ATTACHMENT_ID_2).blockOptional())
             .isEmpty();
     }
 
@@ -190,7 +192,7 @@ class AttachmentV2MigrationTest {
             .thenReturn(CompletableFuture.completedFuture(BLOB_ID_FACTORY.forPayload(attachment1.getBytes())));
         when(blobsDAO.save(attachment2.getBytes()))
             .thenReturn(CompletableFuture.completedFuture(BLOB_ID_FACTORY.forPayload(attachment2.getBytes())));
-        when(attachmentDAOV2.storeAttachment(any())).thenReturn(CompletableFuture.completedFuture(null));
+        when(attachmentDAOV2.storeAttachment(any())).thenReturn(Mono.empty());
         when(attachmentDAO.deleteAttachment(any())).thenThrow(new RuntimeException());
 
         assertThat(migration.run()).isEqualTo(Migration.Result.PARTIAL);
@@ -210,7 +212,7 @@ class AttachmentV2MigrationTest {
             .thenReturn(CompletableFuture.completedFuture(BLOB_ID_FACTORY.forPayload(attachment1.getBytes())));
         when(blobsDAO.save(attachment2.getBytes()))
             .thenThrow(new RuntimeException());
-        when(attachmentDAOV2.storeAttachment(any())).thenReturn(CompletableFuture.completedFuture(null));
+        when(attachmentDAOV2.storeAttachment(any())).thenReturn(Mono.empty());
         when(attachmentDAO.deleteAttachment(any())).thenReturn(CompletableFuture.completedFuture(null));
 
         assertThat(migration.run()).isEqualTo(Migration.Result.PARTIAL);

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2MigrationTest.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2MigrationTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2MigrationTest.java
index cf19f73..f17ab80 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2MigrationTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2MigrationTest.java
@@ -91,7 +91,7 @@ class MailboxPathV2MigrationTest {
     void newValuesShouldBeSavedInMostRecentDAO() throws Exception {
         mailboxMapper.save(MAILBOX_1);
 
-        assertThat(daoV2.retrieveId(MAILBOX_PATH_1).join())
+        assertThat(daoV2.retrieveId(MAILBOX_PATH_1).blockOptional())
             .contains(new CassandraIdAndPath(MAILBOX_ID_1, MAILBOX_PATH_1));
     }
 
@@ -99,33 +99,33 @@ class MailboxPathV2MigrationTest {
     void newValuesShouldNotBeSavedInOldDAO() throws Exception {
         mailboxMapper.save(MAILBOX_1);
 
-        assertThat(daoV1.retrieveId(MAILBOX_PATH_1).join())
+        assertThat(daoV1.retrieveId(MAILBOX_PATH_1).blockOptional())
             .isEmpty();
     }
 
     @Test
     void readingOldValuesShouldMigrateThem() throws Exception {
-        daoV1.save(MAILBOX_PATH_1, MAILBOX_ID_1).join();
-        mailboxDAO.save(MAILBOX_1).join();
+        daoV1.save(MAILBOX_PATH_1, MAILBOX_ID_1).block();
+        mailboxDAO.save(MAILBOX_1).block();
 
         mailboxMapper.findMailboxByPath(MAILBOX_PATH_1);
 
         SoftAssertions softly = new SoftAssertions();
-        softly.assertThat(daoV1.retrieveId(MAILBOX_PATH_1).join()).isEmpty();
-        softly.assertThat(daoV2.retrieveId(MAILBOX_PATH_1).join())
+        softly.assertThat(daoV1.retrieveId(MAILBOX_PATH_1).blockOptional()).isEmpty();
+        softly.assertThat(daoV2.retrieveId(MAILBOX_PATH_1).blockOptional())
             .contains(new CassandraIdAndPath(MAILBOX_ID_1, MAILBOX_PATH_1));
         softly.assertAll();
     }
 
     @Test
     void migrationTaskShouldMoveDataToMostRecentDao() {
-        daoV1.save(MAILBOX_PATH_1, MAILBOX_ID_1).join();
+        daoV1.save(MAILBOX_PATH_1, MAILBOX_ID_1).block();
 
         new MailboxPathV2Migration(daoV1, daoV2).run();
 
         SoftAssertions softly = new SoftAssertions();
-        softly.assertThat(daoV1.retrieveId(MAILBOX_PATH_1).join()).isEmpty();
-        softly.assertThat(daoV2.retrieveId(MAILBOX_PATH_1).join())
+        softly.assertThat(daoV1.retrieveId(MAILBOX_PATH_1).blockOptional()).isEmpty();
+        softly.assertThat(daoV2.retrieveId(MAILBOX_PATH_1).blockOptional())
             .contains(new CassandraIdAndPath(MAILBOX_ID_1, MAILBOX_PATH_1));
         softly.assertAll();
     }

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractMailboxManagerAttachmentTest.java
----------------------------------------------------------------------
diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractMailboxManagerAttachmentTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractMailboxManagerAttachmentTest.java
index 637cc69..9c43818 100644
--- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractMailboxManagerAttachmentTest.java
+++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractMailboxManagerAttachmentTest.java
@@ -26,11 +26,16 @@ import java.io.InputStream;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Optional;
+import java.util.stream.Stream;
 
+import com.github.fge.lambdas.Throwing;
+import com.google.common.collect.ImmutableList;
+import org.apache.commons.io.IOUtils;
 import org.apache.james.mailbox.MailboxManager;
 import org.apache.james.mailbox.MailboxSession;
 import org.apache.james.mailbox.MailboxSessionUtil;
 import org.apache.james.mailbox.MessageManager;
+import org.apache.james.mailbox.model.Attachment;
 import org.apache.james.mailbox.model.MailboxPath;
 import org.apache.james.mailbox.model.MessageAttachment;
 import org.apache.james.mailbox.model.MessageRange;
@@ -149,10 +154,20 @@ public abstract class AbstractMailboxManagerAttachmentTest {
         assertThat(messages.hasNext()).isTrue();
         List<MessageAttachment> attachments = messages.next().getAttachments();
         assertThat(attachments).hasSize(2);
-        assertThat(attachmentMapper.getAttachment(attachments.get(0).getAttachmentId()).getStream())
-            .hasSameContentAs(ClassLoader.getSystemResourceAsStream("eml/4037_014.jpg"));
-        assertThat(attachmentMapper.getAttachment(attachments.get(1).getAttachmentId()).getStream())
-            .hasSameContentAs(ClassLoader.getSystemResourceAsStream("eml/4037_015.jpg"));
+        ImmutableList<byte[]> attachmentContents = attachments
+            .stream()
+            .map(MessageAttachment::getAttachmentId)
+            .map(Throwing.function(attachmentMapper::getAttachment))
+            .map(Attachment::getBytes)
+            .collect(ImmutableList.toImmutableList());
+
+        ImmutableList<byte[]> files = Stream.of("eml/4037_014.jpg", "eml/4037_015.jpg")
+                .map(ClassLoader::getSystemResourceAsStream)
+                .map(Throwing.function(IOUtils::toByteArray))
+                .collect(ImmutableList.toImmutableList());
+
+        assertThat(attachmentContents)
+            .containsExactlyInAnyOrder(files.get(0), files.get(1));
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/container/util/pom.xml
----------------------------------------------------------------------
diff --git a/server/container/util/pom.xml b/server/container/util/pom.xml
index fa76097..85c3a6e 100644
--- a/server/container/util/pom.xml
+++ b/server/container/util/pom.xml
@@ -116,6 +116,12 @@
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
         </dependency>
+        <dependency>
+            <groupId>backport-util-concurrent</groupId>
+            <artifactId>backport-util-concurrent</artifactId>
+            <version>3.1</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/data/data-cassandra/src/main/java/org/apache/james/domainlist/cassandra/CassandraDomainList.java
----------------------------------------------------------------------
diff --git a/server/data/data-cassandra/src/main/java/org/apache/james/domainlist/cassandra/CassandraDomainList.java b/server/data/data-cassandra/src/main/java/org/apache/james/domainlist/cassandra/CassandraDomainList.java
index bd1d177..830b01c 100644
--- a/server/data/data-cassandra/src/main/java/org/apache/james/domainlist/cassandra/CassandraDomainList.java
+++ b/server/data/data-cassandra/src/main/java/org/apache/james/domainlist/cassandra/CassandraDomainList.java
@@ -112,7 +112,7 @@ public class CassandraDomainList extends AbstractDomainList {
     public void addDomain(Domain domain) throws DomainListException {
         boolean executed = executor.executeReturnApplied(insertStatement.bind()
             .setString(DOMAIN, domain.asString()))
-            .join();
+            .block();
         if (!executed) {
             throw new DomainListException(domain.name() + " already exists.");
         }
@@ -122,7 +122,7 @@ public class CassandraDomainList extends AbstractDomainList {
     public void removeDomain(Domain domain) throws DomainListException {
         boolean executed = executor.executeReturnApplied(removeStatement.bind()
             .setString(DOMAIN, domain.asString()))
-            .join();
+            .block();
         if (!executed) {
             throw new DomainListException(domain.name() + " was not found");
         }

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraActiveScriptDAO.java
----------------------------------------------------------------------
diff --git a/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraActiveScriptDAO.java b/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraActiveScriptDAO.java
index 47c598d..10049e1 100644
--- a/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraActiveScriptDAO.java
+++ b/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraActiveScriptDAO.java
@@ -32,8 +32,6 @@ import static org.apache.james.sieve.cassandra.tables.CassandraSieveActiveTable.
 import java.time.ZoneOffset;
 import java.time.ZonedDateTime;
 import java.util.Date;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
 
 import javax.inject.Inject;
 
@@ -44,6 +42,7 @@ import org.apache.james.sieverepository.api.ScriptName;
 
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Session;
+import reactor.core.publisher.Mono;
 
 public class CassandraActiveScriptDAO {
     private final CassandraAsyncExecutor cassandraAsyncExecutor;
@@ -66,23 +65,23 @@ public class CassandraActiveScriptDAO {
             .where(eq(USER_NAME, bindMarker(USER_NAME))));
     }
 
-    public CompletableFuture<Optional<ActiveScriptInfo>> getActiveSctiptInfo(User user) {
-        return cassandraAsyncExecutor.executeSingleRow(
+    public Mono<ActiveScriptInfo> getActiveSctiptInfo(User user) {
+        return cassandraAsyncExecutor.executeSingleRowReactor(
             selectActiveName.bind()
                 .setString(USER_NAME, user.asString()))
-            .thenApply(rowOptional -> rowOptional.map(row -> new ActiveScriptInfo(
+            .map(row -> new ActiveScriptInfo(
                 new ScriptName(row.getString(SCRIPT_NAME)),
-                ZonedDateTime.ofInstant(row.getTimestamp(DATE).toInstant(), ZoneOffset.UTC))));
+                ZonedDateTime.ofInstant(row.getTimestamp(DATE).toInstant(), ZoneOffset.UTC)));
     }
 
-    public CompletableFuture<Void> unactivate(User user) {
-        return cassandraAsyncExecutor.executeVoid(
+    public Mono<Void> unactivate(User user) {
+        return cassandraAsyncExecutor.executeVoidReactor(
             deleteActive.bind()
                 .setString(USER_NAME, user.asString()));
     }
 
-    public CompletableFuture<Void> activate(User user, ScriptName scriptName) {
-        return cassandraAsyncExecutor.executeVoid(
+    public Mono<Void> activate(User user, ScriptName scriptName) {
+        return cassandraAsyncExecutor.executeVoidReactor(
             insertActive.bind()
                 .setString(USER_NAME, user.asString())
                 .setString(SCRIPT_NAME, scriptName.getValue())

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraSieveDAO.java
----------------------------------------------------------------------
diff --git a/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraSieveDAO.java b/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraSieveDAO.java
index 429ca4b..d3cf741 100644
--- a/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraSieveDAO.java
+++ b/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraSieveDAO.java
@@ -34,7 +34,6 @@ import static org.apache.james.sieve.cassandra.tables.CassandraSieveTable.TABLE_
 import static org.apache.james.sieve.cassandra.tables.CassandraSieveTable.USER_NAME;
 
 import java.util.List;
-import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
 import javax.inject.Inject;
@@ -50,6 +49,7 @@ import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.querybuilder.Select;
 import com.github.steveash.guavate.Guavate;
+import reactor.core.publisher.Mono;
 
 public class CassandraSieveDAO {
 
@@ -98,8 +98,8 @@ public class CassandraSieveDAO {
             .where(eq(USER_NAME, bindMarker(USER_NAME)));
     }
 
-    public CompletableFuture<Void> insertScript(User user, Script script) {
-        return cassandraAsyncExecutor.executeVoid(
+    public Mono<Void> insertScript(User user, Script script) {
+        return cassandraAsyncExecutor.executeVoidReactor(
             insertScriptStatement.bind()
                 .setString(USER_NAME, user.asString())
                 .setString(SCRIPT_NAME, script.getName().getValue())
@@ -120,7 +120,7 @@ public class CassandraSieveDAO {
                 .collect(Guavate.toImmutableList()));
     }
 
-    public CompletableFuture<Boolean> updateScriptActivation(User user, ScriptName scriptName, boolean active) {
+    public Mono<Boolean> updateScriptActivation(User user, ScriptName scriptName, boolean active) {
         return cassandraAsyncExecutor.executeReturnApplied(
             updateScriptActivationStatement.bind()
                 .setString(USER_NAME, user.asString())
@@ -128,24 +128,24 @@ public class CassandraSieveDAO {
                 .setBool(IS_ACTIVE, active));
     }
 
-    public CompletableFuture<Optional<Script>> getScript(User user, ScriptName name) {
-        return getScriptRow(user, name).thenApply(opt -> opt.map(row -> Script.builder()
+    public Mono<Script> getScript(User user, ScriptName name) {
+        return getScriptRow(user, name).map(row -> Script.builder()
                 .content(row.getString(SCRIPT_CONTENT))
                 .isActive(row.getBool(IS_ACTIVE))
                 .name(name)
                 .size(row.getLong(SIZE))
-                .build()));
+                .build());
     }
 
-    public CompletableFuture<Boolean> deleteScriptInCassandra(User user, ScriptName name) {
+    public Mono<Boolean> deleteScriptInCassandra(User user, ScriptName name) {
         return cassandraAsyncExecutor.executeReturnApplied(
             deleteScriptStatement.bind()
                 .setString(USER_NAME, user.asString())
                 .setString(SCRIPT_NAME, name.getValue()));
     }
 
-    private CompletableFuture<Optional<Row>> getScriptRow(User user, ScriptName name) {
-        return cassandraAsyncExecutor.executeSingleRow(
+    private Mono<Row> getScriptRow(User user, ScriptName name) {
+        return cassandraAsyncExecutor.executeSingleRowReactor(
             selectScriptStatement.bind()
                 .setString(USER_NAME, user.asString())
                 .setString(SCRIPT_NAME, name.getValue()));

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraSieveQuotaDAO.java
----------------------------------------------------------------------
diff --git a/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraSieveQuotaDAO.java b/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraSieveQuotaDAO.java
index b20ea30..1e99f1a 100644
--- a/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraSieveQuotaDAO.java
+++ b/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraSieveQuotaDAO.java
@@ -41,6 +41,7 @@ import org.apache.james.sieve.cassandra.tables.CassandraSieveSpaceTable;
 
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Session;
+import reactor.core.publisher.Mono;
 
 public class CassandraSieveQuotaDAO {
 
@@ -107,8 +108,8 @@ public class CassandraSieveQuotaDAO {
                 .orElse(0L));
     }
 
-    public CompletableFuture<Void> updateSpaceUsed(User user, long spaceUsed) {
-        return cassandraAsyncExecutor.executeVoid(
+    public Mono<Void> updateSpaceUsed(User user, long spaceUsed) {
+        return cassandraAsyncExecutor.executeVoidReactor(
             updateSpaceUsedStatement.bind()
                 .setLong(CassandraSieveSpaceTable.SPACE_USED, spaceUsed)
                 .setString(CassandraSieveSpaceTable.USER_NAME, user.asString()));

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraSieveRepository.java
----------------------------------------------------------------------
diff --git a/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraSieveRepository.java b/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraSieveRepository.java
index fa52b4a..7f4b559 100644
--- a/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraSieveRepository.java
+++ b/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraSieveRepository.java
@@ -25,6 +25,7 @@ import java.time.ZonedDateTime;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
 
 import javax.inject.Inject;
 
@@ -43,7 +44,11 @@ import org.apache.james.sieverepository.api.exception.IsActiveException;
 import org.apache.james.sieverepository.api.exception.QuotaExceededException;
 import org.apache.james.sieverepository.api.exception.QuotaNotFoundException;
 import org.apache.james.sieverepository.api.exception.ScriptNotFoundException;
-import org.apache.james.util.CompletableFutureUtil;
+
+import org.apache.james.util.FunctionalUtils;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
 public class CassandraSieveRepository implements SieveRepository {
 
@@ -60,7 +65,8 @@ public class CassandraSieveRepository implements SieveRepository {
 
     @Override
     public ZonedDateTime getActivationDateForActiveScript(User user) throws ScriptNotFoundException {
-        return cassandraActiveScriptDAO.getActiveSctiptInfo(user).join()
+        return cassandraActiveScriptDAO.getActiveSctiptInfo(user)
+            .blockOptional()
             .orElseThrow(ScriptNotFoundException::new)
             .getActivationDate();
     }
@@ -70,19 +76,20 @@ public class CassandraSieveRepository implements SieveRepository {
         throwOnOverQuota(user, spaceThatWillBeUsedByNewScript(user, name, newSize));
     }
 
-    private void throwOnOverQuota(User user, CompletableFuture<Long> sizeDifference) throws QuotaExceededException {
+    private void throwOnOverQuota(User user, Mono<Long> sizeDifference) throws QuotaExceededException {
         CompletableFuture<Optional<QuotaSize>> userQuotaFuture = cassandraSieveQuotaDAO.getQuota(user);
         CompletableFuture<Optional<QuotaSize>> globalQuotaFuture = cassandraSieveQuotaDAO.getQuota();
         CompletableFuture<Long> spaceUsedFuture = cassandraSieveQuotaDAO.spaceUsedBy(user);
 
         new SieveQuota(spaceUsedFuture.join(), limitToUse(userQuotaFuture, globalQuotaFuture))
-            .checkOverQuotaUponModification(sizeDifference.join());
+            .checkOverQuotaUponModification(sizeDifference.block());
     }
 
-    private CompletableFuture<Long> spaceThatWillBeUsedByNewScript(User user, ScriptName name, long scriptSize) {
+    private Mono<Long> spaceThatWillBeUsedByNewScript(User user, ScriptName name, long scriptSize) {
         return cassandraSieveDAO.getScript(user, name)
-            .thenApply(optional -> optional.map(Script::getSize).orElse(0L))
-            .thenApply(sizeOfStoredScript -> scriptSize - sizeOfStoredScript);
+            .map(Script::getSize)
+            .switchIfEmpty(Mono.just(0L))
+            .map(sizeOfStoredScript -> scriptSize - sizeOfStoredScript);
     }
 
     private Optional<QuotaSize> limitToUse(CompletableFuture<Optional<QuotaSize>> userQuota, CompletableFuture<Optional<QuotaSize>> globalQuota) {
@@ -94,23 +101,24 @@ public class CassandraSieveRepository implements SieveRepository {
 
     @Override
     public void putScript(User user, ScriptName name, ScriptContent content) throws QuotaExceededException {
-        CompletableFuture<Long> spaceUsed = spaceThatWillBeUsedByNewScript(user, name, content.length());
+        Mono<Long> spaceUsed = spaceThatWillBeUsedByNewScript(user, name, content.length());
         throwOnOverQuota(user, spaceUsed);
 
-        CompletableFuture.allOf(
-            updateSpaceUsed(user, spaceUsed.join()),
+        Flux.merge(
+            updateSpaceUsed(user, spaceUsed.block()),
             cassandraSieveDAO.insertScript(user,
                 Script.builder()
                     .name(name)
                     .content(content)
                     .isActive(false)
                     .build()))
-            .join();
+            .then()
+            .block();
     }
 
-    private CompletableFuture<Void> updateSpaceUsed(User user, long spaceUsed) {
+    private Mono<Void> updateSpaceUsed(User user, long spaceUsed) {
         if (spaceUsed == 0) {
-            return CompletableFuture.completedFuture(null);
+            return Mono.empty();
         }
         return cassandraSieveQuotaDAO.updateSpaceUsed(user, spaceUsed);
     }
@@ -124,10 +132,8 @@ public class CassandraSieveRepository implements SieveRepository {
     public InputStream getActive(User user) throws ScriptNotFoundException {
         return IOUtils.toInputStream(
             cassandraActiveScriptDAO.getActiveSctiptInfo(user)
-                .thenCompose(optionalActiveName -> optionalActiveName
-                    .map(activeScriptInfo -> cassandraSieveDAO.getScript(user, activeScriptInfo.getName()))
-                    .orElse(CompletableFuture.completedFuture(Optional.empty())))
-                .join()
+                .flatMap(activeScriptInfo -> cassandraSieveDAO.getScript(user, activeScriptInfo.getName()))
+                .blockOptional()
                 .orElseThrow(ScriptNotFoundException::new)
                 .getContent()
                 .getValue(), StandardCharsets.UTF_8);
@@ -135,36 +141,33 @@ public class CassandraSieveRepository implements SieveRepository {
 
     @Override
     public void setActive(User user, ScriptName name) throws ScriptNotFoundException {
-        CompletableFuture<Boolean> activateNewScript =
+        Mono<Boolean> activateNewScript =
             unactivateOldScript(user)
-                .thenCompose(any -> updateScriptActivation(user, name, true)
-                    .thenCompose(CompletableFutureUtil.composeIfTrue(
-                        () -> cassandraActiveScriptDAO.activate(user, name))));
+                .then(updateScriptActivation(user, name, true))
+                .filter(FunctionalUtils.toPredicate(Function.identity()))
+                .flatMap(any -> cassandraActiveScriptDAO.activate(user, name).thenReturn(any));
 
-        if (!activateNewScript.join()) {
+        if (!activateNewScript.blockOptional().isPresent()) {
             throw new ScriptNotFoundException();
         }
     }
 
-    private CompletableFuture<Void> unactivateOldScript(User user) {
+    private Mono<Void> unactivateOldScript(User user) {
         return cassandraActiveScriptDAO.getActiveSctiptInfo(user)
-            .thenCompose(scriptNameOptional -> scriptNameOptional
-                .map(activeScriptInfo -> updateScriptActivation(user, activeScriptInfo.getName(), false)
-                    .<Void>thenApply(any -> null))
-                .orElse(CompletableFuture.completedFuture(null)));
+            .flatMap(activeScriptInfo -> updateScriptActivation(user, activeScriptInfo.getName(), false).then());
     }
 
-    private CompletableFuture<Boolean> updateScriptActivation(User user, ScriptName scriptName, boolean active) {
+    private Mono<Boolean> updateScriptActivation(User user, ScriptName scriptName, boolean active) {
         if (!scriptName.equals(SieveRepository.NO_SCRIPT_NAME)) {
             return cassandraSieveDAO.updateScriptActivation(user, scriptName, active);
         }
-        return cassandraActiveScriptDAO.unactivate(user).thenApply(any -> true);
+        return cassandraActiveScriptDAO.unactivate(user).thenReturn(true);
     }
 
     @Override
     public InputStream getScript(User user, ScriptName name) throws ScriptNotFoundException {
         return  cassandraSieveDAO.getScript(user, name)
-            .join()
+            .blockOptional()
             .map(script -> IOUtils.toInputStream(script.getContent().getValue(), StandardCharsets.UTF_8))
             .orElseThrow(ScriptNotFoundException::new);
     }
@@ -172,13 +175,13 @@ public class CassandraSieveRepository implements SieveRepository {
     @Override
     public void deleteScript(User user, ScriptName name) throws ScriptNotFoundException, IsActiveException {
         ensureIsNotActive(user, name);
-        if (!cassandraSieveDAO.deleteScriptInCassandra(user, name).join()) {
+        if (!cassandraSieveDAO.deleteScriptInCassandra(user, name).switchIfEmpty(Mono.just(false)).block()) {
             throw new ScriptNotFoundException();
         }
     }
 
     private void ensureIsNotActive(User user, ScriptName name) throws IsActiveException {
-        Optional<ScriptName> activeName = cassandraActiveScriptDAO.getActiveSctiptInfo(user).join().map(ActiveScriptInfo::getName);
+        Optional<ScriptName> activeName = cassandraActiveScriptDAO.getActiveSctiptInfo(user).blockOptional().map(ActiveScriptInfo::getName);
         if (activeName.isPresent() && name.equals(activeName.get())) {
             throw new IsActiveException();
         }
@@ -186,22 +189,21 @@ public class CassandraSieveRepository implements SieveRepository {
 
     @Override
     public void renameScript(User user, ScriptName oldName, ScriptName newName) throws ScriptNotFoundException, DuplicateException {
-        CompletableFuture<Boolean> scriptExistsFuture = cassandraSieveDAO.getScript(user, newName)
-            .thenApply(Optional::isPresent);
-        CompletableFuture<Optional<Script>> oldScriptFuture = cassandraSieveDAO.getScript(user, oldName);
+        Mono<Script> oldScript = cassandraSieveDAO.getScript(user, oldName).cache();
+        Mono<Boolean> newScriptExists = cassandraSieveDAO.getScript(user, newName).hasElement();
 
-        oldScriptFuture.join();
-        if (scriptExistsFuture.join()) {
+        oldScript.block();
+        if (newScriptExists.block()) {
             throw new DuplicateException();
         }
 
         performScriptRename(user,
             newName,
-            oldScriptFuture.join().orElseThrow(ScriptNotFoundException::new));
+            oldScript.blockOptional().orElseThrow(ScriptNotFoundException::new));
     }
 
     private void performScriptRename(User user, ScriptName newName, Script oldScript) {
-        CompletableFuture.allOf(
+        Flux.merge(
             cassandraSieveDAO.insertScript(user,
                 Script.builder()
                     .copyOf(oldScript)
@@ -209,15 +211,14 @@ public class CassandraSieveRepository implements SieveRepository {
                     .build()),
             cassandraSieveDAO.deleteScriptInCassandra(user, oldScript.getName()),
             performActiveScriptRename(user, oldScript.getName(), newName))
-            .join();
+            .then()
+            .block();
     }
 
-    private CompletableFuture<Void> performActiveScriptRename(User user, ScriptName oldName, ScriptName newName) {
+    private Mono<Void> performActiveScriptRename(User user, ScriptName oldName, ScriptName newName) {
         return cassandraActiveScriptDAO.getActiveSctiptInfo(user)
-            .thenCompose(optionalActivationInfo -> optionalActivationInfo
-                .filter(activeScriptInfo -> activeScriptInfo.getName().equals(oldName))
-                .map(name -> cassandraActiveScriptDAO.activate(user, newName))
-                .orElse(CompletableFuture.completedFuture(null)));
+            .filter(activeScriptInfo -> activeScriptInfo.getName().equals(oldName))
+            .flatMap(name -> cassandraActiveScriptDAO.activate(user, newName));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/data/data-cassandra/src/main/java/org/apache/james/user/cassandra/CassandraUsersRepository.java
----------------------------------------------------------------------
diff --git a/server/data/data-cassandra/src/main/java/org/apache/james/user/cassandra/CassandraUsersRepository.java b/server/data/data-cassandra/src/main/java/org/apache/james/user/cassandra/CassandraUsersRepository.java
index 45504ab..0d6eee8 100644
--- a/server/data/data-cassandra/src/main/java/org/apache/james/user/cassandra/CassandraUsersRepository.java
+++ b/server/data/data-cassandra/src/main/java/org/apache/james/user/cassandra/CassandraUsersRepository.java
@@ -137,7 +137,7 @@ public class CassandraUsersRepository extends AbstractUsersRepository {
                     .setString(PASSWORD, defaultUser.getHashedPassword())
                     .setString(ALGORITHM, defaultUser.getHashAlgorithm())
                     .setString(NAME, defaultUser.getUserName().toLowerCase(Locale.US)))
-            .join();
+            .block();
 
         if (!executed) {
             throw new UsersRepositoryException("Unable to update user");
@@ -149,7 +149,7 @@ public class CassandraUsersRepository extends AbstractUsersRepository {
         boolean executed = executor.executeReturnApplied(
             removeUserStatement.bind()
                 .setString(NAME, name))
-            .join();
+            .block();
 
         if (!executed) {
             throw new UsersRepositoryException("unable to remove unknown user " + name);
@@ -202,7 +202,7 @@ public class CassandraUsersRepository extends AbstractUsersRepository {
                 .setString(REALNAME, user.getUserName())
                 .setString(PASSWORD, user.getHashedPassword())
                 .setString(ALGORITHM, user.getHashAlgorithm()))
-            .join();
+            .block();
 
         if (!executed) {
             throw new AlreadyExistInUsersRepositoryException("User with username " + username + " already exist!");


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org