You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2023/06/14 06:59:27 UTC

[james-project] 05/28: [FIX] JPA: avoid leaks in entity manager

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch 3.8.x
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit bb9cebc6c5ee0f026a31ce65decf25633058940a
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Jun 2 17:07:37 2023 +0700

    [FIX] JPA: avoid leaks in entity manager
---
 .../listeners/SetCustomFlagOnBigMessages.java      |  1 +
 .../james/mailbox/backup/DefaultMailboxBackup.java | 16 +++++----
 .../mailbox/backup/ZipMailArchiveRestorer.java     |  1 +
 .../CassandraMailboxSessionMapperFactory.java      |  5 +++
 ...CassandraRecomputeCurrentQuotasServiceTest.java |  2 +-
 .../task/JPARecomputeCurrentQuotasServiceTest.java |  2 +-
 .../MemoryRecomputeCurrentQuotasServiceTest.java   |  4 ++-
 .../mailbox/store/SystemMailboxesProviderImpl.java |  3 +-
 .../store/event/MailboxAnnotationListener.java     |  3 +-
 .../store/quota/DefaultUserQuotaRootResolver.java  |  6 ++--
 .../store/search/ListeningMessageSearchIndex.java  |  6 ++--
 .../store/event/MailboxAnnotationListenerTest.java |  3 ++
 .../mailbox/tools/indexer/ReIndexerImpl.java       |  1 +
 .../mailbox/tools/indexer/ReIndexerPerformer.java  | 18 ++++++----
 .../quota/task/RecomputeCurrentQuotasService.java  |  9 +++--
 .../james/healthcheck/MailReceptionCheck.java      |  3 +-
 .../adapter/mailbox/ACLUsernameChangeTaskStep.java |  7 ++--
 .../mailbox/MailboxUsernameChangeTaskStep.java     | 16 ++++++---
 .../james/transport/mailets/RandomStoring.java     |  3 +-
 .../jmap/draft/send/PostDequeueDecorator.java      |  1 +
 .../james/jmap/mailet/filter/ActionApplier.java    |  1 +
 .../webadmin/vault/routes/RestoreService.java      |  3 +-
 .../webadmin/service/CreateMissingParentsTask.java | 15 ++++----
 .../webadmin/service/ExpireMailboxService.java     |  3 +-
 .../webadmin/service/UserMailboxesService.java     | 41 ++++++++++++++++------
 .../org/apache/james/rspamd/RspamdListener.java    | 10 ++++--
 .../rspamd/task/GetMailboxMessagesService.java     |  6 ++--
 .../james/spamassassin/SpamAssassinListener.java   |  2 ++
 28 files changed, 135 insertions(+), 56 deletions(-)

diff --git a/examples/custom-listeners/src/main/java/org/apache/james/examples/custom/listeners/SetCustomFlagOnBigMessages.java b/examples/custom-listeners/src/main/java/org/apache/james/examples/custom/listeners/SetCustomFlagOnBigMessages.java
index d7223c1d6b..192e6df1a0 100644
--- a/examples/custom-listeners/src/main/java/org/apache/james/examples/custom/listeners/SetCustomFlagOnBigMessages.java
+++ b/examples/custom-listeners/src/main/java/org/apache/james/examples/custom/listeners/SetCustomFlagOnBigMessages.java
@@ -86,6 +86,7 @@ class SetCustomFlagOnBigMessages implements EventListener.GroupEventListener {
                 FlagsUpdateMode.ADD,
                 MessageRange.one(messageUid),
                 session);
