You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ma...@apache.org on 2019/01/28 14:53:22 UTC

[07/12] james-project git commit: JAMES-2630 Migrate CassandraAsyncExecutor.executeReturnApplied consumers to Reactor

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/data/data-cassandra/src/test/java/org/apache/james/sieve/cassandra/CassandraActiveScriptDAOTest.java
----------------------------------------------------------------------
diff --git a/server/data/data-cassandra/src/test/java/org/apache/james/sieve/cassandra/CassandraActiveScriptDAOTest.java b/server/data/data-cassandra/src/test/java/org/apache/james/sieve/cassandra/CassandraActiveScriptDAOTest.java
index 2fd0426..df5dccc 100644
--- a/server/data/data-cassandra/src/test/java/org/apache/james/sieve/cassandra/CassandraActiveScriptDAOTest.java
+++ b/server/data/data-cassandra/src/test/java/org/apache/james/sieve/cassandra/CassandraActiveScriptDAOTest.java
@@ -49,15 +49,15 @@ class CassandraActiveScriptDAOTest {
 
     @Test
     void getActiveSctiptInfoShouldReturnEmptyByDefault() {
-        assertThat(activeScriptDAO.getActiveSctiptInfo(USER).join().isPresent())
+        assertThat(activeScriptDAO.getActiveSctiptInfo(USER).blockOptional().isPresent())
             .isFalse();
     }
 
     @Test
     void getActiveSctiptInfoShouldReturnStoredName() {
-        activeScriptDAO.activate(USER, SCRIPT_NAME).join();
+        activeScriptDAO.activate(USER, SCRIPT_NAME).block();
 
-        Optional<ActiveScriptInfo> actual = activeScriptDAO.getActiveSctiptInfo(USER).join();
+        Optional<ActiveScriptInfo> actual = activeScriptDAO.getActiveSctiptInfo(USER).blockOptional();
 
         assertThat(actual.isPresent()).isTrue();
         assertThat(actual.get().getName()).isEqualTo(SCRIPT_NAME);
@@ -65,30 +65,30 @@ class CassandraActiveScriptDAOTest {
 
     @Test
     void activateShouldAllowRename() {
-        activeScriptDAO.activate(USER, SCRIPT_NAME).join();
+        activeScriptDAO.activate(USER, SCRIPT_NAME).block();
 
-        activeScriptDAO.activate(USER, NEW_SCRIPT_NAME).join();
+        activeScriptDAO.activate(USER, NEW_SCRIPT_NAME).block();
 
-        Optional<ActiveScriptInfo> actual = activeScriptDAO.getActiveSctiptInfo(USER).join();
+        Optional<ActiveScriptInfo> actual = activeScriptDAO.getActiveSctiptInfo(USER).blockOptional();
         assertThat(actual.isPresent()).isTrue();
         assertThat(actual.get().getName()).isEqualTo(NEW_SCRIPT_NAME);
     }
 
     @Test
     void unactivateShouldAllowRemovingActiveScript() {
-        activeScriptDAO.activate(USER, SCRIPT_NAME).join();
+        activeScriptDAO.activate(USER, SCRIPT_NAME).block();
 
-        activeScriptDAO.unactivate(USER).join();
+        activeScriptDAO.unactivate(USER).block();
 
-        Optional<ActiveScriptInfo> actual = activeScriptDAO.getActiveSctiptInfo(USER).join();
+        Optional<ActiveScriptInfo> actual = activeScriptDAO.getActiveSctiptInfo(USER).blockOptional();
         assertThat(actual.isPresent()).isFalse();
     }
 
     @Test
     void unactivateShouldWorkWhenNoneStore() {
-        activeScriptDAO.unactivate(USER).join();
+        activeScriptDAO.unactivate(USER).block();
 
-        Optional<ActiveScriptInfo> actual = activeScriptDAO.getActiveSctiptInfo(USER).join();
+        Optional<ActiveScriptInfo> actual = activeScriptDAO.getActiveSctiptInfo(USER).blockOptional();
         assertThat(actual.isPresent()).isFalse();
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/data/data-cassandra/src/test/java/org/apache/james/sieve/cassandra/CassandraSieveDAOTest.java
----------------------------------------------------------------------
diff --git a/server/data/data-cassandra/src/test/java/org/apache/james/sieve/cassandra/CassandraSieveDAOTest.java b/server/data/data-cassandra/src/test/java/org/apache/james/sieve/cassandra/CassandraSieveDAOTest.java
index 5d38f61..5e3150d 100644
--- a/server/data/data-cassandra/src/test/java/org/apache/james/sieve/cassandra/CassandraSieveDAOTest.java
+++ b/server/data/data-cassandra/src/test/java/org/apache/james/sieve/cassandra/CassandraSieveDAOTest.java
@@ -70,54 +70,54 @@ class CassandraSieveDAOTest {
     
      @Test
     void getScriptShouldReturnEmptyByDefault() {
-        assertThat(sieveDAO.getScript(USER, SCRIPT_NAME).join().isPresent())
-            .isFalse();
+        assertThat(sieveDAO.getScript(USER, SCRIPT_NAME).blockOptional())
+            .isEmpty();
     }
 
     @Test
     void getScriptShouldReturnStoredScript() {
-        sieveDAO.insertScript(USER, SCRIPT).join();
+        sieveDAO.insertScript(USER, SCRIPT).block();
 
-        Optional<Script> actual = sieveDAO.getScript(USER, SCRIPT_NAME).join();
+        Optional<Script> actual = sieveDAO.getScript(USER, SCRIPT_NAME).blockOptional();
 
         assertThat(actual).contains(SCRIPT);
     }
 
     @Test
     void insertScriptShouldUpdateContent() {
-        sieveDAO.insertScript(USER, SCRIPT).join();
+        sieveDAO.insertScript(USER, SCRIPT).block();
 
-        sieveDAO.insertScript(USER, SCRIPT_NEW_CONTENT).join();
+        sieveDAO.insertScript(USER, SCRIPT_NEW_CONTENT).block();
 
-        Optional<Script> actual = sieveDAO.getScript(USER, SCRIPT_NAME).join();
+        Optional<Script> actual = sieveDAO.getScript(USER, SCRIPT_NAME).blockOptional();
         assertThat(actual).contains(SCRIPT_NEW_CONTENT);
     }
 
     @Test
     void insertScriptShouldUpdateActivate() {
-        sieveDAO.insertScript(USER, SCRIPT).join();
+        sieveDAO.insertScript(USER, SCRIPT).block();
 
-        sieveDAO.insertScript(USER, ACTIVE_SCRIPT).join();
+        sieveDAO.insertScript(USER, ACTIVE_SCRIPT).block();
 
-        Optional<Script> actual = sieveDAO.getScript(USER, SCRIPT_NAME).join();
+        Optional<Script> actual = sieveDAO.getScript(USER, SCRIPT_NAME).blockOptional();
         assertThat(actual).contains(ACTIVE_SCRIPT);
     }
 
     @Test
     void deleteScriptInCassandraShouldWork() {
-        sieveDAO.insertScript(USER, SCRIPT).join();
+        sieveDAO.insertScript(USER, SCRIPT).block();
 
-        sieveDAO.deleteScriptInCassandra(USER, SCRIPT_NAME).join();
+        sieveDAO.deleteScriptInCassandra(USER, SCRIPT_NAME).block();
 
-        Optional<Script> actual = sieveDAO.getScript(USER, SCRIPT_NAME).join();
+        Optional<Script> actual = sieveDAO.getScript(USER, SCRIPT_NAME).blockOptional();
         assertThat(actual).isEmpty();
     }
 
     @Test
     void deleteScriptInCassandraShouldWorkWhenNoneStore() {
-        sieveDAO.deleteScriptInCassandra(USER, SCRIPT_NAME).join();
+        sieveDAO.deleteScriptInCassandra(USER, SCRIPT_NAME).block();
 
-        Optional<Script> actual = sieveDAO.getScript(USER, SCRIPT_NAME).join();
+        Optional<Script> actual = sieveDAO.getScript(USER, SCRIPT_NAME).blockOptional();
         assertThat(actual).isEmpty();
     }
 
@@ -130,8 +130,8 @@ class CassandraSieveDAOTest {
 
     @Test
     void listScriptsShouldReturnSingleStoredValue() {
-        sieveDAO.insertScript(USER, SCRIPT).join();
-        sieveDAO.insertScript(USER, SCRIPT2).join();
+        sieveDAO.insertScript(USER, SCRIPT).block();
+        sieveDAO.insertScript(USER, SCRIPT2).block();
 
         List<ScriptSummary> scriptSummaryList = sieveDAO.listScripts(USER).join();
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/data/data-cassandra/src/test/java/org/apache/james/sieve/cassandra/CassandraSieveQuotaDAOTest.java
----------------------------------------------------------------------
diff --git a/server/data/data-cassandra/src/test/java/org/apache/james/sieve/cassandra/CassandraSieveQuotaDAOTest.java b/server/data/data-cassandra/src/test/java/org/apache/james/sieve/cassandra/CassandraSieveQuotaDAOTest.java
index adca5b0..7cacb55 100644
--- a/server/data/data-cassandra/src/test/java/org/apache/james/sieve/cassandra/CassandraSieveQuotaDAOTest.java
+++ b/server/data/data-cassandra/src/test/java/org/apache/james/sieve/cassandra/CassandraSieveQuotaDAOTest.java
@@ -116,7 +116,7 @@ class CassandraSieveQuotaDAOTest {
     void spaceUsedByShouldReturnStoredValue() {
         long spaceUsed = 18L;
 
-        sieveQuotaDAO.updateSpaceUsed(USER, spaceUsed).join();
+        sieveQuotaDAO.updateSpaceUsed(USER, spaceUsed).block();
 
         assertThat(sieveQuotaDAO.spaceUsedBy(USER).join()).isEqualTo(spaceUsed);
     }
@@ -125,8 +125,8 @@ class CassandraSieveQuotaDAOTest {
     void updateSpaceUsedShouldBeAdditive() {
         long spaceUsed = 18L;
 
-        sieveQuotaDAO.updateSpaceUsed(USER, spaceUsed).join();
-        sieveQuotaDAO.updateSpaceUsed(USER, spaceUsed).join();
+        sieveQuotaDAO.updateSpaceUsed(USER, spaceUsed).block();
+        sieveQuotaDAO.updateSpaceUsed(USER, spaceUsed).block();
 
         assertThat(sieveQuotaDAO.spaceUsedBy(USER).join()).isEqualTo(2 * spaceUsed);
     }
@@ -135,8 +135,8 @@ class CassandraSieveQuotaDAOTest {
     void updateSpaceUsedShouldWorkWithNegativeValues() {
         long spaceUsed = 18L;
 
-        sieveQuotaDAO.updateSpaceUsed(USER, spaceUsed).join();
-        sieveQuotaDAO.updateSpaceUsed(USER, -1 * spaceUsed).join();
+        sieveQuotaDAO.updateSpaceUsed(USER, spaceUsed).block();
+        sieveQuotaDAO.updateSpaceUsed(USER, -1 * spaceUsed).block();
 
         assertThat(sieveQuotaDAO.spaceUsedBy(USER).join()).isEqualTo(0L);
     }

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java
----------------------------------------------------------------------
diff --git a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java
index 6655798..6f005e8 100644
--- a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java
+++ b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java
@@ -22,7 +22,6 @@ package org.apache.james.mailrepository.cassandra;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
 
 import javax.mail.MessagingException;
 import javax.mail.internet.MimeMessage;
@@ -33,10 +32,11 @@ import org.apache.james.mailrepository.api.MailKey;
 import org.apache.james.mailrepository.api.MailRepository;
 import org.apache.james.mailrepository.api.MailRepositoryUrl;
 import org.apache.james.util.CompletableFutureUtil;
-import org.apache.james.util.FluentFutureStream;
 import org.apache.mailet.Mail;
 
 import com.github.fge.lambdas.Throwing;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
 public class CassandraMailRepository implements MailRepository {
 
@@ -60,28 +60,28 @@ public class CassandraMailRepository implements MailRepository {
     public MailKey store(Mail mail) throws MessagingException {
         MailKey mailKey = MailKey.forMail(mail);
 
-        mimeMessageStore.save(mail.getMessage())
+        Mono.fromFuture(mimeMessageStore.save(mail.getMessage())
             .thenCompose(Throwing.function(parts -> mailDAO.store(url, mail,
                 parts.getHeaderBlobId(),
-                parts.getBodyBlobId())))
-            .thenCompose(any -> keysDAO.store(url, mailKey))
-            .thenCompose(this::increaseSizeIfStored)
-            .join();
+                parts.getBodyBlobId()))))
+            .then(keysDAO.store(url, mailKey))
+            .flatMap(this::increaseSizeIfStored)
+            .block();
 
         return mailKey;
     }
 
-    private CompletionStage<Void> increaseSizeIfStored(Boolean isStored) {
+    private Mono<Void> increaseSizeIfStored(Boolean isStored) {
         if (isStored) {
             return countDAO.increment(url);
         }
-        return CompletableFuture.completedFuture(null);
+        return Mono.empty();
     }
 
     @Override
     public Iterator<MailKey> list() {
         return keysDAO.list(url)
-            .join()
+            .toIterable()
             .iterator();
     }
 
@@ -108,33 +108,34 @@ public class CassandraMailRepository implements MailRepository {
 
     @Override
     public void remove(Mail mail) {
-        removeAsync(MailKey.forMail(mail)).join();
+        removeAsync(MailKey.forMail(mail)).block();
     }
 
     @Override
     public void remove(Collection<Mail> toRemove) {
-        FluentFutureStream.of(toRemove.stream()
+        Flux.fromIterable(toRemove)
             .map(MailKey::forMail)
-            .map(this::removeAsync))
-            .join();
+            .flatMap(this::removeAsync)
+            .then()
+            .block();
     }
 
     @Override
     public void remove(MailKey key) {
-        removeAsync(key).join();
+        removeAsync(key).block();
     }
 
-    private CompletableFuture<Void> removeAsync(MailKey key) {
+    private Mono<Void> removeAsync(MailKey key) {
         return keysDAO.remove(url, key)
-            .thenCompose(this::decreaseSizeIfDeleted)
-            .thenCompose(any -> mailDAO.remove(url, key));
+            .flatMap(this::decreaseSizeIfDeleted)
+            .then(mailDAO.remove(url, key));
     }
 
-    private CompletionStage<Void> decreaseSizeIfDeleted(Boolean isDeleted) {
+    private Mono<Void> decreaseSizeIfDeleted(Boolean isDeleted) {
         if (isDeleted) {
             return countDAO.decrement(url);
         }
-        return CompletableFuture.completedFuture(null);
+        return Mono.empty();
     }
 
     @Override
@@ -145,9 +146,9 @@ public class CassandraMailRepository implements MailRepository {
     @Override
     public void removeAll() {
         keysDAO.list(url)
-            .thenCompose(stream -> FluentFutureStream.of(stream.map(this::removeAsync))
-                .completableFuture())
-            .join();
+            .flatMap(this::removeAsync)
+            .then()
+            .block();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryCountDAO.java
----------------------------------------------------------------------
diff --git a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryCountDAO.java b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryCountDAO.java
index cc2d4ea..540aa1a 100644
--- a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryCountDAO.java
+++ b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryCountDAO.java
@@ -40,6 +40,7 @@ import org.apache.james.mailrepository.api.MailRepositoryUrl;
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
+import reactor.core.publisher.Mono;
 
 public class CassandraMailRepositoryCountDAO {
 
@@ -75,13 +76,13 @@ public class CassandraMailRepositoryCountDAO {
             .where(eq(REPOSITORY_NAME, bindMarker(REPOSITORY_NAME))));
     }
 
-    public CompletableFuture<Void> increment(MailRepositoryUrl url) {
-        return executor.executeVoid(increment.bind()
+    public Mono<Void> increment(MailRepositoryUrl url) {
+        return executor.executeVoidReactor(increment.bind()
             .setString(REPOSITORY_NAME, url.asString()));
     }
 
-    public CompletableFuture<Void> decrement(MailRepositoryUrl url) {
-        return executor.executeVoid(decrement.bind()
+    public Mono<Void> decrement(MailRepositoryUrl url) {
+        return executor.executeVoidReactor(decrement.bind()
             .setString(REPOSITORY_NAME, url.asString()));
     }
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryKeysDAO.java
----------------------------------------------------------------------
diff --git a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryKeysDAO.java b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryKeysDAO.java
index 93bffec..8bd5902 100644
--- a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryKeysDAO.java
+++ b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryKeysDAO.java
@@ -28,9 +28,6 @@ import static org.apache.james.mailrepository.cassandra.MailRepositoryTable.KEYS
 import static org.apache.james.mailrepository.cassandra.MailRepositoryTable.MAIL_KEY;
 import static org.apache.james.mailrepository.cassandra.MailRepositoryTable.REPOSITORY_NAME;
 
-import java.util.concurrent.CompletableFuture;
-import java.util.stream.Stream;
-
 import javax.inject.Inject;
 
 import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
@@ -40,6 +37,8 @@ import org.apache.james.mailrepository.api.MailRepositoryUrl;
 
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Session;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
 public class CassandraMailRepositoryKeysDAO {
 
@@ -80,20 +79,20 @@ public class CassandraMailRepositoryKeysDAO {
             .value(MAIL_KEY, bindMarker(MAIL_KEY)));
     }
 
-    public CompletableFuture<Boolean> store(MailRepositoryUrl url, MailKey key) {
+    public Mono<Boolean> store(MailRepositoryUrl url, MailKey key) {
         return executor.executeReturnApplied(insertKey.bind()
             .setString(REPOSITORY_NAME, url.asString())
             .setString(MAIL_KEY, key.asString()));
     }
 
-    public CompletableFuture<Stream<MailKey>> list(MailRepositoryUrl url) {
-        return executor.execute(listKeys.bind()
+    public Flux<MailKey> list(MailRepositoryUrl url) {
+        return executor.executeReactor(listKeys.bind()
             .setString(REPOSITORY_NAME, url.asString()))
-            .thenApply(cassandraUtils::convertToStream)
-            .thenApply(stream -> stream.map(row -> new MailKey(row.getString(MAIL_KEY))));
+            .flatMapMany(cassandraUtils::convertToFlux)
+            .map(row -> new MailKey(row.getString(MAIL_KEY)));
     }
 
-    public CompletableFuture<Boolean> remove(MailRepositoryUrl url, MailKey key) {
+    public Mono<Boolean> remove(MailRepositoryUrl url, MailKey key) {
         return executor.executeReturnApplied(deleteKey.bind()
             .setString(REPOSITORY_NAME, url.asString())
             .setString(MAIL_KEY, key.asString()));

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAO.java
----------------------------------------------------------------------
diff --git a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAO.java b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAO.java
index 7c3596b..6e884b4 100644
--- a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAO.java
+++ b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAO.java
@@ -84,6 +84,7 @@ import com.github.steveash.guavate.Guavate;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import reactor.core.publisher.Mono;
 
 public class CassandraMailRepositoryMailDAO implements CassandraMailRepositoryMailDaoAPI {
 
@@ -161,8 +162,8 @@ public class CassandraMailRepositoryMailDAO implements CassandraMailRepositoryMa
     }
 
     @Override
-    public CompletableFuture<Void> remove(MailRepositoryUrl url, MailKey key) {
-        return executor.executeVoid(deleteMail.bind()
+    public Mono<Void> remove(MailRepositoryUrl url, MailKey key) {
+        return executor.executeVoidReactor(deleteMail.bind()
             .setString(REPOSITORY_NAME, url.asString())
             .setString(MAIL_KEY, key.asString()));
     }

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoAPI.java
----------------------------------------------------------------------
diff --git a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoAPI.java b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoAPI.java
index 435bcf1..bf49097 100644
--- a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoAPI.java
+++ b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoAPI.java
@@ -31,10 +31,12 @@ import org.apache.james.mailrepository.api.MailRepositoryUrl;
 import org.apache.james.server.core.MailImpl;
 import org.apache.mailet.Mail;
 
+import reactor.core.publisher.Mono;
+
 public interface CassandraMailRepositoryMailDaoAPI {
     CompletableFuture<Void> store(MailRepositoryUrl url, Mail mail, BlobId headerId, BlobId bodyId) throws MessagingException;
 
-    CompletableFuture<Void> remove(MailRepositoryUrl url, MailKey key);
+    Mono<Void> remove(MailRepositoryUrl url, MailKey key);
 
     CompletableFuture<Optional<MailDTO>> read(MailRepositoryUrl url, MailKey key);
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoV2.java
----------------------------------------------------------------------
diff --git a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoV2.java b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoV2.java
index 36e8e28..5b4edfe 100644
--- a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoV2.java
+++ b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoV2.java
@@ -79,6 +79,7 @@ import com.github.steveash.guavate.Guavate;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import reactor.core.publisher.Mono;
 
 public class CassandraMailRepositoryMailDaoV2 implements CassandraMailRepositoryMailDaoAPI {
 
@@ -151,8 +152,9 @@ public class CassandraMailRepositoryMailDaoV2 implements CassandraMailRepository
         );
     }
 
-    public CompletableFuture<Void> remove(MailRepositoryUrl url, MailKey key) {
-        return executor.executeVoid(deleteMail.bind()
+    @Override
+    public Mono<Void> remove(MailRepositoryUrl url, MailKey key) {
+        return executor.executeVoidReactor(deleteMail.bind()
             .setString(REPOSITORY_NAME, url.asString())
             .setString(MAIL_KEY, key.asString()));
     }

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/MergingCassandraMailRepositoryMailDao.java
----------------------------------------------------------------------
diff --git a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/MergingCassandraMailRepositoryMailDao.java b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/MergingCassandraMailRepositoryMailDao.java
index 8b01a38..f83766a 100644
--- a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/MergingCassandraMailRepositoryMailDao.java
+++ b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/MergingCassandraMailRepositoryMailDao.java
@@ -32,6 +32,8 @@ import org.apache.james.util.OptionalUtils;
 import org.apache.mailet.Mail;
 
 import com.google.common.annotations.VisibleForTesting;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
 public class MergingCassandraMailRepositoryMailDao implements CassandraMailRepositoryMailDaoAPI {
 
@@ -51,8 +53,8 @@ public class MergingCassandraMailRepositoryMailDao implements CassandraMailRepos
     }
 
     @Override
-    public CompletableFuture<Void> remove(MailRepositoryUrl url, MailKey key) {
-        return CompletableFuture.allOf(v1.remove(url, key), v2.remove(url, key));
+    public Mono<Void> remove(MailRepositoryUrl url, MailKey key) {
+        return Flux.merge(v1.remove(url, key), v2.remove(url, key)).then();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryCountDAOTest.java
----------------------------------------------------------------------
diff --git a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryCountDAOTest.java b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryCountDAOTest.java
index 98b6fa2..32eb13d 100644
--- a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryCountDAOTest.java
+++ b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryCountDAOTest.java
@@ -50,7 +50,7 @@ class CassandraMailRepositoryCountDAOTest {
 
     @Test
     void getCountShouldReturnOneWhenIncrementedOneTime() {
-        testee.increment(URL).join();
+        testee.increment(URL).block();
 
         assertThat(testee.getCount(URL).join())
             .isEqualTo(1L);
@@ -58,7 +58,7 @@ class CassandraMailRepositoryCountDAOTest {
 
     @Test
     void incrementShouldNotAffectOtherUrls() {
-        testee.increment(URL).join();
+        testee.increment(URL).block();
 
         assertThat(testee.getCount(URL2).join())
             .isEqualTo(0L);
@@ -66,8 +66,8 @@ class CassandraMailRepositoryCountDAOTest {
 
     @Test
     void incrementCanBeAppliedSeveralTime() {
-        testee.increment(URL).join();
-        testee.increment(URL).join();
+        testee.increment(URL).block();
+        testee.increment(URL).block();
 
         assertThat(testee.getCount(URL).join())
             .isEqualTo(2L);
@@ -75,11 +75,11 @@ class CassandraMailRepositoryCountDAOTest {
 
     @Test
     void decrementShouldDecreaseCount() {
-        testee.increment(URL).join();
-        testee.increment(URL).join();
-        testee.increment(URL).join();
+        testee.increment(URL).block();
+        testee.increment(URL).block();
+        testee.increment(URL).block();
 
-        testee.decrement(URL).join();
+        testee.decrement(URL).block();
 
         assertThat(testee.getCount(URL).join())
             .isEqualTo(2L);
@@ -87,7 +87,7 @@ class CassandraMailRepositoryCountDAOTest {
 
     @Test
     void decrementCanLeadToNegativeCount() {
-        testee.decrement(URL).join();
+        testee.decrement(URL).block();
 
         assertThat(testee.getCount(URL).join())
             .isEqualTo(-1L);

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryKeysDAOTest.java
----------------------------------------------------------------------
diff --git a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryKeysDAOTest.java b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryKeysDAOTest.java
index 3c457af..db7f98a 100644
--- a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryKeysDAOTest.java
+++ b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryKeysDAOTest.java
@@ -23,7 +23,9 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.CassandraClusterExtension;
+import org.apache.james.backends.cassandra.components.CassandraModule;
 import org.apache.james.backends.cassandra.utils.CassandraUtils;
+import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
 import org.apache.james.mailrepository.api.MailKey;
 import org.apache.james.mailrepository.api.MailRepositoryUrl;
 import org.junit.jupiter.api.BeforeEach;
@@ -36,9 +38,11 @@ class CassandraMailRepositoryKeysDAOTest {
     static final MailKey KEY_1 = new MailKey("key1");
     static final MailKey KEY_2 = new MailKey("key2");
     static final MailKey KEY_3 = new MailKey("key3");
+    static final CassandraModule MODULE = CassandraModule.aggregateModules(CassandraMailRepositoryModule.MODULE,
+            CassandraSchemaVersionModule.MODULE);
 
     @RegisterExtension
-    static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraMailRepositoryModule.MODULE);
+    static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(MODULE);
 
     CassandraMailRepositoryKeysDAO testee;
 
@@ -49,75 +53,75 @@ class CassandraMailRepositoryKeysDAOTest {
 
     @Test
     void listShouldBeEmptyByDefault() {
-        assertThat(testee.list(URL).join())
+        assertThat(testee.list(URL).collectList().block())
             .isEmpty();
     }
 
     @Test
     void listShouldReturnEmptyByDefault() {
-        testee.store(URL, KEY_1).join();
+        testee.store(URL, KEY_1).block();
 
-        assertThat(testee.list(URL).join())
+        assertThat(testee.list(URL).collectList().block())
             .containsOnly(KEY_1);
     }
 
     @Test
     void listShouldNotReturnElementsOfOtherRepositories() {
-        testee.store(URL, KEY_1).join();
+        testee.store(URL, KEY_1).block();
 
-        assertThat(testee.list(URL2).join())
+        assertThat(testee.list(URL2).collectList().block())
             .isEmpty();
     }
 
     @Test
     void listShouldReturnSeveralElements() {
-        testee.store(URL, KEY_1).join();
-        testee.store(URL, KEY_2).join();
-        testee.store(URL, KEY_3).join();
+        testee.store(URL, KEY_1).block();
+        testee.store(URL, KEY_2).block();
+        testee.store(URL, KEY_3).block();
 
-        assertThat(testee.list(URL).join())
+        assertThat(testee.list(URL).collectList().block())
             .containsOnly(KEY_1, KEY_2, KEY_3);
     }
 
     @Test
     void listShouldNotReturnRemovedElements() {
-        testee.store(URL, KEY_1).join();
-        testee.store(URL, KEY_2).join();
-        testee.store(URL, KEY_3).join();
+        testee.store(URL, KEY_1).block();
+        testee.store(URL, KEY_2).block();
+        testee.store(URL, KEY_3).block();
 
-        testee.remove(URL, KEY_2).join();
+        testee.remove(URL, KEY_2).block();
 
-        assertThat(testee.list(URL).join())
+        assertThat(testee.list(URL).collectList().block())
             .containsOnly(KEY_1, KEY_3);
     }
 
     @Test
     void removeShouldBeIdempotent() {
-        testee.remove(URL, KEY_2).join();
+        testee.remove(URL, KEY_2).block();
     }
 
     @Test
     void removeShouldNotAffectOtherRepositories() {
-        testee.store(URL, KEY_1).join();
+        testee.store(URL, KEY_1).block();
 
-        testee.remove(URL2, KEY_2).join();
+        testee.remove(URL2, KEY_2).block();
 
-        assertThat(testee.list(URL).join())
+        assertThat(testee.list(URL).collectList().block())
             .containsOnly(KEY_1);
     }
 
     @Test
     void removeShouldReturnTrueWhenKeyDeleted() {
-        testee.store(URL, KEY_1).join();
+        testee.store(URL, KEY_1).block();
 
-        boolean isDeleted = testee.remove(URL, KEY_1).join();
+        boolean isDeleted = testee.remove(URL, KEY_1).block();
 
         assertThat(isDeleted).isTrue();
     }
 
     @Test
     void removeShouldReturnFalseWhenKeyNotDeleted() {
-        boolean isDeleted = testee.remove(URL2, KEY_2).join();
+        boolean isDeleted = testee.remove(URL2, KEY_2).block();
 
         assertThat(isDeleted).isFalse();
     }
@@ -125,16 +129,16 @@ class CassandraMailRepositoryKeysDAOTest {
 
     @Test
     void storeShouldReturnTrueWhenNotPreviouslyStored() {
-        boolean isStored = testee.store(URL, KEY_1).join();
+        boolean isStored = testee.store(URL, KEY_1).block();
 
         assertThat(isStored).isTrue();
     }
 
     @Test
     void storeShouldReturnFalseWhenPreviouslyStored() {
-        testee.store(URL, KEY_1).join();
+        testee.store(URL, KEY_1).block();
 
-        boolean isStored = testee.store(URL, KEY_1).join();
+        boolean isStored = testee.store(URL, KEY_1).block();
 
         assertThat(isStored).isFalse();
     }

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAOTest.java
----------------------------------------------------------------------
diff --git a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAOTest.java b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAOTest.java
index 2377b6b..f597eff 100644
--- a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAOTest.java
+++ b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAOTest.java
@@ -104,7 +104,7 @@ class CassandraMailRepositoryMailDAOTest {
                 blobIdBody)
                 .join();
 
-            testee.remove(URL, KEY_1).join();
+            testee.remove(URL, KEY_1).block();
 
             assertThat(testee.read(URL, KEY_1).join())
                 .isEmpty();
@@ -374,7 +374,7 @@ class CassandraMailRepositoryMailDAOTest {
                 blobIdBody2)
                 .join();
 
-            testee.remove(URL, KEY_1).join();
+            testee.remove(URL, KEY_1).block();
 
             Optional<CassandraMailRepositoryMailDaoAPI.MailDTO> v1Entry = v1.read(URL, KEY_1).join();
             Optional<CassandraMailRepositoryMailDaoAPI.MailDTO> v2Entry = v2.read(URL, KEY_1).join();

http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryWithFakeImplementationsTest.java
----------------------------------------------------------------------
diff --git a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryWithFakeImplementationsTest.java b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryWithFakeImplementationsTest.java
index ca2eb2c..b39ed2d 100644
--- a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryWithFakeImplementationsTest.java
+++ b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryWithFakeImplementationsTest.java
@@ -33,6 +33,7 @@ import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.CassandraClusterExtension;
 import org.apache.james.backends.cassandra.components.CassandraModule;
 import org.apache.james.backends.cassandra.utils.CassandraUtils;
+import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
 import org.apache.james.blob.api.BlobId;
 import org.apache.james.blob.api.HashBlobId;
 import org.apache.james.blob.api.Store;
@@ -56,17 +57,19 @@ import org.junit.jupiter.api.extension.ExtensionContext;
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Session;
 import com.google.common.collect.ImmutableList;
+import reactor.core.publisher.Mono;
 
 @ExtendWith(CassandraMailRepositoryWithFakeImplementationsTest.MailRepositoryCassandraClusterExtension.class)
 class CassandraMailRepositoryWithFakeImplementationsTest {
-    static final MailRepositoryUrl URL = MailRepositoryUrl.from("proto://url");
-    static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory();
+    private static final MailRepositoryUrl URL = MailRepositoryUrl.from("proto://url");
+    private static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory();
 
     static class MailRepositoryCassandraClusterExtension extends CassandraClusterExtension {
         public MailRepositoryCassandraClusterExtension() {
             super(CassandraModule.aggregateModules(
                     CassandraMailRepositoryModule.MODULE,
-                    CassandraBlobModule.MODULE));
+                    CassandraBlobModule.MODULE,
+                    CassandraSchemaVersionModule.MODULE));
         }
 
         @Override
@@ -122,7 +125,7 @@ class CassandraMailRepositoryWithFakeImplementationsTest {
                     .isInstanceOf(RuntimeException.class)
                     .hasMessage("java.lang.RuntimeException: Expected failure while saving");
 
-            assertThat(keysDAO.list(URL).join()).isEmpty();
+            assertThat(keysDAO.list(URL).collectList().block()).isEmpty();
         }
     }
 
@@ -155,9 +158,9 @@ class CassandraMailRepositoryWithFakeImplementationsTest {
             }
 
             @Override
-            public CompletableFuture<Void> remove(MailRepositoryUrl url, MailKey key) {
-                return CompletableFuture.supplyAsync(() -> {
-                    throw new RuntimeException("Expected failure while remeving mail parts");
+            public Mono<Void> remove(MailRepositoryUrl url, MailKey key) {
+                return Mono.fromCallable(() -> {
+                    throw new RuntimeException("Expected failure while removing mail parts");
                 });
 
             }
@@ -186,7 +189,7 @@ class CassandraMailRepositoryWithFakeImplementationsTest {
                     .isInstanceOf(RuntimeException.class)
                     .hasMessage("java.lang.RuntimeException: Expected failure while storing mail parts");
 
-            assertThat(keysDAO.list(URL).join()).isEmpty();
+            assertThat(keysDAO.list(URL).collectList().block()).isEmpty();
         }
 
         @Test
@@ -234,8 +237,8 @@ class CassandraMailRepositoryWithFakeImplementationsTest {
             }
 
             @Override
-            public CompletableFuture<Boolean> store(MailRepositoryUrl url, MailKey key) {
-                return CompletableFuture.supplyAsync(() -> {
+            public Mono<Boolean> store(MailRepositoryUrl url, MailKey key) {
+                return Mono.fromCallable(() -> {
                     throw new RuntimeException("Expected failure while storing keys");
                 });
             }
@@ -255,7 +258,7 @@ class CassandraMailRepositoryWithFakeImplementationsTest {
 
             assertThatThrownBy(() -> cassandraMailRepository.store(mail))
                     .isInstanceOf(RuntimeException.class)
-                    .hasMessage("java.lang.RuntimeException: Expected failure while storing keys");
+                    .hasMessage("Expected failure while storing keys");
 
             assertThat(countDAO.getCount(URL).join()).isEqualTo(0);
         }
@@ -274,7 +277,7 @@ class CassandraMailRepositoryWithFakeImplementationsTest {
 
             assertThatThrownBy(() -> cassandraMailRepository.store(mail))
                     .isInstanceOf(RuntimeException.class)
-                    .hasMessage("java.lang.RuntimeException: Expected failure while storing keys");
+                    .hasMessage("Expected failure while storing keys");
 
             ResultSet resultSet = cassandra.getConf().execute(select()
                     .from(BlobTable.TABLE_NAME));
@@ -295,7 +298,7 @@ class CassandraMailRepositoryWithFakeImplementationsTest {
 
             assertThatThrownBy(() -> cassandraMailRepository.store(mail))
                     .isInstanceOf(RuntimeException.class)
-                    .hasMessage("java.lang.RuntimeException: Expected failure while storing keys");
+                    .hasMessage("Expected failure while storing keys");
 
             ResultSet resultSet = cassandra.getConf().execute(select()
                     .from(MailRepositoryTable.CONTENT_TABLE_NAME));


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org