You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2020/11/25 05:19:54 UTC

[james-project] branch master updated (6fab993 -> 315d8cf)

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git.


    from 6fab993  JAMES-3448 Rewrite links to http://james.apache.org/server/3/
     new 0cd7c17  JAMES-3400 Update documentation
     new 12c7105  JAMES-3452 Integration test for EmailSubmission/set create identityId parameter
     new f56aa1a  JAMES-3430 Add a missing singleton binding for CassandraMessageDAOV3
     new dae920e  JAMES-3440 JMAP tasks integration tests on top of the Distributed Server
     new fb26439  [Refactoring] ElasticSearchSearcher: avoid flatMap usage for synchronous transformations
     new 315d8cf  JAMES-3453 Specify explicitly lower safer defaults for Reactor flatMaps, filterWhens

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../cassandra/init/CassandraTableManager.java      |   4 +-
 .../james/backends/es/DeleteByQueryPerformer.java  |   4 +-
 .../mailbox/cassandra/DeleteMessageListener.java   |   5 +-
 .../cassandra/mail/CassandraAttachmentMapper.java  |   3 +-
 .../cassandra/mail/CassandraIndexTableHandler.java |   8 +-
 .../cassandra/mail/CassandraMailboxDAO.java        |   3 +-
 .../cassandra/mail/CassandraMessageIdMapper.java   |  17 +-
 .../cassandra/mail/CassandraMessageMapper.java     |   4 +-
 .../mail/CassandraUserMailboxRightsDAO.java        |   7 +-
 .../task/SolveMailboxInconsistenciesService.java   |  10 +-
 .../search/ElasticSearchSearcher.java              |  11 +-
 .../apache/james/mailbox/events/InVMEventBus.java  |   4 +-
 .../james/mailbox/events/EventDispatcher.java      |   4 +-
 .../james/mailbox/events/GroupRegistration.java    |   2 +-
 .../mailbox/events/KeyRegistrationHandler.java     |   4 +-
 .../inmemory/mail/InMemoryMessageIdMapper.java     |   5 +-
 .../vault/blob/BlobStoreDeletedMessageVault.java   |   3 +-
 .../james/mailbox/store/StoreMailboxManager.java   |   3 +-
 .../james/mailbox/store/StoreMessageIdManager.java |  22 ++-
 .../james/mailbox/store/StoreMessageManager.java   |   3 +-
 .../mailbox/tools/indexer/ReIndexerPerformer.java  |   6 +-
 .../blob/cassandra/CassandraBlobStoreDAO.java      |   4 +-
 .../main/java/org/apache/james/blob/api/Store.java |   4 +-
 .../blob/objectstorage/aws/S3BlobStoreDAO.java     |   8 +-
 .../modules/mailbox/CassandraMailboxModule.java    |   2 +
 .../org/apache/james/PeriodicalHealthChecks.java   |   4 +-
 .../java/org/apache/james/util/ReactorUtils.java   |   4 +-
 .../org/apache/james/util/reactor/Constants.java   |   5 +-
 .../migration/MappingsSourcesMigration.java        |   4 +-
 .../api/projections/MessageFastViewProjection.java |   4 +-
 .../mailets/remote/delivery/DeliveryRunnable.java  |   3 +-
 .../cassandra/CassandraMailRepository.java         |   5 +-
 .../jmap/draft/methods/GetMailboxesMethod.java     |   3 +-
 .../model/message/view/MessageViewFactory.java     |   6 +-
 .../ComputeMessageFastViewProjectionListener.java  |   6 +-
 .../jmap/http/DefaultMailboxesProvisioner.java     |   3 +-
 .../EmailSubmissionSetMethodContract.scala         |  55 ++++++
 .../james/jmap/mail/EmailSubmissionSet.scala       |   2 +-
 server/protocols/webadmin-cli/README.md            | 219 ++++++++++++++++++++-
 ...dminServerTaskSerializationIntegrationTest.java |  58 ++++++
 .../james/webadmin/routes/HealthCheckRoutes.java   |   4 +-
 .../data/jmap/EmailQueryViewPopulator.java         |   7 +-
 .../jmap/MessageFastViewProjectionCorrector.java   |   5 +-
 .../webadmin/vault/routes/RestoreService.java      |   3 +-
 .../service/EventDeadLettersRedeliverService.java  |   4 +-
 .../james/webadmin/service/EventRetriever.java     |   6 +-
 .../james/queue/memory/MemoryMailQueueFactory.java |   4 +-
 .../james/queue/rabbitmq/RabbitMQMailQueue.java    |   4 +-
 .../view/cassandra/CassandraMailQueueBrowser.java  |   5 +-
 .../apache/james/task/SerialTaskManagerWorker.java |   3 +-
 50 files changed, 489 insertions(+), 87 deletions(-)
 copy json/src/test/java/org/apache/james/dto/BaseType.java => server/container/util/src/main/java/org/apache/james/util/reactor/Constants.java (92%)


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 04/06: JAMES-3440 JMAP tasks integration tests on top of the Distributed Server

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit dae920e00882ca8acaffacfb362402ad0a3f524c
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue Nov 24 10:50:40 2020 +0700

    JAMES-3440 JMAP tasks integration tests on top of the Distributed Server
---
 ...dminServerTaskSerializationIntegrationTest.java | 58 ++++++++++++++++++++++
 .../data/jmap/EmailQueryViewPopulator.java         |  7 ++-
 .../jmap/MessageFastViewProjectionCorrector.java   |  5 +-
 3 files changed, 67 insertions(+), 3 deletions(-)

diff --git a/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQWebAdminServerTaskSerializationIntegrationTest.java b/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQWebAdminServerTaskSerializationIntegrationTest.java
index 84b1c7e..cd130fe 100644
--- a/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQWebAdminServerTaskSerializationIntegrationTest.java
+++ b/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQWebAdminServerTaskSerializationIntegrationTest.java
@@ -157,6 +157,64 @@ class RabbitMQWebAdminServerTaskSerializationIntegrationTest {
     }
 
     @Test
