You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/04/06 01:44:20 UTC

[james-project] branch master updated (44297c5 -> 42eaf62)

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git.


    from 44297c5  [maven-release-plugin] prepare for next development iteration
     new fa5cf9d  [Refactoring] Rely of CassandraVersionManager instead of its DAO
     new 21c0033  JAMES-3133 Reactify the event system
     new 42eaf62  [Release] Upgrade Spring dockerfile version to 3.6.0-SNAPSHOT

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 dockerfiles/run/spring/Dockerfile                  |  2 +-
 event-sourcing/event-sourcing-core/pom.xml         |  4 ++
 .../apache/james/eventsourcing/CommandHandler.java |  6 ++-
 .../eventsourcing/javaapi/CommandHandlerJava.java  | 34 ------------
 .../james/eventsourcing/CommandDispatcher.scala    | 56 +++++++++----------
 .../org/apache/james/eventsourcing/EventBus.scala  | 22 ++++++--
 .../james/eventsourcing/EventSourcingSystem.scala  |  3 +-
 .../eventsourcing/EventSourcingSystemTest.scala    | 55 ++++++++++---------
 .../org/apache/james/eventsourcing/Event.scala     |  3 +-
 event-sourcing/event-store-api/pom.xml             |  9 ++++
 .../eventsourcing/eventstore/EventStore.scala      | 11 ++--
 .../eventstore/EventStoreContract.scala            | 23 ++++----
 event-sourcing/event-store-cassandra/pom.xml       |  3 +-
 .../eventstore/cassandra/CassandraEventStore.scala | 28 ++++++----
 .../eventstore/cassandra/EventStoreDao.scala       |  4 +-
 event-sourcing/event-store-memory/pom.xml          |  4 ++
 .../eventstore/memory/InMemoryEventStore.scala     | 22 +++++---
 .../CassandraMailboxSessionMapperFactory.java      | 11 ++--
 .../cassandra/mail/CassandraMailboxMapper.java     | 22 ++++----
 .../task/SolveMailboxInconsistenciesService.java   | 16 +++---
 .../CassandraSubscriptionManagerTest.java          |  5 +-
 .../cassandra/mail/CassandraMailboxMapperTest.java |  3 +-
 .../mail/migration/MailboxPathV2MigrationTest.java |  3 +-
 .../SolveMailboxInconsistenciesServiceTest.java    |  3 +-
 .../commands/DetectThresholdCrossingHandler.java   | 18 ++++---
 .../listeners/QuotaThresholdCrossingListener.java  | 10 ++--
 pom.xml                                            |  5 ++
 .../james/dlp/api/DLPConfigurationLoader.java      |  3 +-
 .../dlp/api/DLPConfigurationStoreContract.java     | 23 ++++----
 .../jmap/api/filtering/FilteringManagement.java    |  3 +-
 .../filtering/impl/DefineRulesCommandHandler.java  | 16 +++---
 .../impl/EventSourcingFilteringManagement.java     | 15 +++---
 .../api/filtering/FilteringManagementContract.java | 14 ++---
 .../EventSourcingDLPConfigurationStore.java        | 21 ++++----
 .../commands/ClearCommandHandler.java              | 15 +++---
 .../commands/StoreCommandHandler.java              | 15 +++---
 .../transport/matchers/dlp/DlpRulesLoader.java     |  4 +-
 .../james/jmap/draft/methods/GetFilterMethod.java  | 29 +++++-----
 .../james/jmap/mailet/filter/JMAPFiltering.java    |  8 ++-
 .../webadmin/routes/DLPConfigurationRoutes.java    |  3 +-
 .../EventsourcingConfigurationManagement.java      | 15 +++---
 .../RegisterConfigurationCommandHandler.java       | 16 +++---
 .../EventsourcingConfigurationManagementTest.java  | 19 ++++---
 .../distributed/RabbitMQWorkQueue.java             | 16 +++---
 .../eventsourcing/distributed/ImmediateWorker.java | 10 ++--
 .../org/apache/james/task/MemoryTaskManager.java   | 36 +++++++------
 .../apache/james/task/SerialTaskManagerWorker.java | 59 ++++++++++----------
 .../org/apache/james/task/TaskManagerWorker.java   | 18 ++++---
 .../james/task/eventsourcing/CommandHandlers.scala | 63 ++++++++++++----------
 .../eventsourcing/EventSourcingTaskManager.scala   |  9 ++--
 .../task/eventsourcing/WorkerStatusListener.scala  | 17 +++---
 .../james/task/SerialTaskManagerWorkerTest.java    |  8 +++
 .../EventSourcingTaskManagerTest.java              |  8 +--
 53 files changed, 475 insertions(+), 373 deletions(-)
 delete mode 100644 event-sourcing/event-sourcing-core/src/main/java/org/apache/james/eventsourcing/javaapi/CommandHandlerJava.java


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 03/03: [Release] Upgrade Spring dockerfile version to 3.6.0-SNAPSHOT

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 42eaf62ddc149e4da325ea8d157ac650140b1197
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Sat Apr 4 17:19:57 2020 +0200

    [Release] Upgrade Spring dockerfile version to 3.6.0-SNAPSHOT
---
 dockerfiles/run/spring/Dockerfile | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/dockerfiles/run/spring/Dockerfile b/dockerfiles/run/spring/Dockerfile
index db224ac..dbf564c 100644
--- a/dockerfiles/run/spring/Dockerfile
+++ b/dockerfiles/run/spring/Dockerfile
@@ -17,7 +17,7 @@ EXPOSE 25 110 143 465 587 993 4000
 
 WORKDIR /root
 
-ARG VERSION=3.5.0-SNAPSHOT
+ARG VERSION=3.6.0-SNAPSHOT
 
 # Get data we need to run James : build results and configuration
 ADD "destination/james-server-app-$VERSION-app.zip" "/root/james-server-app-$VERSION-app.zip"


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 01/03: [Refactoring] Rely of CassandraVersionManager instead of its DAO

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit fa5cf9dab31fd5e576a0181059bf08d32da8d648
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Apr 3 09:23:25 2020 +0700

    [Refactoring] Rely of CassandraVersionManager instead of its DAO
---
 .../CassandraMailboxSessionMapperFactory.java      | 11 ++++++-----
 .../cassandra/mail/CassandraMailboxMapper.java     | 22 +++++++++++++---------
 .../task/SolveMailboxInconsistenciesService.java   | 16 +++++++---------
 .../CassandraSubscriptionManagerTest.java          |  5 +++--
 .../cassandra/mail/CassandraMailboxMapperTest.java |  3 ++-
 .../mail/migration/MailboxPathV2MigrationTest.java |  3 ++-
 .../SolveMailboxInconsistenciesServiceTest.java    |  3 ++-
 7 files changed, 35 insertions(+), 28 deletions(-)

diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
index 6086ba4..301e28a 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
@@ -23,7 +23,7 @@ import javax.inject.Inject;
 
 import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
 import org.apache.james.backends.cassandra.utils.CassandraUtils;
-import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionDAO;
+import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionManager;
 import org.apache.james.blob.api.BlobStore;
 import org.apache.james.mailbox.MailboxSession;
 import org.apache.james.mailbox.cassandra.mail.CassandraACLMapper;
@@ -93,7 +93,7 @@ public class CassandraMailboxSessionMapperFactory extends MailboxSessionMapperFa
     private final CassandraAttachmentOwnerDAO ownerDAO;
     private final CassandraACLMapper aclMapper;
     private final CassandraUserMailboxRightsDAO userMailboxRightsDAO;
-    private final CassandraSchemaVersionDAO versionDAO;
+    private final CassandraSchemaVersionManager versionManager;
     private final CassandraUtils cassandraUtils;
     private final CassandraConfiguration cassandraConfiguration;
 
@@ -107,7 +107,8 @@ public class CassandraMailboxSessionMapperFactory extends MailboxSessionMapperFa
                                                 BlobStore blobStore, CassandraAttachmentMessageIdDAO attachmentMessageIdDAO,
                                                 CassandraAttachmentOwnerDAO ownerDAO, CassandraACLMapper aclMapper,
                                                 CassandraUserMailboxRightsDAO userMailboxRightsDAO,
