You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by rc...@apache.org on 2020/05/13 02:48:32 UTC

[james-project] branch master updated (88d05d9 -> 22999e9)

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git.


    from 88d05d9  JAMES-3091 Mailbox/Get POJO
     new 96d202e  JAMES-3058 Cassandra instrumentation: Future for ongoing computation
     new 1e69f1f  JAMES-3155 ComputeMessageFastViewProjectionListener can retrieve messages reactively
     new 0cb375f  [REFACTORING] Rely on Flux::flatMapIterable
     new 6d99ef5  [REFACTORING] Flatten some .flatMapMany calls
     new 22999e9  JAMES-3174 Fix dependency analyze on java 11

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../backends/cassandra/TestingSessionTest.java     | 28 ++++++++++++----------
 .../mail/CassandraMailboxPathDAOImpl.java          |  6 ++---
 .../cassandra/mail/CassandraMailboxPathV2DAO.java  | 12 +++++-----
 .../cassandra/mail/CassandraMessageIdMapper.java   |  4 +++-
 .../cassandra/mail/CassandraMailboxDAOTest.java    | 16 +++++++------
 .../mail/CassandraModSeqProviderTest.java          | 20 +++++++---------
 pom.xml                                            |  2 +-
 .../org/apache/james/PeriodicalHealthChecks.java   |  4 ++--
 .../impl/EventSourcingFilteringManagement.java     |  3 +--
 .../EventSourcingDLPConfigurationStore.java        |  3 +--
 .../draft/methods/GetVacationResponseMethod.java   |  5 ++--
 .../ComputeMessageFastViewProjectionListener.java  |  4 +---
 12 files changed, 54 insertions(+), 53 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 02/05: JAMES-3155 ComputeMessageFastViewProjectionListener can retrieve messages reactively

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 1e69f1ff4c800b4201b865709a0d832f1158ca52
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue May 12 11:45:13 2020 +0700

    JAMES-3155 ComputeMessageFastViewProjectionListener can retrieve messages reactively
---
 .../james/jmap/event/ComputeMessageFastViewProjectionListener.java    | 4 +---
 1 file changed, 1 insertion(+), 3 deletions(-)

diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/event/ComputeMessageFastViewProjectionListener.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/event/ComputeMessageFastViewProjectionListener.java
index 2100dc3..af1dbc9 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/event/ComputeMessageFastViewProjectionListener.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/event/ComputeMessageFastViewProjectionListener.java
@@ -80,9 +80,7 @@ public class ComputeMessageFastViewProjectionListener implements MailboxListener
     }
 
     private Mono<Void> handleAddedEvent(Added addedEvent, MailboxSession session) {
-        return Mono.fromCallable(() -> messageIdManager.getMessages(addedEvent.getMessageIds(), FetchGroup.FULL_CONTENT, session))
-            .subscribeOn(Schedulers.elastic())
-            .flatMapMany(Flux::fromIterable)
+        return Flux.from(messageIdManager.getMessagesReactive(addedEvent.getMessageIds(), FetchGroup.FULL_CONTENT, session))
             .flatMap(Throwing.function(messageResult -> Mono.fromCallable(
                 () -> Pair.of(messageResult.getMessageId(), computeFastViewPrecomputedProperties(messageResult)))
                     .subscribeOn(Schedulers.parallel())))


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 05/05: JAMES-3174 Fix dependency analyze on java 11

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 22999e9bf88bb45e3e0d1bda2fbfe2430e6d2de9
Author: Eugen Stan <eu...@netdava.com>
AuthorDate: Fri May 8 00:34:21 2020 +0300

    JAMES-3174 Fix dependency analyze on java 11
    
    Signed-off-by: Eugen Stan <ie...@apache.org>
---
 pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pom.xml b/pom.xml
index 9524e11..413a1ad 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2842,7 +2842,7 @@
                 <plugin>
                     <groupId>org.apache.maven.plugins</groupId>
                     <artifactId>maven-dependency-plugin</artifactId>
-                    <version>3.1.1</version>
+                    <version>3.1.2</version>
                 </plugin>
                 <plugin>
                     <groupId>org.apache.maven.plugins</groupId>


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 04/05: [REFACTORING] Flatten some .flatMapMany calls

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 6d99ef58fefda962f1a2b923f0c851fe5635e276
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue May 12 11:41:02 2020 +0700

    [REFACTORING] Flatten some .flatMapMany calls
---
 .../mailbox/cassandra/mail/CassandraMailboxPathDAOImpl.java  |  6 +++---
 .../mailbox/cassandra/mail/CassandraMailboxPathV2DAO.java    | 12 ++++++------
 2 files changed, 9 insertions(+), 9 deletions(-)

diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAOImpl.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAOImpl.java
index df210a0..291f02a 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAOImpl.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAOImpl.java
@@ -139,9 +139,9 @@ public class CassandraMailboxPathDAOImpl {
         return cassandraAsyncExecutor.execute(
             selectAllForUser.bind()
                 .setUDTValue(NAMESPACE_AND_USER, mailboxBaseTupleUtil.createMailboxBaseUDT(namespace, user)))
-            .flatMapMany(resultSet -> cassandraUtils.convertToFlux(resultSet)
-                .map(this::fromRowToCassandraIdAndPath)
-                .map(FunctionalUtils.toFunction(this::logReadSuccess)));
+            .flatMapMany(cassandraUtils::convertToFlux)
+            .map(this::fromRowToCassandraIdAndPath)
+            .map(FunctionalUtils.toFunction(this::logReadSuccess));
     }
 
     /**
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV2DAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV2DAO.java
index f105013..b89a755 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV2DAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV2DAO.java
@@ -127,17 +127,17 @@ public class CassandraMailboxPathV2DAO {
                 .setString(NAMESPACE, namespace)
                 .setString(USER, sanitizeUser(user))
                 .setConsistencyLevel(SERIAL))
-            .flatMapMany(resultSet -> cassandraUtils.convertToFlux(resultSet)
-                .map(this::fromRowToCassandraIdAndPath)
-                .map(FunctionalUtils.toFunction(this::logReadSuccess)));
+            .flatMapMany(cassandraUtils::convertToFlux)
+            .map(this::fromRowToCassandraIdAndPath)
+            .map(FunctionalUtils.toFunction(this::logReadSuccess));
     }
 
     public Flux<CassandraIdAndPath> listAll() {
         return cassandraAsyncExecutor.execute(
             selectAll.bind())
-            .flatMapMany(resultSet -> cassandraUtils.convertToFlux(resultSet)
-                .map(this::fromRowToCassandraIdAndPath)
-                .map(FunctionalUtils.toFunction(this::logReadSuccess)));
+            .flatMapMany(cassandraUtils::convertToFlux)
+            .map(this::fromRowToCassandraIdAndPath)
+            .map(FunctionalUtils.toFunction(this::logReadSuccess));
     }
 
     /**


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 03/05: [REFACTORING] Rely on Flux::flatMapIterable

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 0cb375f4a24397c5fa72346e075118c63bad8a67
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue May 12 11:40:31 2020 +0700

    [REFACTORING] Rely on Flux::flatMapIterable
    
    This enables reactor to know it can synchronously expend the iterable
    without needing asynchronous execution
---
 .../james/mailbox/cassandra/mail/CassandraMessageIdMapper.java       | 4 +++-
 .../src/main/java/org/apache/james/PeriodicalHealthChecks.java       | 4 ++--
 .../jmap/api/filtering/impl/EventSourcingFilteringManagement.java    | 3 +--
 .../james/dlp/eventsourcing/EventSourcingDLPConfigurationStore.java  | 3 +--
 .../apache/james/jmap/draft/methods/GetVacationResponseMethod.java   | 5 +++--
 5 files changed, 10 insertions(+), 9 deletions(-)

diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
index 281fb2b..f98090a 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
@@ -22,6 +22,7 @@ import java.time.Duration;
 import java.util.Collection;
 import java.util.List;
 import java.util.Optional;
+import java.util.function.Function;
 
 import javax.mail.Flags;
 
@@ -251,7 +252,8 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
                 LOGGER.info("Mailbox {} was deleted during flag update", mailboxId);
                 return Mono.empty();
             })
-            .flatMapMany(Flux::fromIterable)
+            .flux()
+            .flatMapIterable(Function.identity())
             .map(pair -> buildUpdatedFlags(pair.getRight(), pair.getLeft()));
     }
 
diff --git a/server/container/guice/guice-common/src/main/java/org/apache/james/PeriodicalHealthChecks.java b/server/container/guice/guice-common/src/main/java/org/apache/james/PeriodicalHealthChecks.java
index c51efdf..dedddca 100644
--- a/server/container/guice/guice-common/src/main/java/org/apache/james/PeriodicalHealthChecks.java
+++ b/server/container/guice/guice-common/src/main/java/org/apache/james/PeriodicalHealthChecks.java
@@ -59,8 +59,8 @@ public class PeriodicalHealthChecks implements Startable {
 
     public void start() {
         disposable = Flux.interval(configuration.getPeriod(), scheduler)
-            .flatMap(any -> Flux.fromIterable(healthChecks)
-                .flatMap(healthCheck -> Mono.from(healthCheck.check())))
+            .flatMapIterable(any -> healthChecks)
+            .flatMap(healthCheck -> Mono.from(healthCheck.check()))
             .doOnNext(this::logResult)
             .onErrorContinue(this::logError)
             .subscribeOn(Schedulers.elastic())
diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java
index 72af24d..662fad1 100644
--- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java
+++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java
@@ -34,7 +34,6 @@ import org.reactivestreams.Publisher;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
 
-import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public class EventSourcingFilteringManagement implements FilteringManagement {
@@ -65,6 +64,6 @@ public class EventSourcingFilteringManagement implements FilteringManagement {
         FilteringAggregateId aggregateId = new FilteringAggregateId(username);
 
         return Mono.from(eventStore.getEventsOfAggregate(aggregateId))
-            .flatMapMany(history -> Flux.fromIterable(FilteringAggregate.load(aggregateId, history).listRules()));
+            .flatMapIterable(history -> FilteringAggregate.load(aggregateId, history).listRules());
     }
 }
diff --git a/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/EventSourcingDLPConfigurationStore.java b/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/EventSourcingDLPConfigurationStore.java
index 48d88ba..7a6140b 100644
--- a/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/EventSourcingDLPConfigurationStore.java
+++ b/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/EventSourcingDLPConfigurationStore.java
@@ -41,7 +41,6 @@ import org.reactivestreams.Publisher;
 
 import com.google.common.collect.ImmutableSet;
 
-import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public class EventSourcingDLPConfigurationStore implements DLPConfigurationStore {
@@ -84,7 +83,7 @@ public class EventSourcingDLPConfigurationStore implements DLPConfigurationStore
     @Override
     public Optional<DLPConfigurationItem> fetch(Domain domain, Id ruleId) {
         return Mono.from(list(domain))
-                .flatMapMany(rules -> Flux.fromIterable(rules.getItems()))
+                .flatMapIterable(DLPRules::getItems)
                 .toStream()
                 .filter((DLPConfigurationItem item) -> item.getId().equals(ruleId))
                 .findFirst();
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetVacationResponseMethod.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetVacationResponseMethod.java
index da38ebd..da9f326 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetVacationResponseMethod.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetVacationResponseMethod.java
@@ -74,11 +74,12 @@ public class GetVacationResponseMethod implements Method {
 
         return Flux.from(metricFactory.runPublishingTimerMetricLogP99(JMAP_PREFIX + METHOD_NAME.getName(),
             process(mailboxSession)
-                .flatMapMany(response -> Flux.just(JmapResponse.builder()
+                .map(response -> JmapResponse.builder()
                     .methodCallId(methodCallId)
                     .responseName(RESPONSE_NAME)
                     .response(response)
-                    .build()))))
+                    .build())
+                .flux()))
             .subscriberContext(jmapAction("VACATION"));
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 01/05: JAMES-3058 Cassandra instrumentation: Future for ongoing computation

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 96d202e08c697f569071181be79a7228a631e22a
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue May 12 09:54:03 2020 +0700

    JAMES-3058 Cassandra instrumentation: Future for ongoing computation
    
    Within Cassandra instrumentation we previously relyed in the following
    pattern when synchronizing Cassandra code:
    
    ```
    Mono<Void> operation = dao.updateVersion(newVersion).cache();
    
    operation.subscribeOn(Schedulers.elastic()).subscribe();
    barrier.awaitCaller();
    barrier.releaseCaller();
    operation.block();
    ```
    
    Which leaded to unstable test catched by our CI:
    
    ```
    [5c452decd240c33200a5bb639dce022d9846bea8] [INFO] Running org.apache.james.backends.cassandra.TestingSessionTest
    [5c452decd240c33200a5bb639dce022d9846bea8] 16:50:42.328 [ERROR] r.c.s.Schedulers - Scheduler worker in group main failed with an uncaught exception
    [5c452decd240c33200a5bb639dce022d9846bea8] org.apache.james.backends.cassandra.Scenario$InjectedFailureException: Injected failure
    [5c452decd240c33200a5bb639dce022d9846bea8] 	at org.apache.james.backends.cassandra.Scenario$Behavior.lambda$static$0(Scenario.java:45)
    [5c452decd240c33200a5bb639dce022d9846bea8] 	at org.apache.james.backends.cassandra.Scenario$Behavior.lambda$awaitOn$1(Scenario.java:53)
    [5c452decd240c33200a5bb639dce022d9846bea8] 	at org.apache.james.backends.cassandra.TestingSession.executeAsync(TestingSession.java:121)
    [5c452decd240c33200a5bb639dce022d9846bea8] 	at org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor.lambda$execute$0(CassandraAsyncExecutor.java:47)
    [5c452decd240c33200a5bb639dce022d9846bea8] 	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:44)
    [5c452decd240c33200a5bb639dce022d9846bea8] 	at reactor.core.publisher.Mono.subscribe(Mono.java:4210)
    [5c452decd240c33200a5bb639dce022d9846bea8] 	at reactor.core.publisher.MonoIgnorePublisher.subscribe(MonoIgnorePublisher.java:47)
    [5c452decd240c33200a5bb639dce022d9846bea8] 	at reactor.core.publisher.MonoCacheTime.subscribeOrReturn(MonoCacheTime.java:132)
    [5c452decd240c33200a5bb639dce022d9846bea8] 	at reactor.core.publisher.Mono.subscribe(Mono.java:4195)
    [5c452decd240c33200a5bb639dce022d9846bea8] 	at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.run(MonoSubscribeOn.java:124)
    [5c452decd240c33200a5bb639dce022d9846bea8] 	at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
    [5c452decd240c33200a5bb639dce022d9846bea8] 	at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
    [5c452decd240c33200a5bb639dce022d9846bea8] 	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    [5c452decd240c33200a5bb639dce022d9846bea8] 	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    [5c452decd240c33200a5bb639dce022d9846bea8] 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    [5c452decd240c33200a5bb639dce022d9846bea8] 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    [5c452decd240c33200a5bb639dce022d9846bea8] 	at java.base/java.lang.Thread.run(Thread.java:834)
    [5c452decd240c33200a5bb639dce022d9846bea8] Wrapped by: reactor.core.Exceptions$ErrorCallbackNotImplemented: org.apache.james.backends.cassandra.Scenario$InjectedFailureException: Injected failure
    [Then blocks forever]
    ```
    
    Relying on future to represent ongoing computation effectively solved
    the issue (unable to reproduce with a @RepeatedTest(100))
    
    The pattern then is:
    
    ```
    CompletableFuture<Void> operation = dao.updateVersion(newVersion)
        .subscribeOn(Schedulers.elastic())
        .toFuture();
    
    barrier.awaitCaller();
    barrier.releaseCaller();
    operation.get();
    ```
---
 .../backends/cassandra/TestingSessionTest.java     | 28 ++++++++++++----------
 .../cassandra/mail/CassandraMailboxDAOTest.java    | 16 +++++++------
 .../mail/CassandraModSeqProviderTest.java          | 20 +++++++---------
 3 files changed, 33 insertions(+), 31 deletions(-)

diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/TestingSessionTest.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/TestingSessionTest.java
index 54fca6f..27a191a 100644
--- a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/TestingSessionTest.java
+++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/TestingSessionTest.java
@@ -29,6 +29,8 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
+import java.util.concurrent.CompletableFuture;
+
 import org.apache.james.backends.cassandra.Scenario.Barrier;
 import org.apache.james.backends.cassandra.Scenario.InjectedFailureException;
 import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
@@ -40,7 +42,6 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
-import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 
 class TestingSessionTest {
@@ -222,7 +223,7 @@ class TestingSessionTest {
     }
 
     @Test
-    void statementShouldBeAppliedWhenBarrierIsReleased(CassandraCluster cassandra) {
+    void statementShouldBeAppliedWhenBarrierIsReleased(CassandraCluster cassandra) throws Exception {
         SchemaVersion originalSchemaVersion = new SchemaVersion(32);
         SchemaVersion newVersion = new SchemaVersion(36);
 
@@ -234,11 +235,12 @@ class TestingSessionTest {
                 .times(1)
                 .whenQueryStartsWith("INSERT INTO schemaVersion"));
 
-        Mono<Void> operation = dao.updateVersion(newVersion).cache();
+        CompletableFuture<Void> operation = dao.updateVersion(newVersion)
+            .subscribeOn(Schedulers.elastic())
+            .toFuture();
 
-        operation.subscribeOn(Schedulers.elastic()).subscribe();
         barrier.releaseCaller();
-        operation.block();
+        operation.get();
 
         assertThat(dao.getCurrentSchemaVersion().block())
             .contains(newVersion);
@@ -257,12 +259,13 @@ class TestingSessionTest {
                 .times(1)
                 .whenQueryStartsWith("INSERT INTO schemaVersion"));
 
-        Mono<Void> operation = dao.updateVersion(newVersion).cache();
+        CompletableFuture<Void> operation = dao.updateVersion(newVersion)
+            .subscribeOn(Schedulers.elastic())
+            .toFuture();
 
-        operation.subscribeOn(Schedulers.elastic()).subscribe();
         barrier.awaitCaller();
         barrier.releaseCaller();
-        operation.block();
+        operation.get();
 
         assertThat(dao.getCurrentSchemaVersion().block())
             .contains(newVersion);
@@ -281,13 +284,14 @@ class TestingSessionTest {
                 .times(1)
                 .whenQueryStartsWith("INSERT INTO schemaVersion"));
 
-        Mono<Void> operation = dao.updateVersion(newVersion).cache();
+        CompletableFuture<Void> operation = dao.updateVersion(newVersion)
+            .subscribeOn(Schedulers.elastic())
+            .toFuture();
 
-        operation.subscribeOn(Schedulers.elastic()).subscribe();
         barrier.awaitCaller();
         barrier.releaseCaller();
 
-        assertThatThrownBy(operation::block)
-            .isInstanceOf(InjectedFailureException.class);
+        assertThatThrownBy(operation::get)
+            .hasCauseInstanceOf(InjectedFailureException.class);
     }
 }
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAOTest.java
index e2d3b2f..b7b570d 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAOTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAOTest.java
@@ -27,6 +27,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.CassandraClusterExtension;
@@ -46,7 +47,6 @@ import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
-import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 
 class CassandraMailboxDAOTest {
@@ -139,16 +139,18 @@ class CassandraMailboxDAOTest {
             .times(2)
             .whenQueryStartsWith("UPDATE mailbox SET"));
 
-        Mono<Mailbox> readMailbox1 = testee.retrieveMailbox(CASSANDRA_ID_1).cache();
-        Mono<Mailbox> readMailbox2 = testee.retrieveMailbox(CASSANDRA_ID_1).cache();
-        readMailbox1.subscribeOn(Schedulers.elastic()).subscribe();
-        readMailbox2.subscribeOn(Schedulers.elastic()).subscribe();
+        CompletableFuture<Mailbox> readMailbox1 = testee.retrieveMailbox(CASSANDRA_ID_1)
+            .subscribeOn(Schedulers.elastic())
+            .toFuture();
+        CompletableFuture<Mailbox> readMailbox2 = testee.retrieveMailbox(CASSANDRA_ID_1)
+            .subscribeOn(Schedulers.elastic())
+            .toFuture();
 
         barrier.awaitCaller();
         barrier.releaseCaller();
 
-        assertThat(readMailbox1.block().getUidValidity())
-            .isEqualTo(readMailbox2.block().getUidValidity());
+        assertThat(readMailbox1.get().getUidValidity())
+            .isEqualTo(readMailbox2.get().getUidValidity());
     }
 
     @Test
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProviderTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProviderTest.java
index e3aed85..3128df7 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProviderTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProviderTest.java
@@ -24,16 +24,16 @@ import static org.apache.james.mailbox.cassandra.table.CassandraMessageModseqTab
 import static org.apache.james.mailbox.cassandra.table.CassandraMessageModseqTable.TABLE_NAME;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.LongStream;
 
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.CassandraClusterExtension;
-import org.apache.james.backends.cassandra.Scenario;
 import org.apache.james.backends.cassandra.Scenario.Barrier;
 import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
 import org.apache.james.core.Username;
@@ -51,7 +51,6 @@ import org.junit.jupiter.api.extension.RegisterExtension;
 import com.datastax.driver.core.querybuilder.QueryBuilder;
 import com.github.fge.lambdas.Throwing;
 
-import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 
 class CassandraModSeqProviderTest {
@@ -113,15 +112,12 @@ class CassandraModSeqProviderTest {
                     .times(1)
                     .whenQueryStartsWith("SELECT nextModseq FROM modseq WHERE mailboxId=:mailboxId;"));
 
-        Mono<ModSeq> operation1 = modSeqProvider.nextModSeq(CASSANDRA_ID)
+        CompletableFuture<ModSeq> operation1 = modSeqProvider.nextModSeq(CASSANDRA_ID)
             .subscribeOn(Schedulers.elastic())
-            .cache();
-        Mono<ModSeq> operation2 = modSeqProvider.nextModSeq(CASSANDRA_ID)
+            .toFuture();
+        CompletableFuture<ModSeq> operation2 = modSeqProvider.nextModSeq(CASSANDRA_ID)
             .subscribeOn(Schedulers.elastic())
-            .cache();
-
-        operation1.subscribe();
-        operation2.subscribe();
+            .toFuture();
 
         insertBarrier.awaitCaller();
         insertBarrier.releaseCaller();
@@ -136,8 +132,8 @@ class CassandraModSeqProviderTest {
 
         retryBarrier.releaseCaller();
 
-        assertThatCode(() -> operation1.block(Duration.ofSeconds(1))).doesNotThrowAnyException();
-        assertThatCode(() -> operation2.block(Duration.ofSeconds(1))).doesNotThrowAnyException();
+        assertThatCode(() -> operation1.get(1, TimeUnit.SECONDS)).doesNotThrowAnyException();
+        assertThatCode(() -> operation2.get(1, TimeUnit.SECONDS)).doesNotThrowAnyException();
     }
 
     @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org