+            mailboxManager.endProcessingRequest(session);
         } catch (MailboxException e) {
             LOGGER.error("error happens when adding '{}' flag to the message with uid {} in mailbox {} of user {}",
                 BIG_MESSAGE, messageUid.asLong(), addedEvent.getMailboxId(), addedEvent.getUsername().asString(), e);
diff --git a/mailbox/backup/src/main/java/org/apache/james/mailbox/backup/DefaultMailboxBackup.java b/mailbox/backup/src/main/java/org/apache/james/mailbox/backup/DefaultMailboxBackup.java
index 0c9fd32bcd..bc6697a88a 100644
--- a/mailbox/backup/src/main/java/org/apache/james/mailbox/backup/DefaultMailboxBackup.java
+++ b/mailbox/backup/src/main/java/org/apache/james/mailbox/backup/DefaultMailboxBackup.java
@@ -32,7 +32,6 @@ import org.apache.james.core.Username;
 import org.apache.james.mailbox.MailboxManager;
 import org.apache.james.mailbox.MailboxSession;
 import org.apache.james.mailbox.MessageManager;
-import org.apache.james.mailbox.exception.BadCredentialsException;
 import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.model.FetchGroup;
 import org.apache.james.mailbox.model.Mailbox;
@@ -98,14 +97,19 @@ public class DefaultMailboxBackup implements MailboxBackup {
 
         Stream<MessageResult> messages = allMessagesForUser(accountContents);
         archive(mailboxes, messages, destination);
+        mailboxManager.endProcessingRequest(session);
     }
 
-    private boolean isAccountNonEmpty(Username username) throws BadCredentialsException, MailboxException, IOException {
+    private boolean isAccountNonEmpty(Username username) throws MailboxException {
         MailboxSession session = mailboxManager.createSystemSession(username);
-        return getAccountContentForUser(session)
-            .stream()
-            .findFirst()
-            .isPresent();
+        try {
+            return getAccountContentForUser(session)
+                .stream()
+                .findFirst()
+                .isPresent();
+        } finally {
+            mailboxManager.endProcessingRequest(session);
+        }
     }
 
     @Override
diff --git a/mailbox/backup/src/main/java/org/apache/james/mailbox/backup/ZipMailArchiveRestorer.java b/mailbox/backup/src/main/java/org/apache/james/mailbox/backup/ZipMailArchiveRestorer.java
index 5830c69019..215f7b19bf 100644
--- a/mailbox/backup/src/main/java/org/apache/james/mailbox/backup/ZipMailArchiveRestorer.java
+++ b/mailbox/backup/src/main/java/org/apache/james/mailbox/backup/ZipMailArchiveRestorer.java
@@ -58,6 +58,7 @@ public class ZipMailArchiveRestorer implements MailArchiveRestorer {
     public void restore(Username username, InputStream source) throws MailboxException, IOException {
         MailboxSession session = mailboxManager.createSystemSession(username);
         restoreEntries(source, session);
+        mailboxManager.endProcessingRequest(session);
     }
 
     private void restoreEntries(InputStream source, MailboxSession session) throws IOException {
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
index 84d86b1882..fb3069869b 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
@@ -231,6 +231,11 @@ public class CassandraMailboxSessionMapperFactory extends MailboxSessionMapperFa
         return cassandraSubscriptionMapper;
     }
 
+    @Override
+    public void endProcessingRequest(MailboxSession session) {
+
+    }
+
     public DeleteMessageListener deleteMessageListener() {
         return new DeleteMessageListener(threadDAO, threadLookupDAO, imapUidDAO, messageIdDAO, messageDAO, messageDAOV3, attachmentDAOV2,
             attachmentMessageIdDAO, aclMapper, userMailboxRightsDAO, applicableFlagDAO, firstUnseenDAO, deletedMessageDAO,
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/CassandraRecomputeCurrentQuotasServiceTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/CassandraRecomputeCurrentQuotasServiceTest.java
index 113b741ed4..9b5d3b628a 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/CassandraRecomputeCurrentQuotasServiceTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/CassandraRecomputeCurrentQuotasServiceTest.java
@@ -86,7 +86,7 @@ public class CassandraRecomputeCurrentQuotasServiceTest implements RecomputeCurr
         userQuotaRootResolver = new DefaultUserQuotaRootResolver(sessionProvider, mapperFactory);
         CurrentQuotaCalculator currentQuotaCalculator = new CurrentQuotaCalculator(mapperFactory, userQuotaRootResolver);
 
-        testee = new RecomputeCurrentQuotasService(usersRepository, currentQuotaManager, currentQuotaCalculator, userQuotaRootResolver, sessionProvider);
+        testee = new RecomputeCurrentQuotasService(usersRepository, currentQuotaManager, currentQuotaCalculator, userQuotaRootResolver, sessionProvider, mailboxManager);
     }
 
     @Override
diff --git a/mailbox/jpa/src/test/java/org/apache/james/mailbox/jpa/mail/task/JPARecomputeCurrentQuotasServiceTest.java b/mailbox/jpa/src/test/java/org/apache/james/mailbox/jpa/mail/task/JPARecomputeCurrentQuotasServiceTest.java
index 5d5dfbf696..43925070a6 100644
--- a/mailbox/jpa/src/test/java/org/apache/james/mailbox/jpa/mail/task/JPARecomputeCurrentQuotasServiceTest.java
+++ b/mailbox/jpa/src/test/java/org/apache/james/mailbox/jpa/mail/task/JPARecomputeCurrentQuotasServiceTest.java
@@ -87,7 +87,7 @@ class JPARecomputeCurrentQuotasServiceTest implements RecomputeCurrentQuotasServ
 
         CurrentQuotaCalculator currentQuotaCalculator = new CurrentQuotaCalculator(mapperFactory, userQuotaRootResolver);
 
-        testee = new RecomputeCurrentQuotasService(usersRepository, currentQuotaManager, currentQuotaCalculator, userQuotaRootResolver, sessionProvider);
+        testee = new RecomputeCurrentQuotasService(usersRepository, currentQuotaManager, currentQuotaCalculator, userQuotaRootResolver, sessionProvider, mailboxManager);
     }
 
     @AfterEach
diff --git a/mailbox/memory/src/test/java/org/apache/james/mailbox/inmemory/mail/task/MemoryRecomputeCurrentQuotasServiceTest.java b/mailbox/memory/src/test/java/org/apache/james/mailbox/inmemory/mail/task/MemoryRecomputeCurrentQuotasServiceTest.java
index 328fccb798..2fa06bde62 100644
--- a/mailbox/memory/src/test/java/org/apache/james/mailbox/inmemory/mail/task/MemoryRecomputeCurrentQuotasServiceTest.java
+++ b/mailbox/memory/src/test/java/org/apache/james/mailbox/inmemory/mail/task/MemoryRecomputeCurrentQuotasServiceTest.java
@@ -26,6 +26,7 @@ import org.apache.james.domainlist.lib.DomainListConfiguration;
 import org.apache.james.domainlist.memory.MemoryDomainList;
 import org.apache.james.mailbox.MailboxManager;
 import org.apache.james.mailbox.SessionProvider;
+import org.apache.james.mailbox.inmemory.InMemoryMailboxManager;
 import org.apache.james.mailbox.inmemory.manager.InMemoryIntegrationResources;
 import org.apache.james.mailbox.quota.CurrentQuotaManager;
 import org.apache.james.mailbox.quota.UserQuotaRootResolver;
@@ -52,7 +53,8 @@ class MemoryRecomputeCurrentQuotasServiceTest implements RecomputeCurrentQuotasS
         usersRepository = MemoryUsersRepository.withoutVirtualHosting(memoryDomainList);
 
         resources = InMemoryIntegrationResources.defaultResources();
-        testee = new RecomputeCurrentQuotasService(usersRepository, resources.getCurrentQuotaManager(), resources.getCurrentQuotaCalculator(), resources.getDefaultUserQuotaRootResolver(), resources.getMailboxManager().getSessionProvider());
+        InMemoryMailboxManager mailboxManager = resources.getMailboxManager();
+        testee = new RecomputeCurrentQuotasService(usersRepository, resources.getCurrentQuotaManager(), resources.getCurrentQuotaCalculator(), resources.getDefaultUserQuotaRootResolver(), mailboxManager.getSessionProvider(), mailboxManager);
     }
 
     @Override
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/SystemMailboxesProviderImpl.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/SystemMailboxesProviderImpl.java
index 3b6a2fbd7c..a1518902b1 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/SystemMailboxesProviderImpl.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/SystemMailboxesProviderImpl.java
@@ -57,7 +57,8 @@ public class SystemMailboxesProviderImpl implements SystemMailboxesProvider {
 
         return Mono.from(mailboxManager.getMailboxReactive(mailboxPath, session))
             .flux()
-            .onErrorResume(MailboxNotFoundException.class, e -> searchMessageManagerByMailboxRole(aRole, username));
+            .onErrorResume(MailboxNotFoundException.class, e -> searchMessageManagerByMailboxRole(aRole, username))
+            .doFinally(any -> mailboxManager.endProcessingRequest(session));
     }
 
     private boolean hasRole(Role aRole, MailboxPath mailBoxPath) {
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/MailboxAnnotationListener.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/MailboxAnnotationListener.java
index 6385f9caf2..8d11cb6ec9 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/MailboxAnnotationListener.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/MailboxAnnotationListener.java
@@ -69,7 +69,8 @@ public class MailboxAnnotationListener implements EventListener.ReactiveGroupEve
 
             return Flux.from(annotationMapper.getAllAnnotationsReactive(mailboxId))
                 .flatMap(annotation -> Mono.from(annotationMapper.deleteAnnotationReactive(mailboxId, annotation.getKey())))
-                .then();
+                .then()
+                .doFinally(any -> mailboxSessionMapperFactory.endProcessingRequest(mailboxSession));
         }
         return Mono.empty();
     }
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/DefaultUserQuotaRootResolver.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/DefaultUserQuotaRootResolver.java
index 00506ca094..db58bf198f 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/DefaultUserQuotaRootResolver.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/DefaultUserQuotaRootResolver.java
@@ -121,7 +121,8 @@ public class DefaultUserQuotaRootResolver implements UserQuotaRootResolver {
         return factory.getMailboxMapper(session)
             .findMailboxById(mailboxId)
             .map(Mailbox::generateAssociatedPath)
-            .flatMap(path -> Mono.from(getQuotaRootReactive(path)));
+            .flatMap(path -> Mono.from(getQuotaRootReactive(path)))
+            .doFinally(any -> factory.endProcessingRequest(session));
     }
 
     @Override
@@ -138,7 +139,8 @@ public class DefaultUserQuotaRootResolver implements UserQuotaRootResolver {
             .flatMap(this::getQuotaRootReactive, ReactorUtils.DEFAULT_CONCURRENCY)
             .distinct();
 
-        return Flux.concat(quotaRootListFromDelegatedMailboxes, Flux.just(forUser(username)));
+        return Flux.concat(quotaRootListFromDelegatedMailboxes, Flux.just(forUser(username)))
+            .doFinally(any -> factory.endProcessingRequest(session));
     }
 
     @Override
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/ListeningMessageSearchIndex.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/ListeningMessageSearchIndex.java
index 3eb17a3661..5fb5019d56 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/ListeningMessageSearchIndex.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/ListeningMessageSearchIndex.java
@@ -95,9 +95,9 @@ public abstract class ListeningMessageSearchIndex implements MessageSearchIndex,
      */
     @Override
     public Mono<Void> reactiveEvent(Event event) {
-        return handleMailboxEvent(event,
-            sessionProvider.createSystemSession(event.getUsername()),
-            (MailboxEvent) event);
+        MailboxSession systemSession = sessionProvider.createSystemSession(event.getUsername());
+        return handleMailboxEvent(event, systemSession, (MailboxEvent) event)
+            .then(Mono.fromRunnable(() -> factory.endProcessingRequest(systemSession)));
     }
 
     private Mono<Void> handleMailboxEvent(Event event, MailboxSession session, MailboxEvent mailboxEvent) {
diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/event/MailboxAnnotationListenerTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/event/MailboxAnnotationListenerTest.java
index 1c310a5447..19739fdac7 100644
--- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/event/MailboxAnnotationListenerTest.java
+++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/event/MailboxAnnotationListenerTest.java
@@ -125,6 +125,7 @@ class MailboxAnnotationListenerTest {
         listener.event(deleteEvent);
 
         verify(mailboxSessionMapperFactory).getAnnotationMapper(eq(mailboxSession));
+        verify(mailboxSessionMapperFactory).endProcessingRequest(eq(mailboxSession));
         verify(annotationMapper).getAllAnnotationsReactive(eq(mailboxId));
 
         verifyNoMoreInteractions(mailboxSessionMapperFactory);
@@ -140,6 +141,7 @@ class MailboxAnnotationListenerTest {
         listener.event(deleteEvent);
 
         verify(mailboxSessionMapperFactory).getAnnotationMapper(eq(mailboxSession));
+        verify(mailboxSessionMapperFactory).endProcessingRequest(eq(mailboxSession));
         verify(annotationMapper).getAllAnnotationsReactive(eq(mailboxId));
         verify(annotationMapper).deleteAnnotationReactive(eq(mailboxId), eq(PRIVATE_KEY));
         verify(annotationMapper).deleteAnnotationReactive(eq(mailboxId), eq(SHARED_KEY));
@@ -158,6 +160,7 @@ class MailboxAnnotationListenerTest {
         assertThatThrownBy(() -> listener.event(deleteEvent)).isInstanceOf(RuntimeException.class);
 
         verify(mailboxSessionMapperFactory).getAnnotationMapper(eq(mailboxSession));
+        verify(mailboxSessionMapperFactory).endProcessingRequest(eq(mailboxSession));
         verify(annotationMapper).getAllAnnotationsReactive(eq(mailboxId));
         verify(annotationMapper).deleteAnnotationReactive(eq(mailboxId), eq(PRIVATE_KEY));
 
diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerImpl.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerImpl.java
index f557602cd1..d4adbfa34f 100644
--- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerImpl.java
+++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerImpl.java
@@ -106,5 +106,6 @@ public class ReIndexerImpl implements ReIndexer {
     private void validateIdExists(MailboxId mailboxId) throws MailboxException {
         MailboxSession mailboxSession = mailboxManager.createSystemSession(Username.of("ReIndexingImap"));
         MailboxReactorUtils.block(mapperFactory.getMailboxMapper(mailboxSession).findMailboxById(mailboxId));
+        mailboxManager.endProcessingRequest(mailboxSession);
     }
 }
diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
index eafd50fe2a..9a1d779bc1 100644
--- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
+++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
@@ -155,7 +155,8 @@ public class ReIndexerPerformer {
             .flatMap(mailbox -> reIndexingEntriesForMailbox(mailbox, mailboxSession, runningOptions), MAILBOX_CONCURRENCY);
 
         return reIndexMessages(entriesToIndex, runningOptions, reIndexingContext)
-            .doFinally(any -> LOGGER.info("Full reindex finished"));
+            .doFinally(any -> LOGGER.info("Full reindex finished"))
+            .doFinally(any -> mailboxManager.endProcessingRequest(mailboxSession));
     }
 
     Mono<Result> reIndexSingleMailbox(MailboxId mailboxId, ReIndexingContext reIndexingContext, RunningOptions runningOptions) {
@@ -165,7 +166,8 @@ public class ReIndexerPerformer {
             .findMailboxById(mailboxId)
             .flatMapMany(mailbox -> reIndexingEntriesForMailbox(mailbox, mailboxSession, runningOptions));
 
-        return reIndexMessages(entriesToIndex, runningOptions, reIndexingContext);
+        return reIndexMessages(entriesToIndex, runningOptions, reIndexingContext)
+            .doFinally(any -> mailboxManager.endProcessingRequest(mailboxSession));
     }
 
     Mono<Result> reIndexUserMailboxes(Username username, ReIndexingContext reIndexingContext, RunningOptions runningOptions) {
@@ -180,7 +182,8 @@ public class ReIndexerPerformer {
                 .flatMap(mailbox -> reIndexingEntriesForMailbox(mailbox, mailboxSession, runningOptions), MAILBOX_CONCURRENCY);
 
             return reIndexMessages(entriesToIndex, runningOptions, reIndexingContext)
-                .doFinally(any -> LOGGER.info("User {} reindex finished", username.asString()));
+                .doFinally(any -> LOGGER.info("User {} reindex finished", username.asString()))
+                .doFinally(any -> mailboxManager.endProcessingRequest(mailboxSession));
         } catch (Exception e) {
             LOGGER.error("Error fetching mailboxes for user: {}", username.asString());
             return Mono.just(Result.PARTIAL);
@@ -195,7 +198,8 @@ public class ReIndexerPerformer {
             .map(mailbox -> new ReIndexingEntry(mailbox, mailboxSession, uid))
             .flatMap(this::fullyReadMessage)
             .flatMap(message -> reIndex(message, mailboxSession))
-            .switchIfEmpty(Mono.just(Result.COMPLETED));
+            .switchIfEmpty(Mono.just(Result.COMPLETED))
+            .doFinally(any -> mailboxManager.endProcessingRequest(mailboxSession));
     }
 
     Mono<Result> reIndexMessageId(MessageId messageId) {
@@ -209,7 +213,8 @@ public class ReIndexerPerformer {
             .onErrorResume(e -> {
                 LOGGER.warn("Failed to re-index {}", messageId, e);
                 return Mono.just(Result.PARTIAL);
-            });
+            })
+            .doFinally(any -> mailboxManager.endProcessingRequest(session));
     }
 
     Mono<Result> reIndexErrors(ReIndexingContext reIndexingContext, ReIndexingExecutionFailures previousReIndexingFailures, RunningOptions runningOptions) {
@@ -227,7 +232,8 @@ public class ReIndexerPerformer {
                         return Mono.just(Either.left(new MailboxFailure(mailboxId)));
                     }), MAILBOX_CONCURRENCY));
 
-        return reIndexMessages(entriesToIndex, runningOptions, reIndexingContext);
+        return reIndexMessages(entriesToIndex, runningOptions, reIndexingContext)
+            .doFinally(any -> mailboxManager.endProcessingRequest(mailboxSession));
     }
 
     private Mono<Result> reIndex(MailboxMessage mailboxMessage, MailboxSession session) {
diff --git a/mailbox/tools/quota-recompute/src/main/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasService.java b/mailbox/tools/quota-recompute/src/main/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasService.java
index acd288dfe7..f2d1a9c431 100644
--- a/mailbox/tools/quota-recompute/src/main/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasService.java
+++ b/mailbox/tools/quota-recompute/src/main/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasService.java
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import javax.inject.Inject;
 
 import org.apache.james.core.Username;
+import org.apache.james.mailbox.MailboxManager;
 import org.apache.james.mailbox.MailboxSession;
 import org.apache.james.mailbox.SessionProvider;
 import org.apache.james.mailbox.model.QuotaOperation;
@@ -147,18 +148,21 @@ public class RecomputeCurrentQuotasService {
     private final CurrentQuotaCalculator currentQuotaCalculator;
     private final UserQuotaRootResolver userQuotaRootResolver;
     private final SessionProvider sessionProvider;
+    private final MailboxManager mailboxManager;
 
     @Inject
     public RecomputeCurrentQuotasService(UsersRepository usersRepository,
                                          CurrentQuotaManager storeCurrentQuotaManager,
                                          CurrentQuotaCalculator currentQuotaCalculator,
                                          UserQuotaRootResolver userQuotaRootResolver,
-                                         SessionProvider sessionProvider) {
+                                         SessionProvider sessionProvider,
+                                         MailboxManager mailboxManager) {
         this.usersRepository = usersRepository;
         this.storeCurrentQuotaManager = storeCurrentQuotaManager;
         this.currentQuotaCalculator = currentQuotaCalculator;
         this.userQuotaRootResolver = userQuotaRootResolver;
         this.sessionProvider = sessionProvider;
+        this.mailboxManager = mailboxManager;
     }
 
     public Mono<Task.Result> recomputeCurrentQuotas(Context context, RunningOptions runningOptions) {
@@ -190,6 +194,7 @@ public class RecomputeCurrentQuotasService {
                 LOGGER.error("Error while recomputing current quotas for {}", quotaRoot, e);
                 context.addToFailedMailboxes(quotaRoot);
                 return Mono.just(Task.Result.PARTIAL);
-            });
+            })
+            .doFinally(any -> mailboxManager.endProcessingRequest(session));
     }
 }
diff --git a/server/container/feature-checks/src/main/java/org/apache/james/healthcheck/MailReceptionCheck.java b/server/container/feature-checks/src/main/java/org/apache/james/healthcheck/MailReceptionCheck.java
index c8f31d0844..8d865ee097 100644
--- a/server/container/feature-checks/src/main/java/org/apache/james/healthcheck/MailReceptionCheck.java
+++ b/server/container/feature-checks/src/main/java/org/apache/james/healthcheck/MailReceptionCheck.java
@@ -223,7 +223,8 @@ public class MailReceptionCheck implements HealthCheck {
             .onErrorResume(e -> {
                 LOGGER.error("Mail reception check failed", e);
                 return Mono.just(Result.unhealthy(componentName(), e.getMessage()));
-            });
+            })
+            .doFinally(any -> mailboxManager.endProcessingRequest(session));
     }
 
     private Mono<MessageManager> retrieveInbox(Username username, MailboxSession session) {
diff --git a/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/ACLUsernameChangeTaskStep.java b/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/ACLUsernameChangeTaskStep.java
index 813dc8a043..7027645eb0 100644
--- a/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/ACLUsernameChangeTaskStep.java
+++ b/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/ACLUsernameChangeTaskStep.java
@@ -67,7 +67,9 @@ public class ACLUsernameChangeTaskStep implements UsernameChangeTaskStep {
         return mailboxManager.search(MailboxQuery.builder().matchesAllMailboxNames().build(), oldSession)
             .filter(mailbox -> !mailbox.getPath().getUser().equals(oldUsername))
             .concatMap(mailbox -> migrateACLs(oldUsername, newUsername, mailbox))
-            .then(updateSubscriptionsOnDeletedMailboxes(oldUsername, oldSession, newSession));
+            .then(updateSubscriptionsOnDeletedMailboxes(oldUsername, oldSession, newSession))
+            .doFinally(any -> mailboxManager.endProcessingRequest(oldSession))
+            .doFinally(any -> mailboxManager.endProcessingRequest(newSession));
     }
 
     private Mono<Void> updateSubscriptionsOnDeletedMailboxes(Username oldUsername, MailboxSession oldSession, MailboxSession newSession) {
@@ -90,6 +92,7 @@ public class ACLUsernameChangeTaskStep implements UsernameChangeTaskStep {
         return Mono.fromRunnable(Throwing.runnable(() -> mailboxManager.applyRightsCommand(mailbox.getId(), MailboxACL.command().rights(rights).forUser(newUsername).asAddition(), ownerSession)))
             .then(Mono.fromRunnable(Throwing.runnable(() -> mailboxManager.applyRightsCommand(mailbox.getId(), MailboxACL.command().rights(rights).forUser(oldUsername).asRemoval(), ownerSession))))
             .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER)
-            .then();
+            .then()
+            .doFinally(any -> mailboxManager.endProcessingRequest(ownerSession));
     }
 }
