You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ro...@apache.org on 2019/12/16 17:10:06 UTC

[james-project] branch master updated (24be076 -> a2b3b28)

This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git.


    from 24be076  JAMES-2642 Add alias mapping integration test to show that alias delivery is case-insensitive
     new 069f81f  JAMES-3003 Mailbox event delivery should run listeners concurrently
     new bf600b0  JAMES-3004 avoid starvation issues by use Schedulers.elastic instead of boundedElastic
     new a2b3b28  Merge remote-tracking branch 'mbaechler/revert-boundedElasticScheduler-use'

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../cassandra/init/CassandraTableManager.java      |  2 +-
 .../cassandra/init/ResilientClusterProvider.java   |  4 +-
 .../cassandra/utils/CassandraAsyncExecutor.java    |  2 +-
 .../apache/james/backends/es/ClientProvider.java   |  4 +-
 .../backends/es/ElasticSearchClusterExtension.java |  2 +-
 .../rabbitmq/RabbitMQConnectionFactory.java        |  2 +-
 .../rabbitmq/ReactorRabbitMQChannelPool.java       |  2 +-
 .../backends/rabbitmq/SimpleConnectionPool.java    |  2 +-
 .../apache/james/mailbox/events/GroupContract.java | 57 ++++++++++++++++++++++
 .../apache/james/mailbox/events/KeyContract.java   | 44 +++++++++++++++++
 .../james/mailbox/backup/DefaultMailboxBackup.java |  2 +-
 .../cassandra/mail/CassandraMessageDAO.java        |  2 +-
 .../cassandra/mail/CassandraMessageIdMapper.java   |  4 +-
 .../cassandra/mail/CassandraMessageMapper.java     |  2 +-
 .../cassandra/mail/CassandraModSeqProvider.java    |  2 +-
 .../apache/james/mailbox/events/InVMEventBus.java  | 20 ++++----
 .../mailbox/events/delivery/EventDelivery.java     | 44 +----------------
 .../mailbox/events/delivery/InVmEventDelivery.java | 11 ++---
 .../events/delivery/InVmEventDeliveryTest.java     | 21 ++------
 .../james/mailbox/events/EventDispatcher.java      |  8 +--
 .../james/mailbox/events/GroupRegistration.java    |  2 +-
 .../mailbox/events/KeyRegistrationHandler.java     |  2 +-
 .../james/mailbox/events/WaitDelayGenerator.java   |  2 +-
 .../mailbox/events/LocalListenerRegistryTest.java  |  2 +-
 .../vault/blob/BlobStoreDeletedMessageVault.java   |  2 +-
 .../blob/BlobStoreVaultGarbageCollectionTask.java  |  2 +-
 .../james/mailbox/store/PreDeletionHooks.java      |  2 +-
 .../james/mailbox/store/StoreMessageManager.java   |  2 +-
 .../main/java/org/apache/james/blob/api/Store.java |  2 +-
 .../james/blob/cassandra/CassandraBlobStore.java   |  6 +--
 .../blob/objectstorage/ObjectStorageBlobStore.java |  6 +--
 .../objectstorage/StreamCompatibleBlobPutter.java  |  6 +--
 .../blob/objectstorage/aws/AwsS3ObjectStorage.java |  4 +-
 .../objectstorage/ObjectStorageBlobStoreTest.java  |  8 +--
 .../org/apache/james/StartUpChecksPerformer.java   |  2 +-
 .../apache/james/GuiceLifecycleHeathCheckTest.java |  2 +-
 .../main/java/org/apache/james/util/Runnables.java |  4 +-
 .../org/apache/james/util/ReactorUtilsTest.java    |  8 +--
 .../api/projections/MessageFastViewProjection.java |  2 +-
 .../transport/mailets/delivery/MailDispatcher.java |  2 +-
 .../model/message/view/MessageFastViewFactory.java |  2 +-
 .../model/message/view/MessageFullViewFactory.java |  4 +-
 .../ComputeMessageFastViewProjectionListener.java  |  2 +-
 .../james/queue/api/DelayedMailQueueContract.java  |  6 +--
 .../api/DelayedPriorityMailQueueContract.java      |  4 +-
 .../apache/james/queue/api/MailQueueContract.java  | 16 +++---
 .../james/queue/api/PriorityMailQueueContract.java |  8 +--
 .../james/queue/memory/MemoryMailQueueFactory.java |  2 +-
 .../cassandra/CassandraMailQueueMailDelete.java    |  2 +-
 .../distributed/RabbitMQTerminationSubscriber.java |  4 +-
 .../distributed/RabbitMQWorkQueue.java             |  6 +--
 .../distributed/RabbitMQWorkQueueTest.java         |  2 +-
 .../org/apache/james/task/MemoryTaskManager.java   |  2 +-
 .../org/apache/james/task/MemoryWorkQueue.java     |  2 +-
 .../apache/james/task/SerialTaskManagerWorker.java |  2 +-
 .../eventsourcing/EventSourcingTaskManager.scala   |  2 +-
 .../TerminationSubscriberContract.java             |  6 +--
 57 files changed, 210 insertions(+), 167 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 03/03: Merge remote-tracking branch 'mbaechler/revert-boundedElasticScheduler-use'

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit a2b3b2854ab5dec98da4580960a39b57d2be995b
Merge: 069f81f bf600b0
Author: Raphael Ouazana <ra...@linagora.com>
AuthorDate: Mon Dec 16 18:09:29 2019 +0100

    Merge remote-tracking branch 'mbaechler/revert-boundedElasticScheduler-use'

 .../backends/cassandra/init/CassandraTableManager.java   |  2 +-
 .../cassandra/init/ResilientClusterProvider.java         |  4 ++--
 .../backends/cassandra/utils/CassandraAsyncExecutor.java |  2 +-
 .../org/apache/james/backends/es/ClientProvider.java     |  4 ++--
 .../james/backends/es/ElasticSearchClusterExtension.java |  2 +-
 .../backends/rabbitmq/RabbitMQConnectionFactory.java     |  2 +-
 .../backends/rabbitmq/ReactorRabbitMQChannelPool.java    |  2 +-
 .../james/backends/rabbitmq/SimpleConnectionPool.java    |  2 +-
 .../james/mailbox/backup/DefaultMailboxBackup.java       |  2 +-
 .../mailbox/cassandra/mail/CassandraMessageDAO.java      |  2 +-
 .../mailbox/cassandra/mail/CassandraMessageIdMapper.java |  4 ++--
 .../mailbox/cassandra/mail/CassandraMessageMapper.java   |  2 +-
 .../mailbox/cassandra/mail/CassandraModSeqProvider.java  |  2 +-
 .../james/mailbox/events/delivery/EventDelivery.java     |  2 +-
 .../org/apache/james/mailbox/events/EventDispatcher.java |  2 +-
 .../apache/james/mailbox/events/GroupRegistration.java   |  2 +-
 .../james/mailbox/events/KeyRegistrationHandler.java     |  2 +-
 .../apache/james/mailbox/events/WaitDelayGenerator.java  |  2 +-
 .../james/mailbox/events/LocalListenerRegistryTest.java  |  2 +-
 .../james/vault/blob/BlobStoreDeletedMessageVault.java   |  2 +-
 .../vault/blob/BlobStoreVaultGarbageCollectionTask.java  |  2 +-
 .../org/apache/james/mailbox/store/PreDeletionHooks.java |  2 +-
 .../apache/james/mailbox/store/StoreMessageManager.java  |  2 +-
 .../src/main/java/org/apache/james/blob/api/Store.java   |  2 +-
 .../apache/james/blob/cassandra/CassandraBlobStore.java  |  6 +++---
 .../james/blob/objectstorage/ObjectStorageBlobStore.java |  6 +++---
 .../blob/objectstorage/StreamCompatibleBlobPutter.java   |  6 +++---
 .../james/blob/objectstorage/aws/AwsS3ObjectStorage.java |  4 ++--
 .../blob/objectstorage/ObjectStorageBlobStoreTest.java   |  8 ++++----
 .../java/org/apache/james/StartUpChecksPerformer.java    |  2 +-
 .../org/apache/james/GuiceLifecycleHeathCheckTest.java   |  2 +-
 .../src/main/java/org/apache/james/util/Runnables.java   |  4 ++--
 .../java/org/apache/james/util/ReactorUtilsTest.java     |  8 ++++----
 .../jmap/api/projections/MessageFastViewProjection.java  |  2 +-
 .../james/transport/mailets/delivery/MailDispatcher.java |  2 +-
 .../draft/model/message/view/MessageFastViewFactory.java |  2 +-
 .../draft/model/message/view/MessageFullViewFactory.java |  4 ++--
 .../event/ComputeMessageFastViewProjectionListener.java  |  2 +-
 .../apache/james/queue/api/DelayedMailQueueContract.java |  6 +++---
 .../queue/api/DelayedPriorityMailQueueContract.java      |  4 ++--
 .../org/apache/james/queue/api/MailQueueContract.java    | 16 ++++++++--------
 .../james/queue/api/PriorityMailQueueContract.java       |  8 ++++----
 .../james/queue/memory/MemoryMailQueueFactory.java       |  2 +-
 .../view/cassandra/CassandraMailQueueMailDelete.java     |  2 +-
 .../distributed/RabbitMQTerminationSubscriber.java       |  4 ++--
 .../eventsourcing/distributed/RabbitMQWorkQueue.java     |  6 +++---
 .../eventsourcing/distributed/RabbitMQWorkQueueTest.java |  2 +-
 .../java/org/apache/james/task/MemoryTaskManager.java    |  2 +-
 .../main/java/org/apache/james/task/MemoryWorkQueue.java |  2 +-
 .../org/apache/james/task/SerialTaskManagerWorker.java   |  2 +-
 .../task/eventsourcing/EventSourcingTaskManager.scala    |  2 +-
 .../eventsourcing/TerminationSubscriberContract.java     |  6 +++---
 52 files changed, 88 insertions(+), 88 deletions(-)



---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 01/03: JAMES-3004 avoid starvation issues by use Schedulers.elastic instead of boundedElastic

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit bf600b02c74846cc7670bc241a42fe5f65360dae
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Tue Dec 10 14:45:03 2019 +0100

    JAMES-3004 avoid starvation issues by use Schedulers.elastic instead of boundedElastic