-                                                CassandraSchemaVersionDAO versionDAO, CassandraUtils cassandraUtils, CassandraConfiguration cassandraConfiguration) {
+                                                CassandraSchemaVersionManager versionManager,
+                                                CassandraUtils cassandraUtils, CassandraConfiguration cassandraConfiguration) {
         this.uidProvider = uidProvider;
         this.modSeqProvider = modSeqProvider;
         this.session = session;
@@ -128,7 +129,7 @@ public class CassandraMailboxSessionMapperFactory extends MailboxSessionMapperFa
         this.attachmentMessageIdDAO = attachmentMessageIdDAO;
         this.aclMapper = aclMapper;
         this.userMailboxRightsDAO = userMailboxRightsDAO;
-        this.versionDAO = versionDAO;
+        this.versionManager = versionManager;
         this.cassandraUtils = cassandraUtils;
         this.ownerDAO = ownerDAO;
         this.cassandraConfiguration = cassandraConfiguration;
@@ -168,7 +169,7 @@ public class CassandraMailboxSessionMapperFactory extends MailboxSessionMapperFa
 
     @Override
     public MailboxMapper createMailboxMapper(MailboxSession mailboxSession) {
-        return new CassandraMailboxMapper(mailboxDAO, mailboxPathDAO, mailboxPathV2DAO, userMailboxRightsDAO, aclMapper, versionDAO);
+        return new CassandraMailboxMapper(mailboxDAO, mailboxPathDAO, mailboxPathV2DAO, userMailboxRightsDAO, aclMapper, versionManager);
     }
 
     @Override
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java
index 8e2d0b5..75b068a 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java
@@ -25,7 +25,6 @@ import java.util.List;
 import javax.inject.Inject;
 
 import org.apache.commons.lang3.tuple.Pair;
-import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionDAO;
 import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionManager;
 import org.apache.james.backends.cassandra.versions.SchemaVersion;
 import org.apache.james.core.Username;
@@ -64,20 +63,25 @@ public class CassandraMailboxMapper implements MailboxMapper {
     private final CassandraMailboxPathV2DAO mailboxPathV2DAO;
     private final CassandraACLMapper cassandraACLMapper;
     private final CassandraUserMailboxRightsDAO userMailboxRightsDAO;
-    private final boolean needMailboxPathV1Support;
+    private final CassandraSchemaVersionManager versionManager;
 
     @Inject
-    public CassandraMailboxMapper(CassandraMailboxDAO mailboxDAO, CassandraMailboxPathDAOImpl mailboxPathDAO, CassandraMailboxPathV2DAO mailboxPathV2DAO, CassandraUserMailboxRightsDAO userMailboxRightsDAO, CassandraACLMapper aclMapper, CassandraSchemaVersionDAO versionDAO) {
+    public CassandraMailboxMapper(CassandraMailboxDAO mailboxDAO,
+                                  CassandraMailboxPathDAOImpl mailboxPathDAO,
+                                  CassandraMailboxPathV2DAO mailboxPathV2DAO,
+                                  CassandraUserMailboxRightsDAO userMailboxRightsDAO,
+                                  CassandraACLMapper aclMapper,
+                                  CassandraSchemaVersionManager versionManager) {
         this.mailboxDAO = mailboxDAO;
         this.mailboxPathDAO = mailboxPathDAO;
         this.mailboxPathV2DAO = mailboxPathV2DAO;
         this.userMailboxRightsDAO = userMailboxRightsDAO;
         this.cassandraACLMapper = aclMapper;
+        this.versionManager = versionManager;
+    }
 
-        this.needMailboxPathV1Support = versionDAO.getCurrentSchemaVersion()
-            .block()
-            .orElse(CassandraSchemaVersionManager.MIN_VERSION)
-            .isBefore(MAILBOX_PATH_V_2_MIGRATION_PERFORMED_VERSION);
+    private boolean needMailboxPathV1Support() {
+        return versionManager.isBefore(MAILBOX_PATH_V_2_MIGRATION_PERFORMED_VERSION);
     }
 
     @Override
@@ -90,7 +94,7 @@ public class CassandraMailboxMapper implements MailboxMapper {
     }
 
     private Flux<Void> deletePath(Mailbox mailbox) {
-        if (needMailboxPathV1Support) {
+        if (needMailboxPathV1Support()) {
             return Flux.merge(
                 mailboxPathDAO.delete(mailbox.generateAssociatedPath()),
                 mailboxPathV2DAO.delete(mailbox.generateAssociatedPath()));
@@ -163,7 +167,7 @@ public class CassandraMailboxMapper implements MailboxMapper {
     }
 
     private Flux<CassandraIdAndPath> listPaths(String fixedNamespace, Username fixedUser) {
-        if (needMailboxPathV1Support) {
+        if (needMailboxPathV1Support()) {
             return Flux.concat(mailboxPathV2DAO.listUserMailboxes(fixedNamespace, fixedUser),
                 mailboxPathDAO.listUserMailboxes(fixedNamespace, fixedUser));
         }
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesService.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesService.java
index 29c1792..4777ed8 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesService.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesService.java
@@ -27,7 +27,7 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import javax.inject.Inject;
 
-import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionDAO;
+import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionManager;
 import org.apache.james.backends.cassandra.versions.SchemaVersion;
 import org.apache.james.mailbox.cassandra.ids.CassandraId;
 import org.apache.james.mailbox.cassandra.mail.CassandraIdAndPath;
@@ -357,13 +357,13 @@ public class SolveMailboxInconsistenciesService {
 
     private final CassandraMailboxDAO mailboxDAO;
     private final CassandraMailboxPathV2DAO mailboxPathV2DAO;
-    private final CassandraSchemaVersionDAO versionDAO;
+    private final CassandraSchemaVersionManager versionManager;
 
     @Inject
-    SolveMailboxInconsistenciesService(CassandraMailboxDAO mailboxDAO, CassandraMailboxPathV2DAO mailboxPathV2DAO, CassandraSchemaVersionDAO versionDAO) {
+    SolveMailboxInconsistenciesService(CassandraMailboxDAO mailboxDAO, CassandraMailboxPathV2DAO mailboxPathV2DAO, CassandraSchemaVersionManager versionManager) {
         this.mailboxDAO = mailboxDAO;
         this.mailboxPathV2DAO = mailboxPathV2DAO;
-        this.versionDAO = versionDAO;
+        this.versionManager = versionManager;
     }
 
     Mono<Result> fixMailboxInconsistencies(Context context) {
@@ -375,16 +375,14 @@ public class SolveMailboxInconsistenciesService {
     }
 
     private void assertValidVersion() {
-        Optional<SchemaVersion> maybeVersion = versionDAO.getCurrentSchemaVersion().block();
+        SchemaVersion version = versionManager.computeVersion();
 
-        boolean isVersionValid = maybeVersion
-            .map(version -> version.isAfterOrEquals(MAILBOX_PATH_V_2_MIGRATION_PERFORMED_VERSION))
-            .orElse(false);
+        boolean isVersionValid = version.isAfterOrEquals(MAILBOX_PATH_V_2_MIGRATION_PERFORMED_VERSION);
 
         Preconditions.checkState(isVersionValid,
             "Schema version %s is required in order to ensure mailboxPathV2DAO to be correctly populated, got %s",
             MAILBOX_PATH_V_2_MIGRATION_PERFORMED_VERSION.getValue(),
-            maybeVersion.map(SchemaVersion::getValue));
+            version.getValue());
     }
 
     private Flux<Result> processMailboxPathDaoInconsistencies(Context context) {
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraSubscriptionManagerTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraSubscriptionManagerTest.java
index 44e1063..a230e11 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraSubscriptionManagerTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraSubscriptionManagerTest.java
@@ -23,6 +23,7 @@ import org.apache.james.backends.cassandra.CassandraClusterExtension;
 import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
 import org.apache.james.backends.cassandra.utils.CassandraUtils;
 import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionDAO;
+import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionManager;
 import org.apache.james.blob.api.BlobStore;
 import org.apache.james.mailbox.SubscriptionManager;
 import org.apache.james.mailbox.SubscriptionManagerContract;
@@ -87,7 +88,7 @@ class CassandraSubscriptionManagerTest implements SubscriptionManagerContract {
         BlobStore blobStore = null;
         CassandraUidProvider uidProvider = null;
         CassandraModSeqProvider modSeqProvider = null;
-        CassandraSchemaVersionDAO versionDAO = null;
+        CassandraSchemaVersionManager versionManager = null;
 
         subscriptionManager = new StoreSubscriptionManager(
             new CassandraMailboxSessionMapperFactory(
@@ -112,7 +113,7 @@ class CassandraSubscriptionManagerTest implements SubscriptionManagerContract {
                 ownerDAO,
                 aclMapper,
                 userMailboxRightsDAO,
-                versionDAO,
+                versionManager,
                 CassandraUtils.WITH_DEFAULT_CONFIGURATION,
                 CassandraConfiguration.DEFAULT_CONFIGURATION));
     }
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java
index 9a9fcf2..f6d1dc2 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java
@@ -36,6 +36,7 @@ import org.apache.james.backends.cassandra.components.CassandraModule;
 import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
 import org.apache.james.backends.cassandra.utils.CassandraUtils;
 import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionDAO;
+import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionManager;
 import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
 import org.apache.james.core.Username;
 import org.apache.james.mailbox.cassandra.ids.CassandraId;
@@ -106,7 +107,7 @@ class CassandraMailboxMapperTest {
             mailboxPathV2DAO,
             userMailboxRightsDAO,
             aclMapper,
-            new CassandraSchemaVersionDAO(cassandra.getConf()));
+            new CassandraSchemaVersionManager(new CassandraSchemaVersionDAO(cassandra.getConf())));
     }
 
     @Nested
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2MigrationTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2MigrationTest.java
index d65cb06..e018eb1 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2MigrationTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2MigrationTest.java
@@ -27,6 +27,7 @@ import org.apache.james.backends.cassandra.components.CassandraModule;
 import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
 import org.apache.james.backends.cassandra.utils.CassandraUtils;
 import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionDAO;
+import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionManager;
 import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
 import org.apache.james.core.Username;
 import org.apache.james.mailbox.cassandra.ids.CassandraId;
@@ -85,7 +86,7 @@ class MailboxPathV2MigrationTest {
             daoV2,
             userMailboxRightsDAO,
             new CassandraACLMapper(cassandra.getConf(), userMailboxRightsDAO, CassandraConfiguration.DEFAULT_CONFIGURATION),
-            new CassandraSchemaVersionDAO(cassandra.getConf()));
+            new CassandraSchemaVersionManager(new CassandraSchemaVersionDAO(cassandra.getConf())));
     }
 
     @Test
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesServiceTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesServiceTest.java
index 97cc2f2..ca2c2b1 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesServiceTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesServiceTest.java
@@ -28,6 +28,7 @@ import org.apache.james.backends.cassandra.CassandraClusterExtension;
 import org.apache.james.backends.cassandra.components.CassandraModule;
 import org.apache.james.backends.cassandra.utils.CassandraUtils;
 import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionDAO;
+import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionManager;
 import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
 import org.apache.james.backends.cassandra.versions.SchemaVersion;
 import org.apache.james.core.Username;
@@ -75,7 +76,7 @@ class SolveMailboxInconsistenciesServiceTest {
         mailboxDAO = new CassandraMailboxDAO(cassandra.getConf(), cassandra.getTypesProvider());
         mailboxPathV2DAO = new CassandraMailboxPathV2DAO(cassandra.getConf(), CassandraUtils.WITH_DEFAULT_CONFIGURATION);
         versionDAO = new CassandraSchemaVersionDAO(cassandra.getConf());
-        testee = new SolveMailboxInconsistenciesService(mailboxDAO, mailboxPathV2DAO, versionDAO);
+        testee = new SolveMailboxInconsistenciesService(mailboxDAO, mailboxPathV2DAO, new CassandraSchemaVersionManager(versionDAO));
 
         versionDAO.updateVersion(new SchemaVersion(7)).block();
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 02/03: JAMES-3133 Reactify the event system

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 21c0033b3bfc7fad82a6f92ceb0ece3d0b9ee9a3
Author: Gautier DI FOLCO <gd...@linagora.com>
AuthorDate: Wed Mar 25 15:10:22 2020 +0100

    JAMES-3133 Reactify the event system
---
 event-sourcing/event-sourcing-core/pom.xml         |  4 ++
 .../apache/james/eventsourcing/CommandHandler.java |  6 ++-
 .../eventsourcing/javaapi/CommandHandlerJava.java  | 34 ------------
 .../james/eventsourcing/CommandDispatcher.scala    | 56 +++++++++----------
 .../org/apache/james/eventsourcing/EventBus.scala  | 22 ++++++--
 .../james/eventsourcing/EventSourcingSystem.scala  |  3 +-
 .../eventsourcing/EventSourcingSystemTest.scala    | 55 ++++++++++---------
 .../org/apache/james/eventsourcing/Event.scala     |  3 +-
 event-sourcing/event-store-api/pom.xml             |  9 ++++
 .../eventsourcing/eventstore/EventStore.scala      | 11 ++--
 .../eventstore/EventStoreContract.scala            | 23 ++++----
 event-sourcing/event-store-cassandra/pom.xml       |  3 +-
 .../eventstore/cassandra/CassandraEventStore.scala | 28 ++++++----
 .../eventstore/cassandra/EventStoreDao.scala       |  4 +-
 event-sourcing/event-store-memory/pom.xml          |  4 ++
 .../eventstore/memory/InMemoryEventStore.scala     | 22 +++++---
 .../commands/DetectThresholdCrossingHandler.java   | 18 ++++---
 .../listeners/QuotaThresholdCrossingListener.java  | 10 ++--
 pom.xml                                            |  5 ++
 .../james/dlp/api/DLPConfigurationLoader.java      |  3 +-
 .../dlp/api/DLPConfigurationStoreContract.java     | 23 ++++----
 .../jmap/api/filtering/FilteringManagement.java    |  3 +-
 .../filtering/impl/DefineRulesCommandHandler.java  | 16 +++---
 .../impl/EventSourcingFilteringManagement.java     | 15 +++---
 .../api/filtering/FilteringManagementContract.java | 14 ++---
 .../EventSourcingDLPConfigurationStore.java        | 21 ++++----
 .../commands/ClearCommandHandler.java              | 15 +++---
 .../commands/StoreCommandHandler.java              | 15 +++---
 .../transport/matchers/dlp/DlpRulesLoader.java     |  4 +-
 .../james/jmap/draft/methods/GetFilterMethod.java  | 29 +++++-----
 .../james/jmap/mailet/filter/JMAPFiltering.java    |  8 ++-
 .../webadmin/routes/DLPConfigurationRoutes.java    |  3 +-
 .../EventsourcingConfigurationManagement.java      | 15 +++---
 .../RegisterConfigurationCommandHandler.java       | 16 +++---
 .../EventsourcingConfigurationManagementTest.java  | 19 ++++---
 .../distributed/RabbitMQWorkQueue.java             | 16 +++---
 .../eventsourcing/distributed/ImmediateWorker.java | 10 ++--
 .../org/apache/james/task/MemoryTaskManager.java   | 36 +++++++------
 .../apache/james/task/SerialTaskManagerWorker.java | 59 ++++++++++----------
 .../org/apache/james/task/TaskManagerWorker.java   | 18 ++++---
 .../james/task/eventsourcing/CommandHandlers.scala | 63 ++++++++++++----------
 .../eventsourcing/EventSourcingTaskManager.scala   |  9 ++--
 .../task/eventsourcing/WorkerStatusListener.scala  | 17 +++---
 .../james/task/SerialTaskManagerWorkerTest.java    |  8 +++
 .../EventSourcingTaskManagerTest.java              |  8 +--
 45 files changed, 439 insertions(+), 344 deletions(-)

diff --git a/event-sourcing/event-sourcing-core/pom.xml b/event-sourcing/event-sourcing-core/pom.xml
index 078e693..9497ae1 100644
--- a/event-sourcing/event-sourcing-core/pom.xml
+++ b/event-sourcing/event-sourcing-core/pom.xml
@@ -56,6 +56,10 @@
             <artifactId>guavate</artifactId>
         </dependency>
         <dependency>
+            <groupId>io.projectreactor</groupId>
+            <artifactId>reactor-scala-extensions_${scala.base}</artifactId>
+        </dependency>
+        <dependency>
             <groupId>javax.inject</groupId>
             <artifactId>javax.inject</artifactId>
         </dependency>
diff --git a/event-sourcing/event-sourcing-core/src/main/java/org/apache/james/eventsourcing/CommandHandler.java b/event-sourcing/event-sourcing-core/src/main/java/org/apache/james/eventsourcing/CommandHandler.java
index 6e3645b..dd69083 100644
--- a/event-sourcing/event-sourcing-core/src/main/java/org/apache/james/eventsourcing/CommandHandler.java
+++ b/event-sourcing/event-sourcing-core/src/main/java/org/apache/james/eventsourcing/CommandHandler.java
@@ -18,10 +18,12 @@
  ****************************************************************/
 package org.apache.james.eventsourcing;
 
-import scala.collection.immutable.List;
+import java.util.List;
+
+import org.reactivestreams.Publisher;
 
 public interface CommandHandler<C extends Command> {
   Class<C> handledClass();
 
-  List<? extends Event> handle(C command);
+  Publisher<List<? extends Event>> handle(C command);
 }
diff --git a/event-sourcing/event-sourcing-core/src/main/java/org/apache/james/eventsourcing/javaapi/CommandHandlerJava.java b/event-sourcing/event-sourcing-core/src/main/java/org/apache/james/eventsourcing/javaapi/CommandHandlerJava.java
deleted file mode 100644
index ce9cfc9..0000000
--- a/event-sourcing/event-sourcing-core/src/main/java/org/apache/james/eventsourcing/javaapi/CommandHandlerJava.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-package org.apache.james.eventsourcing.javaapi;
-
-import org.apache.james.eventsourcing.Command;
-import org.apache.james.eventsourcing.CommandHandler;
-import org.apache.james.eventsourcing.Event;
-
-import scala.jdk.javaapi.CollectionConverters;
-
-public interface CommandHandlerJava<C extends Command> extends CommandHandler<C> {
-
-    java.util.List<? extends Event> handleJava(C command);
-
-    default scala.collection.immutable.List<? extends Event> handle(C command) {
-        return CollectionConverters.asScala(handleJava(command)).toList();
-    }
-}
diff --git a/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/CommandDispatcher.scala b/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/CommandDispatcher.scala
index 8dde47f..05abc6f 100644
--- a/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/CommandDispatcher.scala
+++ b/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/CommandDispatcher.scala
@@ -18,11 +18,17 @@
  ****************************************************************/
 package org.apache.james.eventsourcing
 
-import com.google.common.base.Preconditions
+import java.util
+
 import javax.inject.Inject
+
 import org.apache.james.eventsourcing.eventstore.EventStoreFailedException
+import org.reactivestreams.Publisher
+
+import com.google.common.base.Preconditions
+import reactor.core.scala.publisher.SMono
 
-import scala.util.{Failure, Success, Try}
+import scala.jdk.CollectionConverters._
 
 object CommandDispatcher {
   private val MAX_RETRY = 10
@@ -45,9 +51,16 @@ object CommandDispatcher {
 class CommandDispatcher @Inject()(eventBus: EventBus, handlers: Set[CommandHandler[_ <: Command]]) {
   Preconditions.checkArgument(hasOnlyOneHandlerByCommand(handlers), CommandDispatcher.ONLY_ONE_HANDLER_PRECONDITION)
 
-  def dispatch(c: Command): Unit = {
-    trySeveralTimes(() => tryDispatch(c))
-      .getOrElse(() => throw CommandDispatcher.TooManyRetries(c, CommandDispatcher.MAX_RETRY))
+  def dispatch(c: Command): Publisher[Void] = {
+    tryDispatch(c)
+      .retry(CommandDispatcher.MAX_RETRY, {
+        case _: EventStoreFailedException => true
+        case _ => false
+      })
+      .onErrorMap({
+        case _: EventStoreFailedException => CommandDispatcher.TooManyRetries(c, CommandDispatcher.MAX_RETRY)
+        case error => error
+      })
   }
 
   private def hasOnlyOneHandlerByCommand(handlers: Set[CommandHandler[_ <: Command]]): Boolean =
@@ -58,35 +71,22 @@ class CommandDispatcher @Inject()(eventBus: EventBus, handlers: Set[CommandHandl
   private val handlersByClass: Map[Class[_ <: Command], CommandHandler[_ <: Command]] =
     handlers.map(handler => (handler.handledClass, handler)).toMap
 
-
-  private def trySeveralTimes(singleTry: () => Boolean): Option[Unit] =
-    0.until(CommandDispatcher.MAX_RETRY)
-      .find(_ => singleTry())
-      .map(_ => ())
-
-
-  private def tryDispatch(c: Command): Boolean = {
-    val maybeEvents: Option[Try[List[_ <: Event]]] = handleCommand(c)
-    maybeEvents match {
-      case Some(eventsTry) =>
-        eventsTry
-          .flatMap(events => Try(eventBus.publish(events))) match {
-          case Success(_) => true
-          case Failure(_: EventStoreFailedException) => false
-          case Failure(e) => throw e
-        }
+  private def tryDispatch(c: Command): SMono[Void] = {
+    handleCommand(c) match {
+      case Some(eventsPublisher) =>
+        SMono(eventsPublisher)
+          .flatMap(events => eventBus.publish(events.asScala))
       case _ =>
-        throw CommandDispatcher.UnknownCommandException(c)
+        SMono.raiseError(CommandDispatcher.UnknownCommandException(c))
     }
   }
 
-  private def handleCommand(c: Command): Option[Try[List[_ <: Event]]] = {
+  private def handleCommand(c: Command): Option[Publisher[util.List[_ <: Event]]] = {
     handlersByClass
       .get(c.getClass)
       .map(commandHandler =>
-        Try(
-          commandHandler
-            .asInstanceOf[CommandHandler[c.type]]
-            .handle(c)))
+        commandHandler
+          .asInstanceOf[CommandHandler[c.type]]
+          .handle(c))
   }
 }
\ No newline at end of file
diff --git a/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/EventBus.scala b/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/EventBus.scala
index 9d527d3..ca8d753 100644
--- a/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/EventBus.scala
+++ b/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/EventBus.scala
@@ -19,22 +19,34 @@
 package org.apache.james.eventsourcing
 
 import javax.inject.Inject
+
 import org.apache.james.eventsourcing.eventstore.{EventStore, EventStoreFailedException}
+import org.reactivestreams.Publisher
 import org.slf4j.LoggerFactory
 
+import reactor.core.scala.publisher.{SFlux, SMono}
+
 object EventBus {
   private val LOGGER = LoggerFactory.getLogger(classOf[EventBus])
 }
 
 class EventBus @Inject() (eventStore: EventStore, subscribers: Set[Subscriber]) {
   @throws[EventStoreFailedException]
-  def publish(events: List[Event]): Unit = {
-    eventStore.appendAll(events)
-    events
-      .flatMap((event: Event) => subscribers.map(subscriber => (event, subscriber)))
-      .foreach {case (event, subscriber) => handle(event, subscriber)}
+  def publish(events: Iterable[Event]): SMono[Void] = {
+    SMono(eventStore.appendAll(events))
+        .`then`(runHandlers(events, subscribers))
+
   }
 
+  def runHandlers(events: Iterable[Event], subscribers: Set[Subscriber]): SMono[Void] = {
+    SFlux.fromIterable(events.flatMap((event: Event) => subscribers.map(subscriber => (event, subscriber))))
+      .flatMap(infos => runHandler(infos._1, infos._2))
+      .`then`()
+      .`then`(SMono.empty)
+  }
+
+  def runHandler(event: Event, subscriber: Subscriber): Publisher[Void] = SMono.fromCallable(() => handle(event, subscriber)).`then`(SMono.empty)
+
   private def handle(event : Event, subscriber: Subscriber) : Unit = {
     try {
       subscriber.handle(event)
diff --git a/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/EventSourcingSystem.scala b/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/EventSourcingSystem.scala
index 2f3b71b..5cc52f2 100644
--- a/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/EventSourcingSystem.scala
+++ b/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/EventSourcingSystem.scala
@@ -19,6 +19,7 @@
 package org.apache.james.eventsourcing
 
 import org.apache.james.eventsourcing.eventstore.EventStore
+import org.reactivestreams.Publisher
 
 
 object EventSourcingSystem {
@@ -36,5 +37,5 @@ class EventSourcingSystem(handlers: Set[CommandHandler[_ <: Command]],
   private val eventBus = new EventBus(eventStore, subscribers)
   private val commandDispatcher = new CommandDispatcher(eventBus, handlers)
 
-  def dispatch(c: Command): Unit = commandDispatcher.dispatch(c)
+  def dispatch(c: Command): Publisher[Void] = commandDispatcher.dispatch(c)
 }
\ No newline at end of file
diff --git a/event-sourcing/event-sourcing-core/src/test/scala/org/apache/james/eventsourcing/EventSourcingSystemTest.scala b/event-sourcing/event-sourcing-core/src/test/scala/org/apache/james/eventsourcing/EventSourcingSystemTest.scala
index 948867a..5555907 100644
--- a/event-sourcing/event-sourcing-core/src/test/scala/org/apache/james/eventsourcing/EventSourcingSystemTest.scala
+++ b/event-sourcing/event-sourcing-core/src/test/scala/org/apache/james/eventsourcing/EventSourcingSystemTest.scala
@@ -18,7 +18,8 @@
  ****************************************************************/
 package org.apache.james.eventsourcing
 
-import com.google.common.base.Splitter
+import java.util.{List => JavaList}
+
 import org.apache.james.eventsourcing.eventstore.{EventStore, History}
 import org.assertj.core.api.Assertions.{assertThat, assertThatThrownBy}
 import org.junit.jupiter.api.Test
@@ -26,8 +27,12 @@ import org.mockito.ArgumentMatchers.any
 import org.mockito.Mockito.{doThrow, mock, when}
 import org.mockito.internal.matchers.InstanceOf
 import org.mockito.internal.progress.ThreadSafeMockingProgress
+import org.reactivestreams.Publisher
+
+import com.google.common.base.Splitter
+import reactor.core.publisher.Mono
+import reactor.core.scala.publisher.SMono
 
-import scala.collection.immutable.List
 import scala.jdk.CollectionConverters._
 
 object EventSourcingSystemTest {
@@ -59,7 +64,7 @@ trait EventSourcingSystemTest {
   def dispatchShouldApplyCommandHandlerThenCallSubscribers(eventStore: EventStore) : Unit = {
     val subscriber = new DataCollectorSubscriber
     val eventSourcingSystem = new EventSourcingSystem(Set(simpleDispatcher(eventStore)), Set(subscriber), eventStore)
-    eventSourcingSystem.dispatch(new EventSourcingSystemTest.MyCommand(EventSourcingSystemTest.PAYLOAD_1))
+    Mono.from(eventSourcingSystem.dispatch(new EventSourcingSystemTest.MyCommand(EventSourcingSystemTest.PAYLOAD_1))).block()
     assertThat(subscriber.getData.asJava).containsExactly(EventSourcingSystemTest.PAYLOAD_1)
   }
 
@@ -70,7 +75,7 @@ trait EventSourcingSystemTest {
       Set(simpleDispatcher(eventStore)),
       Set((_: Event) => throw new RuntimeException, subscriber),
       eventStore)
-    eventSourcingSystem.dispatch(new EventSourcingSystemTest.MyCommand(EventSourcingSystemTest.PAYLOAD_1))
+    Mono.from(eventSourcingSystem.dispatch(new EventSourcingSystemTest.MyCommand(EventSourcingSystemTest.PAYLOAD_1))).block()
     assertThat(subscriber.getData.asJava).containsExactly(EventSourcingSystemTest.PAYLOAD_1)
   }
 
@@ -78,13 +83,13 @@ trait EventSourcingSystemTest {
   def throwingStoreShouldNotLeadToPublishing() : Unit = {
     val eventStore = mock(classOf[EventStore])
     doThrow(new RuntimeException).when(eventStore).appendAll(EventSourcingSystemTest.anyScalaList)
-    when(eventStore.getEventsOfAggregate(any)).thenReturn(History.empty)
+    when(eventStore.getEventsOfAggregate(any)).thenReturn(SMono.just(History.empty))
     val subscriber = new DataCollectorSubscriber
     val eventSourcingSystem = new EventSourcingSystem(
       Set(simpleDispatcher(eventStore)),
       Set((_: Event) => throw new RuntimeException, subscriber),
       eventStore)
-    assertThatThrownBy(() => eventSourcingSystem.dispatch(new EventSourcingSystemTest.MyCommand(EventSourcingSystemTest.PAYLOAD_1)))
+    assertThatThrownBy(() => Mono.from(eventSourcingSystem.dispatch(new EventSourcingSystemTest.MyCommand(EventSourcingSystemTest.PAYLOAD_1))).block())
       .isInstanceOf(classOf[RuntimeException])
     assertThat(subscriber.getData.asJava).isEmpty()
   }
@@ -93,17 +98,17 @@ trait EventSourcingSystemTest {
   def dispatchShouldApplyCommandHandlerThenStoreGeneratedEvents(eventStore: EventStore) : Unit = {
     val subscriber = new DataCollectorSubscriber
     val eventSourcingSystem = new EventSourcingSystem(Set(simpleDispatcher(eventStore)), Set(subscriber), eventStore)
-    eventSourcingSystem.dispatch(new EventSourcingSystemTest.MyCommand(EventSourcingSystemTest.PAYLOAD_1))
+    Mono.from(eventSourcingSystem.dispatch(new EventSourcingSystemTest.MyCommand(EventSourcingSystemTest.PAYLOAD_1))).block()
     val expectedEvent = TestEvent(EventId.first, EventSourcingSystemTest.AGGREGATE_ID, EventSourcingSystemTest.PAYLOAD_1)
-    assertThat(eventStore.getEventsOfAggregate(EventSourcingSystemTest.AGGREGATE_ID).getEventsJava).containsOnly(expectedEvent)
+    assertThat(SMono(eventStore.getEventsOfAggregate(EventSourcingSystemTest.AGGREGATE_ID)).block().getEventsJava).containsOnly(expectedEvent)
   }
 
   @Test
   def dispatchShouldCallSubscriberForSubsequentCommands(eventStore: EventStore) : Unit = {
     val subscriber = new DataCollectorSubscriber
     val eventSourcingSystem = new EventSourcingSystem(Set(simpleDispatcher(eventStore)), Set(subscriber), eventStore)
-    eventSourcingSystem.dispatch(new EventSourcingSystemTest.MyCommand(EventSourcingSystemTest.PAYLOAD_1))
-    eventSourcingSystem.dispatch(new EventSourcingSystemTest.MyCommand(EventSourcingSystemTest.PAYLOAD_2))
+    Mono.from(eventSourcingSystem.dispatch(new EventSourcingSystemTest.MyCommand(EventSourcingSystemTest.PAYLOAD_1))).block()
+    Mono.from(eventSourcingSystem.dispatch(new EventSourcingSystemTest.MyCommand(EventSourcingSystemTest.PAYLOAD_2))).block()
     assertThat(subscriber.getData.asJava).containsExactly(EventSourcingSystemTest.PAYLOAD_1, EventSourcingSystemTest.PAYLOAD_2)
   }
 
@@ -111,18 +116,18 @@ trait EventSourcingSystemTest {
   def dispatchShouldStoreEventsForSubsequentCommands(eventStore: EventStore) : Unit = {
     val subscriber = new DataCollectorSubscriber
     val eventSourcingSystem = new EventSourcingSystem(Set(simpleDispatcher(eventStore)), Set(subscriber), eventStore)
-    eventSourcingSystem.dispatch(new EventSourcingSystemTest.MyCommand(EventSourcingSystemTest.PAYLOAD_1))
-    eventSourcingSystem.dispatch(new EventSourcingSystemTest.MyCommand(EventSourcingSystemTest.PAYLOAD_2))
+    Mono.from(eventSourcingSystem.dispatch(new EventSourcingSystemTest.MyCommand(EventSourcingSystemTest.PAYLOAD_1))).block()
+    Mono.from(eventSourcingSystem.dispatch(new EventSourcingSystemTest.MyCommand(EventSourcingSystemTest.PAYLOAD_2))).block()
     val expectedEvent1 = TestEvent(EventId.first, EventSourcingSystemTest.AGGREGATE_ID, EventSourcingSystemTest.PAYLOAD_1)
     val expectedEvent2 = TestEvent(expectedEvent1.eventId.next, EventSourcingSystemTest.AGGREGATE_ID, EventSourcingSystemTest.PAYLOAD_2)
-    assertThat(eventStore.getEventsOfAggregate(EventSourcingSystemTest.AGGREGATE_ID).getEventsJava).containsOnly(expectedEvent1, expectedEvent2)
+    assertThat(SMono(eventStore.getEventsOfAggregate(EventSourcingSystemTest.AGGREGATE_ID)).block().getEventsJava).containsOnly(expectedEvent1, expectedEvent2)
   }
 
   @Test
   def dispatcherShouldBeAbleToReturnSeveralEvents(eventStore: EventStore) : Unit = {
     val subscriber = new DataCollectorSubscriber
     val eventSourcingSystem = new EventSourcingSystem(Set(wordCuttingDispatcher(eventStore)), Set(subscriber), eventStore)
-    eventSourcingSystem.dispatch(new EventSourcingSystemTest.MyCommand("This is a test"))
+    Mono.from(eventSourcingSystem.dispatch(new EventSourcingSystemTest.MyCommand("This is a test"))).block()
     assertThat(subscriber.getData.asJava).containsExactly("This", "is", "a", "test")
   }
 
@@ -130,7 +135,7 @@ trait EventSourcingSystemTest {
   def unknownCommandsShouldBeIgnored(eventStore: EventStore) : Unit = {
     val subscriber = new DataCollectorSubscriber
     val eventSourcingSystem = new EventSourcingSystem(Set(wordCuttingDispatcher(eventStore)), Set(subscriber), eventStore)
-    assertThatThrownBy(() => eventSourcingSystem.dispatch(new Command() {}))
+    assertThatThrownBy(() => Mono.from(eventSourcingSystem.dispatch(new Command() {})).block())
       .isInstanceOf(classOf[CommandDispatcher.UnknownCommandException])
   }
 
@@ -146,22 +151,22 @@ trait EventSourcingSystemTest {
   def simpleDispatcher(eventStore: EventStore) = new CommandHandler[EventSourcingSystemTest.MyCommand]() {
     override def handledClass: Class[EventSourcingSystemTest.MyCommand] = classOf[EventSourcingSystemTest.MyCommand]
 
-    override def handle(myCommand: EventSourcingSystemTest.MyCommand): List[TestEvent] = {
-      val history = eventStore.getEventsOfAggregate(EventSourcingSystemTest.AGGREGATE_ID)
-      List(TestEvent(history.getNextEventId, EventSourcingSystemTest.AGGREGATE_ID, myCommand.getPayload))
+    override def handle(myCommand: EventSourcingSystemTest.MyCommand): Publisher[JavaList[_ <: Event]] = {
+      SMono.apply(eventStore.getEventsOfAggregate(EventSourcingSystemTest.AGGREGATE_ID))
+          .map(history => Seq(TestEvent(history.getNextEventId, EventSourcingSystemTest.AGGREGATE_ID, myCommand.getPayload)).asJava)
     }
   }
 
   def wordCuttingDispatcher(eventStore: EventStore) = new CommandHandler[EventSourcingSystemTest.MyCommand]() {
     override def handledClass: Class[EventSourcingSystemTest.MyCommand] = classOf[EventSourcingSystemTest.MyCommand]
 
-    override def handle(myCommand: EventSourcingSystemTest.MyCommand): List[TestEvent] = {
-      val history = eventStore.getEventsOfAggregate(EventSourcingSystemTest.AGGREGATE_ID)
-      val eventIdIncrementer = new EventSourcingSystemTest.EventIdIncrementer(history.getNextEventId)
-      Splitter.on(" ").splitToList(myCommand.getPayload)
-        .asScala
-        .toList
-        .map((word: String) => TestEvent(eventIdIncrementer.next, EventSourcingSystemTest.AGGREGATE_ID, word))
+    override def handle(myCommand: EventSourcingSystemTest.MyCommand): Publisher[JavaList[_ <: Event]] = {
+      SMono.apply(eventStore.getEventsOfAggregate(EventSourcingSystemTest.AGGREGATE_ID))
+        .map(history => new EventSourcingSystemTest.EventIdIncrementer(history.getNextEventId))
+        .map(eventIdIncrementer => Splitter.on(" ").splitToList(myCommand.getPayload)
+          .asScala
+          .toList
+          .map((word: String) => TestEvent(eventIdIncrementer.next, EventSourcingSystemTest.AGGREGATE_ID, word)).asJava)
     }
   }
 }
\ No newline at end of file
diff --git a/event-sourcing/event-sourcing-pojo/src/main/scala/org/apache/james/eventsourcing/Event.scala b/event-sourcing/event-sourcing-pojo/src/main/scala/org/apache/james/eventsourcing/Event.scala
index 79b80bc..5440ab0 100644
--- a/event-sourcing/event-sourcing-pojo/src/main/scala/org/apache/james/eventsourcing/Event.scala
+++ b/event-sourcing/event-sourcing-pojo/src/main/scala/org/apache/james/eventsourcing/Event.scala
@@ -19,7 +19,8 @@
 package org.apache.james.eventsourcing
 
 object Event {
-  def belongsToSameAggregate(events: List[_ <: Event]): Boolean = events
+  def belongsToSameAggregate(events: Iterable[_ <: Event]): Boolean = events
+    .toSeq
     .view
     .map(event => event.getAggregateId)
     .distinct
diff --git a/event-sourcing/event-store-api/pom.xml b/event-sourcing/event-store-api/pom.xml
index 44064ce..2bffd38 100644
--- a/event-sourcing/event-store-api/pom.xml
+++ b/event-sourcing/event-store-api/pom.xml
@@ -52,6 +52,15 @@
             <artifactId>guavate</artifactId>
         </dependency>
         <dependency>
+            <groupId>io.projectreactor</groupId>
+            <artifactId>reactor-scala-extensions_${scala.base}</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.reactivestreams</groupId>
+            <artifactId>reactive-streams</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.scala-lang</groupId>
             <artifactId>scala-library</artifactId>
         </dependency>
diff --git a/event-sourcing/event-store-api/src/main/scala/org/apache/james/eventsourcing/eventstore/EventStore.scala b/event-sourcing/event-store-api/src/main/scala/org/apache/james/eventsourcing/eventstore/EventStore.scala
index b90c314..fcfac47 100644
--- a/event-sourcing/event-store-api/src/main/scala/org/apache/james/eventsourcing/eventstore/EventStore.scala
+++ b/event-sourcing/event-store-api/src/main/scala/org/apache/james/eventsourcing/eventstore/EventStore.scala
@@ -22,20 +22,19 @@ import org.apache.james.eventsourcing.{AggregateId, Event}
 
 import scala.annotation.varargs
 import scala.jdk.CollectionConverters._
+import org.reactivestreams.Publisher
 
 trait EventStore {
-  def append(event: Event): Unit = appendAll(List(event))
+  def append(event: Event): Publisher[Void] = appendAll(List(event))
 
   @varargs
-  def appendAll(events: Event*): Unit = appendAll(events.toList)
-
-  def appendAll(events: java.util.List[Event]): Unit = appendAll(events.asScala.toList)
+  def appendAll(events: Event*): Publisher[Void] = appendAll(events.toList)
 
   /**
    * This method should check that no input event has an id already stored and throw otherwise
    * It should also check that all events belong to the same aggregate
    */
-  def appendAll(events: List[Event]): Unit
+  def appendAll(events: Iterable[Event]): Publisher[Void]
 
-  def getEventsOfAggregate(aggregateId: AggregateId): History
+  def getEventsOfAggregate(aggregateId: AggregateId): Publisher[History]
 }
\ No newline at end of file
diff --git a/event-sourcing/event-store-api/src/test/scala/org/apache/james/eventsourcing/eventstore/EventStoreContract.scala b/event-sourcing/event-store-api/src/test/scala/org/apache/james/eventsourcing/eventstore/EventStoreContract.scala
index f519a9c..23f454b 100644
--- a/event-sourcing/event-store-api/src/test/scala/org/apache/james/eventsourcing/eventstore/EventStoreContract.scala
+++ b/event-sourcing/event-store-api/src/test/scala/org/apache/james/eventsourcing/eventstore/EventStoreContract.scala
@@ -22,6 +22,9 @@ import org.apache.james.eventsourcing.{EventId, TestAggregateId, TestEvent}
 import org.assertj.core.api.Assertions.{assertThat, assertThatCode, assertThatThrownBy}
 import org.junit.jupiter.api.Test
 
+import reactor.core.scala.publisher
+import reactor.core.scala.publisher.SMono
+
 object EventStoreContract {
   val AGGREGATE_1 = TestAggregateId(1)
   val AGGREGATE_2 = TestAggregateId(2)
@@ -38,35 +41,35 @@ trait EventStoreContract {
   def appendShouldThrowWhenEventFromSeveralAggregates(testee: EventStore) : Unit = {
     val event1 = TestEvent(EventId.first, EventStoreContract.AGGREGATE_1, "first")
     val event2 = TestEvent(event1.eventId.next, EventStoreContract.AGGREGATE_2, "second")
-    assertThatThrownBy(() => testee.appendAll(event1, event2))
+    assertThatThrownBy(() => SMono(testee.appendAll(event1, event2)).block())
       .isInstanceOf(classOf[IllegalArgumentException])
   }
 
   @Test
   def appendShouldDoNothingOnEmptyEventList(testee: EventStore) : Unit =
-    assertThatCode(() => testee.appendAll())
+    assertThatCode(() => SMono(testee.appendAll()).block())
       .doesNotThrowAnyException()
 
   @Test
   def appendShouldThrowWhenTryingToRewriteHistory(testee: EventStore) : Unit = {
     val event1 = TestEvent(EventId.first, EventStoreContract.AGGREGATE_1, "first")
-    testee.append(event1)
+    SMono(testee.append(event1)).block()
     val event2 = TestEvent(EventId.first, EventStoreContract.AGGREGATE_1, "second")
     assertThatThrownBy(
-      () => testee.append(event2))
+      () => SMono(testee.append(event2)).block())
       .isInstanceOf(classOf[EventStoreFailedException])
   }
 
   @Test
   def getEventsOfAggregateShouldReturnEmptyHistoryWhenUnknown(testee: EventStore) : Unit =
-    assertThat(testee.getEventsOfAggregate(EventStoreContract.AGGREGATE_1))
+    assertThat(SMono(testee.getEventsOfAggregate(EventStoreContract.AGGREGATE_1)).block())
       .isEqualTo(History.empty)
 
   @Test
   def getEventsOfAggregateShouldReturnAppendedEvent(testee: EventStore) : Unit = {
     val event = TestEvent(EventId.first, EventStoreContract.AGGREGATE_1, "first")
-    testee.append(event)
-    assertThat(testee.getEventsOfAggregate(EventStoreContract.AGGREGATE_1))
+    SMono(testee.append(event)).block()
+    assertThat(SMono(testee.getEventsOfAggregate(EventStoreContract.AGGREGATE_1)).block())
       .isEqualTo(History.of(event))
   }
 
@@ -74,9 +77,9 @@ trait EventStoreContract {
   def getEventsOfAggregateShouldReturnAppendedEvents(testee: EventStore) : Unit = {
     val event1 = TestEvent(EventId.first, EventStoreContract.AGGREGATE_1, "first")
     val event2 = TestEvent(event1.eventId.next, EventStoreContract.AGGREGATE_1, "second")
-    testee.append(event1)
-    testee.append(event2)
-    assertThat(testee.getEventsOfAggregate(EventStoreContract.AGGREGATE_1))
+    SMono(testee.append(event1)).block()
+    SMono(testee.append(event2)).block()
+    assertThat(SMono(testee.getEventsOfAggregate(EventStoreContract.AGGREGATE_1)).block())
       .isEqualTo(History.of(event1, event2))
   }
 }
\ No newline at end of file
diff --git a/event-sourcing/event-store-cassandra/pom.xml b/event-sourcing/event-store-cassandra/pom.xml
index 8bfd47a..e083ea4 100644
--- a/event-sourcing/event-store-cassandra/pom.xml
+++ b/event-sourcing/event-store-cassandra/pom.xml
@@ -80,8 +80,7 @@
         </dependency>
         <dependency>
             <groupId>io.projectreactor</groupId>
-            <artifactId>reactor-scala-extensions_2.13</artifactId>
-            <version>0.5.0</version>
+            <artifactId>reactor-scala-extensions_${scala.base}</artifactId>
         </dependency>
         <dependency>
             <groupId>net.javacrumbs.json-unit</groupId>
diff --git a/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStore.scala b/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStore.scala
index 6c41973..d9ab8ed 100644
--- a/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStore.scala
+++ b/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStore.scala
@@ -20,25 +20,35 @@ package org.apache.james.eventsourcing.eventstore.cassandra
 
 import com.google.common.base.Preconditions
 import javax.inject.Inject
+
 import org.apache.james.eventsourcing.eventstore.{EventStore, EventStoreFailedException, History}
 import org.apache.james.eventsourcing.{AggregateId, Event}
+import org.reactivestreams.Publisher
+
+import reactor.core.scala.publisher.SMono
 
 class CassandraEventStore @Inject() (eventStoreDao: EventStoreDao) extends EventStore {
-  override def appendAll(events: List[Event]): Unit = {
+  override def appendAll(events: Iterable[Event]): Publisher[Void] = {
     if (events.nonEmpty) {
       doAppendAll(events)
+    } else {
+      SMono.empty
     }
   }
 
-  private def doAppendAll(events: List[Event]): Unit = {
+  private def doAppendAll(events: Iterable[Event]): SMono[Void] = {
     Preconditions.checkArgument(Event.belongsToSameAggregate(events))
-    val success: Boolean = eventStoreDao.appendAll(events).block()
-    if (!success) {
-      throw EventStoreFailedException("Concurrent update to the EventStore detected")
-    }
+    eventStoreDao.appendAll(events)
+      .filter(success => success)
+      .single()
+      .onErrorMap({
+        case _: NoSuchElementException => EventStoreFailedException("Concurrent update to the EventStore detected")
+        case e => e
+      })
+      .`then`(SMono.empty)
   }
 
-  override def getEventsOfAggregate(aggregateId: AggregateId): History = {
-    eventStoreDao.getEventsOfAggregate(aggregateId).block()
+  override def getEventsOfAggregate(aggregateId: AggregateId): SMono[History] = {
+    eventStoreDao.getEventsOfAggregate(aggregateId)
   }
-}
\ No newline at end of file
+}
diff --git a/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.scala b/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.scala
index 95a9239..9fda3a4 100644
--- a/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.scala
+++ b/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.scala
@@ -50,7 +50,7 @@ class EventStoreDao @Inject() (val session: Session, val jsonEventSerializer: Js
       .where(QueryBuilder.eq(AGGREGATE_ID, bindMarker(AGGREGATE_ID))))
   }
 
-  private[cassandra] def appendAll(events: List[Event]): SMono[Boolean] = {
+  private[cassandra] def appendAll(events: Iterable[Event]): SMono[Boolean] = {
     val batch: BatchStatement = new BatchStatement
     events.foreach((event: Event) => batch.add(insertEvent(event)))
     SMono(cassandraAsyncExecutor.executeReturnApplied(batch))
@@ -74,7 +74,7 @@ class EventStoreDao @Inject() (val session: Session, val jsonEventSerializer: Js
     val listEvents: SMono[List[Event]] = events.collectSeq()
       .map(_.toList)
 
-    listEvents.map(History.of)
+    listEvents.map(History.of(_))
   }
 
   private def toEvent(row: Row): Event = {
diff --git a/event-sourcing/event-store-memory/pom.xml b/event-sourcing/event-store-memory/pom.xml
index d4720c8..8e58626 100644
--- a/event-sourcing/event-store-memory/pom.xml
+++ b/event-sourcing/event-store-memory/pom.xml
@@ -65,6 +65,10 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>io.projectreactor</groupId>
+            <artifactId>reactor-scala-extensions_${scala.base}</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.mockito</groupId>
             <artifactId>mockito-core</artifactId>
             <scope>test</scope>
diff --git a/event-sourcing/event-store-memory/src/main/scala/org/apache/james/eventsourcing/eventstore/memory/InMemoryEventStore.scala b/event-sourcing/event-store-memory/src/main/scala/org/apache/james/eventsourcing/eventstore/memory/InMemoryEventStore.scala
index c4f9621..caf254a 100644
--- a/event-sourcing/event-store-memory/src/main/scala/org/apache/james/eventsourcing/eventstore/memory/InMemoryEventStore.scala
+++ b/event-sourcing/event-store-memory/src/main/scala/org/apache/james/eventsourcing/eventstore/memory/InMemoryEventStore.scala
@@ -23,34 +23,44 @@ import java.util.concurrent.atomic.AtomicReference
 import com.google.common.base.Preconditions
 import org.apache.james.eventsourcing.eventstore.{EventStore, History}
 import org.apache.james.eventsourcing.{AggregateId, Event}
+import org.reactivestreams.Publisher
+
+import reactor.core.scala.publisher.SMono
 
 class InMemoryEventStore() extends EventStore {
   private val storeRef: AtomicReference[Map[AggregateId, History]] =
     new AtomicReference(Map().withDefault(_ => History.empty))
 
-  override def appendAll(events: List[Event]): Unit = if (events.nonEmpty) doAppendAll(events)
+  override def appendAll(events: Iterable[Event]): Publisher[Void] = {
+    if (events.nonEmpty) {
+      SMono.fromCallable(() => doAppendAll(events)).`then`()
+    } else {
+      SMono.empty
+    }
+  }
 
-  override def getEventsOfAggregate(aggregateId: AggregateId): History = {
+  override def getEventsOfAggregate(aggregateId: AggregateId): Publisher[History] = {
     Preconditions.checkNotNull(aggregateId)
-    storeRef.get()(aggregateId)
+    SMono.fromCallable(() => storeRef.get()(aggregateId))
   }
 
-  private def doAppendAll(events: Seq[Event]): Unit = {
+  private def doAppendAll(events: Iterable[Event]): Boolean = {
     val aggregateId: AggregateId = getAggregateId(events)
     storeRef.updateAndGet(store => {
       val updatedHistory = History.of(store(aggregateId).getEvents ++ events)
       store.updated(aggregateId, updatedHistory)
     })
+    true
   }
 
-  private def getAggregateId(events: Seq[Event]): AggregateId = {
+  private def getAggregateId(events: Iterable[Event]): AggregateId = {
     Preconditions.checkArgument(events.nonEmpty)
     val aggregateId = events.head.getAggregateId
     Preconditions.checkArgument(belongsToSameAggregate(aggregateId, events))
     aggregateId
   }
 
-  private def belongsToSameAggregate(aggregateId: AggregateId, events: Seq[Event]) =
+  private def belongsToSameAggregate(aggregateId: AggregateId, events: Iterable[Event]) =
     events.forall(_.getAggregateId.equals(aggregateId))
 
 }
\ No newline at end of file
diff --git a/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/commands/DetectThresholdCrossingHandler.java b/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/commands/DetectThresholdCrossingHandler.java
index 83f09a4..551644c 100644
--- a/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/commands/DetectThresholdCrossingHandler.java
+++ b/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/commands/DetectThresholdCrossingHandler.java
@@ -21,14 +21,16 @@ package org.apache.james.mailbox.quota.mailing.commands;
 
 import java.util.List;
 
+import org.apache.james.eventsourcing.CommandHandler;
 import org.apache.james.eventsourcing.Event;
 import org.apache.james.eventsourcing.eventstore.EventStore;
-import org.apache.james.eventsourcing.eventstore.History;
-import org.apache.james.eventsourcing.javaapi.CommandHandlerJava;
 import org.apache.james.mailbox.quota.mailing.QuotaMailingListenerConfiguration;
 import org.apache.james.mailbox.quota.mailing.aggregates.UserQuotaThresholds;
+import org.reactivestreams.Publisher;
 
-public class DetectThresholdCrossingHandler implements CommandHandlerJava<DetectThresholdCrossing> {
+import reactor.core.publisher.Mono;
+
+public class DetectThresholdCrossingHandler implements CommandHandler<DetectThresholdCrossing> {
 
     private final EventStore eventStore;
     private final QuotaMailingListenerConfiguration quotaMailingListenerConfiguration;
@@ -41,15 +43,15 @@ public class DetectThresholdCrossingHandler implements CommandHandlerJava<Detect
     }
 
     @Override
-    public List<? extends Event> handleJava(DetectThresholdCrossing command) {
+    public Publisher<List<? extends Event>> handle(DetectThresholdCrossing command) {
         return loadAggregate(command)
-            .detectThresholdCrossing(quotaMailingListenerConfiguration, command);
+            .map(aggregate -> aggregate.detectThresholdCrossing(quotaMailingListenerConfiguration, command));
     }
 
-    private UserQuotaThresholds loadAggregate(DetectThresholdCrossing command) {
+    private Mono<UserQuotaThresholds> loadAggregate(DetectThresholdCrossing command) {
         UserQuotaThresholds.Id aggregateId = UserQuotaThresholds.Id.from(command.getUsername(), listenerName);
-        History history = eventStore.getEventsOfAggregate(aggregateId);
-        return UserQuotaThresholds.fromEvents(aggregateId, history);
+        return Mono.from(eventStore.getEventsOfAggregate(aggregateId))
+            .map(history -> UserQuotaThresholds.fromEvents(aggregateId, history));
     }
 
     @Override
diff --git a/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/listeners/QuotaThresholdCrossingListener.java b/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/listeners/QuotaThresholdCrossingListener.java
index d4e7d5d..577e2bc 100644
--- a/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/listeners/QuotaThresholdCrossingListener.java
+++ b/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/listeners/QuotaThresholdCrossingListener.java
@@ -42,6 +42,8 @@ import org.apache.mailet.MailetContext;
 
 import com.google.common.collect.ImmutableSet;
 
+import reactor.core.publisher.Mono;
+
 public class QuotaThresholdCrossingListener implements MailboxListener.GroupMailboxListener {
     public static class QuotaThresholdCrossingListenerGroup extends Group {
 
@@ -74,12 +76,12 @@ public class QuotaThresholdCrossingListener implements MailboxListener.GroupMail
     @Override
     public void event(Event event) {
         if (event instanceof QuotaUsageUpdatedEvent) {
-            handleEvent(event.getUsername(), (QuotaUsageUpdatedEvent) event);
+            handleEvent(event.getUsername(), (QuotaUsageUpdatedEvent) event).block();
         }
     }
 
-    private void handleEvent(Username username, QuotaUsageUpdatedEvent event) {
-        eventSourcingSystem.dispatch(
-            new DetectThresholdCrossing(username, event.getCountQuota(), event.getSizeQuota(), event.getInstant()));
+    private Mono<Void> handleEvent(Username username, QuotaUsageUpdatedEvent event) {
+        return Mono.from(eventSourcingSystem.dispatch(
+            new DetectThresholdCrossing(username, event.getCountQuota(), event.getSizeQuota(), event.getInstant())));
     }
 }
diff --git a/pom.xml b/pom.xml
index 03ad478..ac8afa0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2184,6 +2184,11 @@
                 <version>${feign-form.version}</version>
             </dependency>
             <dependency>
+                <groupId>io.projectreactor</groupId>
+                <artifactId>reactor-scala-extensions_${scala.base}</artifactId>
+                <version>0.5.0</version>
+            </dependency>
+            <dependency>
                 <groupId>io.netty</groupId>
                 <artifactId>netty</artifactId>
                 <version>${netty.version}</version>
diff --git a/server/data/data-api/src/main/java/org/apache/james/dlp/api/DLPConfigurationLoader.java b/server/data/data-api/src/main/java/org/apache/james/dlp/api/DLPConfigurationLoader.java
index ccd7223..e18a5c8 100644
--- a/server/data/data-api/src/main/java/org/apache/james/dlp/api/DLPConfigurationLoader.java
+++ b/server/data/data-api/src/main/java/org/apache/james/dlp/api/DLPConfigurationLoader.java
@@ -20,7 +20,8 @@
 package org.apache.james.dlp.api;
 
 import org.apache.james.core.Domain;
+import org.reactivestreams.Publisher;
 
 public interface DLPConfigurationLoader {
-    DLPRules list(Domain domain);
+    Publisher<DLPRules> list(Domain domain);
 }
diff --git a/server/data/data-api/src/test/java/org/apache/james/dlp/api/DLPConfigurationStoreContract.java b/server/data/data-api/src/test/java/org/apache/james/dlp/api/DLPConfigurationStoreContract.java
index 6ed3b0d..9f3b9d6 100644
--- a/server/data/data-api/src/test/java/org/apache/james/dlp/api/DLPConfigurationStoreContract.java
+++ b/server/data/data-api/src/test/java/org/apache/james/dlp/api/DLPConfigurationStoreContract.java
@@ -30,6 +30,7 @@ import org.apache.james.core.Domain;
 import org.junit.jupiter.api.Test;
 
 import com.google.common.collect.ImmutableList;
+import reactor.core.publisher.Mono;
 
 public interface DLPConfigurationStoreContract {
 
@@ -37,7 +38,7 @@ public interface DLPConfigurationStoreContract {
 
     @Test
     default void listShouldReturnEmptyWhenNone(DLPConfigurationStore dlpConfigurationStore) {
-        assertThat(dlpConfigurationStore.list(Domain.LOCALHOST))
+        assertThat(Mono.from(dlpConfigurationStore.list(Domain.LOCALHOST)).block())
             .isEmpty();
     }
 
@@ -45,7 +46,7 @@ public interface DLPConfigurationStoreContract {
     default void listShouldReturnExistingEntries(DLPConfigurationStore dlpConfigurationStore) {
         dlpConfigurationStore.store(Domain.LOCALHOST, RULE, RULE_2);
 
-        assertThat(dlpConfigurationStore.list(Domain.LOCALHOST)).containsOnly(RULE, RULE_2);
+        assertThat(Mono.from(dlpConfigurationStore.list(Domain.LOCALHOST)).block()).containsOnly(RULE, RULE_2);
     }
 
     @Test
@@ -53,7 +54,7 @@ public interface DLPConfigurationStoreContract {
         dlpConfigurationStore.store(Domain.LOCALHOST, RULE);
         dlpConfigurationStore.store(OTHER_DOMAIN, RULE_2);
 
-        assertThat(dlpConfigurationStore.list(Domain.LOCALHOST)).containsOnly(RULE);
+        assertThat(Mono.from(dlpConfigurationStore.list(Domain.LOCALHOST)).block()).containsOnly(RULE);
     }
 
     @Test
@@ -62,7 +63,7 @@ public interface DLPConfigurationStoreContract {
 
         dlpConfigurationStore.clear(Domain.LOCALHOST);
 
-        assertThat(dlpConfigurationStore.list(Domain.LOCALHOST)).isEmpty();
+        assertThat(Mono.from(dlpConfigurationStore.list(Domain.LOCALHOST)).block()).isEmpty();
     }
 
     @Test
@@ -78,7 +79,7 @@ public interface DLPConfigurationStoreContract {
 
         dlpConfigurationStore.clear(Domain.LOCALHOST);
 
-        assertThat(dlpConfigurationStore.list(OTHER_DOMAIN)).containsOnly(RULE_2);
+        assertThat(Mono.from(dlpConfigurationStore.list(OTHER_DOMAIN)).block()).containsOnly(RULE_2);
     }
 
     @Test
@@ -89,7 +90,7 @@ public interface DLPConfigurationStoreContract {
 
         dlpConfigurationStore.store(Domain.LOCALHOST, RULE);
 
-        assertThat(dlpConfigurationStore.list(Domain.LOCALHOST)).containsOnly(RULE);
+        assertThat(Mono.from(dlpConfigurationStore.list(Domain.LOCALHOST)).block()).containsOnly(RULE);
     }
 
     @Test
@@ -97,7 +98,7 @@ public interface DLPConfigurationStoreContract {
         dlpConfigurationStore.store(Domain.LOCALHOST, RULE, RULE_2);
         dlpConfigurationStore.store(Domain.LOCALHOST, RULE);
 
-        assertThat(dlpConfigurationStore.list(Domain.LOCALHOST)).containsOnly(RULE);
+        assertThat(Mono.from(dlpConfigurationStore.list(Domain.LOCALHOST)).block()).containsOnly(RULE);
     }
 
     @Test
@@ -105,7 +106,7 @@ public interface DLPConfigurationStoreContract {
         dlpConfigurationStore.store(Domain.LOCALHOST, RULE);
         dlpConfigurationStore.store(Domain.LOCALHOST, RULE, RULE_2);
 
-        assertThat(dlpConfigurationStore.list(Domain.LOCALHOST)).containsOnly(RULE, RULE_2);
+        assertThat(Mono.from(dlpConfigurationStore.list(Domain.LOCALHOST)).block()).containsOnly(RULE, RULE_2);
     }
 
     @Test
@@ -119,7 +120,7 @@ public interface DLPConfigurationStoreContract {
         dlpConfigurationStore.store(Domain.LOCALHOST, RULE);
         dlpConfigurationStore.store(Domain.LOCALHOST, RULE_UPDATED);
 
-        assertThat(dlpConfigurationStore.list(Domain.LOCALHOST)).containsOnly(RULE_UPDATED);
+        assertThat(Mono.from(dlpConfigurationStore.list(Domain.LOCALHOST)).block()).containsOnly(RULE_UPDATED);
     }
 
     @Test
@@ -127,7 +128,7 @@ public interface DLPConfigurationStoreContract {
         dlpConfigurationStore.store(Domain.LOCALHOST, RULE);
         dlpConfigurationStore.store(Domain.LOCALHOST, RULE);
 
-        assertThat(dlpConfigurationStore.list(Domain.LOCALHOST)).containsOnly(RULE);
+        assertThat(Mono.from(dlpConfigurationStore.list(Domain.LOCALHOST)).block()).containsOnly(RULE);
     }
 
     @Test
@@ -135,6 +136,6 @@ public interface DLPConfigurationStoreContract {
         dlpConfigurationStore.store(Domain.LOCALHOST, RULE);
         dlpConfigurationStore.store(Domain.LOCALHOST, new DLPRules(ImmutableList.of()));
 
-        assertThat(dlpConfigurationStore.list(Domain.LOCALHOST)).isEmpty();
+        assertThat(Mono.from(dlpConfigurationStore.list(Domain.LOCALHOST)).block()).isEmpty();
     }
 }
diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/FilteringManagement.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/FilteringManagement.java
index c8aec6a..6e780f9 100644
--- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/FilteringManagement.java
+++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/FilteringManagement.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
 import java.util.List;
 
 import org.apache.james.core.Username;
+import org.reactivestreams.Publisher;
 
 import com.google.common.collect.ImmutableList;
 
@@ -38,6 +39,6 @@ public interface FilteringManagement {
         defineRulesForUser(username, ImmutableList.of());
     }
 
-    List<Rule> listRulesForUser(Username username);
+    Publisher<Rule> listRulesForUser(Username username);
 
 }
diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/DefineRulesCommandHandler.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/DefineRulesCommandHandler.java
index 4931e07..9378a9f 100644
--- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/DefineRulesCommandHandler.java
+++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/DefineRulesCommandHandler.java
@@ -21,11 +21,14 @@ package org.apache.james.jmap.api.filtering.impl;
 
 import java.util.List;
 
+import org.apache.james.eventsourcing.CommandHandler;
 import org.apache.james.eventsourcing.Event;
 import org.apache.james.eventsourcing.eventstore.EventStore;
-import org.apache.james.eventsourcing.javaapi.CommandHandlerJava;
+import org.reactivestreams.Publisher;
 
-public class DefineRulesCommandHandler implements CommandHandlerJava<DefineRulesCommand> {
+import reactor.core.publisher.Mono;
+
+public class DefineRulesCommandHandler implements CommandHandler<DefineRulesCommand> {
 
     private final EventStore eventStore;
 
@@ -39,14 +42,11 @@ public class DefineRulesCommandHandler implements CommandHandlerJava<DefineRules
     }
 
     @Override
-    public List<? extends Event> handleJava(DefineRulesCommand storeCommand) {
+    public Publisher<List<? extends Event>> handle(DefineRulesCommand storeCommand) {
         FilteringAggregateId aggregateId = new FilteringAggregateId(storeCommand.getUsername());
 
-        return FilteringAggregate
-            .load(
-                aggregateId,
-                eventStore.getEventsOfAggregate(aggregateId))
-            .defineRules(storeCommand.getRules());
+        return Mono.from(eventStore.getEventsOfAggregate(aggregateId))
+        .map(history -> FilteringAggregate.load(aggregateId, history).defineRules(storeCommand.getRules()));
     }
 
 }
diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java
index d8dad9f..429cc79 100644
--- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java
+++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java
@@ -29,10 +29,14 @@ import org.apache.james.eventsourcing.Subscriber;
 import org.apache.james.eventsourcing.eventstore.EventStore;
 import org.apache.james.jmap.api.filtering.FilteringManagement;
 import org.apache.james.jmap.api.filtering.Rule;
+import org.reactivestreams.Publisher;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
 
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
 public class EventSourcingFilteringManagement implements FilteringManagement {
 
     private static final ImmutableSet<Subscriber> NO_SUBSCRIBER = ImmutableSet.of();
@@ -51,19 +55,16 @@ public class EventSourcingFilteringManagement implements FilteringManagement {
 
     @Override
     public void defineRulesForUser(Username username, List<Rule> rules) {
-        eventSourcingSystem.dispatch(new DefineRulesCommand(username, rules));
+        Mono.from(eventSourcingSystem.dispatch(new DefineRulesCommand(username, rules))).block();
     }
 
     @Override
-    public List<Rule> listRulesForUser(Username username) {
+    public Publisher<Rule> listRulesForUser(Username username) {
         Preconditions.checkNotNull(username);
 
         FilteringAggregateId aggregateId = new FilteringAggregateId(username);
 
-        return FilteringAggregate
-            .load(
-                aggregateId,
-                eventStore.getEventsOfAggregate(aggregateId))
-            .listRules();
+        return Mono.from(eventStore.getEventsOfAggregate(aggregateId))
+            .flatMapMany(history -> Flux.fromIterable(FilteringAggregate.load(aggregateId, history).listRules()));
     }
 }
diff --git a/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/filtering/FilteringManagementContract.java b/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/filtering/FilteringManagementContract.java
index d959f23..7a355d4 100644
--- a/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/filtering/FilteringManagementContract.java
+++ b/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/filtering/FilteringManagementContract.java
@@ -35,6 +35,8 @@ import org.apache.james.eventsourcing.eventstore.EventStore;
 import org.apache.james.jmap.api.filtering.impl.EventSourcingFilteringManagement;
 import org.junit.jupiter.api.Test;
 
+import reactor.core.publisher.Flux;
+
 public interface FilteringManagementContract {
 
     String BART_SIMPSON_CARTOON = "bart@simpson.cartoon";
@@ -46,7 +48,7 @@ public interface FilteringManagementContract {
 
     @Test
     default void listingRulesForUnknownUserShouldReturnEmptyList(EventStore eventStore) {
-        assertThat(instantiateFilteringManagement(eventStore).listRulesForUser(USERNAME))
+        assertThat(Flux.from(instantiateFilteringManagement(eventStore).listRulesForUser(USERNAME)).toStream())
             .isEmpty();
     }
 
@@ -63,7 +65,7 @@ public interface FilteringManagementContract {
 
         testee.defineRulesForUser(USERNAME, RULE_1, RULE_2);
 
-        assertThat(testee.listRulesForUser(USERNAME))
+        assertThat(Flux.from(testee.listRulesForUser(USERNAME)).toStream())
             .containsExactly(RULE_1, RULE_2);
     }
 
@@ -74,7 +76,7 @@ public interface FilteringManagementContract {
         testee.defineRulesForUser(USERNAME, RULE_1, RULE_2);
         testee.defineRulesForUser(USERNAME, RULE_2, RULE_1);
 
-        assertThat(testee.listRulesForUser(USERNAME))
+        assertThat(Flux.from(testee.listRulesForUser(USERNAME)).toStream())
             .containsExactly(RULE_2, RULE_1);
     }
 
@@ -108,7 +110,7 @@ public interface FilteringManagementContract {
         FilteringManagement testee = instantiateFilteringManagement(eventStore);
         testee.defineRulesForUser(USERNAME, RULE_3, RULE_2, RULE_1);
 
-        assertThat(testee.listRulesForUser(USERNAME))
+        assertThat(Flux.from(testee.listRulesForUser(USERNAME)).toStream())
             .containsExactly(RULE_3, RULE_2, RULE_1);
     }
 
@@ -119,7 +121,7 @@ public interface FilteringManagementContract {
         testee.defineRulesForUser(USERNAME, RULE_3, RULE_2, RULE_1);
         testee.clearRulesForUser(USERNAME);
 
-        assertThat(testee.listRulesForUser(USERNAME)).isEmpty();
+        assertThat(Flux.from(testee.listRulesForUser(USERNAME)).toStream()).isEmpty();
     }
 
     @Test
@@ -128,7 +130,7 @@ public interface FilteringManagementContract {
 
         testee.defineRulesForUser(USERNAME, RULE_FROM, RULE_RECIPIENT, RULE_SUBJECT, RULE_TO, RULE_1);
 
-        assertThat(testee.listRulesForUser(USERNAME))
+        assertThat(Flux.from(testee.listRulesForUser(USERNAME)).toStream())
             .containsExactly(RULE_FROM, RULE_RECIPIENT, RULE_SUBJECT, RULE_TO, RULE_1);
     }
 
diff --git a/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/EventSourcingDLPConfigurationStore.java b/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/EventSourcingDLPConfigurationStore.java
index 957a86d..48d88ba 100644
--- a/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/EventSourcingDLPConfigurationStore.java
+++ b/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/EventSourcingDLPConfigurationStore.java
@@ -37,10 +37,13 @@ import org.apache.james.dlp.eventsourcing.commands.StoreCommandHandler;
 import org.apache.james.eventsourcing.EventSourcingSystem;
 import org.apache.james.eventsourcing.Subscriber;
 import org.apache.james.eventsourcing.eventstore.EventStore;
-import org.apache.james.util.streams.Iterables;
+import org.reactivestreams.Publisher;
 
 import com.google.common.collect.ImmutableSet;
 
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
 public class EventSourcingDLPConfigurationStore implements DLPConfigurationStore {
 
     private static final ImmutableSet<Subscriber> NO_SUBSCRIBER = ImmutableSet.of();
@@ -60,29 +63,29 @@ public class EventSourcingDLPConfigurationStore implements DLPConfigurationStore
     }
 
     @Override
-    public DLPRules list(Domain domain) {
+    public Publisher<DLPRules> list(Domain domain) {
 
         DLPAggregateId aggregateId = new DLPAggregateId(domain);
 
-        return DLPDomainConfiguration.load(
-                aggregateId,
-                eventStore.getEventsOfAggregate(aggregateId))
-            .retrieveRules();
+        return Mono.from(eventStore.getEventsOfAggregate(aggregateId))
+            .map(history -> DLPDomainConfiguration.load(aggregateId, history).retrieveRules());
     }
 
     @Override
     public void store(Domain domain, DLPRules rules) {
-        eventSourcingSystem.dispatch(new StoreCommand(domain, rules));
+        Mono.from(eventSourcingSystem.dispatch(new StoreCommand(domain, rules))).block();
     }
 
     @Override
     public void clear(Domain domain) {
-        eventSourcingSystem.dispatch(new ClearCommand(domain));
+        Mono.from(eventSourcingSystem.dispatch(new ClearCommand(domain))).block();
     }
 
     @Override
     public Optional<DLPConfigurationItem> fetch(Domain domain, Id ruleId) {
-        return Iterables.toStream(list(domain))
+        return Mono.from(list(domain))
+                .flatMapMany(rules -> Flux.fromIterable(rules.getItems()))
+                .toStream()
                 .filter((DLPConfigurationItem item) -> item.getId().equals(ruleId))
                 .findFirst();
     }
diff --git a/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/commands/ClearCommandHandler.java b/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/commands/ClearCommandHandler.java
index c59f3ce..e9ed63e 100644
--- a/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/commands/ClearCommandHandler.java
+++ b/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/commands/ClearCommandHandler.java
@@ -23,11 +23,14 @@ import java.util.List;
 
 import org.apache.james.dlp.eventsourcing.aggregates.DLPAggregateId;
 import org.apache.james.dlp.eventsourcing.aggregates.DLPDomainConfiguration;
+import org.apache.james.eventsourcing.CommandHandler;
 import org.apache.james.eventsourcing.Event;
 import org.apache.james.eventsourcing.eventstore.EventStore;
-import org.apache.james.eventsourcing.javaapi.CommandHandlerJava;
+import org.reactivestreams.Publisher;
 
-public class ClearCommandHandler implements CommandHandlerJava<ClearCommand> {
+import reactor.core.publisher.Mono;
+
+public class ClearCommandHandler implements CommandHandler<ClearCommand> {
 
     private final EventStore eventStore;
 
@@ -41,12 +44,10 @@ public class ClearCommandHandler implements CommandHandlerJava<ClearCommand> {
     }
 
     @Override
-    public List<? extends Event> handleJava(ClearCommand clearCommand) {
+    public Publisher<List<? extends Event>> handle(ClearCommand clearCommand) {
         DLPAggregateId aggregateId = new DLPAggregateId(clearCommand.getDomain());
 
-        return DLPDomainConfiguration.load(
-                aggregateId,
-                eventStore.getEventsOfAggregate(aggregateId))
-            .clear();
+        return Mono.from(eventStore.getEventsOfAggregate(aggregateId))
+            .map(history -> DLPDomainConfiguration.load(aggregateId, history).clear());
     }
 }
diff --git a/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/commands/StoreCommandHandler.java b/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/commands/StoreCommandHandler.java
index e24bd83..0d628d5 100644
--- a/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/commands/StoreCommandHandler.java
+++ b/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/commands/StoreCommandHandler.java
@@ -23,11 +23,14 @@ import java.util.List;
 
 import org.apache.james.dlp.eventsourcing.aggregates.DLPAggregateId;
 import org.apache.james.dlp.eventsourcing.aggregates.DLPDomainConfiguration;
+import org.apache.james.eventsourcing.CommandHandler;
 import org.apache.james.eventsourcing.Event;
 import org.apache.james.eventsourcing.eventstore.EventStore;
-import org.apache.james.eventsourcing.javaapi.CommandHandlerJava;
+import org.reactivestreams.Publisher;
 
-public class StoreCommandHandler implements CommandHandlerJava<StoreCommand> {
+import reactor.core.publisher.Mono;
+
+public class StoreCommandHandler implements CommandHandler<StoreCommand> {
 
     private final EventStore eventStore;
 
@@ -41,12 +44,10 @@ public class StoreCommandHandler implements CommandHandlerJava<StoreCommand> {
     }
 
     @Override
-    public List<? extends Event> handleJava(StoreCommand storeCommand) {
+    public Publisher<List<? extends Event>> handle(StoreCommand storeCommand) {
         DLPAggregateId aggregateId = new DLPAggregateId(storeCommand.getDomain());
 
-        return DLPDomainConfiguration.load(
-                aggregateId,
-                eventStore.getEventsOfAggregate(aggregateId))
-            .store(storeCommand.getRules());
+        return Mono.from(eventStore.getEventsOfAggregate(aggregateId))
+        .map(history -> DLPDomainConfiguration.load(aggregateId, history).store(storeCommand.getRules()));
     }
 }
diff --git a/server/mailet/mailets/src/main/java/org/apache/james/transport/matchers/dlp/DlpRulesLoader.java b/server/mailet/mailets/src/main/java/org/apache/james/transport/matchers/dlp/DlpRulesLoader.java
index 08e3c4d..3d1f95f 100644
--- a/server/mailet/mailets/src/main/java/org/apache/james/transport/matchers/dlp/DlpRulesLoader.java
+++ b/server/mailet/mailets/src/main/java/org/apache/james/transport/matchers/dlp/DlpRulesLoader.java
@@ -25,6 +25,8 @@ import org.apache.james.core.Domain;
 import org.apache.james.dlp.api.DLPConfigurationStore;
 import org.apache.james.dlp.api.DLPRules;
 
+import reactor.core.publisher.Mono;
+
 public interface DlpRulesLoader {
 
     DlpDomainRules load(Domain domain);
@@ -40,7 +42,7 @@ public interface DlpRulesLoader {
 
         @Override
         public DlpDomainRules load(Domain domain) {
-          return toRules(configurationStore.list(domain));
+          return toRules(Mono.from(configurationStore.list(domain)).block());
         }
 
         private DlpDomainRules toRules(DLPRules items) {
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetFilterMethod.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetFilterMethod.java
index 5bb238c..618d862 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetFilterMethod.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetFilterMethod.java
@@ -19,14 +19,12 @@
 
 package org.apache.james.jmap.draft.methods;
 
-import java.util.List;
 import java.util.stream.Stream;
 
 import javax.inject.Inject;
 
 import org.apache.james.core.Username;
 import org.apache.james.jmap.api.filtering.FilteringManagement;
-import org.apache.james.jmap.api.filtering.Rule;
 import org.apache.james.jmap.draft.model.GetFilterRequest;
 import org.apache.james.jmap.draft.model.GetFilterResponse;
 import org.apache.james.jmap.draft.model.MethodCallId;
@@ -37,8 +35,12 @@ import org.apache.james.util.MDCBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.github.steveash.guavate.Guavate;
 import com.google.common.base.Preconditions;
 
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
 public class GetFilterMethod implements Method {
     private static final Logger LOGGER = LoggerFactory.getLogger(GetFilterMethod.class);
 
@@ -93,17 +95,18 @@ public class GetFilterMethod implements Method {
     }
 
     private Stream<JmapResponse> retrieveFilter(MethodCallId methodCallId, Username username) {
-        List<Rule> rules = filteringManagement.listRulesForUser(username);
-
-        GetFilterResponse getFilterResponse = GetFilterResponse.builder()
-            .rules(rules)
-            .build();
-
-        return Stream.of(JmapResponse.builder()
-            .methodCallId(methodCallId)
-            .response(getFilterResponse)
-            .responseName(RESPONSE_NAME)
-            .build());
+        return Flux.from(filteringManagement.listRulesForUser(username))
+            .collect(Guavate.toImmutableList())
+            .map(rules -> GetFilterResponse.builder()
+                .rules(rules)
+                .build())
+            .map(getFilterResponse -> JmapResponse.builder()
+                .methodCallId(methodCallId)
+                .response(getFilterResponse)
+                .responseName(RESPONSE_NAME)
+                .build())
+            .flatMapMany(Mono::just)
+            .toStream();
     }
 
     private JmapResponse unKnownError(MethodCallId methodCallId) {
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/mailet/filter/JMAPFiltering.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/mailet/filter/JMAPFiltering.java
index 0a0d58f..efd5fc7 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/mailet/filter/JMAPFiltering.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/mailet/filter/JMAPFiltering.java
@@ -36,6 +36,10 @@ import org.apache.mailet.base.GenericMailet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.github.steveash.guavate.Guavate;
+
+import reactor.core.publisher.Flux;
+
 /**
  * Mailet for applying JMAP filtering to incoming email.
  *
@@ -77,7 +81,9 @@ public class JMAPFiltering extends GenericMailet {
     }
 
     private void findFirstApplicableRule(Username username, Mail mail) {
-        List<Rule> filteringRules = filteringManagement.listRulesForUser(username);
+        List<Rule> filteringRules = Flux.from(filteringManagement.listRulesForUser(username))
+            .collect(Guavate.toImmutableList())
+            .block();
         RuleMatcher ruleMatcher = new RuleMatcher(filteringRules);
         Stream<Rule> matchingRules = ruleMatcher.findApplicableRules(mail);
 
diff --git a/server/protocols/webadmin/webadmin-data/src/main/java/org/apache/james/webadmin/routes/DLPConfigurationRoutes.java b/server/protocols/webadmin/webadmin-data/src/main/java/org/apache/james/webadmin/routes/DLPConfigurationRoutes.java
index 9dc3e32..e197a5a 100644
--- a/server/protocols/webadmin/webadmin-data/src/main/java/org/apache/james/webadmin/routes/DLPConfigurationRoutes.java
+++ b/server/protocols/webadmin/webadmin-data/src/main/java/org/apache/james/webadmin/routes/DLPConfigurationRoutes.java
@@ -55,6 +55,7 @@ import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
+import reactor.core.publisher.Mono;
 import spark.HaltException;
 import spark.Request;
 import spark.Service;
@@ -166,7 +167,7 @@ public class DLPConfigurationRoutes implements Routes {
     public void defineList(Service service) {
         service.get(SPECIFIC_DLP_RULE_DOMAIN, (request, response) -> {
             Domain senderDomain = parseDomain(request);
-            DLPRules dlpConfigurations = dlpConfigurationStore.list(senderDomain);
+            DLPRules dlpConfigurations = Mono.from(dlpConfigurationStore.list(senderDomain)).block();
 
             DLPConfigurationDTO dto = DLPConfigurationDTO.toDTO(dlpConfigurations);
             response.status(HttpStatus.OK_200);
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/EventsourcingConfigurationManagement.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/EventsourcingConfigurationManagement.java
index a9ca76a..ba2c621 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/EventsourcingConfigurationManagement.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/EventsourcingConfigurationManagement.java
@@ -19,8 +19,6 @@
 
 package org.apache.james.queue.rabbitmq.view.cassandra.configuration;
 
-import java.util.Optional;
-
 import javax.inject.Inject;
 
 import org.apache.james.eventsourcing.AggregateId;
@@ -32,6 +30,8 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
 
+import reactor.core.publisher.Mono;
+
 public class EventsourcingConfigurationManagement {
     static final String CONFIGURATION_AGGREGATE_KEY = "CassandraMailQueueViewConfiguration";
     static final AggregateId CONFIGURATION_AGGREGATE_ID = () -> CONFIGURATION_AGGREGATE_KEY;
@@ -51,15 +51,16 @@ public class EventsourcingConfigurationManagement {
     }
 
     @VisibleForTesting
-    Optional<CassandraMailQueueViewConfiguration> load() {
-        return ConfigurationAggregate
-            .load(CONFIGURATION_AGGREGATE_ID, eventStore.getEventsOfAggregate(CONFIGURATION_AGGREGATE_ID))
-            .getCurrentConfiguration();
+    Mono<CassandraMailQueueViewConfiguration> load() {
+        return Mono.from(eventStore.getEventsOfAggregate(CONFIGURATION_AGGREGATE_ID))
+            .flatMap(history -> Mono.justOrEmpty(ConfigurationAggregate
+                .load(CONFIGURATION_AGGREGATE_ID, history)
+                .getCurrentConfiguration()));
     }
 
     public void registerConfiguration(CassandraMailQueueViewConfiguration newConfiguration) {
         Preconditions.checkNotNull(newConfiguration);
 
-        eventSourcingSystem.dispatch(new RegisterConfigurationCommand(newConfiguration, CONFIGURATION_AGGREGATE_ID));
+        Mono.from(eventSourcingSystem.dispatch(new RegisterConfigurationCommand(newConfiguration, CONFIGURATION_AGGREGATE_ID))).block();
     }
 }
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/RegisterConfigurationCommandHandler.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/RegisterConfigurationCommandHandler.java
index 33c61e9..bf54102 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/RegisterConfigurationCommandHandler.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/RegisterConfigurationCommandHandler.java
@@ -21,11 +21,14 @@ package org.apache.james.queue.rabbitmq.view.cassandra.configuration;
 
 import java.util.List;
 
+import org.apache.james.eventsourcing.CommandHandler;
 import org.apache.james.eventsourcing.Event;
 import org.apache.james.eventsourcing.eventstore.EventStore;
-import org.apache.james.eventsourcing.javaapi.CommandHandlerJava;
+import org.reactivestreams.Publisher;
 
-class RegisterConfigurationCommandHandler implements CommandHandlerJava<RegisterConfigurationCommand> {
+import reactor.core.publisher.Mono;
+
+class RegisterConfigurationCommandHandler implements CommandHandler<RegisterConfigurationCommand> {
 
     private final EventStore eventStore;
 
@@ -39,9 +42,10 @@ class RegisterConfigurationCommandHandler implements CommandHandlerJava<Register
     }
 
     @Override
-    public List<? extends Event> handleJava(RegisterConfigurationCommand command) {
-        return ConfigurationAggregate
-            .load(command.getAggregateId(), eventStore.getEventsOfAggregate(command.getAggregateId()))
-            .registerConfiguration(command.getConfiguration());
+    public Publisher<List<? extends Event>> handle(RegisterConfigurationCommand command) {
+        return Mono.from(eventStore.getEventsOfAggregate(command.getAggregateId()))
+            .map(history -> ConfigurationAggregate
+                .load(command.getAggregateId(), history)
+                .registerConfiguration(command.getConfiguration()));
     }
 }
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/EventsourcingConfigurationManagementTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/EventsourcingConfigurationManagementTest.java
index 1518a1d..5d191bf 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/EventsourcingConfigurationManagementTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/EventsourcingConfigurationManagementTest.java
@@ -31,6 +31,8 @@ import org.apache.james.eventsourcing.eventstore.cassandra.JsonEventSerializer;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
+import reactor.core.publisher.Mono;
+
 class EventsourcingConfigurationManagementTest {
 
     @RegisterExtension
@@ -69,7 +71,7 @@ class EventsourcingConfigurationManagementTest {
     void loadShouldReturnEmptyIfNoConfigurationStored(EventStore eventStore) {
         EventsourcingConfigurationManagement testee = createConfigurationManagement(eventStore);
 
-        assertThat(testee.load())
+        assertThat(Mono.from(testee.load()).blockOptional())
             .isEmpty();
     }
 
@@ -80,7 +82,7 @@ class EventsourcingConfigurationManagementTest {
         testee.registerConfiguration(SECOND_CONFIGURATION);
         testee.registerConfiguration(THIRD_CONFIGURATION);
 
-        assertThat(testee.load())
+        assertThat(Mono.from(testee.load()).blockOptional())
             .contains(THIRD_CONFIGURATION);
     }
 
@@ -125,7 +127,7 @@ class EventsourcingConfigurationManagementTest {
             .build();
         testee.registerConfiguration(increaseOneBucketConfiguration);
 
-        assertThat(testee.load())
+        assertThat(Mono.from(testee.load()).blockOptional())
             .contains(increaseOneBucketConfiguration);
     }
 
@@ -135,7 +137,7 @@ class EventsourcingConfigurationManagementTest {
 
         testee.registerConfiguration(FIRST_CONFIGURATION);
 
-        assertThat(testee.load())
+        assertThat(Mono.from(testee.load()).blockOptional())
             .contains(FIRST_CONFIGURATION);
     }
 
@@ -189,7 +191,7 @@ class EventsourcingConfigurationManagementTest {
             .build();
         testee.registerConfiguration(decreaseTwiceSliceWindowConfiguration);
 
-        assertThat(testee.load())
+        assertThat(Mono.from(testee.load()).blockOptional())
             .contains(decreaseTwiceSliceWindowConfiguration);
     }
 
@@ -209,7 +211,7 @@ class EventsourcingConfigurationManagementTest {
             .build();
         testee.registerConfiguration(decreaseTwiceSliceWindowConfiguration);
 
-        assertThat(testee.load())
+        assertThat(Mono.from(testee.load()).blockOptional())
             .contains(decreaseTwiceSliceWindowConfiguration);
     }
 
@@ -229,7 +231,7 @@ class EventsourcingConfigurationManagementTest {
             .build();
         testee.registerConfiguration(decreaseTwiceSliceWindowConfiguration);
 
-        assertThat(testee.load())
+        assertThat(Mono.from(testee.load()).blockOptional())
             .contains(decreaseTwiceSliceWindowConfiguration);
     }
 
@@ -239,7 +241,8 @@ class EventsourcingConfigurationManagementTest {
         testee.registerConfiguration(FIRST_CONFIGURATION);
         testee.registerConfiguration(FIRST_CONFIGURATION);
 
-        assertThat(eventStore.getEventsOfAggregate(CONFIGURATION_AGGREGATE_ID)
+        assertThat(Mono.from(eventStore.getEventsOfAggregate(CONFIGURATION_AGGREGATE_ID))
+            .block()
             .getEventsJava())
             .hasSize(1);
     }
diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
index de68b24..dbdce52 100644
--- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
+++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
@@ -140,22 +140,22 @@ public class RabbitMQWorkQueue implements WorkQueue {
 
     private Mono<Task> deserialize(String json, TaskId taskId) {
         return Mono.fromCallable(() -> taskSerializer.deserialize(json))
-            .doOnError(error -> {
+            .onErrorResume(error -> {
                 String errorMessage = String.format("Unable to deserialize submitted Task %s", taskId.asString());
                 LOGGER.error(errorMessage, error);
-                worker.fail(taskId, Optional.empty(), errorMessage, error);
-            })
-            .onErrorResume(error -> Mono.empty());
+                return Mono.from(worker.fail(taskId, Optional.empty(), errorMessage, error))
+                    .then(Mono.empty());
+            });
     }
 
     private Mono<Task.Result> executeOnWorker(TaskId taskId, Task task) {
         return worker.executeTask(new TaskWithId(taskId, task))
-            .doOnError(error -> {
+            .onErrorResume(error -> {
                 String errorMessage = String.format("Unable to run submitted Task %s", taskId.asString());
                 LOGGER.warn(errorMessage, error);
-                worker.fail(taskId, task.details(), errorMessage, error);
-            })
-            .onErrorResume(error -> Mono.empty());
+                return Mono.from(worker.fail(taskId, task.details(), errorMessage, error))
+                    .then(Mono.empty());
+            });
     }
 
     private void listenToCancelRequests() {
diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/ImmediateWorker.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/ImmediateWorker.java
index 93d8f75..71e6214 100644
--- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/ImmediateWorker.java
+++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/ImmediateWorker.java
@@ -27,6 +27,7 @@ import org.apache.james.task.TaskExecutionDetails;
 import org.apache.james.task.TaskId;
 import org.apache.james.task.TaskManagerWorker;
 import org.apache.james.task.TaskWithId;
+import org.reactivestreams.Publisher;
 
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
@@ -39,8 +40,8 @@ class ImmediateWorker implements TaskManagerWorker {
 
     @Override
     public Mono<Task.Result> executeTask(TaskWithId taskWithId) {
-        tasks.add(taskWithId);
-        return Mono.fromCallable(() -> taskWithId.getTask().run())
+        return Mono.fromRunnable(() -> tasks.add(taskWithId))
+            .then(Mono.fromCallable(() -> taskWithId.getTask().run()))
             .doOnNext(result -> results.add(result))
             .subscribeOn(Schedulers.elastic());
     }
@@ -50,8 +51,9 @@ class ImmediateWorker implements TaskManagerWorker {
     }
 
     @Override
-    public void fail(TaskId taskId, Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation, String errorMessage, Throwable reason) {
-        failedTasks.add(taskId);
+    public Publisher<Void> fail(TaskId taskId, Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation, String errorMessage, Throwable reason) {
+        return Mono.fromRunnable(() -> failedTasks.add(taskId))
+            .then();
     }
 
     @Override
diff --git a/server/task/task-memory/src/main/java/org/apache/james/task/MemoryTaskManager.java b/server/task/task-memory/src/main/java/org/apache/james/task/MemoryTaskManager.java
index 3d2da18..8d983b0 100644
--- a/server/task/task-memory/src/main/java/org/apache/james/task/MemoryTaskManager.java
+++ b/server/task/task-memory/src/main/java/org/apache/james/task/MemoryTaskManager.java
@@ -30,10 +30,13 @@ import java.util.function.Consumer;
 import javax.annotation.PreDestroy;
 import javax.inject.Inject;
 
+import org.reactivestreams.Publisher;
+
 import com.github.steveash.guavate.Guavate;
 import com.google.common.collect.ImmutableList;
 
 import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 
 public class MemoryTaskManager implements TaskManager {
@@ -54,40 +57,41 @@ public class MemoryTaskManager implements TaskManager {
         }
 
         @Override
-        public void started(TaskId taskId) {
-            updaterFactory.apply(taskId).accept(details -> details.started(hostname));
+        public Publisher<Void> started(TaskId taskId) {
+            return Mono.fromRunnable(() -> updaterFactory.apply(taskId)
+                .accept(details -> details.started(hostname)));
         }
 
         @Override
-        public void completed(TaskId taskId, Task.Result result, Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation) {
-            updaterFactory.apply(taskId)
-                .accept(details -> details.completed(additionalInformation));
+        public Publisher<Void> completed(TaskId taskId, Task.Result result, Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation) {
+            return Mono.fromRunnable(() -> updaterFactory.apply(taskId)
+                .accept(details -> details.completed(additionalInformation)));
         }
 
         @Override
-        public void failed(TaskId taskId, Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation, String errorMessage, Throwable t) {
-            failed(taskId, additionalInformation);
+        public Publisher<Void> failed(TaskId taskId, Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation, String errorMessage, Throwable t) {
+            return failed(taskId, additionalInformation);
         }
 
         @Override
-        public void failed(TaskId taskId, Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation, Throwable t) {
-            failed(taskId, additionalInformation);
+        public Publisher<Void> failed(TaskId taskId, Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation, Throwable t) {
+            return failed(taskId, additionalInformation);
          }
 
         @Override
-        public void failed(TaskId taskId, Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation) {
-            updaterFactory.apply(taskId)
-                .accept(details -> details.failed(additionalInformation));
+        public Publisher<Void> failed(TaskId taskId, Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation) {
+            return Mono.fromRunnable(() -> updaterFactory.apply(taskId)
+                .accept(details -> details.failed(additionalInformation)));
         }
 
         @Override
-        public void cancelled(TaskId taskId, Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation) {
-            updaterFactory.apply(taskId)
-                .accept(details -> details.cancelEffectively(additionalInformation));
+        public Publisher<Void> cancelled(TaskId taskId, Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation) {
+            return Mono.fromRunnable(() -> updaterFactory.apply(taskId)
+                .accept(details -> details.cancelEffectively(additionalInformation)));
         }
 
         @Override
-        public void updated(TaskId taskId, TaskExecutionDetails.AdditionalInformation additionalInformation) {
+        public Publisher<Void> updated(TaskId taskId, TaskExecutionDetails.AdditionalInformation additionalInformation) {
             //The memory task manager doesn't use polling to update its additionalInformation.
             throw new IllegalStateException();
         }
diff --git a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
index 9bdad89..b024c4c 100644
--- a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
+++ b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
@@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.james.util.MDCBuilder;
 import org.apache.james.util.concurrent.NamedThreadFactory;
+import org.reactivestreams.Publisher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -69,20 +70,20 @@ public class SerialTaskManagerWorker implements TaskManagerWorker {
             return Mono.using(
                 () -> pollAdditionalInformation(taskWithId).subscribe(),
                 ignored -> Mono.fromFuture(future)
-                    .doOnError(exception -> handleExecutionError(taskWithId, listener, exception))
-                    .onErrorReturn(Task.Result.PARTIAL),
+                    .onErrorResume(exception -> Mono.from(handleExecutionError(taskWithId, listener, exception))
+                            .thenReturn(Task.Result.PARTIAL)),
                 Disposable::dispose);
         } else {
-            listener.cancelled(taskWithId.getId(), taskWithId.getTask().details());
-            return Mono.empty();
+            return Mono.from(listener.cancelled(taskWithId.getId(), taskWithId.getTask().details()))
+                .then(Mono.empty());
         }
     }
 
-    private void handleExecutionError(TaskWithId taskWithId, Listener listener, Throwable exception) {
+    private Publisher<Void> handleExecutionError(TaskWithId taskWithId, Listener listener, Throwable exception) {
         if (exception instanceof CancellationException) {
-            listener.cancelled(taskWithId.getId(), taskWithId.getTask().details());
+            return listener.cancelled(taskWithId.getId(), taskWithId.getTask().details());
         } else {
-            listener.failed(taskWithId.getId(), taskWithId.getTask().details(), exception);
+            return listener.failed(taskWithId.getId(), taskWithId.getTask().details(), exception);
         }
     }
 
@@ -91,7 +92,7 @@ public class SerialTaskManagerWorker implements TaskManagerWorker {
             .delayElement(pollingInterval, Schedulers.elastic())
             .repeat()
             .<TaskExecutionDetails.AdditionalInformation>handle((maybeDetails, sink) -> maybeDetails.ifPresent(sink::next))
-            .doOnNext(information -> listener.updated(taskWithId.getId(), information));
+            .flatMap(information -> Mono.from(listener.updated(taskWithId.getId(), information)).thenReturn(information));
     }
 
 
@@ -101,27 +102,27 @@ public class SerialTaskManagerWorker implements TaskManagerWorker {
                 .addContext(Task.TASK_ID, taskWithId.getId())
                 .addContext(Task.TASK_TYPE, taskWithId.getTask().type())
                 .addContext(Task.TASK_DETAILS, taskWithId.getTask().details()),
-            () -> run(taskWithId, listener));
+            () -> run(taskWithId, listener).block());
     }
 
-    private Task.Result run(TaskWithId taskWithId, Listener listener) {
-        listener.started(taskWithId.getId());
-        try {
-            return taskWithId.getTask()
-                .run()
-                .onComplete(result -> listener.completed(taskWithId.getId(), result, taskWithId.getTask().details()))
-                .onFailure(() -> {
-                    LOGGER.error("Task was partially performed. Check logs for more details. Taskid : " + taskWithId.getId());
-                    listener.failed(taskWithId.getId(), taskWithId.getTask().details());
-                });
-        } catch (InterruptedException e) {
-            listener.cancelled(taskWithId.getId(), taskWithId.getTask().details());
-            return Task.Result.PARTIAL;
-        } catch (Exception e) {
-            LOGGER.error("Error while running task {}", taskWithId.getId(), e);
-            listener.failed(taskWithId.getId(), taskWithId.getTask().details(), e);
-            return Task.Result.PARTIAL;
-        }
+    private Mono<Task.Result> run(TaskWithId taskWithId, Listener listener) {
+        return Mono.from(listener.started(taskWithId.getId()))
+            .then(Mono.fromCallable(() -> runTask(taskWithId, listener)))
+            .onErrorResume(InterruptedException.class, e -> Mono.from(listener.cancelled(taskWithId.getId(), taskWithId.getTask().details())).thenReturn(Task.Result.PARTIAL))
+            .onErrorResume(Exception.class, e -> {
+                LOGGER.error("Error while running task {}", taskWithId.getId(), e);
+                return Mono.from(listener.failed(taskWithId.getId(), taskWithId.getTask().details(), e)).thenReturn(Task.Result.PARTIAL);
+            });
+    }
+
+    private Task.Result runTask(TaskWithId taskWithId, Listener listener) throws InterruptedException {
+        return taskWithId.getTask()
+            .run()
+            .onComplete(result -> Mono.from(listener.completed(taskWithId.getId(), result, taskWithId.getTask().details())).block())
+            .onFailure(() -> {
+                LOGGER.error("Task was partially performed. Check logs for more details. Taskid : " + taskWithId.getId());
+                Mono.from(listener.failed(taskWithId.getId(), taskWithId.getTask().details())).block();
+            });
     }
 
     @Override
@@ -133,8 +134,8 @@ public class SerialTaskManagerWorker implements TaskManagerWorker {
     }
 
     @Override
-    public void fail(TaskId taskId, Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation, String errorMessage, Throwable reason) {
-        listener.failed(taskId, additionalInformation, errorMessage, reason);
+    public Publisher<Void> fail(TaskId taskId, Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation, String errorMessage, Throwable reason) {
+        return listener.failed(taskId, additionalInformation, errorMessage, reason);
     }
 
     @Override
diff --git a/server/task/task-memory/src/main/java/org/apache/james/task/TaskManagerWorker.java b/server/task/task-memory/src/main/java/org/apache/james/task/TaskManagerWorker.java
index c6475e0..c29069e 100644
--- a/server/task/task-memory/src/main/java/org/apache/james/task/TaskManagerWorker.java
+++ b/server/task/task-memory/src/main/java/org/apache/james/task/TaskManagerWorker.java
@@ -21,29 +21,31 @@ package org.apache.james.task;
 import java.io.Closeable;
 import java.util.Optional;
 
+import org.reactivestreams.Publisher;
+
 import reactor.core.publisher.Mono;
 
 public interface TaskManagerWorker extends Closeable {
 
     interface Listener {
-        void started(TaskId taskId);
+        Publisher<Void> started(TaskId taskId);
 
-        void completed(TaskId taskId, Task.Result result, Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation);
+        Publisher<Void> completed(TaskId taskId, Task.Result result, Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation);
 
-        void failed(TaskId taskId, Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation, String errorMessage, Throwable t);
+        Publisher<Void> failed(TaskId taskId, Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation, String errorMessage, Throwable t);
 
-        void failed(TaskId taskId, Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation, Throwable t);
+        Publisher<Void> failed(TaskId taskId, Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation, Throwable t);
 
-        void failed(TaskId taskId, Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation);
+        Publisher<Void> failed(TaskId taskId, Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation);
 
-        void cancelled(TaskId taskId, Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation);
+        Publisher<Void> cancelled(TaskId taskId, Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation);
 
-        void updated(TaskId taskId, TaskExecutionDetails.AdditionalInformation additionalInformation);
+        Publisher<Void> updated(TaskId taskId, TaskExecutionDetails.AdditionalInformation additionalInformation);
     }
 
     Mono<Task.Result> executeTask(TaskWithId taskWithId);
 
     void cancelTask(TaskId taskId);
 
-    void fail(TaskId taskId, Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation, String errorMessage, Throwable reason);
+    Publisher<Void> fail(TaskId taskId, Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation, String errorMessage, Throwable reason);
 }
diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala
index 39e0ea4..de682c1 100644
--- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala
+++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala
@@ -1,4 +1,4 @@
-/** **************************************************************
+/*****************************************************************
   * Licensed to the Apache Software Foundation (ASF) under one   *
   * or more contributor license agreements.  See the NOTICE file *
   * distributed with this work for additional information        *
@@ -6,87 +6,92 @@
   * to you under the Apache License, Version 2.0 (the            *
   * "License"); you may not use this file except in compliance   *
   * with the License.  You may obtain a copy of the License at   *
-  * *
-  * http://www.apache.org/licenses/LICENSE-2.0                 *
-  * *
+  *                                                              *
+  * http://www.apache.org/licenses/LICENSE-2.0                   *
+  *                                                              *
   * Unless required by applicable law or agreed to in writing,   *
   * software distributed under the License is distributed on an  *
   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
   * KIND, either express or implied.  See the License for the    *
   * specific language governing permissions and limitations      *
   * under the License.                                           *
-  * ***************************************************************/
+  ****************************************************************/
 package org.apache.james.task.eventsourcing
 
-import java.util
+import java.util.List
 
 import org.apache.james.eventsourcing.eventstore.History
 import org.apache.james.eventsourcing.{CommandHandler, Event}
 import org.apache.james.task.eventsourcing.TaskCommand._
 import org.apache.james.task.{Hostname, TaskId}
+import org.reactivestreams.Publisher
+
+import reactor.core.scala.publisher.SMono
+
+import scala.jdk.CollectionConverters._
 
 sealed abstract class TaskCommandHandler[T <: TaskCommand] extends CommandHandler[T] {
 
-  def loadAggregate(loadHistory: TaskAggregateId => History, taskId: TaskId): TaskAggregate = {
+  def loadAggregate(loadHistory: TaskAggregateId => SMono[History], taskId: TaskId): SMono[TaskAggregate] = {
     val aggregateId = TaskAggregateId(taskId)
-    TaskAggregate.fromHistory(aggregateId, loadHistory(aggregateId))
+    loadHistory(aggregateId).map(TaskAggregate.fromHistory(aggregateId, _))
   }
 }
 
-class CreateCommandHandler(private val loadHistory: TaskAggregateId => History, hostname: Hostname) extends TaskCommandHandler[Create] {
+class CreateCommandHandler(private val loadHistory: TaskAggregateId => SMono[History], hostname: Hostname) extends TaskCommandHandler[Create] {
   override def handledClass: Class[Create] = classOf[Create]
 
-  override def handle(command: Create): List[_ <: Event] = {
-    TaskAggregate.create(TaskAggregateId(command.id), command.task, hostname)
+  override def handle(command: Create): Publisher[List[_ <: Event]] = {
+    SMono.fromCallable(() => TaskAggregate.create(TaskAggregateId(command.id), command.task, hostname).asJava)
   }
 }
 
-class StartCommandHandler(private val loadHistory: TaskAggregateId => History,
+class StartCommandHandler(private val loadHistory: TaskAggregateId => SMono[History],
                           private val hostname: Hostname) extends TaskCommandHandler[Start] {
   override def handledClass: Class[Start] = classOf[Start]
 
-  override def handle(command: Start): List[_ <: Event] = {
-    loadAggregate(loadHistory, command.id).start(hostname)
+  override def handle(command: Start): Publisher[List[_ <: Event]] = {
+    loadAggregate(loadHistory, command.id).map(_.start(hostname).asJava)
   }
 }
 
-class RequestCancelCommandHandler(private val loadHistory: TaskAggregateId => History,
+class RequestCancelCommandHandler(private val loadHistory: TaskAggregateId => SMono[History],
                                   private val hostname: Hostname) extends TaskCommandHandler[RequestCancel] {
   override def handledClass: Class[RequestCancel] = classOf[RequestCancel]
 
-  override def handle(command: RequestCancel): List[_ <: Event] = {
-    loadAggregate(loadHistory, command.id).requestCancel(hostname)
+  override def handle(command: RequestCancel): Publisher[List[_ <: Event]] = {
+    loadAggregate(loadHistory, command.id).map(_.requestCancel(hostname).asJava)
   }
 }
 
-class CompleteCommandHandler(private val loadHistory: TaskAggregateId => History) extends TaskCommandHandler[Complete] {
+class CompleteCommandHandler(private val loadHistory: TaskAggregateId => SMono[History]) extends TaskCommandHandler[Complete] {
   override def handledClass: Class[Complete] = classOf[Complete]
 
-  override def handle(command: Complete): List[_ <: Event] = {
-    loadAggregate(loadHistory, command.id).complete(command.result, command.additionalInformation)
+  override def handle(command: Complete): Publisher[List[_ <: Event]] = {
+    loadAggregate(loadHistory, command.id).map(_.complete(command.result, command.additionalInformation).asJava)
   }
 }
 
-class CancelCommandHandler(private val loadHistory: TaskAggregateId => History) extends TaskCommandHandler[Cancel] {
+class CancelCommandHandler(private val loadHistory: TaskAggregateId => SMono[History]) extends TaskCommandHandler[Cancel] {
   override def handledClass: Class[Cancel] = classOf[Cancel]
 
-  override def handle(command: Cancel): List[_ <: Event] = {
-    loadAggregate(loadHistory, command.id).cancel(command.additionalInformation)
+  override def handle(command: Cancel): Publisher[List[_ <: Event]] = {
+    loadAggregate(loadHistory, command.id).map(_.cancel(command.additionalInformation).asJava)
   }
 }
 
-class FailCommandHandler(private val loadHistory: TaskAggregateId => History) extends TaskCommandHandler[Fail] {
+class FailCommandHandler(private val loadHistory: TaskAggregateId => SMono[History]) extends TaskCommandHandler[Fail] {
   override def handledClass: Class[Fail] = classOf[Fail]
 
-  override def handle(command: Fail): List[_ <: Event] = {
-    loadAggregate(loadHistory, command.id).fail(command.additionalInformation, command.errorMessage, command.exception)
+  override def handle(command: Fail): Publisher[List[_ <: Event]] = {
+    loadAggregate(loadHistory, command.id).map(_.fail(command.additionalInformation, command.errorMessage, command.exception).asJava)
   }
 }
 
-class UpdateCommandHandler(private val loadHistory: TaskAggregateId => History) extends TaskCommandHandler[UpdateAdditionalInformation] {
+class UpdateCommandHandler(private val loadHistory: TaskAggregateId => SMono[History]) extends TaskCommandHandler[UpdateAdditionalInformation] {
   override def handledClass: Class[UpdateAdditionalInformation] = classOf[UpdateAdditionalInformation]
 
-  override def handle(command: UpdateAdditionalInformation): List[_ <: Event] = {
-    loadAggregate(loadHistory, command.id).update(command.additionalInformation)
+  override def handle(command: UpdateAdditionalInformation): Publisher[List[_ <: Event]] = {
+    loadAggregate(loadHistory, command.id).map(_.update(command.additionalInformation).asJava)
   }
 }
\ No newline at end of file
diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
index ad5d848..d9e6fa6 100644
--- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
+++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
@@ -25,13 +25,16 @@ import java.util
 import com.google.common.annotations.VisibleForTesting
 import javax.annotation.PreDestroy
 import javax.inject.Inject
+
 import org.apache.james.eventsourcing.eventstore.{EventStore, History}
 import org.apache.james.eventsourcing.{AggregateId, EventSourcingSystem, Subscriber}
 import org.apache.james.lifecycle.api.Startable
 import org.apache.james.task.TaskManager.ReachedTimeoutException
 import org.apache.james.task._
 import org.apache.james.task.eventsourcing.TaskCommand._
+
 import reactor.core.publisher.{Flux, Mono}
+import reactor.core.scala.publisher.SMono
 import reactor.core.scheduler.Schedulers
 
 class EventSourcingTaskManager @Inject @VisibleForTesting private[eventsourcing](
@@ -52,7 +55,7 @@ class EventSourcingTaskManager @Inject @VisibleForTesting private[eventsourcing]
 
   import scala.jdk.CollectionConverters._
 
-  private val loadHistory: AggregateId => History = eventStore.getEventsOfAggregate _
+  private val loadHistory: AggregateId => SMono[History] = aggregateId => SMono(eventStore.getEventsOfAggregate(aggregateId))
   private val eventSourcingSystem = new EventSourcingSystem(
     handlers = Set(
       new CreateCommandHandler(loadHistory, hostname),
@@ -75,7 +78,7 @@ class EventSourcingTaskManager @Inject @VisibleForTesting private[eventsourcing]
   override def submit(task: Task): TaskId = {
     val taskId = TaskId.generateTaskId
     val command = Create(taskId, task)
-    eventSourcingSystem.dispatch(command)
+    SMono(eventSourcingSystem.dispatch(command)).block()
     taskId
   }
 
@@ -93,7 +96,7 @@ class EventSourcingTaskManager @Inject @VisibleForTesting private[eventsourcing]
 
   override def cancel(id: TaskId): Unit = {
     val command = RequestCancel(id)
-    eventSourcingSystem.dispatch(command)
+    SMono(eventSourcingSystem.dispatch(command)).block()
   }
 
   @throws(classOf[TaskNotFoundException])
diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/WorkerStatusListener.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/WorkerStatusListener.scala
index aa93d82..1edbf03 100644
--- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/WorkerStatusListener.scala
+++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/WorkerStatusListener.scala
@@ -26,28 +26,31 @@ import org.apache.james.eventsourcing.EventSourcingSystem
 import org.apache.james.task.Task.Result
 import org.apache.james.task.eventsourcing.TaskCommand._
 import org.apache.james.task.{TaskExecutionDetails, TaskId, TaskManagerWorker}
+import org.reactivestreams.Publisher
+
+import reactor.core.scala.publisher.SMono
 
 import scala.compat.java8.OptionConverters._
 
 case class WorkerStatusListener(eventSourcingSystem: EventSourcingSystem) extends TaskManagerWorker.Listener {
 
-  override def started(taskId: TaskId): Unit = eventSourcingSystem.dispatch(Start(taskId))
+  override def started(taskId: TaskId): Publisher[Void] = eventSourcingSystem.dispatch(Start(taskId))
 
-  override def completed(taskId: TaskId, result: Result, additionalInformation: Optional[TaskExecutionDetails.AdditionalInformation]): Unit =
+  override def completed(taskId: TaskId, result: Result, additionalInformation: Optional[TaskExecutionDetails.AdditionalInformation]): Publisher[Void] =
     eventSourcingSystem.dispatch(Complete(taskId, result, additionalInformation.asScala))
 
-  override def failed(taskId: TaskId, additionalInformation: Optional[TaskExecutionDetails.AdditionalInformation], errorMessage: String, t: Throwable): Unit =
+  override def failed(taskId: TaskId, additionalInformation: Optional[TaskExecutionDetails.AdditionalInformation], errorMessage: String, t: Throwable): Publisher[Void] =
     eventSourcingSystem.dispatch(Fail(taskId, additionalInformation.asScala, Some(errorMessage), Some(Throwables.getStackTraceAsString(t))))
 
-  override def failed(taskId: TaskId, additionalInformation: Optional[TaskExecutionDetails.AdditionalInformation], t: Throwable): Unit =
+  override def failed(taskId: TaskId, additionalInformation: Optional[TaskExecutionDetails.AdditionalInformation], t: Throwable): Publisher[Void] =
     eventSourcingSystem.dispatch(Fail(taskId, additionalInformation.asScala, None, Some(Throwables.getStackTraceAsString(t))))
 
-  override def failed(taskId: TaskId, additionalInformation: Optional[TaskExecutionDetails.AdditionalInformation]): Unit =
+  override def failed(taskId: TaskId, additionalInformation: Optional[TaskExecutionDetails.AdditionalInformation]): Publisher[Void] =
     eventSourcingSystem.dispatch(Fail(taskId, additionalInformation.asScala, None, None))
 
-  override def cancelled(taskId: TaskId, additionalInformation: Optional[TaskExecutionDetails.AdditionalInformation]): Unit =
+  override def cancelled(taskId: TaskId, additionalInformation: Optional[TaskExecutionDetails.AdditionalInformation]): Publisher[Void] =
     eventSourcingSystem.dispatch(Cancel(taskId, additionalInformation.asScala ))
 
-  override def updated(taskId: TaskId, additionalInformation: TaskExecutionDetails.AdditionalInformation): Unit =
+  override def updated(taskId: TaskId, additionalInformation: TaskExecutionDetails.AdditionalInformation): Publisher[Void] =
     eventSourcingSystem.dispatch(UpdateAdditionalInformation(taskId, additionalInformation))
 }
\ No newline at end of file
diff --git a/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java b/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
index a83392f..5c4f23d 100644
--- a/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
+++ b/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
@@ -28,6 +28,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.time.Duration;
@@ -56,6 +57,13 @@ class SerialTaskManagerWorkerTest {
     @BeforeEach
     void beforeEach() {
         listener = mock(TaskManagerWorker.Listener.class);
+        when(listener.started(any())).thenReturn(Mono.empty());
+        when(listener.cancelled(any(), any())).thenReturn(Mono.empty());
+        when(listener.completed(any(), any(), any())).thenReturn(Mono.empty());
+        when(listener.updated(any(), any())).thenReturn(Mono.empty());
+        when(listener.failed(any(), any())).thenReturn(Mono.empty());
+        when(listener.failed(any(), any(), any())).thenReturn(Mono.empty());
+        when(listener.failed(any(), any(), any(), any())).thenReturn(Mono.empty());
         worker = new SerialTaskManagerWorker(listener, UPDATE_INFORMATION_POLLING_DURATION);
     }
 
diff --git a/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java b/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java
index dcc727c..852551c 100644
--- a/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java
+++ b/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java
@@ -41,6 +41,8 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 
+import reactor.core.publisher.Mono;
+
 @ExtendWith(CountDownLatchExtension.class)
 class EventSourcingTaskManagerTest implements TaskManagerContract {
     ConditionFactory CALMLY_AWAIT = Awaitility
@@ -79,7 +81,7 @@ class EventSourcingTaskManagerTest implements TaskManagerContract {
     void createdTaskShouldKeepOriginHostname() {
         TaskId taskId = taskManager.submit(new MemoryReferenceTask(() -> Task.Result.COMPLETED));
         TaskAggregateId aggregateId = new TaskAggregateId(taskId);
-        assertThat(eventStore.getEventsOfAggregate(aggregateId).getEventsJava())
+        assertThat(Mono.from(eventStore.getEventsOfAggregate(aggregateId)).block().getEventsJava())
                 .filteredOn(event -> event instanceof Created)
                 .extracting("hostname")
                 .containsOnly(HOSTNAME);
@@ -91,7 +93,7 @@ class EventSourcingTaskManagerTest implements TaskManagerContract {
         TaskAggregateId aggregateId = new TaskAggregateId(taskId);
 
         CALMLY_AWAIT.untilAsserted(() ->
-            assertThat(eventStore.getEventsOfAggregate(aggregateId).getEventsJava())
+            assertThat(Mono.from(eventStore.getEventsOfAggregate(aggregateId)).block().getEventsJava())
                 .filteredOn(event -> event instanceof Started)
                 .extracting("hostname")
                 .containsOnly(HOSTNAME));
@@ -107,7 +109,7 @@ class EventSourcingTaskManagerTest implements TaskManagerContract {
 
         TaskAggregateId aggregateId = new TaskAggregateId(taskId);
         CALMLY_AWAIT.untilAsserted(() ->
-            assertThat(eventStore.getEventsOfAggregate(aggregateId).getEventsJava())
+            assertThat(Mono.from(eventStore.getEventsOfAggregate(aggregateId)).block().getEventsJava())
                 .filteredOn(event -> event instanceof CancelRequested)
                 .extracting("hostname")
                 .containsOnly(HOSTNAME));


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org