diff --git a/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/MailboxUsernameChangeTaskStep.java b/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/MailboxUsernameChangeTaskStep.java
index cdf02d268a..a28d9544ad 100644
--- a/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/MailboxUsernameChangeTaskStep.java
+++ b/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/MailboxUsernameChangeTaskStep.java
@@ -72,7 +72,9 @@ public class MailboxUsernameChangeTaskStep implements UsernameChangeTaskStep {
         return mailboxManager.search(queryUser, MailboxManager.MailboxSearchFetchType.Minimal, fromSession)
             // Only keep top level, rename takes care of sub mailboxes
             .filter(mailbox -> mailbox.getPath().getHierarchyLevels(fromSession.getPathDelimiter()).size() == 1)
-            .concatMap(mailbox -> migrateMailbox(fromSession, toSession, mailbox));
+            .concatMap(mailbox -> migrateMailbox(fromSession, toSession, mailbox))
+            .doFinally(any -> mailboxManager.endProcessingRequest(fromSession))
+            .doFinally(any -> mailboxManager.endProcessingRequest(toSession));
     }
 
     private Mono<Void> migrateMailbox(MailboxSession fromSession, MailboxSession toSession, org.apache.james.mailbox.model.MailboxMetaData mailbox) {
@@ -115,16 +117,20 @@ public class MailboxUsernameChangeTaskStep implements UsernameChangeTaskStep {
         return Flux.fromIterable(mailbox.getResolvedAcls().getEntries().entrySet())
             .filter(entry -> entry.getKey().getNameType() == MailboxACL.NameType.user && !entry.getKey().isNegative())
             .map(entry -> Username.of(entry.getKey().getName()))
-            .concatMap(Throwing.function(userWithAccess ->
-                Flux.from(subscriptionManager.subscriptionsReactive(mailboxManager.createSystemSession(userWithAccess)))
+            .concatMap(Throwing.function(userWithAccess -> {
+                MailboxSession session = mailboxManager.createSystemSession(userWithAccess);
+                return Flux.from(subscriptionManager.subscriptionsReactive(session))
                     .filter(subscribedMailbox -> subscribedMailbox.equals(mailbox.getPath()))
-                    .concatMap(any -> renameSubscription(mailbox, renamedPath, userWithAccess))))
+                    .concatMap(any -> renameSubscription(mailbox, renamedPath, userWithAccess))
+                    .doFinally(any -> mailboxManager.endProcessingRequest(session));
+            }))
             .then();
     }
 
     private Mono<Void> renameSubscription(MailboxMetaData mailbox, MailboxPath renamedPath, Username user) {
         MailboxSession session = mailboxManager.createSystemSession(user);
         return Mono.from(subscriptionManager.subscribeReactive(renamedPath, session))
-            .then(Mono.from(subscriptionManager.unsubscribeReactive(mailbox.getPath(), session)));
+            .then(Mono.from(subscriptionManager.unsubscribeReactive(mailbox.getPath(), session)))
+            .doFinally(any -> mailboxManager.endProcessingRequest(session));
     }
 }