+    void recomputeFastViewProjectionItemsShouldComplete(GuiceJamesServer server) throws Exception {
+        server.getProbe(DataProbeImpl.class).addUser(USERNAME, "secret");
+        mailboxProbe.createMailbox(MailboxConstants.USER_NAMESPACE, USERNAME, MailboxConstants.INBOX);
+        mailboxProbe.appendMessage(
+            USERNAME,
+            MailboxPath.inbox(Username.of(USERNAME)),
+            new ByteArrayInputStream("Subject: test\r\n\r\ntestmail".getBytes()),
+            new Date(),
+            false,
+            new Flags());
+
+        String taskId = with()
+            .post("/mailboxes?task=recomputeFastViewProjectionItems")
+            .jsonPath()
+            .get("taskId");
+
+        given()
+            .basePath(TasksRoutes.BASE)
+        .when()
+            .get(taskId + "/await")
+        .then()
+            .body("status", is("completed"))
+            .body("taskId", is(notNullValue()))
+            .body("type", is("RecomputeAllFastViewProjectionItemsTask"))
+            .body("additionalInformation.processedMessageCount", is(1))
+            .body("additionalInformation.failedMessageCount", is(0));
+    }
+
+    @Test
+    void populateEmailQueryViewShouldComplete(GuiceJamesServer server) throws Exception {
+        server.getProbe(DataProbeImpl.class).addUser(USERNAME, "secret");
+        mailboxProbe.createMailbox(MailboxConstants.USER_NAMESPACE, USERNAME, MailboxConstants.INBOX);
+        mailboxProbe.appendMessage(
+            USERNAME,
+            MailboxPath.inbox(Username.of(USERNAME)),
+            new ByteArrayInputStream("Subject: test\r\n\r\ntestmail".getBytes()),
+            new Date(),
+            false,
+            new Flags());
+
+        String taskId = with()
+            .post("/mailboxes?task=populateEmailQueryView")
+            .jsonPath()
+            .get("taskId");
+
+        given()
+            .basePath(TasksRoutes.BASE)
+        .when()
+            .get(taskId + "/await")
+        .then()
+            .body("status", is("completed"))
+            .body("taskId", is(notNullValue()))
+            .body("type", is("PopulateEmailQueryViewTask"))
+            .body("additionalInformation.processedMessageCount", is(1))
+            .body("additionalInformation.failedMessageCount", is(0));
+    }
+
+    @Test
     void deleteMailsFromMailQueueShouldCompleteWhenSenderIsValid() {
         String firstMailQueue = with()
                 .basePath(MailQueueRoutes.BASE_URL)
diff --git a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/EmailQueryViewPopulator.java b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/EmailQueryViewPopulator.java
index 2141fbd..2ea9f16 100644
--- a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/EmailQueryViewPopulator.java
+++ b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/EmailQueryViewPopulator.java
@@ -58,6 +58,7 @@ import com.github.fge.lambdas.Throwing;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
 public class EmailQueryViewPopulator {
     private static final Logger LOGGER = LoggerFactory.getLogger(EmailQueryViewPopulator.class);
@@ -188,12 +189,14 @@ public class EmailQueryViewPopulator {
     }
 
     private Mono<MessageManager> retrieveMailbox(MailboxSession session, MailboxMetaData mailboxMetadata) {
-        return Mono.fromCallable(() -> mailboxManager.getMailbox(mailboxMetadata.getId(), session));
+        return Mono.fromCallable(() -> mailboxManager.getMailbox(mailboxMetadata.getId(), session))
+            .subscribeOn(Schedulers.elastic());
     }
 
     private Flux<MessageResult> listAllMessages(MessageManager messageManager, MailboxSession session) {
         try {
-            return Iterators.toFlux(messageManager.getMessages(MessageRange.all(), FetchGroup.HEADERS, session));
+            return Iterators.toFlux(messageManager.getMessages(MessageRange.all(), FetchGroup.HEADERS, session))
+                .subscribeOn(Schedulers.elastic());
         } catch (MailboxException e) {
             return Flux.error(e);
         }
diff --git a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/MessageFastViewProjectionCorrector.java b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/MessageFastViewProjectionCorrector.java
index ae8df05..da0a6d7 100644
--- a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/MessageFastViewProjectionCorrector.java
+++ b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/MessageFastViewProjectionCorrector.java
@@ -56,6 +56,7 @@ import com.github.fge.lambdas.Throwing;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
 public class MessageFastViewProjectionCorrector {
     private static final Logger LOGGER = LoggerFactory.getLogger(MessageFastViewProjectionCorrector.class);
@@ -210,7 +211,8 @@ public class MessageFastViewProjectionCorrector {
     }
 
     private Mono<MessageManager> retrieveMailbox(MailboxSession session, MailboxMetaData mailboxMetadata) {
-        return Mono.fromCallable(() -> mailboxManager.getMailbox(mailboxMetadata.getId(), session));
+        return Mono.fromCallable(() -> mailboxManager.getMailbox(mailboxMetadata.getId(), session))
+            .subscribeOn(Schedulers.elastic());
     }
 
     private Flux<ComposedMessageIdWithMetaData> listAllMailboxMessages(MessageManager messageManager, MailboxSession session) {
@@ -220,6 +222,7 @@ public class MessageFastViewProjectionCorrector {
     private Mono<MessageResult> retrieveContent(MessageManager messageManager, MailboxSession session, MessageUid uid) {
         try {
             return Iterators.toFlux(messageManager.getMessages(MessageRange.one(uid), FetchGroup.FULL_CONTENT, session))
+                .subscribeOn(Schedulers.elastic())
                 .next();
         } catch (MailboxException e) {
             return Mono.error(e);


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 05/06: [Refactoring] ElasticSearchSearcher: avoid flatMap usage for synchronous transformations

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit fb264392f656a5c77d51208d91e1d2080901bfc8
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue Nov 24 14:30:05 2020 +0700

    [Refactoring] ElasticSearchSearcher: avoid flatMap usage for synchronous transformations
---
 .../mailbox/elasticsearch/search/ElasticSearchSearcher.java   | 11 +++++++----
 1 file changed, 7 insertions(+), 4 deletions(-)

diff --git a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcher.java b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcher.java
index 7dd6509..0e74473 100644
--- a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcher.java
+++ b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcher.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.mailbox.elasticsearch.search;
 
+import static org.apache.james.util.ReactorUtils.publishIfPresent;
+
 import java.util.Collection;
 import java.util.Optional;
 
@@ -81,7 +83,8 @@ public class ElasticSearchSearcher {
         SearchRequest searchRequest = prepareSearch(mailboxIds, query, limit);
         Flux<MessageSearchIndex.SearchResult> pairStream = new ScrolledSearch(client, searchRequest)
             .searchHits()
-            .flatMap(this::extractContentFromHit);
+            .map(this::extractContentFromHit)
+            .handle(publishIfPresent());
 
         return limit.map(pairStream::take)
             .orElse(pairStream);
@@ -123,20 +126,20 @@ public class ElasticSearchSearcher {
             .orElse(size);
     }
 
-    private Flux<MessageSearchIndex.SearchResult> extractContentFromHit(SearchHit hit) {
+    private Optional<MessageSearchIndex.SearchResult> extractContentFromHit(SearchHit hit) {
         DocumentField mailboxId = hit.field(JsonMessageConstants.MAILBOX_ID);
         DocumentField uid = hit.field(JsonMessageConstants.UID);
         Optional<DocumentField> id = retrieveMessageIdField(hit);
         if (mailboxId != null && uid != null) {
             Number uidAsNumber = uid.getValue();
-            return Flux.just(
+            return Optional.of(
                 new MessageSearchIndex.SearchResult(
                     id.map(field -> messageIdFactory.fromString(field.getValue())),
                     mailboxIdFactory.fromString(mailboxId.getValue()),
                     MessageUid.of(uidAsNumber.longValue())));
         } else {
             LOGGER.warn("Can not extract UID, MessageID and/or MailboxId for search result {}", hit.getId());
-            return Flux.empty();
+            return Optional.empty();
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 03/06: JAMES-3430 Add a missing singleton binding for CassandraMessageDAOV3

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit f56aa1a50601dbc774a987cc152c4613a4ca28c7
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue Nov 24 09:28:24 2020 +0700

    JAMES-3430 Add a missing singleton binding for CassandraMessageDAOV3
---
 .../java/org/apache/james/modules/mailbox/CassandraMailboxModule.java   | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraMailboxModule.java b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraMailboxModule.java
index fc8c038..09bf632 100644
--- a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraMailboxModule.java
+++ b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraMailboxModule.java
@@ -56,6 +56,7 @@ import org.apache.james.mailbox.cassandra.mail.CassandraMailboxPathV2DAO;
 import org.apache.james.mailbox.cassandra.mail.CassandraMailboxPathV3DAO;
 import org.apache.james.mailbox.cassandra.mail.CassandraMailboxRecentsDAO;
 import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAO;
+import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAOV3;
 import org.apache.james.mailbox.cassandra.mail.CassandraMessageIdDAO;
 import org.apache.james.mailbox.cassandra.mail.CassandraMessageIdToImapUidDAO;
 import org.apache.james.mailbox.cassandra.mail.CassandraModSeqProvider;
@@ -125,6 +126,7 @@ public class CassandraMailboxModule extends AbstractModule {
         bind(CassandraMailboxPathV3DAO.class).in(Scopes.SINGLETON);
         bind(CassandraMailboxRecentsDAO.class).in(Scopes.SINGLETON);
         bind(CassandraMessageDAO.class).in(Scopes.SINGLETON);
+        bind(CassandraMessageDAOV3.class).in(Scopes.SINGLETON);
         bind(CassandraMessageIdDAO.class).in(Scopes.SINGLETON);
         bind(CassandraMessageIdToImapUidDAO.class).in(Scopes.SINGLETON);
         bind(CassandraUserMailboxRightsDAO.class).in(Scopes.SINGLETON);


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 01/06: JAMES-3400 Update documentation

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 0cd7c177c1a0916a528185751a2ecd8a9aaddecd
Author: quanth <hq...@linagora.com>
AuthorDate: Fri Nov 20 15:56:37 2020 +0700

    JAMES-3400 Update documentation
---
 server/protocols/webadmin-cli/README.md | 219 +++++++++++++++++++++++++++++++-
 1 file changed, 216 insertions(+), 3 deletions(-)

diff --git a/server/protocols/webadmin-cli/README.md b/server/protocols/webadmin-cli/README.md
index ad6b5ee..14a48b7 100644
--- a/server/protocols/webadmin-cli/README.md
+++ b/server/protocols/webadmin-cli/README.md
@@ -30,7 +30,25 @@ $ ./james-cli --url http://127.0.0.1:9999 domain list
 ```
 
 The above command lists all domain names available on domain route at address http://127.0.0.1:9999. 
-It does not require any argument to execute. Options --url are optional. Without it, the default value is http://127.0.0.1:8000.
+It does not require any argument to execute. 
+
+The following options can be used:
+
+--url : Without it, the default value is http://127.0.0.1:8000.
+
+When James server's jwt setting is enabled, jwt options are required:
+
+--jwt-token : pass the jwt token directly as plain text. E.g: 
+```
+$ ./james-cli --url http://127.0.0.1:8000 --jwt-token eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJlc24iLCJhZG1pbiI6dHJ1ZSwiaWF0IjoxNjA0Mjg3OTU3fQ.IY3AWg9lQE4muXg8iRu1nbluTm786_6aLcoEylCGcbtGcEOp81Neani1-_17xGp2aF6kxBJva0_f_ADUplhfirGXwoxM8evcsdmQBhNGDfa-oXT_a-dd6n7MMPweGOFGhjxnTRvNHpyqDcfNuGGGcB8OfzOsN5epWNkhlivk2OSuY8vR8fHB9Es1Oiv1i8D5H93-K5IXParVLpAtj7uLZT9b7G-kjxlQagApH8USx6JlqIjMwhvM--79SrzPKZ2AtM59NfDT4g6GPaMNntQvXr06Xu3Leys97cot2rsXNfXY1OohorXiYiZ66-eMRcQ7pHF-NyS6dlg87rnMQlpKHw [...]
+```
+Commands with valid token with admin=true claim should pass James's authentication.
+
+--jwt-from-file : pass the jwt token through a file (such as token.jwt). E.g:
+```
+$ ./james-cli --url http://127.0.0.1:8000 --jwt-from-file token.jwt domain list
+```
+
 As for other commands, arguments might be required after the sub-command (ACTION such as list, add and remove).
 
 Note: the command line before ENTITY will be documented as {cli}.
@@ -48,6 +66,30 @@ Note: the command line before ENTITY will be documented as {cli}.
    - [Delete a user](#delete-a-user)
    - [Get users list](#get-users-list)
    - [Update a user password](#update-a-user-password)
+- [Manage Mailboxes](#manage-mailboxes)
+   - [Create a mailbox](#create-a-mailbox)
+   - [Test a mailbox existence](#test-a-mailbox-existence)
+   - [Delete a mailbox and its children](#delete-a-mailbox-and-its-children)
+   - [Delete all mailboxes of a user](#delete-all-mailboxes-of-a-user)
+   - [Get mailboxes list](#get-mailboxes-list)
+   - [Reindex a user mailboxes](#reindex-a-user-mailboxes)
+   - [Reindex all users mailboxes](#reindex-all-users-mailboxes)
+- [Manage Domain Mappings](#manage-domain-mappings)
+   - [Listing all domain mappings](#listing-all-domain-mappings)
+   - [Listing all destination domains for a source domain](#listing-all-destination-domains-for-a-source-domain)
+   - [Adding a domain mapping](#adding-a-domain-mapping)
+   - [Removing a domain mapping](#removing-a-domain-mapping)
+- [Manage Regex Mappings](#manage-regex-mappings)
+   - [Adding a regex mapping](#adding-a-regex-mapping)
+   - [Removing a regex mapping](#removing-a-regex-mapping)
+- [Manage Address Mappings](#manage-address-mappings)
+   - [Add an address mapping](#add-an-address-mapping)
+   - [Remove an address mapping](#remove-an-address-mapping)
+- [Manage User Mappings](#manage-user-mappings)
+  - [Listing User Mappings](#listing-user-mappings)
+- [Manage Users Quotas](#manage-users-quotas)
+- [Manage Domains Quotas](#manage-domains-quotas)
+- [Manage Global Quotas](#manage-global-quotas)
 
 ## Manage Domains
 
@@ -91,7 +133,11 @@ Show all domains' name on the list.
 
 Add a user to the user list.
 ```
-{cli} user create <username> <password>
+{cli} user create <username> --password
+```
+Then the Command Line will prompt users to enter password (password will not be printed on the screen for security):
+```
+Enter value for --password (Password):
 ```
 Resource name <username> representing valid users, hence it should match the criteria at [User Repositories documentation](https://james.apache.org/server/config-users.html)
 
@@ -123,4 +169,171 @@ Show all users' name on the list.
 ### Update a user password
 Same as Create, but a user need to exist.
 
-If the user do not exist, then it will be created.
\ No newline at end of file
+If the user do not exist, then it will be created.
+
+## Manage Mailboxes
+
+### Create a mailbox
+Create a specific mailbox for a user:
+
+```
+{cli} mailbox create <username> <mailboxName>
+```
+
+Resource name username should be an existing user.
+
+Resource name mailboxName should not be empty, nor contain # & % * characters.
+
+To create nested mailboxes, for instance a work mailbox inside the INBOX mailbox, people should use the . separator. E.g: INBOX.work
+
+### Test a mailbox existence
+
+```
+{cli} mailbox exist <usernameToBeUsed> <mailboxNameToBeTested>
+```
+
+Resource name usernameToBeUsed should be an existing user 
+
+Resource name mailboxNameToBeTested should not be empty
+
+### Delete a mailbox and its children
+
+```
+{cli} mailbox delete <usernameToBeUsed> <mailboxNameToBeDeleted>
+```
+
+Resource name usernameToBeUsed should be an existing user 
+
+Resource name mailboxNameToBeDeleted should not be empty
+
+### Delete all mailboxes of a user
+
+```
+{cli} mailbox deleteAll <usernameToBeUsed>
+```
+
+Resource name usernameToBeUsed should be an existing user
+
+### Get mailboxes list
+
+```
+{cli} mailbox list <usernameToBeUsed>
+```
+
+Resource name usernameToBeUsed should be an existing user
+
+### Reindex a user mailboxes
+
+```
+{cli} mailbox reindex <usernameToBeUsed>
+```
+
+Will schedule a task for reIndexing all the mails in “usernameToBeUsed” mailboxes.
+
+Resource name usernameToBeUsed should be an existing user
+
+### Reindex all users mailboxes
+
+```
+{cli} mailbox reindexAll
+```
+
+Will schedule a task for reIndexing all the mails stored on this James server.
+
+## Manage Domain Mappings
+
+Given a configured source (from) domain and a destination (to) domain, when an email is sent to an address belonging to the source domain, then the domain part of this address is overwritten, the destination domain is then used. A source (from) domain can have many destination (to) domains.
+
+For example: with a source domain james.apache.org maps to two destination domains james.org and apache-james.org, when a mail is sent to admin@james.apache.org, then it will be routed to admin@james.org and admin@apache-james.org
+
+This feature uses [Recipients rewrite table](https://james.apache.org/server/config-recipientrewritetable.html) and requires the [RecipientRewriteTable](https://github.com/apache/james-project/blob/master/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RecipientRewriteTable.java) mailet to be configured.
+
+Note that email addresses are restricted to ASCII character set. Mail addresses not matching this criteria will be rejected.
+
+### Listing all domain mappings
+
+Show all configured domain mappings
+```
+{cli} domain-mapping listAll
+```
+
+### Listing all destination domains for a source domain
+
+With sourceDomain.tld as the value passed to fromDomain resource name, this command will show all destination domains configured to that domain
+```
+{cli} domain-mapping list <sourceDomain.tld>
+```
+
+### Adding a domain mapping
+
+With sourceDomain.tld as the value passed to fromDomain resource name, this command will add a destination domain to that source domain
+
+```
+{cli} domain-mapping create <sourceDomain.tld> <destination.tld>
+```
+
+### Removing a domain mapping
+
+With sourceDomain.tld as the value passed to fromDomain resource name, this command will remove a destination domain mapped to that domain
+```
+{cli} domain-mapping delete <sourceDomain.tld> <destination.tld>
+```
+
+## Manage Regex Mappings
+
+A regex mapping contains a mapping source and a Java Regular Expression (regex) in String as the mapping value. Everytime, if a mail containing a recipient matched with the mapping source, then that mail will be re-routed to a new recipient address which is re written by the regex.
+
+This feature uses [Recipients rewrite table](https://james.apache.org/server/config-recipientrewritetable.html) and requires the [RecipientRewriteTable API](https://github.com/apache/james-project/blob/master/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RecipientRewriteTable.java) to be configured.
+
+### Adding a regex mapping
+
+The command will add a regex mapping made from mappingSource and regex to RecipientRewriteTable.
+
+```
+{cli} regex-mapping create <mappingSource> <regex>
+```
+Where:
+
+- mappingSource represents for the Regex Mapping mapping source
+- regex represents for the Regex Mapping regex
+
+
+### Removing a regex mapping
+
+The command will remove the regex mapping made from regex and the mapping source mappingSource out of RecipientRewriteTable.
+```
+{cli} regex-mapping delete <mappingSource> <regex>
+```
+Where:
+
+- the mappingSource represents the Regex Mapping mapping source
+- the regex represents the Regex Mapping regex
+
+## Manage Address Mappings
+
+When a specific email is sent to the mappingSource mail address, every destination addresses will receive it.
+
+### Add an address mapping
+
+This command will add an address mapping to the Recipients rewrite table
+```
+{cli} address-mapping create <mappingSource> <destinationAddress>
+```
+Type of both parameters are Address.
+
+### Remove an address mapping
+
+Remove an address mapping from the Recipients rewrite table
+```
+{cli} address-mapping delete <mappingSource> <destinationAddress>
+```
+Type of both parameters are Address.
+
+## Manage User Mappings
+
+### Listing User Mappings
+
+Receiving all mappings of a corresponding user.
+```
+{cli} user-mapping list <userAddress>
+```
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 02/06: JAMES-3452 Integration test for EmailSubmission/set create identityId parameter

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 12c7105a44d45d87233f189fa3762ce5dad003f2
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Mon Nov 23 17:47:28 2020 +0700

    JAMES-3452 Integration test for EmailSubmission/set create identityId parameter
---
 .../EmailSubmissionSetMethodContract.scala         | 55 ++++++++++++++++++++++
 .../james/jmap/mail/EmailSubmissionSet.scala       |  2 +-
 2 files changed, 56 insertions(+), 1 deletion(-)

diff --git a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/EmailSubmissionSetMethodContract.scala b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/EmailSubmissionSetMethodContract.scala
index 8c7fd45..5f359dc 100644
--- a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/EmailSubmissionSetMethodContract.scala
+++ b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/EmailSubmissionSetMethodContract.scala
@@ -692,6 +692,61 @@ trait EmailSubmissionSetMethodContract {
   }
 
   @Test
+  def emailSubmissionSetCreateShouldAcceptIdentityId(server: GuiceJamesServer): Unit = {
+    val message: Message = Message.Builder
+      .of
+      .setSubject("test")
+      .setSender(BOB.asString)
+      .setFrom(BOB.asString)
+      .setTo(ANDRE.asString)
+      .setBody("testmail", StandardCharsets.UTF_8)
+      .build
+
+    val bobDraftsPath = MailboxPath.forUser(BOB, DefaultMailboxes.DRAFTS)
+    server.getProbe(classOf[MailboxProbeImpl]).createMailbox(bobDraftsPath)
+    val messageId: MessageId = server.getProbe(classOf[MailboxProbeImpl]).appendMessage(BOB.asString(), bobDraftsPath, AppendCommand.builder()
+      .build(message))
+      .getMessageId
+
+    val requestBob =
+      s"""{
+         |  "using": ["urn:ietf:params:jmap:core", "urn:ietf:params:jmap:mail", "urn:ietf:params:jmap:submission"],
+         |  "methodCalls": [
+         |     ["EmailSubmission/set", {
+         |       "accountId": "$ACCOUNT_ID",
+         |       "create": {
+         |         "k1490": {
+         |           "emailId": "${messageId.serialize}",
+         |           "identityId": "29883977c13473ae7cb7678ef767cbfbaffc8a44a6e463d971d23a65c1dc4af6",
+         |           "envelope": {
+         |             "mailFrom": {"email": "${BOB.asString}"},
+         |             "rcptTo": [{"email": "${ANDRE.asString}"}]
+         |           }
+         |         }
+         |    }
+         |  }, "c1"]]
+         |}""".stripMargin
+
+    val response = `given`
+      .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+      .body(requestBob)
+    .when
+      .post
+    .`then`
+      .statusCode(SC_OK)
+      .contentType(JSON)
+      .extract
+      .body
+      .asString
+
+    assertThatJson(response)
+      // Ids are randomly generated, and not stored, let's ignore it
+      .whenIgnoringPaths("methodResponses[0][1].created.k1490")
+      .inPath("methodResponses[0][1].created")
+      .isEqualTo("""{"k1490": {}}""")
+  }
+
+  @Test
   def onSuccessUpdateEmailShouldTriggerAnImplicitEmailSetCall(server: GuiceJamesServer): Unit = {
     val message: Message = Message.Builder
       .of
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/mail/EmailSubmissionSet.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/mail/EmailSubmissionSet.scala
index 6316818..88a81fb 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/mail/EmailSubmissionSet.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/mail/EmailSubmissionSet.scala
@@ -68,7 +68,7 @@ case class EmailSubmissionAddress(email: MailAddress)
 case class Envelope(mailFrom: EmailSubmissionAddress, rcptTo: List[EmailSubmissionAddress])
 
 object EmailSubmissionCreationRequest {
-  private val assignableProperties = Set("emailId", "envelope", "onSuccessUpdateEmail")
+  private val assignableProperties = Set("emailId", "envelope", "identityId", "onSuccessUpdateEmail")
 
   def validateProperties(jsObject: JsObject): Either[EmailSubmissionCreationParseException, JsObject] =
     jsObject.keys.diff(assignableProperties) match {


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 06/06: JAMES-3453 Specify explicitly lower safer defaults for Reactor flatMaps, filterWhens

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 315d8cf3f168ddaaa92f03e7e036a2a36365860b
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue Nov 24 14:59:35 2020 +0700

    JAMES-3453 Specify explicitly lower safer defaults for Reactor flatMaps, filterWhens
    
     - Use 16 by default.
     - Use Queues.SMALL_BUFFER where relevant (other mechanisms prevents unbounded flow)
     - Use EventBus execution rate where relevant
---
 .../cassandra/init/CassandraTableManager.java      |  4 +++-
 .../james/backends/es/DeleteByQueryPerformer.java  |  4 +++-
 .../mailbox/cassandra/DeleteMessageListener.java   |  5 +++--
 .../cassandra/mail/CassandraAttachmentMapper.java  |  3 ++-
 .../cassandra/mail/CassandraIndexTableHandler.java |  8 +++++---
 .../cassandra/mail/CassandraMailboxDAO.java        |  3 ++-
 .../cassandra/mail/CassandraMessageIdMapper.java   | 17 +++++++++------
 .../cassandra/mail/CassandraMessageMapper.java     |  4 +++-
 .../mail/CassandraUserMailboxRightsDAO.java        |  7 +++++--
 .../task/SolveMailboxInconsistenciesService.java   | 10 +++++----
 .../apache/james/mailbox/events/InVMEventBus.java  |  4 ++--
 .../james/mailbox/events/EventDispatcher.java      |  4 ++--
 .../james/mailbox/events/GroupRegistration.java    |  2 +-
 .../mailbox/events/KeyRegistrationHandler.java     |  4 ++--
 .../inmemory/mail/InMemoryMessageIdMapper.java     |  5 +++--
 .../vault/blob/BlobStoreDeletedMessageVault.java   |  3 ++-
 .../james/mailbox/store/StoreMailboxManager.java   |  3 ++-
 .../james/mailbox/store/StoreMessageIdManager.java | 22 +++++++++++---------
 .../james/mailbox/store/StoreMessageManager.java   |  3 ++-
 .../mailbox/tools/indexer/ReIndexerPerformer.java  |  6 ++++--
 .../blob/cassandra/CassandraBlobStoreDAO.java      |  4 +++-
 .../main/java/org/apache/james/blob/api/Store.java |  4 +++-
 .../blob/objectstorage/aws/S3BlobStoreDAO.java     |  8 +++++---
 .../org/apache/james/PeriodicalHealthChecks.java   |  4 +++-
 .../java/org/apache/james/util/ReactorUtils.java   |  4 +++-
 .../org/apache/james/util/reactor/Constants.java   | 24 ++++++++++++++++++++++
 .../migration/MappingsSourcesMigration.java        |  4 +++-
 .../api/projections/MessageFastViewProjection.java |  4 +++-
 .../mailets/remote/delivery/DeliveryRunnable.java  |  3 ++-
 .../cassandra/CassandraMailRepository.java         |  5 +++--
 .../jmap/draft/methods/GetMailboxesMethod.java     |  3 ++-
 .../model/message/view/MessageViewFactory.java     |  6 ++++--
 .../ComputeMessageFastViewProjectionListener.java  |  6 ++++--
 .../jmap/http/DefaultMailboxesProvisioner.java     |  3 ++-
 .../james/webadmin/routes/HealthCheckRoutes.java   |  4 +++-
 .../webadmin/vault/routes/RestoreService.java      |  3 ++-
 .../service/EventDeadLettersRedeliverService.java  |  4 +++-
 .../james/webadmin/service/EventRetriever.java     |  6 ++++--
 .../james/queue/memory/MemoryMailQueueFactory.java |  4 +++-
 .../james/queue/rabbitmq/RabbitMQMailQueue.java    |  4 +++-
 .../view/cassandra/CassandraMailQueueBrowser.java  |  5 +++--
 .../apache/james/task/SerialTaskManagerWorker.java |  3 ++-
 42 files changed, 162 insertions(+), 74 deletions(-)

diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/CassandraTableManager.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/CassandraTableManager.java
index bf72196..41069fb 100644
--- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/CassandraTableManager.java
+++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/CassandraTableManager.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.backends.cassandra.init;
 
+import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
+
 import java.util.function.Predicate;
 
 import javax.inject.Inject;
@@ -65,7 +67,7 @@ public class CassandraTableManager {
         Flux.fromIterable(module.moduleTables())
                 .publishOn(Schedulers.elastic())
                 .map(CassandraTable::getName)
-                .flatMap(name -> truncate(executor, name))
+                .flatMap(name -> truncate(executor, name), DEFAULT_CONCURRENCY)
                 .then()
                 .block();
     }
diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/DeleteByQueryPerformer.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/DeleteByQueryPerformer.java
index 5277694..547ecb6 100644
--- a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/DeleteByQueryPerformer.java
+++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/DeleteByQueryPerformer.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.backends.es;
 
+import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
+
 import org.apache.james.backends.es.search.ScrolledSearch;
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkResponse;
@@ -54,7 +56,7 @@ public class DeleteByQueryPerformer {
 
         return new ScrolledSearch(client, searchRequest).searchResponses()
             .filter(searchResponse -> searchResponse.getHits().getHits().length > 0)
-            .flatMap(searchResponse -> deleteRetrievedIds(searchResponse, routingKey))
+            .flatMap(searchResponse -> deleteRetrievedIds(searchResponse, routingKey), DEFAULT_CONCURRENCY)
             .then();
     }
 
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java
index fc37174..7412ba4 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java
@@ -20,6 +20,7 @@
 package org.apache.james.mailbox.cassandra;
 
 import static org.apache.james.util.FunctionalUtils.negate;
+import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
 
 import java.util.Optional;
 import java.util.function.Predicate;
@@ -213,8 +214,8 @@ public class DeleteMessageListener implements MailboxListener.GroupMailboxListen
 
     private Mono<Void> deleteUnreferencedAttachments(MessageRepresentation message) {
         return Flux.fromIterable(message.getAttachments())
-            .filterWhen(attachment -> ownerDAO.retrieveOwners(attachment.getAttachmentId()).hasElements().map(negate()))
-            .filterWhen(attachment -> hasOtherMessagesReferences(message, attachment))
+            .filterWhen(attachment -> ownerDAO.retrieveOwners(attachment.getAttachmentId()).hasElements().map(negate()), DEFAULT_CONCURRENCY)
+            .filterWhen(attachment -> hasOtherMessagesReferences(message, attachment), DEFAULT_CONCURRENCY)
             .concatMap(attachment -> attachmentDAO.getAttachment(attachment.getAttachmentId())
                 .map(CassandraAttachmentDAOV2.DAOAttachment::getBlobId)
                 .flatMap(blobId -> Mono.from(blobStore.delete(blobStore.getDefaultBucketName(), blobId)))
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
index 9db614a..bc2cb99 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
@@ -20,6 +20,7 @@
 package org.apache.james.mailbox.cassandra.mail;
 
 import static org.apache.james.blob.api.BlobStore.StoragePolicy.LOW_COST;
+import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -88,7 +89,7 @@ public class CassandraAttachmentMapper implements AttachmentMapper {
     public List<AttachmentMetadata> getAttachments(Collection<AttachmentId> attachmentIds) {
         Preconditions.checkArgument(attachmentIds != null);
         return Flux.fromIterable(attachmentIds)
-            .flatMap(this::getAttachmentsAsMono)
+            .flatMap(this::getAttachmentsAsMono, DEFAULT_CONCURRENCY)
             .collect(Guavate.toImmutableList())
             .block();
     }
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java
index c98112e..6894e70 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.mailbox.cassandra.mail;
 
+import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
+
 import java.util.Collection;
 import java.util.List;
 
@@ -77,11 +79,11 @@ public class CassandraIndexTableHandler {
     public Mono<Void> updateIndexOnDelete(CassandraId mailboxId, Collection<MessageMetaData> metaData) {
         return Flux.mergeDelayError(Queues.XS_BUFFER_SIZE,
                 Flux.fromIterable(metaData)
-                    .flatMap(message -> updateFirstUnseenOnDelete(mailboxId, message.getFlags(), message.getUid())),
+                    .flatMap(message -> updateFirstUnseenOnDelete(mailboxId, message.getFlags(), message.getUid()), DEFAULT_CONCURRENCY),
                 Flux.fromIterable(metaData)
-                    .flatMap(message -> updateRecentOnDelete(mailboxId, message.getUid(), message.getFlags())),
+                    .flatMap(message -> updateRecentOnDelete(mailboxId, message.getUid(), message.getFlags()), DEFAULT_CONCURRENCY),
                 Flux.fromIterable(metaData)
-                    .flatMap(message -> updateDeletedMessageProjectionOnDelete(mailboxId, message.getUid(), message.getFlags())),
+                    .flatMap(message -> updateDeletedMessageProjectionOnDelete(mailboxId, message.getUid(), message.getFlags()), DEFAULT_CONCURRENCY),
                 decrementCountersOnDelete(mailboxId, metaData))
             .then();
     }
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java
index 92b0142..21ccd66 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java
@@ -31,6 +31,7 @@ import static org.apache.james.mailbox.cassandra.table.CassandraMailboxTable.MAI
 import static org.apache.james.mailbox.cassandra.table.CassandraMailboxTable.NAME;
 import static org.apache.james.mailbox.cassandra.table.CassandraMailboxTable.TABLE_NAME;
 import static org.apache.james.mailbox.cassandra.table.CassandraMailboxTable.UIDVALIDITY;
+import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
 
 import javax.inject.Inject;
 
@@ -189,7 +190,7 @@ public class CassandraMailboxDAO {
     public Flux<Mailbox> retrieveAllMailboxes() {
         return executor.execute(listStatement.bind())
             .flatMapMany(cassandraUtils::convertToFlux)
-            .flatMap(this::toMailboxWithId);
+            .flatMap(this::toMailboxWithId, DEFAULT_CONCURRENCY);
     }
 
     private Mono<Mailbox> toMailboxWithId(Row row) {
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
index e9cc5eb..c9925b9 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
@@ -18,6 +18,8 @@
  ****************************************************************/
 package org.apache.james.mailbox.cassandra.mail;
 
+import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
+
 import java.time.Duration;
 import java.util.Collection;
 import java.util.List;
@@ -47,6 +49,7 @@ import org.apache.james.mailbox.store.mail.MessageMapper.FetchType;
 import org.apache.james.mailbox.store.mail.ModSeqProvider;
 import org.apache.james.mailbox.store.mail.model.MailboxMessage;
 import org.apache.james.util.FunctionalUtils;
+import org.apache.james.util.ReactorUtils;
 import org.reactivestreams.Publisher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -111,7 +114,7 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
                 .map(messageRepresentation -> Pair.of(composedMessageId, messageRepresentation)), cassandraConfiguration.getMessageReadChunkSize())
             .flatMap(messageRepresentation -> attachmentLoader.addAttachmentToMessage(messageRepresentation, fetchType), cassandraConfiguration.getMessageReadChunkSize())
             .groupBy(MailboxMessage::getMailboxId)
-            .flatMap(this::keepMessageIfMailboxExists);
+            .flatMap(this::keepMessageIfMailboxExists, ReactorUtils.DEFAULT_CONCURRENCY);
     }
 
     @Override
@@ -188,7 +191,8 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
     public Mono<Void> deleteReactive(MessageId messageId, Collection<MailboxId> mailboxIds) {
         CassandraMessageId cassandraMessageId = (CassandraMessageId) messageId;
         return Flux.fromStream(mailboxIds.stream())
-            .flatMap(mailboxId -> retrieveAndDeleteIndices(cassandraMessageId, Optional.of((CassandraId) mailboxId)))
+            .flatMap(mailboxId -> retrieveAndDeleteIndices(cassandraMessageId, Optional.of((CassandraId) mailboxId)),
+                DEFAULT_CONCURRENCY)
             .then();
     }
 
@@ -197,14 +201,15 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
         Flux.fromIterable(ids.asMap()
             .entrySet())
             .publishOn(Schedulers.elastic())
-            .flatMap(entry -> deleteReactive(entry.getKey(), entry.getValue()), cassandraConfiguration.getExpungeChunkSize())
+            .flatMap(entry -> deleteReactive(entry.getKey(), entry.getValue()), cassandraConfiguration.getExpungeChunkSize(),
+                DEFAULT_CONCURRENCY)
             .then()
             .block();
     }
 
     private Mono<Void> retrieveAndDeleteIndices(CassandraMessageId messageId, Optional<CassandraId> mailboxId) {
         return imapUidDAO.retrieve(messageId, mailboxId)
-            .flatMap(this::deleteIds)
+            .flatMap(this::deleteIds, ReactorUtils.DEFAULT_CONCURRENCY)
             .then();
     }
 
@@ -230,7 +235,7 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
             .distinct()
             .map(mailboxId -> (CassandraId) mailboxId)
             .concatMap(mailboxId -> flagsUpdateWithRetry(newState, updateMode, mailboxId, messageId))
-            .flatMap(this::updateCounts)
+            .flatMap(this::updateCounts, ReactorUtils.DEFAULT_CONCURRENCY)
             .collect(Guavate.toImmutableListMultimap(Pair::getLeft, Pair::getRight))
             .block();
     }
@@ -266,7 +271,7 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
     private Mono<List<Pair<Flags, ComposedMessageIdWithMetaData>>> updateFlags(MailboxId mailboxId, MessageId messageId, Flags newState, MessageManager.FlagsUpdateMode updateMode) {
         CassandraId cassandraId = (CassandraId) mailboxId;
         return imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.of(cassandraId))
-            .flatMap(oldComposedId -> updateFlags(newState, updateMode, cassandraId, oldComposedId))
+            .flatMap(oldComposedId -> updateFlags(newState, updateMode, cassandraId, oldComposedId), ReactorUtils.DEFAULT_CONCURRENCY)
             .switchIfEmpty(Mono.error(MailboxDeleteDuringUpdateException::new))
             .collectList();
     }
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index 84841ac..a527a6d 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.mailbox.cassandra.mail;
 
+import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
+
 import java.security.SecureRandom;
 import java.time.Duration;
 import java.util.Comparator;
@@ -386,7 +388,7 @@ public class CassandraMessageMapper implements MessageMapper {
     private Mono<FlagsUpdateStageResult> retryUpdatesStage(CassandraId mailboxId, FlagsUpdateCalculator flagsUpdateCalculator, List<ComposedMessageId> failed) {
         if (!failed.isEmpty()) {
             Flux<ComposedMessageIdWithMetaData> toUpdate = Flux.fromIterable(failed)
-                .flatMap(ids -> imapUidDAO.retrieve((CassandraMessageId) ids.getMessageId(), Optional.of((CassandraId) ids.getMailboxId())));
+                .flatMap(ids -> imapUidDAO.retrieve((CassandraMessageId) ids.getMessageId(), Optional.of((CassandraId) ids.getMailboxId())), DEFAULT_CONCURRENCY);
             return runUpdateStage(mailboxId, toUpdate, flagsUpdateCalculator);
         } else {
             return Mono.empty();
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUserMailboxRightsDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUserMailboxRightsDAO.java
index 7586c96..33d6ec1 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUserMailboxRightsDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUserMailboxRightsDAO.java
@@ -26,6 +26,7 @@ import static org.apache.james.mailbox.cassandra.table.CassandraUserMailboxRight
 import static org.apache.james.mailbox.cassandra.table.CassandraUserMailboxRightsTable.RIGHTS;
 import static org.apache.james.mailbox.cassandra.table.CassandraUserMailboxRightsTable.TABLE_NAME;
 import static org.apache.james.mailbox.cassandra.table.CassandraUserMailboxRightsTable.USER_NAME;
+import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
 
 import java.util.Optional;
 import java.util.stream.Stream;
@@ -112,7 +113,8 @@ public class CassandraUserMailboxRightsDAO {
             .flatMap(entry -> cassandraAsyncExecutor.executeVoid(
                 delete.bind()
                     .setString(USER_NAME, entry.getKey().getName())
-                    .setUUID(MAILBOX_ID, cassandraId.asUuid())));
+                    .setUUID(MAILBOX_ID, cassandraId.asUuid())),
+                DEFAULT_CONCURRENCY);
     }
 
     private Flux<Void> addAll(CassandraId cassandraId, Stream<MailboxACL.Entry> addedEntries) {
@@ -121,7 +123,8 @@ public class CassandraUserMailboxRightsDAO {
                 insert.bind()
                     .setString(USER_NAME, entry.getKey().getName())
                     .setUUID(MAILBOX_ID, cassandraId.asUuid())
-                    .setString(RIGHTS, entry.getValue().serialize())));
+                    .setString(RIGHTS, entry.getValue().serialize())),
+                DEFAULT_CONCURRENCY);
     }
 
     public Mono<Optional<Rfc4314Rights>> retrieve(Username userName, CassandraId mailboxId) {
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesService.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesService.java
index 8266ceb..cef2c29 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesService.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesService.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.mailbox.cassandra.mail.task;
 
+import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
+
 import java.util.Collection;
 import java.util.Objects;
 import java.util.Optional;
@@ -408,15 +410,15 @@ public class SolveMailboxInconsistenciesService {
 
     private Flux<Result> processMailboxPathDaoInconsistencies(Context context) {
         return mailboxPathV3DAO.listAll()
-            .flatMap(this::detectMailboxPathDaoInconsistency)
-            .flatMap(inconsistency -> inconsistency.fix(context, mailboxDAO, mailboxPathV3DAO))
+            .flatMap(this::detectMailboxPathDaoInconsistency, DEFAULT_CONCURRENCY)
+            .flatMap(inconsistency -> inconsistency.fix(context, mailboxDAO, mailboxPathV3DAO), DEFAULT_CONCURRENCY)
             .doOnNext(any -> context.incrementProcessedMailboxPathEntries());
     }
 
     private Flux<Result> processMailboxDaoInconsistencies(Context context) {
         return mailboxDAO.retrieveAllMailboxes()
-            .flatMap(this::detectMailboxDaoInconsistency)
-            .flatMap(inconsistency -> inconsistency.fix(context, mailboxDAO, mailboxPathV3DAO))
+            .flatMap(this::detectMailboxDaoInconsistency, DEFAULT_CONCURRENCY)
+            .flatMap(inconsistency -> inconsistency.fix(context, mailboxDAO, mailboxPathV3DAO), DEFAULT_CONCURRENCY)
             .doOnNext(any -> context.incrementProcessedMailboxEntries());
     }
 
diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/InVMEventBus.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/InVMEventBus.java
index ab7cb2f..6e73b0f 100644
--- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/InVMEventBus.java
+++ b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/InVMEventBus.java
@@ -94,13 +94,13 @@ public class InVMEventBus implements EventBus {
 
     private Mono<Void> keyDeliveries(Event event, Set<RegistrationKey> keys) {
         return Flux.fromIterable(registeredListenersByKeys(keys))
-            .flatMap(listener -> eventDelivery.deliver(listener, event, EventDelivery.DeliveryOption.none()))
+            .flatMap(listener -> eventDelivery.deliver(listener, event, EventDelivery.DeliveryOption.none()), EventBus.EXECUTION_RATE)
             .then();
     }
 
     private Mono<Void> groupDeliveries(Event event) {
         return Flux.fromIterable(groups.entrySet())
-            .flatMap(entry -> groupDelivery(event, entry.getValue(), entry.getKey()))
+            .flatMap(entry -> groupDelivery(event, entry.getValue(), entry.getKey()), EventBus.EXECUTION_RATE)
             .then();
     }
 
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
index 999dc40..af13994 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
@@ -124,9 +124,9 @@ public class EventDispatcher {
     private Mono<Void> dispatchToLocalListeners(Event event, Set<RegistrationKey> keys) {
         return Flux.fromIterable(keys)
             .flatMap(key -> localListenerRegistry.getLocalMailboxListeners(key)
-                .map(listener -> Tuples.of(key, listener)))
+                .map(listener -> Tuples.of(key, listener)), EventBus.EXECUTION_RATE)
             .filter(pair -> pair.getT2().getExecutionMode() == MailboxListener.ExecutionMode.SYNCHRONOUS)
-            .flatMap(pair -> executeListener(event, pair.getT2(), pair.getT1()))
+            .flatMap(pair -> executeListener(event, pair.getT2(), pair.getT1()), EventBus.EXECUTION_RATE)
             .then();
     }
 
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
index 99c5cb9..7d80a82 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
@@ -138,7 +138,7 @@ class GroupRegistration implements Registration {
         return receiver.consumeManualAck(queueName.asString(), new ConsumeOptions().qos(EventBus.EXECUTION_RATE))
             .publishOn(Schedulers.parallel())
             .filter(delivery -> Objects.nonNull(delivery.getBody()))
-            .flatMap(this::deliver)
+            .flatMap(this::deliver, EventBus.EXECUTION_RATE)
             .subscribe();
     }
 
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
index 6f3ddc6..a8898c9 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
@@ -97,7 +97,7 @@ class KeyRegistrationHandler {
 
         receiverSubscriber = Optional.of(receiver.consumeAutoAck(registrationQueue.asString(), new ConsumeOptions().qos(EventBus.EXECUTION_RATE))
             .subscribeOn(Schedulers.parallel())
-            .flatMap(this::handleDelivery)
+            .flatMap(this::handleDelivery, EventBus.EXECUTION_RATE)
             .subscribe());
     }
 
@@ -169,7 +169,7 @@ class KeyRegistrationHandler {
 
         return localListenerRegistry.getLocalMailboxListeners(registrationKey)
             .filter(listener -> !isLocalSynchronousListeners(eventBusId, listener))
-            .flatMap(listener -> executeListener(listener, event, registrationKey))
+            .flatMap(listener -> executeListener(listener, event, registrationKey), EventBus.EXECUTION_RATE)
             .then();
     }
 
diff --git a/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java b/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java
index f01a0e2..521cb96 100644
--- a/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java
+++ b/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java
@@ -20,6 +20,7 @@
 package org.apache.james.mailbox.inmemory.mail;
 
 import static org.apache.james.mailbox.store.mail.AbstractMessageMapper.UNLIMITED;
+import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
 
 import java.util.Collection;
 import java.util.List;
@@ -71,7 +72,7 @@ public class InMemoryMessageIdMapper implements MessageIdMapper {
     @Override
     public Publisher<ComposedMessageIdWithMetaData> findMetadata(MessageId messageId) {
         return mailboxMapper.list()
-            .flatMap(mailbox -> messageMapper.findInMailboxReactive(mailbox, MessageRange.all(), MessageMapper.FetchType.Full, UNLIMITED))
+            .flatMap(mailbox -> messageMapper.findInMailboxReactive(mailbox, MessageRange.all(), MessageMapper.FetchType.Full, UNLIMITED), DEFAULT_CONCURRENCY)
             .map(message -> new ComposedMessageIdWithMetaData(
                 new ComposedMessageId(
                     message.getMailboxId(),
@@ -84,7 +85,7 @@ public class InMemoryMessageIdMapper implements MessageIdMapper {
     @Override
     public Flux<MailboxMessage> findReactive(Collection<MessageId> messageIds, MessageMapper.FetchType fetchType) {
         return mailboxMapper.list()
-            .flatMap(mailbox -> messageMapper.findInMailboxReactive(mailbox, MessageRange.all(), fetchType, UNLIMITED))
+            .flatMap(mailbox -> messageMapper.findInMailboxReactive(mailbox, MessageRange.all(), fetchType, UNLIMITED), DEFAULT_CONCURRENCY)
             .filter(message -> messageIds.contains(message.getMessageId()));
     }
 
diff --git a/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/blob/BlobStoreDeletedMessageVault.java b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/blob/BlobStoreDeletedMessageVault.java
index e10cd2b..bfac3aa 100644
--- a/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/blob/BlobStoreDeletedMessageVault.java
+++ b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/blob/BlobStoreDeletedMessageVault.java
@@ -20,6 +20,7 @@
 package org.apache.james.vault.blob;
 
 import static org.apache.james.blob.api.BlobStore.StoragePolicy.LOW_COST;
+import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
 
 import java.io.InputStream;
 import java.time.Clock;
@@ -174,7 +175,7 @@ public class BlobStoreDeletedMessageVault implements DeletedMessageVault {
             metricFactory.decoratePublisherWithTimerMetric(
                 DELETE_EXPIRED_MESSAGES_METRIC_NAME,
                 retentionQualifiedBuckets(beginningOfRetentionPeriod)
-                    .flatMap(bucketName -> deleteBucketData(bucketName).then(Mono.just(bucketName)))));
+                    .flatMap(bucketName -> deleteBucketData(bucketName).then(Mono.just(bucketName)), DEFAULT_CONCURRENCY)));
 
     }
 
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
index 6e075c0..28cf6b8 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
@@ -22,6 +22,7 @@ package org.apache.james.mailbox.store;
 import static org.apache.james.mailbox.store.MailboxReactorUtils.block;
 import static org.apache.james.mailbox.store.MailboxReactorUtils.blockOptional;
 import static org.apache.james.mailbox.store.mail.AbstractMessageMapper.UNLIMITED;
+import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
 
 import java.time.Duration;
 import java.util.ArrayList;
@@ -583,7 +584,7 @@ public class StoreMailboxManager implements MailboxManager {
                                 .build(),
                             new MailboxIdRegistrationKey(sub.getMailboxId())))
                         .then(Mono.fromRunnable(() -> LOGGER.debug("Rename mailbox sub-mailbox {} to {}", subOriginalName, subNewName)));
-                })
+                }, DEFAULT_CONCURRENCY)
                 .then());
 
             return null;
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
index 74aebbb..d98bf70 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.mailbox.store;
 
+import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
+
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
@@ -163,8 +165,8 @@ public class StoreMessageIdManager implements MessageIdManager {
         MessageMapper.FetchType fetchType = FetchGroupConverter.getFetchType(fetchGroup);
         return messageIdMapper.findReactive(messageIds, fetchType)
             .groupBy(MailboxMessage::getMailboxId)
-            .filterWhen(groupedFlux -> hasRightsOnMailboxReactive(mailboxSession, Right.Read).apply(groupedFlux.key()))
-            .flatMap(Function.identity())
+            .filterWhen(groupedFlux -> hasRightsOnMailboxReactive(mailboxSession, Right.Read).apply(groupedFlux.key()), DEFAULT_CONCURRENCY)
+            .flatMap(Function.identity(), DEFAULT_CONCURRENCY)
             .map(Throwing.function(messageResultConverter(fetchGroup)).sneakyThrow());
     }
 
@@ -175,15 +177,15 @@ public class StoreMessageIdManager implements MessageIdManager {
         return Flux.fromIterable(ids)
             .flatMap(id -> Flux.from(messageIdMapper.findMetadata(id)), concurrency)
             .groupBy(metaData -> metaData.getComposedMessageId().getMailboxId())
-            .filterWhen(groupedFlux -> hasRightsOnMailboxReactive(session, Right.Read).apply(groupedFlux.key()))
-            .flatMap(Function.identity());
+            .filterWhen(groupedFlux -> hasRightsOnMailboxReactive(session, Right.Read).apply(groupedFlux.key()), DEFAULT_CONCURRENCY)
+            .flatMap(Function.identity(), DEFAULT_CONCURRENCY);
     }
 
     private ImmutableSet<MailboxId> getAllowedMailboxIds(MailboxSession mailboxSession, List<MailboxMessage> messageList, Right... rights) throws MailboxException {
         return MailboxReactorUtils.block(Flux.fromIterable(messageList)
             .map(MailboxMessage::getMailboxId)
             .distinct()
-            .filterWhen(hasRightsOnMailboxReactive(mailboxSession, rights))
+            .filterWhen(hasRightsOnMailboxReactive(mailboxSession, rights), DEFAULT_CONCURRENCY)
             .collect(Guavate.toImmutableSet()));
     }
 
@@ -256,7 +258,7 @@ public class StoreMessageIdManager implements MessageIdManager {
                         .mailbox(mailbox)
                         .addMetaData(metadataWithMailboxId.getMessageMetaData())
                         .build(),
-                    new MailboxIdRegistrationKey(metadataWithMailboxId.getMailboxId()))))
+                    new MailboxIdRegistrationKey(metadataWithMailboxId.getMailboxId()))), DEFAULT_CONCURRENCY)
             .then()
             .subscribeOn(Schedulers.elastic())
             .block();
@@ -356,7 +358,7 @@ public class StoreMessageIdManager implements MessageIdManager {
                         .mailbox(mailbox)
                         .addMetaData(eventPayload)
                         .build(),
-                    new MailboxIdRegistrationKey(mailbox.getMailboxId())))
+                    new MailboxIdRegistrationKey(mailbox.getMailboxId())), DEFAULT_CONCURRENCY)
                 .then());
     }
     
@@ -472,7 +474,7 @@ public class StoreMessageIdManager implements MessageIdManager {
 
     private Mono<Void> assertRightsOnMailboxIds(Collection<MailboxId> mailboxIds, MailboxSession mailboxSession, Right... rights) {
         return Flux.fromIterable(mailboxIds)
-            .filterWhen(hasRightsOnMailboxReactive(mailboxSession, rights).andThen(result -> result.map(FunctionalUtils.negate())))
+            .filterWhen(hasRightsOnMailboxReactive(mailboxSession, rights).andThen(result -> result.map(FunctionalUtils.negate())), DEFAULT_CONCURRENCY)
             .next()
             .flatMap(mailboxForbidden -> {
                 LOGGER.info("Mailbox with Id {} does not belong to {}", mailboxForbidden, mailboxSession.getUser().asString());
@@ -497,10 +499,10 @@ public class StoreMessageIdManager implements MessageIdManager {
         MailboxMapper mailboxMapper = mailboxSessionMapperFactory.getMailboxMapper(session);
 
         Mono<List<Mailbox>> target = Flux.fromIterable(messageMoves.getTargetMailboxIds())
-            .flatMap(mailboxMapper::findMailboxById)
+            .flatMap(mailboxMapper::findMailboxById, DEFAULT_CONCURRENCY)
             .collect(Guavate.toImmutableList());
         Mono<List<Mailbox>> previous = Flux.fromIterable(messageMoves.getPreviousMailboxIds())
-            .flatMap(mailboxMapper::findMailboxById)
+            .flatMap(mailboxMapper::findMailboxById, DEFAULT_CONCURRENCY)
             .collect(Guavate.toImmutableList());
 
         return target.zipWith(previous)
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
index a1edc78..e57a6bb 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
@@ -21,6 +21,7 @@ package org.apache.james.mailbox.store;
 
 import static org.apache.james.mailbox.extension.PreDeletionHook.DeleteOperation;
 import static org.apache.james.mailbox.store.mail.AbstractMessageMapper.UNLIMITED;
+import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
 
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
@@ -697,7 +698,7 @@ public class StoreMessageManager implements MessageManager {
 
         Mono<DeleteOperation> deleteOperation = Flux.fromIterable(MessageRange.toRanges(uids))
             .publishOn(Schedulers.elastic())
-            .flatMap(range -> messageMapper.findInMailboxReactive(mailbox, range, FetchType.Metadata, UNLIMITED))
+            .flatMap(range -> messageMapper.findInMailboxReactive(mailbox, range, FetchType.Metadata, UNLIMITED), DEFAULT_CONCURRENCY)
             .map(mailboxMessage -> MetadataWithMailboxId.from(mailboxMessage.metaData(), mailboxMessage.getMailboxId()))
             .collect(Guavate.toImmutableList())
             .map(DeleteOperation::from);
diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
index e9ca9f3..ad2f2f3 100644
--- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
+++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
@@ -19,6 +19,8 @@
 
 package org.apache.mailbox.tools.indexer;
 
+import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
+
 import java.time.Duration;
 
 import javax.inject.Inject;
@@ -200,7 +202,7 @@ public class ReIndexerPerformer {
 
         return mailboxSessionMapperFactory.getMessageIdMapper(session)
             .findReactive(ImmutableList.of(messageId), MessageMapper.FetchType.Full)
-            .flatMap(mailboxMessage -> reIndex(mailboxMessage, session))
+            .flatMap(mailboxMessage -> reIndex(mailboxMessage, session), DEFAULT_CONCURRENCY)
             .reduce(Task::combine)
             .switchIfEmpty(Mono.just(Result.COMPLETED))
             .onErrorResume(e -> {
@@ -215,7 +217,7 @@ public class ReIndexerPerformer {
 
         Flux<Either<Failure, ReIndexingEntry>> entriesToIndex = Flux.merge(
             Flux.fromIterable(previousReIndexingFailures.messageFailures())
-                .flatMap(this::createReindexingEntryFromFailure),
+                .flatMap(this::createReindexingEntryFromFailure, ReactorUtils.DEFAULT_CONCURRENCY),
             Flux.fromIterable(previousReIndexingFailures.mailboxFailures())
                 .flatMap(mailboxId -> mapper.findMailboxById(mailboxId)
                     .flatMapMany(mailbox -> reIndexingEntriesForMailbox(mailbox, mailboxSession, runningOptions))
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStoreDAO.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStoreDAO.java
index d0cfad5..5755607 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStoreDAO.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStoreDAO.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.blob.cassandra;
 
+import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
+
 import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.util.List;
@@ -158,7 +160,7 @@ public class CassandraBlobStoreDAO implements BlobStoreDAO {
         return bucketDAO.listAll()
             .filter(bucketNameBlobIdPair -> bucketNameBlobIdPair.getKey().equals(bucketName))
             .map(Pair::getValue)
-            .flatMap(blobId -> delete(bucketName, blobId))
+            .flatMap(blobId -> delete(bucketName, blobId), DEFAULT_CONCURRENCY)
             .then();
     }
 
diff --git a/server/blob/blob-common/src/main/java/org/apache/james/blob/api/Store.java b/server/blob/blob-common/src/main/java/org/apache/james/blob/api/Store.java
index 89295d9..4c3fd1a 100644
--- a/server/blob/blob-common/src/main/java/org/apache/james/blob/api/Store.java
+++ b/server/blob/blob-common/src/main/java/org/apache/james/blob/api/Store.java
@@ -40,6 +40,8 @@ public interface Store<T, I> {
 
     class Impl<T, I extends BlobPartsId> implements Store<T, I> {
 
+        public static final int DEFAULT_CONCURRENCY = 16;
+
         public interface ValueToSave {
             Mono<BlobId> saveIn(BucketName bucketName, BlobStore blobStore);
         }
@@ -108,7 +110,7 @@ public interface Store<T, I> {
         @Override
         public Publisher<Void> delete(I blobIds) {
             return Flux.fromIterable(blobIds.asMap().values())
-                .flatMap(id -> blobStore.delete(blobStore.getDefaultBucketName(), id))
+                .flatMap(id -> blobStore.delete(blobStore.getDefaultBucketName(), id), DEFAULT_CONCURRENCY)
                 .then();
         }
     }
diff --git a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
index 1f73847..fa00d0c 100644
--- a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
+++ b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.blob.objectstorage.aws;
 
+import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
+
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
@@ -290,8 +292,8 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable {
         return clientPool.withPoolable(client -> Mono.fromFuture(() -> client.listObjects(builder -> builder.bucket(bucketName.asString())))
             .flatMapIterable(ListObjectsResponse::contents))
             .window(EMPTY_BUCKET_BATCH_SIZE)
-            .flatMap(this::buildListForBatch)
-            .flatMap(identifiers -> deleteObjects(bucketName, identifiers))
+            .flatMap(this::buildListForBatch, DEFAULT_CONCURRENCY)
+            .flatMap(identifiers -> deleteObjects(bucketName, identifiers), DEFAULT_CONCURRENCY)
             .then(Mono.just(bucketName));
     }
 
@@ -311,7 +313,7 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable {
     public Mono<Void> deleteAllBuckets() {
         return clientPool.withPoolable(client -> Mono.fromFuture(client::listBuckets)
                 .flatMapIterable(ListBucketsResponse::buckets)
-                     .flatMap(bucket -> deleteResolvedBucket(BucketName.of(bucket.name()))))
+                     .flatMap(bucket -> deleteResolvedBucket(BucketName.of(bucket.name())), DEFAULT_CONCURRENCY))
             .then();
     }
 }
diff --git a/server/container/guice/guice-common/src/main/java/org/apache/james/PeriodicalHealthChecks.java b/server/container/guice/guice-common/src/main/java/org/apache/james/PeriodicalHealthChecks.java
index 8182cc8..0f9d1bf 100644
--- a/server/container/guice/guice-common/src/main/java/org/apache/james/PeriodicalHealthChecks.java
+++ b/server/container/guice/guice-common/src/main/java/org/apache/james/PeriodicalHealthChecks.java
@@ -19,6 +19,8 @@
 
 package org.apache.james;
 
+import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
+
 import java.util.Set;
 
 import javax.annotation.PreDestroy;
@@ -60,7 +62,7 @@ public class PeriodicalHealthChecks implements Startable {
     public void start() {
         disposable = Flux.interval(configuration.getPeriod(), scheduler)
             .flatMapIterable(any -> healthChecks)
-            .flatMap(healthCheck -> Mono.from(healthCheck.check()))
+            .flatMap(healthCheck -> Mono.from(healthCheck.check()), DEFAULT_CONCURRENCY)
             .doOnNext(this::logResult)
             .onErrorContinue(this::logError)
             .subscribeOn(Schedulers.elastic())
diff --git a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
index b516b8e..c11a9d4 100644
--- a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
+++ b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
@@ -39,11 +39,13 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.publisher.Signal;
 import reactor.core.publisher.SynchronousSink;
+import reactor.util.concurrent.Queues;
 import reactor.util.context.Context;
 
 public class ReactorUtils {
     private static final Logger LOGGER = LoggerFactory.getLogger(ReactorUtils.class);
     public static final String MDC_KEY_PREFIX = "MDC-";
+    public static final int DEFAULT_CONCURRENCY = 16;
 
     public static <T, U> RequiresQuantity<T, U> throttle() {
         return elements -> duration -> operation -> {
@@ -55,7 +57,7 @@ public class ReactorUtils {
                 .onErrorContinue((e, o) -> LOGGER.error("Error encountered while generating throttled entries", e))
                 .window(elements)
                 .delayElements(duration)
-                .concatMap(window -> window.flatMap(operation)
+                .concatMap(window -> window.flatMap(operation, Queues.SMALL_BUFFER_SIZE)
                     .onErrorResume(e -> {
                         LOGGER.error("Error encountered while throttling", e);
                         return Mono.empty();
diff --git a/server/container/util/src/main/java/org/apache/james/util/reactor/Constants.java b/server/container/util/src/main/java/org/apache/james/util/reactor/Constants.java
new file mode 100644
index 0000000..4c5a47c
--- /dev/null
+++ b/server/container/util/src/main/java/org/apache/james/util/reactor/Constants.java
@@ -0,0 +1,24 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.util.reactor;
+
+public interface Constants {
+    int DEFAULT_CONCURRENCY = 10;
+}
diff --git a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/migration/MappingsSourcesMigration.java b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/migration/MappingsSourcesMigration.java
index 24dcbb0..4da39d4 100644
--- a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/migration/MappingsSourcesMigration.java
+++ b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/migration/MappingsSourcesMigration.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.rrt.cassandra.migration;
 
+import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
+
 import java.time.Clock;
 import java.time.Instant;
 import java.util.Optional;
@@ -112,7 +114,7 @@ public class MappingsSourcesMigration implements Migration {
     @Override
     public void apply() {
         cassandraRecipientRewriteTableDAO.getAllMappings()
-            .flatMap(this::migrate)
+            .flatMap(this::migrate, DEFAULT_CONCURRENCY)
             .then(Mono.fromRunnable(() -> {
                 if (errorMappingsCount.get() > 0) {
                     throw new MigrationException("MappingsSourcesMigration failed");
diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/projections/MessageFastViewProjection.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/projections/MessageFastViewProjection.java
index dda6417..a0c54e5 100644
--- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/projections/MessageFastViewProjection.java
+++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/projections/MessageFastViewProjection.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.jmap.api.projections;
 
+import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
+
 import java.util.Collection;
 import java.util.Map;
 
@@ -50,7 +52,7 @@ public interface MessageFastViewProjection {
 
         return Flux.fromIterable(messageIds)
             .flatMap(messageId -> Mono.from(this.retrieve(messageId))
-                .map(preview -> Pair.of(messageId, preview)))
+                .map(preview -> Pair.of(messageId, preview)), DEFAULT_CONCURRENCY)
             .collectMap(Pair::getLeft, Pair::getRight);
     }
 }
diff --git a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/DeliveryRunnable.java b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/DeliveryRunnable.java
index d16d502..f380632 100644
--- a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/DeliveryRunnable.java
+++ b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/DeliveryRunnable.java
@@ -47,6 +47,7 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Scheduler;
 import reactor.core.scheduler.Schedulers;
+import reactor.util.concurrent.Queues;
 
 public class DeliveryRunnable implements Disposable {
     private static final Logger LOGGER = LoggerFactory.getLogger(DeliveryRunnable.class);
@@ -87,7 +88,7 @@ public class DeliveryRunnable implements Disposable {
     public void start() {
         remoteDeliveryScheduler = Schedulers.newBoundedElastic(Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE, Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "RemoteDelivery");
         disposable = Flux.from(queue.deQueue())
-            .flatMap(queueItem -> runStep(queueItem).subscribeOn(remoteDeliveryScheduler))
+            .flatMap(queueItem -> runStep(queueItem).subscribeOn(remoteDeliveryScheduler), Queues.SMALL_BUFFER_SIZE)
             .onErrorContinue(((throwable, nothing) -> LOGGER.error("Exception caught in RemoteDelivery", throwable)))
             .subscribeOn(remoteDeliveryScheduler)
             .subscribe();
diff --git a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java
index 4024c7f..460979a 100644
--- a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java
+++ b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java
@@ -19,6 +19,7 @@
 
 package org.apache.james.mailrepository.cassandra;
 
+import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
 import static org.apache.james.util.ReactorUtils.publishIfPresent;
 
 import java.util.Collection;
@@ -121,7 +122,7 @@ public class CassandraMailRepository implements MailRepository {
     public void remove(Collection<Mail> toRemove) {
         Flux.fromIterable(toRemove)
             .map(MailKey::forMail)
-            .flatMap(this::removeAsync)
+            .flatMap(this::removeAsync, DEFAULT_CONCURRENCY)
             .then()
             .block();
     }
@@ -160,7 +161,7 @@ public class CassandraMailRepository implements MailRepository {
     @Override
     public void removeAll() {
         keysDAO.list(url)
-            .flatMap(this::removeAsync)
+            .flatMap(this::removeAsync, DEFAULT_CONCURRENCY)
             .then()
             .block();
     }
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMailboxesMethod.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMailboxesMethod.java
index aa87311..acf9c26 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMailboxesMethod.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMailboxesMethod.java
@@ -19,6 +19,7 @@
 
 package org.apache.james.jmap.draft.methods;
 
+import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
 import static org.apache.james.util.ReactorUtils.context;
 import static org.apache.james.util.ReactorUtils.publishIfPresent;
 
@@ -144,7 +145,7 @@ public class GetMailboxesMethod implements Method {
                     .session(mailboxSession)
                     .usingPreloadedMailboxesMetadata(NO_PRELOADED_METADATA)
                     .build())
-                .subscribeOn(Schedulers.elastic()))
+                .subscribeOn(Schedulers.elastic()), DEFAULT_CONCURRENCY)
             .handle(publishIfPresent());
     }
 
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/message/view/MessageViewFactory.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/message/view/MessageViewFactory.java
index dc9409f..a51df8d 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/message/view/MessageViewFactory.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/message/view/MessageViewFactory.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.jmap.draft.model.message.view;
 
+import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.time.Instant;
@@ -132,9 +134,9 @@ public interface MessageViewFactory<T extends MessageView> {
         static <T extends MessageView> Flux<T> toMessageViews(Flux<MessageResult> messageResults, FromMessageResult<T> converter) {
             return messageResults
                 .groupBy(MessageResult::getMessageId)
-                .flatMap(Flux::collectList)
+                .flatMap(Flux::collectList, DEFAULT_CONCURRENCY)
                 .filter(Predicate.not(List::isEmpty))
-                .flatMap(toMessageViews(converter));
+                .flatMap(toMessageViews(converter), DEFAULT_CONCURRENCY);
         }
 
         static Instant getDateFromHeaderOrInternalDateOtherwise(Message mimeMessage, MessageResult message) {
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/event/ComputeMessageFastViewProjectionListener.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/event/ComputeMessageFastViewProjectionListener.java
index 9f73e64..19779c5 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/event/ComputeMessageFastViewProjectionListener.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/event/ComputeMessageFastViewProjectionListener.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.jmap.event;
 
+import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
+
 import java.io.IOException;
 
 import javax.inject.Inject;
@@ -88,9 +90,9 @@ public class ComputeMessageFastViewProjectionListener implements MailboxListener
         return Flux.from(messageIdManager.getMessagesReactive(addedEvent.getMessageIds(), FetchGroup.FULL_CONTENT, session))
             .flatMap(Throwing.function(messageResult -> Mono.fromCallable(
                 () -> Pair.of(messageResult.getMessageId(), computeFastViewPrecomputedProperties(messageResult)))
-                    .subscribeOn(Schedulers.parallel())))
+                    .subscribeOn(Schedulers.parallel())), DEFAULT_CONCURRENCY)
             .publishOn(Schedulers.elastic())
-            .flatMap(message -> messageFastViewProjection.store(message.getKey(), message.getValue()))
+            .flatMap(message -> messageFastViewProjection.store(message.getKey(), message.getValue()), DEFAULT_CONCURRENCY)
             .then();
     }
 
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/http/DefaultMailboxesProvisioner.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/http/DefaultMailboxesProvisioner.java
index 6449ff0..0b74d72 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/http/DefaultMailboxesProvisioner.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/http/DefaultMailboxesProvisioner.java
@@ -19,6 +19,7 @@
 package org.apache.james.jmap.http;
 
 import static org.apache.james.util.FunctionalUtils.negate;
+import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
 
 import java.util.Optional;
 import java.util.function.Function;
@@ -73,7 +74,7 @@ class DefaultMailboxesProvisioner {
 
         return Flux.fromIterable(DefaultMailboxes.DEFAULT_MAILBOXES)
             .map(toMailboxPath(session))
-            .filterWhen(mailboxPath -> mailboxDoesntExist(mailboxPath, session))
+            .filterWhen(mailboxPath -> mailboxDoesntExist(mailboxPath, session), DEFAULT_CONCURRENCY)
             .concatMap(mailboxPath -> Mono.fromRunnable(() -> createMailbox(mailboxPath, session))
                 .subscribeOn(Schedulers.elastic()))
             .then();
diff --git a/server/protocols/webadmin/webadmin-core/src/main/java/org/apache/james/webadmin/routes/HealthCheckRoutes.java b/server/protocols/webadmin/webadmin-core/src/main/java/org/apache/james/webadmin/routes/HealthCheckRoutes.java
index 8a5a8d6..82c5a16 100644
--- a/server/protocols/webadmin/webadmin-core/src/main/java/org/apache/james/webadmin/routes/HealthCheckRoutes.java
+++ b/server/protocols/webadmin/webadmin-core/src/main/java/org/apache/james/webadmin/routes/HealthCheckRoutes.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.webadmin.routes;
 
+import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
+
 import java.util.List;
 import java.util.Set;
 
@@ -185,7 +187,7 @@ public class HealthCheckRoutes implements PublicRoutes {
 
     private Flux<Result> executeHealthChecks() {
         return Flux.fromIterable(healthChecks)
-            .flatMap(HealthCheck::check)
+            .flatMap(HealthCheck::check, DEFAULT_CONCURRENCY)
             .doOnNext(this::logFailedCheck);
     }
 
diff --git a/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/main/java/org/apache/james/webadmin/vault/routes/RestoreService.java b/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/main/java/org/apache/james/webadmin/vault/routes/RestoreService.java
index 4c36958..330a240 100644
--- a/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/main/java/org/apache/james/webadmin/vault/routes/RestoreService.java
+++ b/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/main/java/org/apache/james/webadmin/vault/routes/RestoreService.java
@@ -20,6 +20,7 @@
 package org.apache.james.webadmin.vault.routes;
 
 import static org.apache.james.mailbox.MessageManager.AppendCommand;
+import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
 import static org.apache.james.webadmin.vault.routes.RestoreService.RestoreResult.RESTORE_FAILED;
 import static org.apache.james.webadmin.vault.routes.RestoreService.RestoreResult.RESTORE_SUCCEED;
 
@@ -74,7 +75,7 @@ class RestoreService {
         MessageManager restoreMessageManager = restoreMailboxManager(session);
 
         return Flux.from(deletedMessageVault.search(usernameToRestore, searchQuery))
-            .flatMap(deletedMessage -> appendToMailbox(restoreMessageManager, deletedMessage, session));
+            .flatMap(deletedMessage -> appendToMailbox(restoreMessageManager, deletedMessage, session), DEFAULT_CONCURRENCY);
     }
 
     private Mono<RestoreResult> appendToMailbox(MessageManager restoreMailboxManager, DeletedMessage deletedMessage, MailboxSession session) {
diff --git a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverService.java b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverService.java
index 7053c9d..108aa6d 100644
--- a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverService.java
+++ b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverService.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.webadmin.service;
 
+import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
+
 import javax.inject.Inject;
 
 import org.apache.james.mailbox.events.Event;
@@ -50,7 +52,7 @@ public class EventDeadLettersRedeliverService {
 
     Flux<Task.Result> redeliverEvents(EventRetriever eventRetriever) {
         return eventRetriever.retrieveEvents(deadLetters)
-            .flatMap(entry -> redeliverGroupEvents(entry.getT1(), entry.getT2(), entry.getT3()));
+            .flatMap(entry -> redeliverGroupEvents(entry.getT1(), entry.getT2(), entry.getT3()), DEFAULT_CONCURRENCY);
     }
 
     private Mono<Task.Result> redeliverGroupEvents(Group group, Event event, EventDeadLetters.InsertionId insertionId) {
diff --git a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventRetriever.java b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventRetriever.java
index a0b4f8e..1d4dbb3 100644
--- a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventRetriever.java
+++ b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventRetriever.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.webadmin.service;
 
+import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
+
 import java.util.Optional;
 
 import org.apache.james.mailbox.events.Event;
@@ -50,7 +52,7 @@ public interface EventRetriever {
 
     default Flux<Tuple3<Group, Event, EventDeadLetters.InsertionId>> listGroupEvents(EventDeadLetters deadLetters, Group group) {
         return deadLetters.failedIds(group)
-            .flatMap(insertionId -> Flux.zip(Mono.just(group), deadLetters.failedEvent(group, insertionId), Mono.just(insertionId)));
+            .flatMap(insertionId -> Flux.zip(Mono.just(group), deadLetters.failedEvent(group, insertionId), Mono.just(insertionId)), DEFAULT_CONCURRENCY);
     }
 
     class AllEventsRetriever implements EventRetriever {
@@ -67,7 +69,7 @@ public interface EventRetriever {
         @Override
         public Flux<Tuple3<Group, Event, EventDeadLetters.InsertionId>> retrieveEvents(EventDeadLetters deadLetters) {
             return deadLetters.groupsWithFailedEvents()
-                .flatMap(group -> listGroupEvents(deadLetters, group));
+                .flatMap(group -> listGroupEvents(deadLetters, group), DEFAULT_CONCURRENCY);
         }
     }
 
diff --git a/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java b/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java
index 93f12ca..025a024 100644
--- a/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java
+++ b/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.queue.memory;
 
+import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
+
 import java.time.DateTimeException;
 import java.time.Duration;
 import java.time.Instant;
@@ -101,7 +103,7 @@ public class MemoryMailQueueFactory implements MailQueueFactory<MemoryMailQueueF
                 .repeat()
                 .subscribeOn(Schedulers.elastic())
                 .flatMap(item ->
-                    Mono.fromRunnable(() -> inProcessingMailItems.add(item)).thenReturn(item))
+                    Mono.fromRunnable(() -> inProcessingMailItems.add(item)).thenReturn(item), DEFAULT_CONCURRENCY)
                 .map(item -> mailQueueItemDecoratorFactory.decorate(item, name));
         }
 
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
index 5e838db..52c8660 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.queue.rabbitmq;
 
+import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
+
 import java.time.Duration;
 import java.time.Instant;
 
@@ -130,6 +132,6 @@ public class RabbitMQMailQueue implements ManageableMailQueue {
                 .thenReturn(item.getMail().getName());
 
         return mailQueueView.browseOlderThanReactive(olderThan)
-            .flatMap(requeue);
+            .flatMap(requeue, DEFAULT_CONCURRENCY);
     }
 }
\ No newline at end of file
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java
index 455025c..b114648 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java
@@ -21,6 +21,7 @@ package org.apache.james.queue.rabbitmq.view.cassandra;
 
 import static org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices.BucketId;
 import static org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices.Slice;
+import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
 
 import java.time.Clock;
 import java.time.Instant;
@@ -151,13 +152,13 @@ public class CassandraMailQueueBrowser {
     private Flux<EnqueuedItemWithSlicingContext> browseSlice(MailQueueName queueName, Slice slice) {
         return
             allBucketIds()
-                .flatMap(bucketId -> browseBucket(queueName, slice, bucketId))
+                .flatMap(bucketId -> browseBucket(queueName, slice, bucketId), DEFAULT_CONCURRENCY)
                 .sort(Comparator.comparing(enqueuedMail -> enqueuedMail.getEnqueuedItem().getEnqueuedTime()));
     }
 
     private Flux<EnqueuedItemWithSlicingContext> browseBucket(MailQueueName queueName, Slice slice, BucketId bucketId) {
         return enqueuedMailsDao.selectEnqueuedMails(queueName, slice, bucketId)
-            .filterWhen(mailReference -> deletedMailsDao.isStillEnqueued(queueName, mailReference.getEnqueuedItem().getEnqueueId()));
+            .filterWhen(mailReference -> deletedMailsDao.isStillEnqueued(queueName, mailReference.getEnqueuedItem().getEnqueueId()), DEFAULT_CONCURRENCY);
     }
 
     private Flux<Slice> allSlicesStartingAt(Instant browseStart) {
diff --git a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
index 5438eff..409c4c8 100644
--- a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
+++ b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
@@ -18,6 +18,7 @@
  ****************************************************************/
 package org.apache.james.task;
 
+import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
 import static org.apache.james.util.ReactorUtils.publishIfPresent;
 
 import java.time.Duration;
@@ -96,7 +97,7 @@ public class SerialTaskManagerWorker implements TaskManagerWorker {
             .delayElement(pollingInterval, Schedulers.elastic())
             .repeat()
             .handle(publishIfPresent())
-            .flatMap(information -> Mono.from(listener.updated(taskWithId.getId(), information)).thenReturn(information));
+            .flatMap(information -> Mono.from(listener.updated(taskWithId.getId(), information)).thenReturn(information), DEFAULT_CONCURRENCY);
     }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org