---
 .../backends/cassandra/init/CassandraTableManager.java   |  2 +-
 .../cassandra/init/ResilientClusterProvider.java         |  4 ++--
 .../backends/cassandra/utils/CassandraAsyncExecutor.java |  2 +-
 .../org/apache/james/backends/es/ClientProvider.java     |  4 ++--
 .../james/backends/es/ElasticSearchClusterExtension.java |  2 +-
 .../backends/rabbitmq/RabbitMQConnectionFactory.java     |  2 +-
 .../backends/rabbitmq/ReactorRabbitMQChannelPool.java    |  2 +-
 .../james/backends/rabbitmq/SimpleConnectionPool.java    |  2 +-
 .../james/mailbox/backup/DefaultMailboxBackup.java       |  2 +-
 .../mailbox/cassandra/mail/CassandraMessageDAO.java      |  2 +-
 .../mailbox/cassandra/mail/CassandraMessageIdMapper.java |  4 ++--
 .../mailbox/cassandra/mail/CassandraMessageMapper.java   |  2 +-
 .../mailbox/cassandra/mail/CassandraModSeqProvider.java  |  2 +-
 .../james/mailbox/events/delivery/EventDelivery.java     |  2 +-
 .../org/apache/james/mailbox/events/EventDispatcher.java |  2 +-
 .../apache/james/mailbox/events/GroupRegistration.java   |  2 +-
 .../james/mailbox/events/KeyRegistrationHandler.java     |  2 +-
 .../apache/james/mailbox/events/WaitDelayGenerator.java  |  2 +-
 .../james/mailbox/events/LocalListenerRegistryTest.java  |  2 +-
 .../james/vault/blob/BlobStoreDeletedMessageVault.java   |  2 +-
 .../vault/blob/BlobStoreVaultGarbageCollectionTask.java  |  2 +-
 .../org/apache/james/mailbox/store/PreDeletionHooks.java |  2 +-
 .../apache/james/mailbox/store/StoreMessageManager.java  |  2 +-
 .../src/main/java/org/apache/james/blob/api/Store.java   |  2 +-
 .../apache/james/blob/cassandra/CassandraBlobStore.java  |  6 +++---
 .../james/blob/objectstorage/ObjectStorageBlobStore.java |  6 +++---
 .../blob/objectstorage/StreamCompatibleBlobPutter.java   |  6 +++---
 .../james/blob/objectstorage/aws/AwsS3ObjectStorage.java |  4 ++--
 .../blob/objectstorage/ObjectStorageBlobStoreTest.java   |  8 ++++----
 .../java/org/apache/james/StartUpChecksPerformer.java    |  2 +-
 .../org/apache/james/GuiceLifecycleHeathCheckTest.java   |  2 +-
 .../src/main/java/org/apache/james/util/Runnables.java   |  4 ++--
 .../java/org/apache/james/util/ReactorUtilsTest.java     |  8 ++++----
 .../jmap/api/projections/MessageFastViewProjection.java  |  2 +-
 .../james/transport/mailets/delivery/MailDispatcher.java |  2 +-
 .../draft/model/message/view/MessageFastViewFactory.java |  2 +-
 .../draft/model/message/view/MessageFullViewFactory.java |  4 ++--
 .../event/ComputeMessageFastViewProjectionListener.java  |  2 +-
 .../apache/james/queue/api/DelayedMailQueueContract.java |  6 +++---
 .../queue/api/DelayedPriorityMailQueueContract.java      |  4 ++--
 .../org/apache/james/queue/api/MailQueueContract.java    | 16 ++++++++--------
 .../james/queue/api/PriorityMailQueueContract.java       |  8 ++++----
 .../james/queue/memory/MemoryMailQueueFactory.java       |  2 +-
 .../view/cassandra/CassandraMailQueueMailDelete.java     |  2 +-
 .../distributed/RabbitMQTerminationSubscriber.java       |  4 ++--
 .../eventsourcing/distributed/RabbitMQWorkQueue.java     |  6 +++---
 .../eventsourcing/distributed/RabbitMQWorkQueueTest.java |  2 +-
 .../java/org/apache/james/task/MemoryTaskManager.java    |  2 +-
 .../main/java/org/apache/james/task/MemoryWorkQueue.java |  2 +-
 .../org/apache/james/task/SerialTaskManagerWorker.java   |  2 +-
 .../task/eventsourcing/EventSourcingTaskManager.scala    |  2 +-
 .../eventsourcing/TerminationSubscriberContract.java     |  6 +++---
 52 files changed, 88 insertions(+), 88 deletions(-)

diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/CassandraTableManager.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/CassandraTableManager.java
index 0f16dc2..30980f3 100644
--- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/CassandraTableManager.java
+++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/CassandraTableManager.java
@@ -59,7 +59,7 @@ public class CassandraTableManager {
     public void clearAllTables() {
         CassandraAsyncExecutor executor = new CassandraAsyncExecutor(session);
         Flux.fromIterable(module.moduleTables())
-                .publishOn(Schedulers.boundedElastic())
+                .publishOn(Schedulers.elastic())
                 .map(CassandraTable::getName)
                 .flatMap(name -> truncate(executor, name))
                 .then()
diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/ResilientClusterProvider.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/ResilientClusterProvider.java
index 657f118..01040bb 100644
--- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/ResilientClusterProvider.java
+++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/ResilientClusterProvider.java
@@ -50,8 +50,8 @@ public class ResilientClusterProvider implements Provider<Cluster> {
         Duration forever = Duration.ofMillis(Long.MAX_VALUE);
         cluster = Mono.fromCallable(getClusterRetryCallable(configuration))
             .doOnError(e -> LOGGER.warn("Error establishing Cassandra connection. Next retry scheduled in {} ms", waitDelay, e))
-            .retryBackoff(configuration.getMaxRetry(), waitDelay, forever, Schedulers.boundedElastic())
-            .publishOn(Schedulers.boundedElastic())
+            .retryBackoff(configuration.getMaxRetry(), waitDelay, forever, Schedulers.elastic())
+            .publishOn(Schedulers.elastic())
             .block();
     }
 
diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java
index 50508cf..1b6464c 100644
--- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java
+++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java
@@ -44,7 +44,7 @@ public class CassandraAsyncExecutor {
     public Mono<ResultSet> execute(Statement statement) {
         return Mono.defer(() -> Mono.fromFuture(FutureConverter
                 .toCompletableFuture(session.executeAsync(statement)))
-                .publishOn(Schedulers.boundedElastic()));
+                .publishOn(Schedulers.elastic()));
     }
 
     public Mono<Boolean> executeReturnApplied(Statement statement) {
diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProvider.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProvider.java
index 1ff14cd..b879a88 100644
--- a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProvider.java
+++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProvider.java
@@ -184,8 +184,8 @@ public class ClientProvider implements Provider<RestHighLevelClient> {
         return Mono.fromCallable(() -> connectToCluster(configuration))
             .doOnError(e -> LOGGER.warn("Error establishing ElasticSearch connection. Next retry scheduled in {}",
                 DurationFormatUtils.formatDurationWords(waitDelay.toMillis(), suppressLeadingZeroElements, suppressTrailingZeroElements), e))
-            .retryBackoff(configuration.getMaxRetries(), waitDelay, forever, Schedulers.boundedElastic())
-            .publishOn(Schedulers.boundedElastic())
+            .retryBackoff(configuration.getMaxRetries(), waitDelay, forever, Schedulers.elastic())
+            .publishOn(Schedulers.elastic())
             .block();
     }
 
diff --git a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchClusterExtension.java b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchClusterExtension.java
index 41660b3..d0d3a5d 100644
--- a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchClusterExtension.java
+++ b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchClusterExtension.java
@@ -75,7 +75,7 @@ class ElasticSearchClusterExtension implements AfterAllCallback, BeforeAllCallba
             Flux.fromStream(Stream.of(runnables)
                     .map(Mono::fromRunnable))
                 .parallel(runnables.length)
-                .runOn(Schedulers.boundedElastic())
+                .runOn(Schedulers.elastic())
                 .flatMap(Function.identity())
                 .then()
                 .block();
diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConnectionFactory.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConnectionFactory.java
index ccbc28b..1f4d0fd 100644
--- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConnectionFactory.java
+++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConnectionFactory.java
@@ -60,6 +60,6 @@ public class RabbitMQConnectionFactory {
     Mono<Connection> connectionMono() {
         Duration forever = Duration.ofMillis(Long.MAX_VALUE);
         return Mono.fromCallable(connectionFactory::newConnection)
-            .retryBackoff(configuration.getMaxRetries(), Duration.ofMillis(configuration.getMinDelayInMs()), forever, Schedulers.boundedElastic());
+            .retryBackoff(configuration.getMaxRetries(), Duration.ofMillis(configuration.getMinDelayInMs()), forever, Schedulers.elastic());
     }
 }
diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
index a5dbea6..5ccfbe1 100644
--- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
+++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
@@ -79,7 +79,7 @@ public class ReactorRabbitMQChannelPool implements ChannelPool, Startable {
             return Mono.fromCallable(connection::openChannel)
                 .map(maybeChannel ->
                     maybeChannel.orElseThrow(() -> new RuntimeException("RabbitMQ reached to maximum opened channels, cannot get more channels")))
-                .retryBackoff(MAX_RETRIES, RETRY_FIRST_BACK_OFF, FOREVER, Schedulers.boundedElastic())
+                .retryBackoff(MAX_RETRIES, RETRY_FIRST_BACK_OFF, FOREVER, Schedulers.elastic())
                 .doOnError(throwable -> LOGGER.error("error when creating new channel", throwable));
         }
 
diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleConnectionPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleConnectionPool.java
index d2e08e4..4ffebfd 100644
--- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleConnectionPool.java
+++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleConnectionPool.java
@@ -57,7 +57,7 @@ public class SimpleConnectionPool implements AutoCloseable {
         Duration initialDelay = Duration.ofMillis(100);
         Duration forever = Duration.ofMillis(Long.MAX_VALUE);
         return Mono.defer(this::getOpenConnection)
-            .retryBackoff(numRetries, initialDelay, forever, Schedulers.boundedElastic());
+            .retryBackoff(numRetries, initialDelay, forever, Schedulers.elastic());
     }
 
     private Mono<Connection> getOpenConnection() {
diff --git a/mailbox/backup/src/main/java/org/apache/james/mailbox/backup/DefaultMailboxBackup.java b/mailbox/backup/src/main/java/org/apache/james/mailbox/backup/DefaultMailboxBackup.java
index ab708f6..856c094 100644
--- a/mailbox/backup/src/main/java/org/apache/james/mailbox/backup/DefaultMailboxBackup.java
+++ b/mailbox/backup/src/main/java/org/apache/james/mailbox/backup/DefaultMailboxBackup.java
@@ -115,7 +115,7 @@ public class DefaultMailboxBackup implements MailboxBackup {
         }
 
         return Mono.fromRunnable(Throwing.runnable(() -> archiveRestorer.restore(username, source)).sneakyThrow())
-            .subscribeOn(Schedulers.boundedElastic())
+            .subscribeOn(Schedulers.elastic())
             .doOnError(e -> LOGGER.error("Error during account restoration for user : " + username.asString(), e))
             .doOnTerminate(Throwing.runnable(source::close).sneakyThrow())
             .thenReturn(BackupStatus.DONE)
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
index b29e7e8..ac232a4 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
@@ -236,7 +236,7 @@ public class CassandraMessageDAO {
 
     public Flux<MessageResult> retrieveMessages(List<ComposedMessageIdWithMetaData> messageIds, FetchType fetchType, Limit limit) {
         return Flux.fromStream(limit.applyOnStream(messageIds.stream().distinct()))
-            .publishOn(Schedulers.boundedElastic())
+            .publishOn(Schedulers.elastic())
             .flatMap(id -> retrieveRow(id, fetchType)
                 .flatMap(resultSet -> message(resultSet, id, fetchType)), configuration.getMessageReadChunkSize());
     }
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
index f317f6b..1b15350 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
@@ -88,7 +88,7 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
     @Override
     public List<MailboxMessage> find(Collection<MessageId> messageIds, FetchType fetchType) {
         return Flux.fromStream(messageIds.stream())
-            .publishOn(Schedulers.boundedElastic())
+            .publishOn(Schedulers.elastic())
             .flatMap(messageId -> imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.empty()), cassandraConfiguration.getMessageReadChunkSize())
             .collectList()
             .flatMapMany(composedMessageIds -> messageDAO.retrieveMessages(composedMessageIds, fetchType, Limit.unlimited()))
@@ -174,7 +174,7 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
     public void delete(Multimap<MessageId, MailboxId> ids) {
         Flux.fromIterable(ids.asMap()
             .entrySet())
-            .publishOn(Schedulers.boundedElastic())
+            .publishOn(Schedulers.elastic())
             .flatMap(entry -> deleteAsMono(entry.getKey(), entry.getValue()), cassandraConfiguration.getExpungeChunkSize())
             .then()
             .block();
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index c43afe7..ebc29a7 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -139,7 +139,7 @@ public class CassandraMessageMapper implements MessageMapper {
     @Override
     public List<MailboxCounters> getMailboxCounters(Collection<Mailbox> mailboxes) {
         return Flux.fromIterable(mailboxes)
-            .publishOn(Schedulers.boundedElastic())
+            .publishOn(Schedulers.elastic())
             .concatMap(this::getMailboxCountersAsMono)
             .toStream()
             .collect(Guavate.toImmutableList());
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
index 7aeb45e..e5d7290 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
@@ -188,7 +188,7 @@ public class CassandraModSeqProvider implements ModSeqProvider {
         Duration forever = Duration.ofMillis(Long.MAX_VALUE);
         return tryFindThenUpdateOnce(mailboxId)
             .single()
-            .retryBackoff(maxModSeqRetries, Duration.ofMillis(2), forever, Schedulers.boundedElastic());
+            .retryBackoff(maxModSeqRetries, Duration.ofMillis(2), forever, Schedulers.elastic());
     }
 
     private Mono<ModSeq> tryFindThenUpdateOnce(CassandraId mailboxId) {
diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java
index 0f56a60..7b2fb67 100644
--- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java
+++ b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java
@@ -89,7 +89,7 @@ public interface EventDelivery {
             @Override
             public Mono<Void> doRetry(Mono<Void> executionResult, Event event) {
                 return executionResult
-                    .retryBackoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff(), FOREVER, retryBackoff.getJitterFactor(), Schedulers.boundedElastic())
+                    .retryBackoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff(), FOREVER, retryBackoff.getJitterFactor(), Schedulers.elastic())
                     .doOnError(throwable -> LOGGER.error("listener {} exceeded maximum retry({}) to handle event {}",
                         mailboxListener.getClass().getCanonicalName(),
                         retryBackoff.getMaxRetries(),
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
index 47844d7..85cc48c 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
@@ -80,7 +80,7 @@ class EventDispatcher {
             .concat(
                 dispatchToLocalListeners(event, keys),
                 dispatchToRemoteListeners(serializeEvent(event), keys))
-            .subscribeOn(Schedulers.boundedElastic())
+            .subscribeOn(Schedulers.elastic())
             .doOnError(throwable -> LOGGER.error("error while dispatching event", throwable))
             .then()
             .subscribeWith(MonoProcessor.create());
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
index dac4ae3..5622412 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
@@ -138,7 +138,7 @@ class GroupRegistration implements Registration {
         int currentRetryCount = getRetryCount(acknowledgableDelivery);
 
         return delayGenerator.delayIfHaveTo(currentRetryCount)
-            .publishOn(Schedulers.boundedElastic())
+            .publishOn(Schedulers.elastic())
             .flatMap(any -> Mono.fromRunnable(Throwing.runnable(() -> runListener(event))))
             .onErrorResume(throwable -> retryHandler.handleRetry(event, currentRetryCount, throwable))
             .then(Mono.fromRunnable(acknowledgableDelivery::ack));
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
index e0f9001..41db3f8 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java
@@ -128,7 +128,7 @@ class KeyRegistrationHandler {
                     .log(logger -> logger.error("Exception happens when handling event", e)))
                 .onErrorResume(e -> Mono.empty())
                 .then())
-            .subscribeOn(Schedulers.boundedElastic())
+            .subscribeOn(Schedulers.elastic())
             .then();
     }
 
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/WaitDelayGenerator.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/WaitDelayGenerator.java
index 922ae02..33a3586 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/WaitDelayGenerator.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/WaitDelayGenerator.java
@@ -59,7 +59,7 @@ class WaitDelayGenerator {
         }
 
         return countRetryMono
-            .delayElement(generateDelay(retryCount), Schedulers.boundedElastic());
+            .delayElement(generateDelay(retryCount), Schedulers.elastic());
     }
 
     @VisibleForTesting
diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/LocalListenerRegistryTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/LocalListenerRegistryTest.java
index 9bd70ec..8136239 100644
--- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/LocalListenerRegistryTest.java
+++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/LocalListenerRegistryTest.java
@@ -247,7 +247,7 @@ class LocalListenerRegistryTest {
             LocalListenerRegistry.LocalRegistration registration5 = testee.addListener(KEY_1, listener5);
 
             Mono<List<MailboxListener>> listeners = testee.getLocalMailboxListeners(KEY_1)
-                .publishOn(Schedulers.boundedElastic())
+                .publishOn(Schedulers.elastic())
                 .delayElements(Duration.ofMillis(100))
                 .collectList();
 
diff --git a/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/blob/BlobStoreDeletedMessageVault.java b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/blob/BlobStoreDeletedMessageVault.java
index 71115f8..7cc3905 100644
--- a/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/blob/BlobStoreDeletedMessageVault.java
+++ b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/blob/BlobStoreDeletedMessageVault.java
@@ -154,7 +154,7 @@ public class BlobStoreDeletedMessageVault implements DeletedMessageVault {
             .flatMap(storageInformation -> Mono.from(messageMetadataVault.remove(storageInformation.getBucketName(), username, messageId))
                 .thenReturn(storageInformation))
             .flatMap(storageInformation -> blobStore.delete(storageInformation.getBucketName(), storageInformation.getBlobId()))
-            .subscribeOn(Schedulers.boundedElastic());
+            .subscribeOn(Schedulers.elastic());
     }
 
     @Override
diff --git a/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/blob/BlobStoreVaultGarbageCollectionTask.java b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/blob/BlobStoreVaultGarbageCollectionTask.java
index f82fc6a..61b4e5d 100644
--- a/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/blob/BlobStoreVaultGarbageCollectionTask.java
+++ b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/blob/BlobStoreVaultGarbageCollectionTask.java
@@ -97,7 +97,7 @@ public class BlobStoreVaultGarbageCollectionTask implements Task {
     public Result run() {
         retentionOperation
             .doOnNext(deletedBuckets::add)
-            .subscribeOn(Schedulers.boundedElastic())
+            .subscribeOn(Schedulers.elastic())
             .then()
             .block();
 
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/PreDeletionHooks.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/PreDeletionHooks.java
index 0429289..b868d62 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/PreDeletionHooks.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/PreDeletionHooks.java
@@ -50,7 +50,7 @@ public class PreDeletionHooks {
 
     public Mono<Void> runHooks(PreDeletionHook.DeleteOperation deleteOperation) {
         return Flux.fromIterable(hooks)
-            .publishOn(Schedulers.boundedElastic())
+            .publishOn(Schedulers.elastic())
             .flatMap(hook -> metricFactory.runPublishingTimerMetric(PRE_DELETION_HOOK_METRIC_NAME,
                 Mono.from(hook.notifyDelete(deleteOperation))), CONCURRENCY)
             .then();
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
index d985641..259cd7b 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
@@ -732,7 +732,7 @@ public class StoreMessageManager implements MessageManager {
         MessageMapper messageMapper = mapperFactory.getMessageMapper(session);
 
         DeleteOperation deleteOperation = Flux.fromIterable(MessageRange.toRanges(uids))
-            .publishOn(Schedulers.boundedElastic())
+            .publishOn(Schedulers.elastic())
             .flatMap(range -> Mono.fromCallable(() -> messageMapper.findInMailbox(mailbox, range, FetchType.Metadata, UNLIMITED))
                 .flatMapMany(iterator -> Flux.fromStream(Iterators.toStream(iterator))))
             .map(mailboxMessage -> MetadataWithMailboxId.from(mailboxMessage.metaData(), mailboxMessage.getMailboxId()))
diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java
index 41910d5..5feef59 100644
--- a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java
+++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java
@@ -118,7 +118,7 @@ public interface Store<T, I> {
         @Override
         public Mono<T> read(I blobIds) {
             return Flux.fromIterable(blobIds.asMap().entrySet())
-                .publishOn(Schedulers.boundedElastic())
+                .publishOn(Schedulers.elastic())
                 .flatMapSequential(
                     entry -> blobStore.readBytes(blobStore.getDefaultBucketName(), entry.getValue())
                         .zipWith(Mono.just(entry.getKey())))
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
index f725304..7f7efe3 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
@@ -92,7 +92,7 @@ public class CassandraBlobStore implements BlobStore {
     private Mono<Integer> saveBlobParts(BucketName bucketName, byte[] data, BlobId blobId) {
         Stream<Pair<Integer, ByteBuffer>> chunks = dataChunker.chunk(data, configuration.getBlobPartSize());
         return Flux.fromStream(chunks)
-            .publishOn(Schedulers.boundedElastic(), PREFETCH)
+            .publishOn(Schedulers.elastic(), PREFETCH)
             .flatMap(pair -> writePart(bucketName, blobId, pair.getKey(), pair.getValue())
                 .then(Mono.just(getChunkNum(pair))))
             .collect(Collectors.maxBy(Comparator.comparingInt(x -> x)))
@@ -129,13 +129,13 @@ public class CassandraBlobStore implements BlobStore {
 
     private Flux<ByteBuffer> readBlobParts(BucketName bucketName, BlobId blobId) {
         Integer rowCount = selectRowCount(bucketName, blobId)
-            .publishOn(Schedulers.boundedElastic())
+            .publishOn(Schedulers.elastic())
             .single()
             .onErrorResume(NoSuchElementException.class, e -> Mono.error(
                 new ObjectNotFoundException(String.format("Could not retrieve blob metadata for %s", blobId))))
             .block();
         return Flux.range(0, rowCount)
-            .publishOn(Schedulers.boundedElastic(), PREFETCH)
+            .publishOn(Schedulers.elastic(), PREFETCH)
             .flatMapSequential(partIndex -> readPart(bucketName, blobId, partIndex)
                 .single()
                 .onErrorResume(NoSuchElementException.class, e -> Mono.error(
diff --git a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStore.java b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStore.java
index 014b354..8a0628e 100644
--- a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStore.java
+++ b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStore.java
@@ -186,7 +186,7 @@ public class ObjectStorageBlobStore implements BlobStore {
     public Mono<Void> deleteBucket(BucketName bucketName) {
         ObjectStorageBucketName resolvedBucketName = bucketNameResolver.resolve(bucketName);
         return Mono.<Void>fromRunnable(() -> blobStore.deleteContainer(resolvedBucketName.asString()))
-            .subscribeOn(Schedulers.boundedElastic());
+            .subscribeOn(Schedulers.elastic());
     }
 
     public PayloadCodec getPayloadCodec() {
@@ -196,7 +196,7 @@ public class ObjectStorageBlobStore implements BlobStore {
     @VisibleForTesting
     Mono<Void> deleteAllBuckets() {
         return Flux.fromIterable(blobStore.list())
-            .publishOn(Schedulers.boundedElastic())
+            .publishOn(Schedulers.elastic())
             .filter(storageMetadata -> storageMetadata.getType().equals(StorageType.CONTAINER))
             .map(StorageMetadata::getName)
             .doOnNext(blobStore::deleteContainer)
@@ -207,6 +207,6 @@ public class ObjectStorageBlobStore implements BlobStore {
     public Mono<Void> delete(BucketName bucketName, BlobId blobId) {
         ObjectStorageBucketName resolvedBucketName = bucketNameResolver.resolve(bucketName);
         return Mono.<Void>fromRunnable(() -> blobStore.removeBlob(resolvedBucketName.asString(), blobId.asString()))
-            .subscribeOn(Schedulers.boundedElastic());
+            .subscribeOn(Schedulers.elastic());
     }
 }
diff --git a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/StreamCompatibleBlobPutter.java b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/StreamCompatibleBlobPutter.java
index aa9e294..30ad89c 100644
--- a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/StreamCompatibleBlobPutter.java
+++ b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/StreamCompatibleBlobPutter.java
@@ -52,14 +52,14 @@ public class StreamCompatibleBlobPutter implements BlobPutter {
     @Override
     public void putDirectly(ObjectStorageBucketName bucketName, Blob blob) {
         Mono.fromRunnable(() -> blobStore.putBlob(bucketName.asString(), blob))
-            .publishOn(Schedulers.boundedElastic())
+            .publishOn(Schedulers.elastic())
             .retryWhen(Retry.onlyIf(retryContext -> needToCreateBucket(retryContext.exception(), bucketName))
                 .exponentialBackoff(FIRST_BACK_OFF, FOREVER)
-                .withBackoffScheduler(Schedulers.boundedElastic())
+                .withBackoffScheduler(Schedulers.elastic())
                 .retryMax(MAX_RETRIES)
                 .doOnRetry(retryContext -> blobStore.createContainerInLocation(DEFAULT_LOCATION, bucketName.asString())))
             .retryWhen(Retry.onlyIf(RetryContext -> isPutMethod(RetryContext.exception()))
-                .withBackoffScheduler(Schedulers.boundedElastic())
+                .withBackoffScheduler(Schedulers.elastic())
                 .exponentialBackoff(FIRST_BACK_OFF, FOREVER)
                 .retryMax(RETRY_ONE_LAST_TIME_ON_CONCURRENT_SAVING))
             .block();
diff --git a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorage.java b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorage.java
index 1ba0f67..db7cd67 100644
--- a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorage.java
+++ b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/aws/AwsS3ObjectStorage.java
@@ -177,11 +177,11 @@ public class AwsS3ObjectStorage {
 
         private Mono<Void> putWithRetry(ObjectStorageBucketName bucketName, AwsS3AuthConfiguration configuration, Blob blob, File file, int tried) {
             return Mono.<Void>fromRunnable(Throwing.runnable(() -> put(bucketName, configuration, blob, file)).sneakyThrow())
-                .publishOn(Schedulers.boundedElastic())
+                .publishOn(Schedulers.elastic())
                 .retryWhen(Retry
                     .<Void>onlyIf(retryContext -> needToCreateBucket(retryContext.exception()))
                     .exponentialBackoff(FIRST_BACK_OFF, FOREVER)
-                    .withBackoffScheduler(Schedulers.boundedElastic())
+                    .withBackoffScheduler(Schedulers.elastic())
                     .retryMax(MAX_RETRY_ON_EXCEPTION)
                     .doOnRetry(retryContext -> createBucket(bucketName, configuration)));
         }
diff --git a/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStoreTest.java b/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStoreTest.java
index 9863601..5b05311 100644
--- a/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStoreTest.java
+++ b/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStoreTest.java
@@ -178,7 +178,7 @@ public class ObjectStorageBlobStoreTest implements MetricableBlobStoreContract {
         // String need to be big enough to get async thread busy hence could not return result instantly
         Mono<BlobId> blobIdFuture = testee
             .save(testee.getDefaultBucketName(), BIG_STRING.getBytes(StandardCharsets.UTF_8))
-            .subscribeOn(Schedulers.boundedElastic());
+            .subscribeOn(Schedulers.elastic());
         assertThat(blobIdFuture.toFuture()).isNotCompleted();
     }
 
@@ -186,7 +186,7 @@ public class ObjectStorageBlobStoreTest implements MetricableBlobStoreContract {
     void saveStringShouldNotCompleteWhenDoesNotAwait() {
         Mono<BlobId> blobIdFuture = testee
             .save(testee.getDefaultBucketName(), BIG_STRING)
-            .subscribeOn(Schedulers.boundedElastic());
+            .subscribeOn(Schedulers.elastic());
         assertThat(blobIdFuture.toFuture()).isNotCompleted();
     }
 
@@ -194,14 +194,14 @@ public class ObjectStorageBlobStoreTest implements MetricableBlobStoreContract {
     void saveInputStreamShouldNotCompleteWhenDoesNotAwait() {
         Mono<BlobId> blobIdFuture = testee
             .save(testee.getDefaultBucketName(), new ByteArrayInputStream(BIG_STRING.getBytes(StandardCharsets.UTF_8)))
-            .subscribeOn(Schedulers.boundedElastic());
+            .subscribeOn(Schedulers.elastic());
         assertThat(blobIdFuture.toFuture()).isNotCompleted();
     }
 
     @Test
     void readBytesShouldNotCompleteWhenDoesNotAwait() {
         BlobId blobId = testee().save(testee.getDefaultBucketName(), BIG_STRING).block();
-        Mono<byte[]> resultFuture = testee.readBytes(testee.getDefaultBucketName(), blobId).subscribeOn(Schedulers.boundedElastic());
+        Mono<byte[]> resultFuture = testee.readBytes(testee.getDefaultBucketName(), blobId).subscribeOn(Schedulers.elastic());
         assertThat(resultFuture.toFuture()).isNotCompleted();
     }
 }
diff --git a/server/container/guice/guice-common/src/main/java/org/apache/james/StartUpChecksPerformer.java b/server/container/guice/guice-common/src/main/java/org/apache/james/StartUpChecksPerformer.java
index e2d0f90..b04f2c3 100644
--- a/server/container/guice/guice-common/src/main/java/org/apache/james/StartUpChecksPerformer.java
+++ b/server/container/guice/guice-common/src/main/java/org/apache/james/StartUpChecksPerformer.java
@@ -84,7 +84,7 @@ public class StartUpChecksPerformer {
 
         public List<StartUpCheck.CheckResult> check() {
             return Flux.fromIterable(startUpChecks)
-                .publishOn(Schedulers.boundedElastic())
+                .publishOn(Schedulers.elastic())
                 .map(this::checkQuietly)
                 .collect(Guavate.toImmutableList())
                 .block();
diff --git a/server/container/guice/memory-guice/src/test/java/org/apache/james/GuiceLifecycleHeathCheckTest.java b/server/container/guice/memory-guice/src/test/java/org/apache/james/GuiceLifecycleHeathCheckTest.java
index d2af157..40d2e0f 100644
--- a/server/container/guice/memory-guice/src/test/java/org/apache/james/GuiceLifecycleHeathCheckTest.java
+++ b/server/container/guice/memory-guice/src/test/java/org/apache/james/GuiceLifecycleHeathCheckTest.java
@@ -122,7 +122,7 @@ class GuiceLifecycleHeathCheckTest {
 
                 stopMono = Mono.fromRunnable(server::stop);
                 stopMono
-                    .publishOn(Schedulers.boundedElastic())
+                    .publishOn(Schedulers.elastic())
                     .subscribeWith(MonoProcessor.create());
 
                 when()
diff --git a/server/container/util/src/main/java/org/apache/james/util/Runnables.java b/server/container/util/src/main/java/org/apache/james/util/Runnables.java
index 7ed8850..b0f707e 100644
--- a/server/container/util/src/main/java/org/apache/james/util/Runnables.java
+++ b/server/container/util/src/main/java/org/apache/james/util/Runnables.java
@@ -31,9 +31,9 @@ public class Runnables {
 
     public static void runParallel(Flux<Runnable> runnables) {
         runnables
-            .publishOn(Schedulers.boundedElastic())
+            .publishOn(Schedulers.elastic())
             .parallel()
-            .runOn(Schedulers.boundedElastic())
+            .runOn(Schedulers.elastic())
             .flatMap(runnable -> {
                 runnable.run();
                 return Mono.empty();
diff --git a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
index e900e05..7bdc678 100644
--- a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
+++ b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
@@ -85,7 +85,7 @@ class ReactorUtilsTest {
         void givenAFluxOnOneByteShouldConsumeOnlyTheReadBytesAndThePrefetch() throws IOException, InterruptedException {
             AtomicInteger generateElements = new AtomicInteger(0);
             Flux<ByteBuffer> source = Flux.range(0, 10)
-                .subscribeOn(Schedulers.boundedElastic())
+                .subscribeOn(Schedulers.elastic())
                 .limitRate(2)
                 .doOnRequest(request -> generateElements.getAndAdd((int) request))
                 .map(index -> new byte[] {(byte) (int) index})
@@ -104,7 +104,7 @@ class ReactorUtilsTest {
         void givenAFluxOf3BytesShouldConsumeOnlyTheReadBytesAndThePrefetch() throws IOException, InterruptedException {
             AtomicInteger generateElements = new AtomicInteger(0);
             Flux<ByteBuffer> source = Flux.just(new byte[] {0, 1, 2}, new byte[] {3, 4, 5}, new byte[] {6, 7, 8})
-                    .subscribeOn(Schedulers.boundedElastic())
+                    .subscribeOn(Schedulers.elastic())
                     .map(ByteBuffer::wrap)
                     .limitRate(2)
                     .doOnRequest(request -> generateElements.getAndAdd((int) request));
@@ -122,7 +122,7 @@ class ReactorUtilsTest {
         void givenAFluxOf3BytesWithAnEmptyByteArrayShouldConsumeOnlyTheReadBytesAndThePrefetch() throws IOException, InterruptedException {
             AtomicInteger generateElements = new AtomicInteger(0);
             Flux<ByteBuffer> source = Flux.just(new byte[] {0, 1, 2}, new byte[] {}, new byte[] {3, 4, 5}, new byte[] {6, 7, 8}, new byte[] {9, 10, 11})
-                    .subscribeOn(Schedulers.boundedElastic())
+                    .subscribeOn(Schedulers.elastic())
                     .map(ByteBuffer::wrap)
                     .limitRate(2)
                     .doOnRequest(request -> generateElements.getAndAdd((int) request));
@@ -140,7 +140,7 @@ class ReactorUtilsTest {
         void givenAnEmptyFluxShouldConsumeOnlyThePrefetch() throws IOException, InterruptedException {
             AtomicInteger generateElements = new AtomicInteger(0);
             Flux<ByteBuffer> source = Flux.<byte[]>empty()
-                    .subscribeOn(Schedulers.boundedElastic())
+                    .subscribeOn(Schedulers.elastic())
                     .map(ByteBuffer::wrap)
                     .limitRate(2)
                     .doOnRequest(request -> generateElements.getAndAdd((int) request));
diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/projections/MessageFastViewProjection.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/projections/MessageFastViewProjection.java
index c9c2e49..6589318 100644
--- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/projections/MessageFastViewProjection.java
+++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/projections/MessageFastViewProjection.java
@@ -51,6 +51,6 @@ public interface MessageFastViewProjection {
             .flatMap(messageId -> Mono.from(this.retrieve(messageId))
                 .map(preview -> Pair.of(messageId, preview)))
             .collectMap(Pair::getLeft, Pair::getRight)
-            .subscribeOn(Schedulers.boundedElastic());
+            .subscribeOn(Schedulers.elastic());
     }
 }
diff --git a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/delivery/MailDispatcher.java b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/delivery/MailDispatcher.java
index 4737012..1b3c1c7 100644
--- a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/delivery/MailDispatcher.java
+++ b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/delivery/MailDispatcher.java
@@ -94,7 +94,7 @@ public class MailDispatcher {
         this.mailStore = mailStore;
         this.consume = consume;
         this.mailetContext = mailetContext;
-        this.scheduler = Schedulers.boundedElastic();
+        this.scheduler = Schedulers.elastic();
     }
 
     public void dispatch(Mail mail) throws MessagingException {
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/message/view/MessageFastViewFactory.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/message/view/MessageFastViewFactory.java
index 6fd35d8..7d7334b 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/message/view/MessageFastViewFactory.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/message/view/MessageFastViewFactory.java
@@ -123,7 +123,7 @@ public class MessageFastViewFactory implements MessageViewFactory<MessageFastVie
         return Mono.from(fastViewProjection.retrieve(messageIds))
             .flatMapMany(fastProjections -> gatherMessageViews(messageIdSet, mailboxSession, fastProjections))
             .collectList()
-            .subscribeOn(Schedulers.boundedElastic())
+            .subscribeOn(Schedulers.elastic())
             .block();
     }
 
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/message/view/MessageFullViewFactory.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/message/view/MessageFullViewFactory.java
index c806214..364d1be 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/message/view/MessageFullViewFactory.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/message/view/MessageFullViewFactory.java
@@ -131,7 +131,7 @@ public class MessageFullViewFactory implements MessageViewFactory<MessageFullVie
         return Mono.from(fastViewProjection.retrieve(messageId))
             .onErrorResume(throwable -> fallBackToCompute(messageContent, hasAttachments, throwable))
             .switchIfEmpty(computeThenStoreAsync(messageContent, messageId, hasAttachments))
-            .subscribeOn(Schedulers.boundedElastic())
+            .subscribeOn(Schedulers.elastic())
             .block();
     }
 
@@ -148,7 +148,7 @@ public class MessageFullViewFactory implements MessageViewFactory<MessageFullVie
         return computeProjection(messageContent, hasAttachments)
             .doOnNext(projection -> Mono.from(fastViewProjection.store(messageId, projection))
                 .doOnError(throwable -> LOGGER.error("Cannot store the projection to MessageFastViewProjection", throwable))
-                .subscribeOn(Schedulers.boundedElastic())
+                .subscribeOn(Schedulers.elastic())
                 .subscribe());
     }
 
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/event/ComputeMessageFastViewProjectionListener.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/event/ComputeMessageFastViewProjectionListener.java
index 877f05b..87ce35f 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/event/ComputeMessageFastViewProjectionListener.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/event/ComputeMessageFastViewProjectionListener.java
@@ -87,7 +87,7 @@ public class ComputeMessageFastViewProjectionListener implements MailboxListener
             .flatMap(Throwing.function(messageResult -> Mono.fromCallable(
                 () -> Pair.of(messageResult.getMessageId(), computeFastViewPrecomputedProperties(messageResult)))
                     .subscribeOn(Schedulers.parallel())))
-            .publishOn(Schedulers.boundedElastic())
+            .publishOn(Schedulers.elastic())
             .flatMap(message -> messageFastViewProjection.store(message.getKey(), message.getValue()))
             .then()
             .block();
diff --git a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/DelayedMailQueueContract.java b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/DelayedMailQueueContract.java
index 588b9e7..e0b51a0 100644
--- a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/DelayedMailQueueContract.java
+++ b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/DelayedMailQueueContract.java
@@ -47,7 +47,7 @@ public interface DelayedMailQueueContract {
             5L,
             TimeUnit.SECONDS);
 
-        Mono<MailQueue.MailQueueItem> next = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.boundedElastic()).next();
+        Mono<MailQueue.MailQueueItem> next = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.elastic()).next();
         assertThatThrownBy(() -> next.block(Duration.ofSeconds(1)))
             .isInstanceOf(RuntimeException.class);
     }
@@ -72,7 +72,7 @@ public interface DelayedMailQueueContract {
             365 * 10,
             TimeUnit.DAYS);
 
-        Mono<MailQueue.MailQueueItem> next = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.boundedElastic()).next();
+        Mono<MailQueue.MailQueueItem> next = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.elastic()).next();
         assertThatThrownBy(() -> next.block(Duration.ofSeconds(1)))
             .isInstanceOf(RuntimeException.class);
     }
@@ -84,7 +84,7 @@ public interface DelayedMailQueueContract {
             .build(),
             ChronoUnit.FOREVER.getDuration());
 
-        Mono<MailQueue.MailQueueItem> next = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.boundedElastic()).next();
+        Mono<MailQueue.MailQueueItem> next = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.elastic()).next();
         assertThatThrownBy(() -> next.block(Duration.ofSeconds(1)))
             .isInstanceOf(RuntimeException.class);
     }
diff --git a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/DelayedPriorityMailQueueContract.java b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/DelayedPriorityMailQueueContract.java
index def35bd..03d473e 100644
--- a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/DelayedPriorityMailQueueContract.java
+++ b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/DelayedPriorityMailQueueContract.java
@@ -53,7 +53,7 @@ public interface DelayedPriorityMailQueueContract extends DelayedMailQueueContra
 
         Thread.sleep(unit.toMillis(2 * delay));
 
-        Iterator<MailQueue.MailQueueItem> mailQueueItems = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.boundedElastic()).toIterable().iterator();
+        Iterator<MailQueue.MailQueueItem> mailQueueItems = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.elastic()).toIterable().iterator();
         MailQueue.MailQueueItem item1 = mailQueueItems.next();
         item1.done(true);
         MailQueue.MailQueueItem item2 = mailQueueItems.next();
@@ -79,7 +79,7 @@ public interface DelayedPriorityMailQueueContract extends DelayedMailQueueContra
             delay,
             unit);
 
-        Iterator<MailQueue.MailQueueItem> mailQueueItems = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.boundedElastic()).toIterable().iterator();
+        Iterator<MailQueue.MailQueueItem> mailQueueItems = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.elastic()).toIterable().iterator();
         MailQueue.MailQueueItem item1 = mailQueueItems.next();
         item1.done(true);
         MailQueue.MailQueueItem item2 = mailQueueItems.next();
diff --git a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java
index cec9783..1b21737 100644
--- a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java
+++ b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java
@@ -332,7 +332,7 @@ public interface MailQueueContract {
             .name(secondExpectedName)
             .build());
 
-        Iterator<MailQueue.MailQueueItem> items = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.boundedElastic()).toIterable().iterator();
+        Iterator<MailQueue.MailQueueItem> items = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.elastic()).toIterable().iterator();
         MailQueue.MailQueueItem mailQueueItem1 = items.next();
         mailQueueItem1.done(true);
         MailQueue.MailQueueItem mailQueueItem2 = items.next();
@@ -351,7 +351,7 @@ public interface MailQueueContract {
             .build());
 
         Iterator<MailQueue.MailQueueItem> items = Flux.from(getMailQueue().deQueue())
-            .subscribeOn(Schedulers.boundedElastic()).toIterable().iterator();
+            .subscribeOn(Schedulers.elastic()).toIterable().iterator();
         MailQueue.MailQueueItem mailQueueItem1 = items.next();
         MailQueue.MailQueueItem mailQueueItem2 = items.next();
         mailQueueItem1.done(true);
@@ -370,7 +370,7 @@ public interface MailQueueContract {
             .name("name2")
             .build());
 
-        Iterator<MailQueue.MailQueueItem> items = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.boundedElastic()).toIterable().iterator();
+        Iterator<MailQueue.MailQueueItem> items = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.elastic()).toIterable().iterator();
         MailQueue.MailQueueItem mailQueueItem1 = items.next();
         MailQueue.MailQueueItem mailQueueItem2 = items.next();
         mailQueueItem2.done(true);
@@ -385,7 +385,7 @@ public interface MailQueueContract {
             .name("name1")
             .build());
 
-        Iterator<MailQueue.MailQueueItem> items = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.boundedElastic()).toIterable().iterator();
+        Iterator<MailQueue.MailQueueItem> items = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.elastic()).toIterable().iterator();
         MailQueue.MailQueueItem mailQueueItem1 = items.next();
         mailQueueItem1.done(false);
         MailQueue.MailQueueItem mailQueueItem2 = items.next();
@@ -401,7 +401,7 @@ public interface MailQueueContract {
             .build());
 
         LinkedBlockingQueue<MailQueue.MailQueueItem> queue = new LinkedBlockingQueue<>(1);
-        Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.boundedElastic()).subscribe(Throwing.consumer(queue::put));
+        Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.elastic()).subscribe(Throwing.consumer(queue::put));
         queue.take();
 
         assertThat(queue.poll(2, TimeUnit.SECONDS)).isNull();
@@ -409,7 +409,7 @@ public interface MailQueueContract {
 
     @Test
     default void deQueueShouldBlockWhenNoMail() {
-        Mono<MailQueue.MailQueueItem> item = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.boundedElastic()).next();
+        Mono<MailQueue.MailQueueItem> item = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.elastic()).next();
 
         assertThatThrownBy(() -> item.block(Duration.ofSeconds(2)))
             .isInstanceOf(RuntimeException.class);
@@ -440,7 +440,7 @@ public interface MailQueueContract {
         LinkedBlockingQueue<MailQueue.MailQueueItem> itemQueue = new LinkedBlockingQueue<>(1);
         Flux.from(testee
             .deQueue())
-            .subscribeOn(Schedulers.boundedElastic())
+            .subscribeOn(Schedulers.elastic())
             .flatMap(e -> {
                 try {
                     itemQueue.put(e);
@@ -482,7 +482,7 @@ public interface MailQueueContract {
         int operationCount = 15;
         int totalDequeuedMessages = 50;
         LinkedBlockingDeque<MailQueue.MailQueueItem> deque = new LinkedBlockingDeque<>();
-        Flux.from(testee.deQueue()).subscribeOn(Schedulers.boundedElastic()).doOnNext(deque::addFirst).subscribe();
+        Flux.from(testee.deQueue()).subscribeOn(Schedulers.elastic()).doOnNext(deque::addFirst).subscribe();
         ConcurrentTestRunner.builder()
             .operation((threadNumber, step) -> {
                 if (step % 3 == 0) {
diff --git a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/PriorityMailQueueContract.java b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/PriorityMailQueueContract.java
index a971218..c8da0a6 100644
--- a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/PriorityMailQueueContract.java
+++ b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/PriorityMailQueueContract.java
@@ -117,7 +117,7 @@ public interface PriorityMailQueueContract {
             .attribute(mailPriority(1))
             .build());
 
-        Iterator<MailQueue.MailQueueItem> mailQueueItems = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.boundedElastic()).toIterable().iterator();
+        Iterator<MailQueue.MailQueueItem> mailQueueItems = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.elastic()).toIterable().iterator();
         MailQueue.MailQueueItem mailQueueItem1 = mailQueueItems.next();
         mailQueueItem1.done(true);
         MailQueue.MailQueueItem mailQueueItem2 = mailQueueItems.next();
@@ -137,7 +137,7 @@ public interface PriorityMailQueueContract {
             .attribute(mailPriority(8))
             .build());
 
-        Iterator<MailQueue.MailQueueItem> mailQueueItems = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.boundedElastic()).toIterable().iterator();
+        Iterator<MailQueue.MailQueueItem> mailQueueItems = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.elastic()).toIterable().iterator();
         MailQueue.MailQueueItem mailQueueItem1 = mailQueueItems.next();
         mailQueueItem1.done(true);
         MailQueue.MailQueueItem mailQueueItem2 = mailQueueItems.next();
@@ -161,7 +161,7 @@ public interface PriorityMailQueueContract {
             .attribute(mailPriority(6))
             .build());
 
-        Iterator<MailQueue.MailQueueItem> mailQueueItems = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.boundedElastic()).toIterable().iterator();
+        Iterator<MailQueue.MailQueueItem> mailQueueItems = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.elastic()).toIterable().iterator();
         MailQueue.MailQueueItem mailQueueItem1 = mailQueueItems.next();
         mailQueueItem1.done(true);
         MailQueue.MailQueueItem mailQueueItem2 = mailQueueItems.next();
@@ -187,7 +187,7 @@ public interface PriorityMailQueueContract {
             .attribute(mailPriority(6))
             .build());
 
-        Iterator<MailQueue.MailQueueItem> mailQueueItems = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.boundedElastic()).toIterable().iterator();
+        Iterator<MailQueue.MailQueueItem> mailQueueItems = Flux.from(getMailQueue().deQueue()).subscribeOn(Schedulers.elastic()).toIterable().iterator();
         MailQueue.MailQueueItem mailQueueItem1 = mailQueueItems.next();
         mailQueueItem1.done(true);
         MailQueue.MailQueueItem mailQueueItem2 = mailQueueItems.next();
diff --git a/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java b/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java
index 2066a7c..8a6186c 100644
--- a/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java
+++ b/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java
@@ -97,7 +97,7 @@ public class MemoryMailQueueFactory implements MailQueueFactory<ManageableMailQu
             this.name = name;
             this.flux = Mono.fromCallable(mailItems::take)
                 .repeat()
-                .subscribeOn(Schedulers.boundedElastic())
+                .subscribeOn(Schedulers.elastic())
                 .flatMap(item ->
                     Mono.fromRunnable(() -> inProcessingMailItems.add(item)).thenReturn(item))
                 .map(mailQueueItemDecoratorFactory::decorate);
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java
index ae273f7..a08318e 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java
@@ -62,7 +62,7 @@ public class CassandraMailQueueMailDelete {
     void updateBrowseStart(MailQueueName mailQueueName) {
         findNewBrowseStart(mailQueueName)
             .flatMap(newBrowseStart -> updateNewBrowseStart(mailQueueName, newBrowseStart))
-            .subscribeOn(Schedulers.boundedElastic())
+            .subscribeOn(Schedulers.elastic())
             .subscribe();
     }
 
diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
index 44a17f5..b04ea22 100644
--- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
+++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
@@ -86,14 +86,14 @@ public class RabbitMQTerminationSubscriber implements TerminationSubscriber, Sta
         sendQueue = UnicastProcessor.create();
         sendQueueHandle = sender
             .send(sendQueue)
-            .subscribeOn(Schedulers.boundedElastic())
+            .subscribeOn(Schedulers.elastic())
             .subscribe();
 
         listenerReceiver = channelPool.createReceiver();
         listener = DirectProcessor.create();
         listenQueueHandle = listenerReceiver
             .consumeAutoAck(queueName)
-            .subscribeOn(Schedulers.boundedElastic())
+            .subscribeOn(Schedulers.elastic())
             .concatMap(this::toEvent)
             .subscribe(listener::onNext);
     }
diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
index 27196f8..eb2044d 100644
--- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
+++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
@@ -103,7 +103,7 @@ public class RabbitMQWorkQueue implements WorkQueue {
     private void consumeWorkqueue() {
         receiver = new Receiver(new ReceiverOptions().connectionMono(channelPool.getConnectionMono()));
         receiverHandle = receiver.consumeManualAck(QUEUE_NAME, new ConsumeOptions())
-            .subscribeOn(Schedulers.boundedElastic())
+            .subscribeOn(Schedulers.elastic())
             .concatMap(this::executeTask)
             .subscribe();
     }
@@ -150,7 +150,7 @@ public class RabbitMQWorkQueue implements WorkQueue {
         sendCancelRequestsQueue = UnicastProcessor.create();
         sendCancelRequestsQueueHandle = cancelRequestSender
             .send(sendCancelRequestsQueue.map(this::makeCancelRequestMessage))
-            .subscribeOn(Schedulers.boundedElastic())
+            .subscribeOn(Schedulers.elastic())
             .subscribe();
     }
 
@@ -158,7 +158,7 @@ public class RabbitMQWorkQueue implements WorkQueue {
         cancelRequestListener = channelPool.createReceiver();
         cancelRequestListenerHandle = cancelRequestListener
             .consumeAutoAck(queueName)
-            .subscribeOn(Schedulers.boundedElastic())
+            .subscribeOn(Schedulers.elastic())
             .map(this::readCancelRequestMessage)
             .doOnNext(worker::cancelTask)
             .subscribe();
diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java
index 5b8bb43..ef6fd07 100644
--- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java
+++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java
@@ -84,7 +84,7 @@ class RabbitMQWorkQueueTest {
             tasks.add(taskWithId);
             return Mono.fromCallable(() -> taskWithId.getTask().run())
                 .doOnNext(result -> results.add(result))
-                .subscribeOn(Schedulers.boundedElastic());
+                .subscribeOn(Schedulers.elastic());
         }
 
         @Override
diff --git a/server/task/task-memory/src/main/java/org/apache/james/task/MemoryTaskManager.java b/server/task/task-memory/src/main/java/org/apache/james/task/MemoryTaskManager.java
index 752c1e6..afe9a12 100644
--- a/server/task/task-memory/src/main/java/org/apache/james/task/MemoryTaskManager.java
+++ b/server/task/task-memory/src/main/java/org/apache/james/task/MemoryTaskManager.java
@@ -152,7 +152,7 @@ public class MemoryTaskManager implements TaskManager {
     @Override
     public TaskExecutionDetails await(TaskId id, Duration timeout) throws TaskNotFoundException, ReachedTimeoutException {
         try {
-            return Flux.interval(NOW, AWAIT_POLLING_DURATION, Schedulers.boundedElastic())
+            return Flux.interval(NOW, AWAIT_POLLING_DURATION, Schedulers.elastic())
                 .map(ignored -> getExecutionDetails(id))
                 .filter(details -> details.getStatus().isFinished())
                 .blockFirst(timeout);
diff --git a/server/task/task-memory/src/main/java/org/apache/james/task/MemoryWorkQueue.java b/server/task/task-memory/src/main/java/org/apache/james/task/MemoryWorkQueue.java
index a782f9b..4446bd4 100644
--- a/server/task/task-memory/src/main/java/org/apache/james/task/MemoryWorkQueue.java
+++ b/server/task/task-memory/src/main/java/org/apache/james/task/MemoryWorkQueue.java
@@ -35,7 +35,7 @@ public class MemoryWorkQueue implements WorkQueue {
         this.worker = worker;
         this.tasks = UnicastProcessor.create();
         this.subscription = tasks
-            .subscribeOn(Schedulers.boundedElastic())
+            .subscribeOn(Schedulers.elastic())
             .limitRate(1)
             .concatMap(this::dispatchTaskToWorker)
             .subscribe();
diff --git a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
index 25005fd..ed35478 100644
--- a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
+++ b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
@@ -87,7 +87,7 @@ public class SerialTaskManagerWorker implements TaskManagerWorker {
 
     private Flux<TaskExecutionDetails.AdditionalInformation> pollAdditionalInformation(TaskWithId taskWithId) {
         return Mono.fromCallable(() -> taskWithId.getTask().details())
-            .delayElement(pollingInterval, Schedulers.boundedElastic())
+            .delayElement(pollingInterval, Schedulers.elastic())
             .repeat()
             .flatMap(Mono::justOrEmpty)
             .doOnNext(information -> listener.updated(taskWithId.getId(), information));
diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
index 41eeb88..025d9ef 100644
--- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
+++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
@@ -113,7 +113,7 @@ class EventSourcingTaskManager @Inject @VisibleForTesting private[eventsourcing]
         .then(details)
 
       Flux.merge(findEvent, details)
-        .subscribeOn(Schedulers.boundedElastic)
+        .subscribeOn(Schedulers.elastic)
         .blockFirst(timeout)
     } catch {
       case _: IllegalStateException => throw new ReachedTimeoutException
diff --git a/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TerminationSubscriberContract.java b/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TerminationSubscriberContract.java
index 4ddf181..b63c170 100644
--- a/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TerminationSubscriberContract.java
+++ b/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TerminationSubscriberContract.java
@@ -127,7 +127,7 @@ public interface TerminationSubscriberContract {
 
         List<Event> listenedEvents = Mono.delay(DELAY_BEFORE_PUBLISHING.plus(DELAY_BETWEEN_EVENTS.multipliedBy(3).dividedBy(2)))
             .then(Mono.defer(() -> collectEvents(subscriber.listenEvents())))
-            .subscribeOn(Schedulers.boundedElastic())
+            .subscribeOn(Schedulers.elastic())
             .block();
         assertThat(listenedEvents).containsExactly(FAILED_EVENT, CANCELLED_EVENT);
     }
@@ -139,7 +139,7 @@ public interface TerminationSubscriberContract {
 
     default Mono<List<Event>> collectEvents(Publisher<Event> listener) {
         return Flux.from(listener)
-            .subscribeOn(Schedulers.boundedElastic())
+            .subscribeOn(Schedulers.elastic())
             .take(DELAY_BEFORE_PUBLISHING.plus(DELAY_BETWEEN_EVENTS.multipliedBy(7)))
             .collectList();
     }
@@ -147,7 +147,7 @@ public interface TerminationSubscriberContract {
     default void sendEvents(TerminationSubscriber subscriber, Event... events) {
         Mono.delay(DELAY_BEFORE_PUBLISHING)
             .flatMapMany(ignored -> Flux.fromArray(events)
-                .subscribeOn(Schedulers.boundedElastic())
+                .subscribeOn(Schedulers.elastic())
                 .delayElements(DELAY_BETWEEN_EVENTS)
                 .doOnNext(subscriber::handle))
             .subscribe();


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 02/03: JAMES-3003 Mailbox event delivery should run listeners concurrently

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 069f81f8a37d10694fdd77b52a4b3bbea6cbc1bc
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Mon Dec 9 16:16:49 2019 +0100

    JAMES-3003 Mailbox event delivery should run listeners concurrently
---
 .../apache/james/mailbox/events/GroupContract.java | 57 ++++++++++++++++++++++
 .../apache/james/mailbox/events/KeyContract.java   | 44 +++++++++++++++++
 .../apache/james/mailbox/events/InVMEventBus.java  | 20 ++++----
 .../mailbox/events/delivery/EventDelivery.java     | 42 +---------------
 .../mailbox/events/delivery/InVmEventDelivery.java | 11 ++---
 .../events/delivery/InVmEventDeliveryTest.java     | 21 ++------
 .../james/mailbox/events/EventDispatcher.java      |  6 +--
 7 files changed, 122 insertions(+), 79 deletions(-)

diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
index 2f1aca9..ebfd4ea 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/GroupContract.java
@@ -26,6 +26,7 @@ import static org.apache.james.mailbox.events.EventBusTestFixture.EVENT_UNSUPPOR
 import static org.apache.james.mailbox.events.EventBusTestFixture.FIVE_HUNDRED_MS;
 import static org.apache.james.mailbox.events.EventBusTestFixture.GROUP_A;
 import static org.apache.james.mailbox.events.EventBusTestFixture.GROUP_B;
+import static org.apache.james.mailbox.events.EventBusTestFixture.GROUP_C;
 import static org.apache.james.mailbox.events.EventBusTestFixture.NO_KEYS;
 import static org.apache.james.mailbox.events.EventBusTestFixture.ONE_SECOND;
 import static org.apache.james.mailbox.events.EventBusTestFixture.WAIT_CONDITION;
@@ -42,6 +43,8 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.time.Duration;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.IntStream;
@@ -54,6 +57,7 @@ import org.junit.jupiter.api.Test;
 
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.ImmutableSortedMap;
+import reactor.core.scheduler.Schedulers;
 
 public interface GroupContract {
 
@@ -97,6 +101,59 @@ public interface GroupContract {
         }
 
         @Test
+        default void groupNotificationShouldDeliverASingleEventToAllListenersAtTheSameTime() {
+            CountDownLatch countDownLatch = new CountDownLatch(1);
+            try {
+                ConcurrentLinkedQueue<String> threads = new ConcurrentLinkedQueue<>();
+                eventBus().register(new MailboxListener.GroupMailboxListener() {
+                    @Override
+                    public Group getDefaultGroup() {
+                        return new GenericGroup("groupA");
+                    }
+
+                    @Override
+                    public void event(Event event) throws Exception {
+                        threads.add(Thread.currentThread().getName());
+                        countDownLatch.await();
+                    }
+                }, GROUP_A);
+                eventBus().register(new MailboxListener.GroupMailboxListener() {
+                    @Override
+                    public Group getDefaultGroup() {
+                        return new GenericGroup("groupB");
+                    }
+
+                    @Override
+                    public void event(Event event) throws Exception {
+                        threads.add(Thread.currentThread().getName());
+                        countDownLatch.await();
+                    }
+                }, GROUP_B);
+                eventBus().register(new MailboxListener.GroupMailboxListener() {
+                    @Override
+                    public Group getDefaultGroup() {
+                        return new GenericGroup("groupC");
+                    }
+
+                    @Override
+                    public void event(Event event) throws Exception {
+                        threads.add(Thread.currentThread().getName());
+                        countDownLatch.await();
+                    }
+                }, GROUP_C);
+
+                eventBus().dispatch(EVENT, NO_KEYS).subscribeOn(Schedulers.elastic()).subscribe();
+
+
+                WAIT_CONDITION.atMost(org.awaitility.Duration.TEN_SECONDS)
+                    .untilAsserted(() -> assertThat(threads).hasSize(3));
+                assertThat(threads).doesNotHaveDuplicates();
+            } finally {
+                countDownLatch.countDown();
+            }
+        }
+
+        @Test
         default void listenersShouldBeAbleToDispatch() {
             AtomicBoolean successfulRetry = new AtomicBoolean(false);
             MailboxListener listener = event -> {
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java
index f365ef3..2c8e581 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java
@@ -43,6 +43,7 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.time.Duration;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -56,6 +57,7 @@ import org.junit.jupiter.api.Test;
 
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.ImmutableSortedMap;
+import reactor.core.scheduler.Schedulers;
 
 public interface KeyContract extends EventBusContract {
 
@@ -86,6 +88,36 @@ public interface KeyContract extends EventBusContract {
         }
 
         @Test
+        default void notificationShouldDeliverASingleEventToAllListenersAtTheSameTime() {
+            CountDownLatch countDownLatch = new CountDownLatch(1);
+            try {
+                ConcurrentLinkedQueue<String> threads = new ConcurrentLinkedQueue<>();
+                eventBus().register(event -> {
+                    threads.add(Thread.currentThread().getName());
+                    countDownLatch.await();
+                }, KEY_1);
+                eventBus().register(event -> {
+                    threads.add(Thread.currentThread().getName());
+                    countDownLatch.await();
+                }, KEY_1);
+                eventBus().register(event -> {
+                    threads.add(Thread.currentThread().getName());
+                    countDownLatch.await();
+                }, KEY_1);
+
+                eventBus().dispatch(EVENT, KEY_1).subscribeOn(Schedulers.elastic()).subscribe();
+
+
+                WAIT_CONDITION.atMost(org.awaitility.Duration.TEN_SECONDS)
+                    .untilAsserted(() -> assertThat(threads).hasSize(3));
+                assertThat(threads).doesNotHaveDuplicates();
+            } finally {
+                countDownLatch.countDown();
+            }
+        }
+
+
+        @Test
         default void registeredListenersShouldNotReceiveNoopEvents() throws Exception {
             MailboxListener listener = newListener();
 
@@ -293,6 +325,18 @@ public interface KeyContract extends EventBusContract {
                 .event(any());
         }
 
+
+        @Test
+        default void dispatchShouldNotifyAsynchronousListener() throws Exception {
+            MailboxListener listener = newListener();
+            when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
+            eventBus().register(listener, KEY_1);
+
+            eventBus().dispatch(EVENT, KEY_1).block();
+
+            verify(listener, after(FIVE_HUNDRED_MS.toMillis())).event(EVENT);
+        }
+
         @Test
         default void dispatchShouldNotBlockAsynchronousListener() throws Exception {
             MailboxListener listener = newListener();
diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/InVMEventBus.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/InVMEventBus.java
index f8bc8c9..bcd2376 100644
--- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/InVMEventBus.java
+++ b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/InVMEventBus.java
@@ -34,9 +34,9 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Multimaps;
-
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
 public class InVMEventBus implements EventBus {
 
@@ -79,8 +79,6 @@ public class InVMEventBus implements EventBus {
     public Mono<Void> dispatch(Event event, Set<RegistrationKey> keys) {
         if (!event.isNoop()) {
             return Flux.merge(groupDeliveries(event), keyDeliveries(event, keys))
-                .reduceWith(EventDelivery.ExecutionStages::empty, EventDelivery.ExecutionStages::combine)
-                .flatMap(EventDelivery.ExecutionStages::synchronousListenerFuture)
                 .then()
                 .onErrorResume(throwable -> Mono.empty());
         }
@@ -90,9 +88,7 @@ public class InVMEventBus implements EventBus {
     @Override
     public Mono<Void> reDeliver(Group group, Event event) {
         if (!event.isNoop()) {
-            return Mono.fromCallable(() -> groupDelivery(event, retrieveListenerFromGroup(group), group))
-                .flatMap(EventDelivery.ExecutionStages::synchronousListenerFuture)
-                .then();
+            return groupDelivery(event, retrieveListenerFromGroup(group), group);
         }
         return Mono.empty();
     }
@@ -102,17 +98,19 @@ public class InVMEventBus implements EventBus {
             .orElseThrow(() -> new GroupRegistrationNotFound(group));
     }
 
-    private Flux<EventDelivery.ExecutionStages> keyDeliveries(Event event, Set<RegistrationKey> keys) {
+    private Mono<Void> keyDeliveries(Event event, Set<RegistrationKey> keys) {
         return Flux.fromIterable(registeredListenersByKeys(keys))
-            .map(listener -> eventDelivery.deliver(listener, event, EventDelivery.DeliveryOption.none()));
+            .flatMap(listener -> eventDelivery.deliver(listener, event, EventDelivery.DeliveryOption.none()).subscribeOn(Schedulers.elastic()))
+            .then();
     }
 
-    private Flux<EventDelivery.ExecutionStages> groupDeliveries(Event event) {
+    private Mono<Void> groupDeliveries(Event event) {
         return Flux.fromIterable(groups.entrySet())
-            .map(entry -> groupDelivery(event, entry.getValue(), entry.getKey()));
+            .flatMap(entry -> groupDelivery(event, entry.getValue(), entry.getKey()).subscribeOn(Schedulers.elastic()))
+            .then();
     }
 
-    private EventDelivery.ExecutionStages groupDelivery(Event event, MailboxListener mailboxListener, Group group) {
+    private Mono<Void> groupDelivery(Event event, MailboxListener mailboxListener, Group group) {
         return eventDelivery.deliver(
             mailboxListener,
             event,
diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java
index 0f56a60..26c972d 100644
--- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java
+++ b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java
@@ -32,7 +32,6 @@ import org.apache.james.mailbox.events.RetryBackoffConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 
@@ -129,44 +128,5 @@ public interface EventDelivery {
         Mono<Void> handle(Event event);
     }
 
-    class ExecutionStages {
-
-        public static ExecutionStages empty() {
-            return new ExecutionStages(Mono.empty(), Mono.empty());
-        }
-
-        static ExecutionStages synchronous(Mono<Void> synchronousListenerFuture) {
-            return new ExecutionStages(synchronousListenerFuture, Mono.empty());
-        }
-
-        static ExecutionStages asynchronous(Mono<Void> asynchronousListenerFuture) {
-            return new ExecutionStages(Mono.empty(),asynchronousListenerFuture);
-        }
-
-        private final Mono<Void> synchronousListenerFuture;
-        private final Mono<Void> asynchronousListenerFuture;
-
-        private ExecutionStages(Mono<Void> synchronousListenerFuture, Mono<Void> asynchronousListenerFuture) {
-            this.synchronousListenerFuture = synchronousListenerFuture;
-            this.asynchronousListenerFuture = asynchronousListenerFuture;
-        }
-
-        public Mono<Void> synchronousListenerFuture() {
-            return synchronousListenerFuture;
-        }
-
-        public Mono<Void> allListenerFuture() {
-            return synchronousListenerFuture
-                .concatWith(asynchronousListenerFuture)
-                .then();
-        }
-
-        public ExecutionStages combine(ExecutionStages another) {
-            return new ExecutionStages(
-                Flux.concat(this.synchronousListenerFuture, another.synchronousListenerFuture).then(),
-                Flux.concat(this.asynchronousListenerFuture, another.asynchronousListenerFuture).then());
-        }
-    }
-
-    ExecutionStages deliver(MailboxListener listener, Event event, DeliveryOption option);
+    Mono<Void> deliver(MailboxListener listener, Event event, DeliveryOption option);
 }
diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java
index 718d5f1..a36119f 100644
--- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java
+++ b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java
@@ -51,18 +51,17 @@ public class InVmEventDelivery implements EventDelivery {
     }
 
     @Override
-    public ExecutionStages deliver(MailboxListener listener, Event event, DeliveryOption option) {
+    public Mono<Void> deliver(MailboxListener listener, Event event, DeliveryOption option) {
         Mono<Void> executionResult = deliverByOption(listener, event, option);
 
-        return toExecutionStages(listener.getExecutionMode(), executionResult);
+        return waitForResultIfNeeded(listener.getExecutionMode(), executionResult);
     }
 
-    private ExecutionStages toExecutionStages(MailboxListener.ExecutionMode executionMode, Mono<Void> executionResult) {
+    private Mono<Void> waitForResultIfNeeded(MailboxListener.ExecutionMode executionMode, Mono<Void> executionResult) {
         if (executionMode.equals(MailboxListener.ExecutionMode.SYNCHRONOUS)) {
-            return ExecutionStages.synchronous(executionResult);
+            return executionResult;
         }
-
-        return ExecutionStages.asynchronous(executionResult);
+        return executionResult.or(Mono.empty()).onErrorResume(throwable -> Mono.empty());
     }
 
     private Mono<Void> deliverByOption(MailboxListener listener, Event event, DeliveryOption deliveryOption) {
diff --git a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java
index 547e192..dfea972 100644
--- a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java
+++ b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java
@@ -63,7 +63,6 @@ class InVmEventDeliveryTest {
         void deliverShouldDeliverEvent() {
             when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
             inVmEventDelivery.deliver(listener, EVENT, DeliveryOption.none())
-                .allListenerFuture()
                 .block();
 
             assertThat(listener.numberOfEventCalls())
@@ -74,7 +73,6 @@ class InVmEventDeliveryTest {
         void deliverShouldReturnSuccessSynchronousMono() {
             when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.SYNCHRONOUS);
             assertThatCode(() -> inVmEventDelivery.deliver(listener, EVENT, DeliveryOption.none())
-                    .synchronousListenerFuture()
                     .block())
                 .doesNotThrowAnyException();
         }
@@ -86,7 +84,6 @@ class InVmEventDeliveryTest {
                 .when(listener).event(EVENT);
 
             assertThatThrownBy(() -> inVmEventDelivery.deliver(listener, EVENT, DeliveryOption.none())
-                .allListenerFuture()
                 .block())
             .isInstanceOf(RuntimeException.class);
 
@@ -101,7 +98,6 @@ class InVmEventDeliveryTest {
                 .when(listener).event(EVENT);
 
             assertThatThrownBy(() -> inVmEventDelivery.deliver(listener, EVENT, DeliveryOption.none())
-                .synchronousListenerFuture()
                 .block())
             .isInstanceOf(RuntimeException.class);
         }
@@ -114,7 +110,6 @@ class InVmEventDeliveryTest {
         void deliverShouldDeliverEvent() {
             when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
             inVmEventDelivery.deliver(listener, EVENT, DeliveryOption.none())
-                .allListenerFuture()
                 .block();
 
             assertThat(listener.numberOfEventCalls())
@@ -125,24 +120,19 @@ class InVmEventDeliveryTest {
         void deliverShouldReturnSuccessSynchronousMono() {
             when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
             assertThatCode(() -> inVmEventDelivery.deliver(listener, EVENT, DeliveryOption.none())
-                    .synchronousListenerFuture()
                     .block())
                 .doesNotThrowAnyException();
         }
 
         @Test
-        void deliverShouldNotDeliverWhenListenerGetException() {
+        void deliverShouldNotFailWhenListenerGetException() {
             when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
             doThrow(new RuntimeException())
                 .when(listener).event(EVENT);
 
-            assertThatThrownBy(() -> inVmEventDelivery.deliver(listener, EVENT, DeliveryOption.none())
-                .allListenerFuture()
+            assertThatCode(() -> inVmEventDelivery.deliver(listener, EVENT, DeliveryOption.none())
                 .block())
-            .isInstanceOf(RuntimeException.class);
-
-            assertThat(listener.numberOfEventCalls())
-                .isEqualTo(0);
+            .doesNotThrowAnyException();
         }
 
         @Test
@@ -152,7 +142,6 @@ class InVmEventDeliveryTest {
                 .when(listener).event(EVENT);
 
             assertThatCode(() -> inVmEventDelivery.deliver(listener, EVENT, DeliveryOption.none())
-                .synchronousListenerFuture()
                 .block())
             .doesNotThrowAnyException();
         }
@@ -174,7 +163,6 @@ class InVmEventDeliveryTest {
                 DeliveryOption.of(
                     BackoffRetryer.of(RetryBackoffConfiguration.DEFAULT, listener),
                     PermanentFailureHandler.NO_HANDLER))
-                .allListenerFuture()
                 .block();
 
             assertThat(listener.numberOfEventCalls())
@@ -193,7 +181,6 @@ class InVmEventDeliveryTest {
                 DeliveryOption.of(
                     Retryer.NO_RETRYER,
                     PermanentFailureHandler.StoreToDeadLetters.of(GROUP_A, deadLetter)))
-                .allListenerFuture()
                 .block();
 
             assertThat(deadLetter.groupsWithFailedEvents().toStream())
@@ -214,7 +201,6 @@ class InVmEventDeliveryTest {
                 DeliveryOption.of(
                     BackoffRetryer.of(RetryBackoffConfiguration.DEFAULT, listener),
                     PermanentFailureHandler.StoreToDeadLetters.of(GROUP_A, deadLetter)))
-                .allListenerFuture()
                 .block();
 
             SoftAssertions.assertSoftly(softy -> {
@@ -242,7 +228,6 @@ class InVmEventDeliveryTest {
                 DeliveryOption.of(
                     BackoffRetryer.of(RetryBackoffConfiguration.DEFAULT, listener),
                     PermanentFailureHandler.StoreToDeadLetters.of(GROUP_A, deadLetter)))
-                .allListenerFuture()
                 .block();
 
             SoftAssertions.assertSoftly(softy -> {
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
index 47844d7..6c1b586 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
@@ -90,13 +90,13 @@ class EventDispatcher {
         return Flux.fromIterable(keys)
             .flatMap(key -> localListenerRegistry.getLocalMailboxListeners(key)
                 .map(listener -> Tuples.of(key, listener)))
-            .filter(pair -> pair.getT2().getExecutionMode().equals(MailboxListener.ExecutionMode.SYNCHRONOUS))
-            .flatMap(pair -> executeListener(event, pair.getT2(), pair.getT1()))
+            .filter(pair -> pair.getT2().getExecutionMode() == MailboxListener.ExecutionMode.SYNCHRONOUS)
+            .flatMap(pair -> executeListener(event, pair.getT2(), pair.getT1()).subscribeOn(Schedulers.elastic()))
             .then();
     }
 
     private Mono<Void> executeListener(Event event, MailboxListener mailboxListener, RegistrationKey registrationKey) {
-        return Mono.from((sink) -> {
+        return Mono.from(sink -> {
             try {
                 mailboxListenerExecutor.execute(mailboxListener,
                     MDCBuilder.create()


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org