diff --git a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RandomStoring.java b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RandomStoring.java
index 6b817023d3..c1eb916848 100644
--- a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RandomStoring.java
+++ b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RandomStoring.java
@@ -122,7 +122,8 @@ public class RandomStoring extends GenericMailet {
             MailboxSession session = mailboxManager.createSystemSession(username);
             return mailboxManager
                 .search(MailboxQuery.privateMailboxesBuilder(session).build(), Minimal, session)
-                .map(metaData -> new ReroutingInfos(metaData.getPath().getName(), username));
+                .map(metaData -> new ReroutingInfos(metaData.getPath().getName(), username))
+                .doFinally(any -> mailboxManager.endProcessingRequest(session));
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/PostDequeueDecorator.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/PostDequeueDecorator.java
index 44daed400f..be51824591 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/PostDequeueDecorator.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/PostDequeueDecorator.java
@@ -88,6 +88,7 @@ public class PostDequeueDecorator extends MailQueueItemDecorator {
                     MailboxSession mailboxSession = mailboxManager.createSystemSession(Username.of(username.get()));
                     moveFromOutboxToSentWithSeenFlag(messageId, mailboxSession);
                     getMail().setAttribute(IS_DELIVERED);
+                    mailboxManager.endProcessingRequest(mailboxSession);
                 } catch (MailShouldBeInOutboxException e) {
                     LOG.info("Message does not exist on Outbox anymore, it could have already been sent", e);
                 }
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/mailet/filter/ActionApplier.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/mailet/filter/ActionApplier.java
index e9ce7b9954..6bb65926a9 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/mailet/filter/ActionApplier.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/mailet/filter/ActionApplier.java
@@ -96,6 +96,7 @@ public class ActionApplier {
         try {
             MailboxSession mailboxSession = mailboxManager.createSystemSession(username);
             MessageManager messageManager = mailboxManager.getMailbox(mailboxId, mailboxSession);
+            mailboxManager.endProcessingRequest(mailboxSession);
 
             String mailboxName = messageManager.getMailboxPath().getName();
 
diff --git a/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/main/java/org/apache/james/webadmin/vault/routes/RestoreService.java b/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/main/java/org/apache/james/webadmin/vault/routes/RestoreService.java
index e2fcd4890f..caf0913810 100644
--- a/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/main/java/org/apache/james/webadmin/vault/routes/RestoreService.java
+++ b/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/main/java/org/apache/james/webadmin/vault/routes/RestoreService.java
@@ -79,7 +79,8 @@ class RestoreService {
         MessageManager restoreMessageManager = restoreMailboxManager(session);
 
         return Flux.from(deletedMessageVault.search(usernameToRestore, searchQuery))
-            .flatMap(deletedMessage -> appendToMailbox(restoreMessageManager, deletedMessage, session), DEFAULT_CONCURRENCY);
+            .flatMap(deletedMessage -> appendToMailbox(restoreMessageManager, deletedMessage, session), DEFAULT_CONCURRENCY)
+            .doFinally(any -> mailboxManager.endProcessingRequest(session));
     }
 
     private Mono<RestoreResult> appendToMailbox(MessageManager restoreMailboxManager, DeletedMessage deletedMessage, MailboxSession session) {
diff --git a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/CreateMissingParentsTask.java b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/CreateMissingParentsTask.java
index d8da1c6eba..c92b172c24 100644
--- a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/CreateMissingParentsTask.java
+++ b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/CreateMissingParentsTask.java
@@ -153,6 +153,8 @@ public class CreateMissingParentsTask implements Task {
         } catch (MailboxException e) {
             LOGGER.error("Error fetching mailbox paths", e);
             return Result.PARTIAL;
+        } finally {
+            mailboxManager.endProcessingRequest(session);
         }
     }
 
@@ -160,12 +162,13 @@ public class CreateMissingParentsTask implements Task {
         MailboxSession ownerSession = mailboxManager.createSystemSession(path.getUser());
         return Mono.from(mailboxManager.createMailboxReactive(path, ownerSession))
             .doOnNext(this::recordSuccess)
-        .then(Mono.just(Result.COMPLETED))
-        .onErrorResume(e -> {
-            LOGGER.error("Error creating missing parent mailbox: {}", path.getName(), e);
-            recordFailure(path);
-            return Mono.just(Result.PARTIAL);
-        });
+            .then(Mono.just(Result.COMPLETED))
+            .onErrorResume(e -> {
+                LOGGER.error("Error creating missing parent mailbox: {}", path.getName(), e);
+                recordFailure(path);
+                return Mono.just(Result.PARTIAL);
+            })
+            .doFinally(any -> mailboxManager.endProcessingRequest(ownerSession));
     }
 
     @Override
diff --git a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/ExpireMailboxService.java b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/ExpireMailboxService.java
index fc709d4bba..526cbdae45 100644
--- a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/ExpireMailboxService.java
+++ b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/ExpireMailboxService.java
@@ -217,7 +217,8 @@ public class ExpireMailboxService {
                 context.incrementFailedCount();
                 context.incrementProcessedCount();
                 return Mono.just(Task.Result.PARTIAL);
-            });
+            })
+            .doFinally(any -> mailboxManager.endProcessingRequest(session));
     }
 
     private Mono<List<MessageUid>> searchMessagesReactive(MessageManager mgr, MailboxSession session, SearchQuery expiration) {
diff --git a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/UserMailboxesService.java b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/UserMailboxesService.java
index a7bf0d7291..e57048f5ac 100644
--- a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/UserMailboxesService.java
+++ b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/UserMailboxesService.java
@@ -74,6 +74,7 @@ public class UserMailboxesService {
             MailboxPath mailboxPath = MailboxPath.forUser(username, mailboxName.asString())
                 .assertAcceptable(mailboxSession.getPathDelimiter());
             mailboxManager.createMailbox(mailboxPath, mailboxSession);
+            mailboxManager.endProcessingRequest(mailboxSession);
         } catch (MailboxExistsException e) {
             LOGGER.info("Attempt to create mailbox {} for user {} that already exists", mailboxName, username);
         }
@@ -85,14 +86,19 @@ public class UserMailboxesService {
         listUserMailboxes(mailboxSession)
             .map(MailboxMetaData::getPath)
             .forEach(Throwing.consumer(mailboxPath -> deleteMailbox(mailboxSession, mailboxPath)));
+        mailboxManager.endProcessingRequest(mailboxSession);
     }
 
     public List<MailboxResponse> listMailboxes(Username username) throws UsersRepositoryException {
         usernamePreconditions(username);
         MailboxSession mailboxSession = mailboxManager.createSystemSession(username);
-        return listUserMailboxes(mailboxSession)
-            .map(mailboxMetaData -> new MailboxResponse(mailboxMetaData.getPath().getName(), mailboxMetaData.getId()))
-            .collect(ImmutableList.toImmutableList());
+        try {
+            return listUserMailboxes(mailboxSession)
+                .map(mailboxMetaData -> new MailboxResponse(mailboxMetaData.getPath().getName(), mailboxMetaData.getId()))
+                .collect(ImmutableList.toImmutableList());
+        } finally {
+            mailboxManager.endProcessingRequest(mailboxSession);
+        }
     }
 
     public boolean testMailboxExists(Username username, MailboxName mailboxName) throws MailboxException, UsersRepositoryException {
@@ -100,8 +106,12 @@ public class UserMailboxesService {
         MailboxSession mailboxSession = mailboxManager.createSystemSession(username);
         MailboxPath mailboxPath = MailboxPath.forUser(username, mailboxName.asString())
             .assertAcceptable(mailboxSession.getPathDelimiter());
-        return Mono.from(mailboxManager.mailboxExists(mailboxPath, mailboxSession))
-            .block();
+        try {
+            return Mono.from(mailboxManager.mailboxExists(mailboxPath, mailboxSession))
+                .block();
+        } finally {
+            mailboxManager.endProcessingRequest(mailboxSession);
+        }
     }
 
 
@@ -118,7 +128,8 @@ public class UserMailboxesService {
                 return Mono.just(Result.PARTIAL);
             })
             .reduce(Task::combine)
-            .switchIfEmpty(Mono.just(Result.COMPLETED));
+            .switchIfEmpty(Mono.just(Result.COMPLETED))
+            .doFinally(any -> mailboxManager.endProcessingRequest(mailboxSession));
     }
 
     private Mono<Result> deleteMessage(MessageManager messageManager, MessageUid messageUid, MailboxSession mailboxSession, ClearMailboxContentTask.Context context) {
@@ -138,20 +149,29 @@ public class UserMailboxesService {
             .assertAcceptable(mailboxSession.getPathDelimiter());
         listChildren(mailboxPath, mailboxSession)
             .forEach(Throwing.consumer(path -> deleteMailbox(mailboxSession, path)));
+        mailboxManager.endProcessingRequest(mailboxSession);
     }
 
     public long messageCount(Username username, MailboxName mailboxName) throws UsersRepositoryException, MailboxException {
         usernamePreconditions(username);
         MailboxSession mailboxSession = mailboxManager.createSystemSession(username);
-        return mailboxManager.getMailbox(MailboxPath.forUser(username, mailboxName.asString()), mailboxSession).getMessageCount(mailboxSession);
+        try {
+            return mailboxManager.getMailbox(MailboxPath.forUser(username, mailboxName.asString()), mailboxSession).getMessageCount(mailboxSession);
+        } finally {
+            mailboxManager.endProcessingRequest(mailboxSession);
+        }
     }
 
     public long unseenMessageCount(Username username, MailboxName mailboxName) throws UsersRepositoryException, MailboxException {
         usernamePreconditions(username);
         MailboxSession mailboxSession = mailboxManager.createSystemSession(username);
-        return mailboxManager.getMailbox(MailboxPath.forUser(username, mailboxName.asString()), mailboxSession)
-            .getMailboxCounters(mailboxSession)
-            .getUnseen();
+        try {
+            return mailboxManager.getMailbox(MailboxPath.forUser(username, mailboxName.asString()), mailboxSession)
+                .getMailboxCounters(mailboxSession)
+                .getUnseen();
+        } finally {
+            mailboxManager.endProcessingRequest(mailboxSession);
+        }
     }
 
     private Stream<MailboxPath> listChildren(MailboxPath mailboxPath, MailboxSession mailboxSession) {
@@ -178,6 +198,7 @@ public class UserMailboxesService {
             .assertAcceptable(mailboxSession.getPathDelimiter());
         Preconditions.checkState(Boolean.TRUE.equals(Mono.from(mailboxManager.mailboxExists(mailboxPath, mailboxSession)).block()),
             "Mailbox does not exist. " + mailboxPath.asString());
+        mailboxManager.endProcessingRequest(mailboxSession);
     }
 
     private Stream<MailboxMetaData> listUserMailboxes(MailboxSession mailboxSession) {
diff --git a/third-party/rspamd/src/main/java/org/apache/james/rspamd/RspamdListener.java b/third-party/rspamd/src/main/java/org/apache/james/rspamd/RspamdListener.java
index 6af803e4c2..6d9b1c5613 100644
--- a/third-party/rspamd/src/main/java/org/apache/james/rspamd/RspamdListener.java
+++ b/third-party/rspamd/src/main/java/org/apache/james/rspamd/RspamdListener.java
@@ -101,10 +101,12 @@ public class RspamdListener implements SpamEventListener, EventListener.Reactive
     }
 
     private Mono<Void> handleMessageAdded(MailboxEvents.Added addedEvent) {
+        MailboxSession mailboxSession = mailboxManager.createSystemSession(Username.of(getClass().getCanonicalName()));
         return isAppendedToInbox(addedEvent)
             .filter(FunctionalUtils.identityPredicate())
             .doOnNext(isHam -> LOGGER.debug("Ham event detected, EventId = {}", addedEvent.getEventId().getId()))
-            .flatMap(any -> reportHamWhenAdded(addedEvent, mailboxManager.createSystemSession(Username.of(getClass().getCanonicalName()))));
+            .flatMap(any -> reportHamWhenAdded(addedEvent, mailboxSession))
+            .then(Mono.fromRunnable(() -> mailboxManager.endProcessingRequest(mailboxSession)));
     }
 
     private Mono<Void> handleMessageMoved(MessageMoveEvent messageMoveEvent) {
@@ -123,9 +125,11 @@ public class RspamdListener implements SpamEventListener, EventListener.Reactive
     }
 
     private Flux<ByteBuffer> mailboxMessagePublisher(MessageMoveEvent messageMoveEvent) {
-        return Mono.fromCallable(() -> mapperFactory.getMessageIdMapper(mailboxManager.createSystemSession(Username.of(getClass().getCanonicalName()))))
+        MailboxSession mailboxSession = mailboxManager.createSystemSession(Username.of(getClass().getCanonicalName()));
+        return Mono.fromCallable(() -> mapperFactory.getMessageIdMapper(mailboxSession))
             .flatMapMany(messageIdMapper -> messageIdMapper.findReactive(messageMoveEvent.getMessageIds(), MessageMapper.FetchType.FULL))
-            .flatMap(MailboxMessage::getFullContentReactive);
+            .flatMap(MailboxMessage::getFullContentReactive)
+            .doFinally(any -> mailboxManager.endProcessingRequest(mailboxSession));
     }
 
     private Mono<Void> handleMessageMoved(Flux<ByteBuffer> mailboxMessagesPublisher, MessageMoveEvent messageMoveEvent) {
diff --git a/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/GetMailboxMessagesService.java b/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/GetMailboxMessagesService.java
index 55bbb1b089..447fd17687 100644
--- a/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/GetMailboxMessagesService.java
+++ b/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/GetMailboxMessagesService.java
@@ -102,7 +102,8 @@ public class GetMailboxMessagesService {
             .doOnNext(mailboxMessageMetaData -> context.incrementSpamMessageCount())
             .filter(message -> randomBooleanWithProbability(runningOptions))
             .flatMap(message -> messageIdManager.getMessagesReactive(List.of(message.getMessageId()), FetchGroup.FULL_CONTENT, mailboxSession), ReactorUtils.DEFAULT_CONCURRENCY)
-            .filter(runningOptions.correspondingClassificationFilter());
+            .filter(runningOptions.correspondingClassificationFilter())
+            .doFinally(any -> mailboxManager.endProcessingRequest(mailboxSession));
     }
 
     private Flux<MessageResult> getMailboxMessagesOfAUser(Username username, MailboxMetaData mailboxMetaData, Optional<Date> afterDate,
@@ -119,7 +120,8 @@ public class GetMailboxMessagesService {
             .doOnNext(mailboxMessageMetaData -> context.incrementHamMessageCount())
             .filter(message -> randomBooleanWithProbability(runningOptions))
             .flatMap(message -> messageIdManager.getMessagesReactive(List.of(message.getMessageId()), FetchGroup.FULL_CONTENT, mailboxSession), ReactorUtils.DEFAULT_CONCURRENCY)
-            .filter(runningOptions.correspondingClassificationFilter());
+            .filter(runningOptions.correspondingClassificationFilter())
+            .doFinally(any -> mailboxManager.endProcessingRequest(mailboxSession));
     }
 
     public static boolean randomBooleanWithProbability(RunningOptions runningOptions) {
diff --git a/third-party/spamassassin/src/main/java/org/apache/james/spamassassin/SpamAssassinListener.java b/third-party/spamassassin/src/main/java/org/apache/james/spamassassin/SpamAssassinListener.java
index 19a86262df..cace69aef6 100644
--- a/third-party/spamassassin/src/main/java/org/apache/james/spamassassin/SpamAssassinListener.java
+++ b/third-party/spamassassin/src/main/java/org/apache/james/spamassassin/SpamAssassinListener.java
@@ -97,10 +97,12 @@ public class SpamAssassinListener implements SpamEventListener {
         if (event instanceof MessageMoveEvent) {
             MailboxSession session = mailboxManager.createSystemSession(username);
             handleMessageMove(event, session, (MessageMoveEvent) event);
+            mailboxManager.endProcessingRequest(session);
         }
         if (event instanceof Added) {
             MailboxSession session = mailboxManager.createSystemSession(username);
             handleAdded(event, session, (Added) event);
+            mailboxManager.endProcessingRequest(session);
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org