You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2023/06/14 06:59:27 UTC
[james-project] 05/28: [FIX] JPA: avoid leaks in entity manager
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch 3.8.x
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit bb9cebc6c5ee0f026a31ce65decf25633058940a
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Jun 2 17:07:37 2023 +0700
[FIX] JPA: avoid leaks in entity manager
---
.../listeners/SetCustomFlagOnBigMessages.java | 1 +
.../james/mailbox/backup/DefaultMailboxBackup.java | 16 +++++----
.../mailbox/backup/ZipMailArchiveRestorer.java | 1 +
.../CassandraMailboxSessionMapperFactory.java | 5 +++
...CassandraRecomputeCurrentQuotasServiceTest.java | 2 +-
.../task/JPARecomputeCurrentQuotasServiceTest.java | 2 +-
.../MemoryRecomputeCurrentQuotasServiceTest.java | 4 ++-
.../mailbox/store/SystemMailboxesProviderImpl.java | 3 +-
.../store/event/MailboxAnnotationListener.java | 3 +-
.../store/quota/DefaultUserQuotaRootResolver.java | 6 ++--
.../store/search/ListeningMessageSearchIndex.java | 6 ++--
.../store/event/MailboxAnnotationListenerTest.java | 3 ++
.../mailbox/tools/indexer/ReIndexerImpl.java | 1 +
.../mailbox/tools/indexer/ReIndexerPerformer.java | 18 ++++++----
.../quota/task/RecomputeCurrentQuotasService.java | 9 +++--
.../james/healthcheck/MailReceptionCheck.java | 3 +-
.../adapter/mailbox/ACLUsernameChangeTaskStep.java | 7 ++--
.../mailbox/MailboxUsernameChangeTaskStep.java | 16 ++++++---
.../james/transport/mailets/RandomStoring.java | 3 +-
.../jmap/draft/send/PostDequeueDecorator.java | 1 +
.../james/jmap/mailet/filter/ActionApplier.java | 1 +
.../webadmin/vault/routes/RestoreService.java | 3 +-
.../webadmin/service/CreateMissingParentsTask.java | 15 ++++----
.../webadmin/service/ExpireMailboxService.java | 3 +-
.../webadmin/service/UserMailboxesService.java | 41 ++++++++++++++++------
.../org/apache/james/rspamd/RspamdListener.java | 10 ++++--
.../rspamd/task/GetMailboxMessagesService.java | 6 ++--
.../james/spamassassin/SpamAssassinListener.java | 2 ++
28 files changed, 135 insertions(+), 56 deletions(-)
diff --git a/examples/custom-listeners/src/main/java/org/apache/james/examples/custom/listeners/SetCustomFlagOnBigMessages.java b/examples/custom-listeners/src/main/java/org/apache/james/examples/custom/listeners/SetCustomFlagOnBigMessages.java
index d7223c1d6b..192e6df1a0 100644
--- a/examples/custom-listeners/src/main/java/org/apache/james/examples/custom/listeners/SetCustomFlagOnBigMessages.java
+++ b/examples/custom-listeners/src/main/java/org/apache/james/examples/custom/listeners/SetCustomFlagOnBigMessages.java
@@ -86,6 +86,7 @@ class SetCustomFlagOnBigMessages implements EventListener.GroupEventListener {
FlagsUpdateMode.ADD,
MessageRange.one(messageUid),
session);
+ mailboxManager.endProcessingRequest(session);
} catch (MailboxException e) {
LOGGER.error("error happens when adding '{}' flag to the message with uid {} in mailbox {} of user {}",
BIG_MESSAGE, messageUid.asLong(), addedEvent.getMailboxId(), addedEvent.getUsername().asString(), e);
diff --git a/mailbox/backup/src/main/java/org/apache/james/mailbox/backup/DefaultMailboxBackup.java b/mailbox/backup/src/main/java/org/apache/james/mailbox/backup/DefaultMailboxBackup.java
index 0c9fd32bcd..bc6697a88a 100644
--- a/mailbox/backup/src/main/java/org/apache/james/mailbox/backup/DefaultMailboxBackup.java
+++ b/mailbox/backup/src/main/java/org/apache/james/mailbox/backup/DefaultMailboxBackup.java
@@ -32,7 +32,6 @@ import org.apache.james.core.Username;
import org.apache.james.mailbox.MailboxManager;
import org.apache.james.mailbox.MailboxSession;
import org.apache.james.mailbox.MessageManager;
-import org.apache.james.mailbox.exception.BadCredentialsException;
import org.apache.james.mailbox.exception.MailboxException;
import org.apache.james.mailbox.model.FetchGroup;
import org.apache.james.mailbox.model.Mailbox;
@@ -98,14 +97,19 @@ public class DefaultMailboxBackup implements MailboxBackup {
Stream<MessageResult> messages = allMessagesForUser(accountContents);
archive(mailboxes, messages, destination);
+ mailboxManager.endProcessingRequest(session);
}
- private boolean isAccountNonEmpty(Username username) throws BadCredentialsException, MailboxException, IOException {
+ private boolean isAccountNonEmpty(Username username) throws MailboxException {
MailboxSession session = mailboxManager.createSystemSession(username);
- return getAccountContentForUser(session)
- .stream()
- .findFirst()
- .isPresent();
+ try {
+ return getAccountContentForUser(session)
+ .stream()
+ .findFirst()
+ .isPresent();
+ } finally {
+ mailboxManager.endProcessingRequest(session);
+ }
}
@Override
diff --git a/mailbox/backup/src/main/java/org/apache/james/mailbox/backup/ZipMailArchiveRestorer.java b/mailbox/backup/src/main/java/org/apache/james/mailbox/backup/ZipMailArchiveRestorer.java
index 5830c69019..215f7b19bf 100644
--- a/mailbox/backup/src/main/java/org/apache/james/mailbox/backup/ZipMailArchiveRestorer.java
+++ b/mailbox/backup/src/main/java/org/apache/james/mailbox/backup/ZipMailArchiveRestorer.java
@@ -58,6 +58,7 @@ public class ZipMailArchiveRestorer implements MailArchiveRestorer {
public void restore(Username username, InputStream source) throws MailboxException, IOException {
MailboxSession session = mailboxManager.createSystemSession(username);
restoreEntries(source, session);
+ mailboxManager.endProcessingRequest(session);
}
private void restoreEntries(InputStream source, MailboxSession session) throws IOException {
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
index 84d86b1882..fb3069869b 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
@@ -231,6 +231,11 @@ public class CassandraMailboxSessionMapperFactory extends MailboxSessionMapperFa
return cassandraSubscriptionMapper;
}
+ @Override
+ public void endProcessingRequest(MailboxSession session) {
+
+ }
+
public DeleteMessageListener deleteMessageListener() {
return new DeleteMessageListener(threadDAO, threadLookupDAO, imapUidDAO, messageIdDAO, messageDAO, messageDAOV3, attachmentDAOV2,
attachmentMessageIdDAO, aclMapper, userMailboxRightsDAO, applicableFlagDAO, firstUnseenDAO, deletedMessageDAO,
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/CassandraRecomputeCurrentQuotasServiceTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/CassandraRecomputeCurrentQuotasServiceTest.java
index 113b741ed4..9b5d3b628a 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/CassandraRecomputeCurrentQuotasServiceTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/CassandraRecomputeCurrentQuotasServiceTest.java
@@ -86,7 +86,7 @@ public class CassandraRecomputeCurrentQuotasServiceTest implements RecomputeCurr
userQuotaRootResolver = new DefaultUserQuotaRootResolver(sessionProvider, mapperFactory);
CurrentQuotaCalculator currentQuotaCalculator = new CurrentQuotaCalculator(mapperFactory, userQuotaRootResolver);
- testee = new RecomputeCurrentQuotasService(usersRepository, currentQuotaManager, currentQuotaCalculator, userQuotaRootResolver, sessionProvider);
+ testee = new RecomputeCurrentQuotasService(usersRepository, currentQuotaManager, currentQuotaCalculator, userQuotaRootResolver, sessionProvider, mailboxManager);
}
@Override
diff --git a/mailbox/jpa/src/test/java/org/apache/james/mailbox/jpa/mail/task/JPARecomputeCurrentQuotasServiceTest.java b/mailbox/jpa/src/test/java/org/apache/james/mailbox/jpa/mail/task/JPARecomputeCurrentQuotasServiceTest.java
index 5d5dfbf696..43925070a6 100644
--- a/mailbox/jpa/src/test/java/org/apache/james/mailbox/jpa/mail/task/JPARecomputeCurrentQuotasServiceTest.java
+++ b/mailbox/jpa/src/test/java/org/apache/james/mailbox/jpa/mail/task/JPARecomputeCurrentQuotasServiceTest.java
@@ -87,7 +87,7 @@ class JPARecomputeCurrentQuotasServiceTest implements RecomputeCurrentQuotasServ
CurrentQuotaCalculator currentQuotaCalculator = new CurrentQuotaCalculator(mapperFactory, userQuotaRootResolver);
- testee = new RecomputeCurrentQuotasService(usersRepository, currentQuotaManager, currentQuotaCalculator, userQuotaRootResolver, sessionProvider);
+ testee = new RecomputeCurrentQuotasService(usersRepository, currentQuotaManager, currentQuotaCalculator, userQuotaRootResolver, sessionProvider, mailboxManager);
}
@AfterEach
diff --git a/mailbox/memory/src/test/java/org/apache/james/mailbox/inmemory/mail/task/MemoryRecomputeCurrentQuotasServiceTest.java b/mailbox/memory/src/test/java/org/apache/james/mailbox/inmemory/mail/task/MemoryRecomputeCurrentQuotasServiceTest.java
index 328fccb798..2fa06bde62 100644
--- a/mailbox/memory/src/test/java/org/apache/james/mailbox/inmemory/mail/task/MemoryRecomputeCurrentQuotasServiceTest.java
+++ b/mailbox/memory/src/test/java/org/apache/james/mailbox/inmemory/mail/task/MemoryRecomputeCurrentQuotasServiceTest.java
@@ -26,6 +26,7 @@ import org.apache.james.domainlist.lib.DomainListConfiguration;
import org.apache.james.domainlist.memory.MemoryDomainList;
import org.apache.james.mailbox.MailboxManager;
import org.apache.james.mailbox.SessionProvider;
+import org.apache.james.mailbox.inmemory.InMemoryMailboxManager;
import org.apache.james.mailbox.inmemory.manager.InMemoryIntegrationResources;
import org.apache.james.mailbox.quota.CurrentQuotaManager;
import org.apache.james.mailbox.quota.UserQuotaRootResolver;
@@ -52,7 +53,8 @@ class MemoryRecomputeCurrentQuotasServiceTest implements RecomputeCurrentQuotasS
usersRepository = MemoryUsersRepository.withoutVirtualHosting(memoryDomainList);
resources = InMemoryIntegrationResources.defaultResources();
- testee = new RecomputeCurrentQuotasService(usersRepository, resources.getCurrentQuotaManager(), resources.getCurrentQuotaCalculator(), resources.getDefaultUserQuotaRootResolver(), resources.getMailboxManager().getSessionProvider());
+ InMemoryMailboxManager mailboxManager = resources.getMailboxManager();
+ testee = new RecomputeCurrentQuotasService(usersRepository, resources.getCurrentQuotaManager(), resources.getCurrentQuotaCalculator(), resources.getDefaultUserQuotaRootResolver(), mailboxManager.getSessionProvider(), mailboxManager);
}
@Override
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/SystemMailboxesProviderImpl.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/SystemMailboxesProviderImpl.java
index 3b6a2fbd7c..a1518902b1 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/SystemMailboxesProviderImpl.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/SystemMailboxesProviderImpl.java
@@ -57,7 +57,8 @@ public class SystemMailboxesProviderImpl implements SystemMailboxesProvider {
return Mono.from(mailboxManager.getMailboxReactive(mailboxPath, session))
.flux()
- .onErrorResume(MailboxNotFoundException.class, e -> searchMessageManagerByMailboxRole(aRole, username));
+ .onErrorResume(MailboxNotFoundException.class, e -> searchMessageManagerByMailboxRole(aRole, username))
+ .doFinally(any -> mailboxManager.endProcessingRequest(session));
}
private boolean hasRole(Role aRole, MailboxPath mailBoxPath) {
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/MailboxAnnotationListener.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/MailboxAnnotationListener.java
index 6385f9caf2..8d11cb6ec9 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/MailboxAnnotationListener.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/MailboxAnnotationListener.java
@@ -69,7 +69,8 @@ public class MailboxAnnotationListener implements EventListener.ReactiveGroupEve
return Flux.from(annotationMapper.getAllAnnotationsReactive(mailboxId))
.flatMap(annotation -> Mono.from(annotationMapper.deleteAnnotationReactive(mailboxId, annotation.getKey())))
- .then();
+ .then()
+ .doFinally(any -> mailboxSessionMapperFactory.endProcessingRequest(mailboxSession));
}
return Mono.empty();
}
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/DefaultUserQuotaRootResolver.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/DefaultUserQuotaRootResolver.java
index 00506ca094..db58bf198f 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/DefaultUserQuotaRootResolver.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/DefaultUserQuotaRootResolver.java
@@ -121,7 +121,8 @@ public class DefaultUserQuotaRootResolver implements UserQuotaRootResolver {
return factory.getMailboxMapper(session)
.findMailboxById(mailboxId)
.map(Mailbox::generateAssociatedPath)
- .flatMap(path -> Mono.from(getQuotaRootReactive(path)));
+ .flatMap(path -> Mono.from(getQuotaRootReactive(path)))
+ .doFinally(any -> factory.endProcessingRequest(session));
}
@Override
@@ -138,7 +139,8 @@ public class DefaultUserQuotaRootResolver implements UserQuotaRootResolver {
.flatMap(this::getQuotaRootReactive, ReactorUtils.DEFAULT_CONCURRENCY)
.distinct();
- return Flux.concat(quotaRootListFromDelegatedMailboxes, Flux.just(forUser(username)));
+ return Flux.concat(quotaRootListFromDelegatedMailboxes, Flux.just(forUser(username)))
+ .doFinally(any -> factory.endProcessingRequest(session));
}
@Override
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/ListeningMessageSearchIndex.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/ListeningMessageSearchIndex.java
index 3eb17a3661..5fb5019d56 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/ListeningMessageSearchIndex.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/ListeningMessageSearchIndex.java
@@ -95,9 +95,9 @@ public abstract class ListeningMessageSearchIndex implements MessageSearchIndex,
*/
@Override
public Mono<Void> reactiveEvent(Event event) {
- return handleMailboxEvent(event,
- sessionProvider.createSystemSession(event.getUsername()),
- (MailboxEvent) event);
+ MailboxSession systemSession = sessionProvider.createSystemSession(event.getUsername());
+ return handleMailboxEvent(event, systemSession, (MailboxEvent) event)
+ .then(Mono.fromRunnable(() -> factory.endProcessingRequest(systemSession)));
}
private Mono<Void> handleMailboxEvent(Event event, MailboxSession session, MailboxEvent mailboxEvent) {
diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/event/MailboxAnnotationListenerTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/event/MailboxAnnotationListenerTest.java
index 1c310a5447..19739fdac7 100644
--- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/event/MailboxAnnotationListenerTest.java
+++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/event/MailboxAnnotationListenerTest.java
@@ -125,6 +125,7 @@ class MailboxAnnotationListenerTest {
listener.event(deleteEvent);
verify(mailboxSessionMapperFactory).getAnnotationMapper(eq(mailboxSession));
+ verify(mailboxSessionMapperFactory).endProcessingRequest(eq(mailboxSession));
verify(annotationMapper).getAllAnnotationsReactive(eq(mailboxId));
verifyNoMoreInteractions(mailboxSessionMapperFactory);
@@ -140,6 +141,7 @@ class MailboxAnnotationListenerTest {
listener.event(deleteEvent);
verify(mailboxSessionMapperFactory).getAnnotationMapper(eq(mailboxSession));
+ verify(mailboxSessionMapperFactory).endProcessingRequest(eq(mailboxSession));
verify(annotationMapper).getAllAnnotationsReactive(eq(mailboxId));
verify(annotationMapper).deleteAnnotationReactive(eq(mailboxId), eq(PRIVATE_KEY));
verify(annotationMapper).deleteAnnotationReactive(eq(mailboxId), eq(SHARED_KEY));
@@ -158,6 +160,7 @@ class MailboxAnnotationListenerTest {
assertThatThrownBy(() -> listener.event(deleteEvent)).isInstanceOf(RuntimeException.class);
verify(mailboxSessionMapperFactory).getAnnotationMapper(eq(mailboxSession));
+ verify(mailboxSessionMapperFactory).endProcessingRequest(eq(mailboxSession));
verify(annotationMapper).getAllAnnotationsReactive(eq(mailboxId));
verify(annotationMapper).deleteAnnotationReactive(eq(mailboxId), eq(PRIVATE_KEY));
diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerImpl.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerImpl.java
index f557602cd1..d4adbfa34f 100644
--- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerImpl.java
+++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerImpl.java
@@ -106,5 +106,6 @@ public class ReIndexerImpl implements ReIndexer {
private void validateIdExists(MailboxId mailboxId) throws MailboxException {
MailboxSession mailboxSession = mailboxManager.createSystemSession(Username.of("ReIndexingImap"));
MailboxReactorUtils.block(mapperFactory.getMailboxMapper(mailboxSession).findMailboxById(mailboxId));
+ mailboxManager.endProcessingRequest(mailboxSession);
}
}
diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
index eafd50fe2a..9a1d779bc1 100644
--- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
+++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
@@ -155,7 +155,8 @@ public class ReIndexerPerformer {
.flatMap(mailbox -> reIndexingEntriesForMailbox(mailbox, mailboxSession, runningOptions), MAILBOX_CONCURRENCY);
return reIndexMessages(entriesToIndex, runningOptions, reIndexingContext)
- .doFinally(any -> LOGGER.info("Full reindex finished"));
+ .doFinally(any -> LOGGER.info("Full reindex finished"))
+ .doFinally(any -> mailboxManager.endProcessingRequest(mailboxSession));
}
Mono<Result> reIndexSingleMailbox(MailboxId mailboxId, ReIndexingContext reIndexingContext, RunningOptions runningOptions) {
@@ -165,7 +166,8 @@ public class ReIndexerPerformer {
.findMailboxById(mailboxId)
.flatMapMany(mailbox -> reIndexingEntriesForMailbox(mailbox, mailboxSession, runningOptions));
- return reIndexMessages(entriesToIndex, runningOptions, reIndexingContext);
+ return reIndexMessages(entriesToIndex, runningOptions, reIndexingContext)
+ .doFinally(any -> mailboxManager.endProcessingRequest(mailboxSession));
}
Mono<Result> reIndexUserMailboxes(Username username, ReIndexingContext reIndexingContext, RunningOptions runningOptions) {
@@ -180,7 +182,8 @@ public class ReIndexerPerformer {
.flatMap(mailbox -> reIndexingEntriesForMailbox(mailbox, mailboxSession, runningOptions), MAILBOX_CONCURRENCY);
return reIndexMessages(entriesToIndex, runningOptions, reIndexingContext)
- .doFinally(any -> LOGGER.info("User {} reindex finished", username.asString()));
+ .doFinally(any -> LOGGER.info("User {} reindex finished", username.asString()))
+ .doFinally(any -> mailboxManager.endProcessingRequest(mailboxSession));
} catch (Exception e) {
LOGGER.error("Error fetching mailboxes for user: {}", username.asString());
return Mono.just(Result.PARTIAL);
@@ -195,7 +198,8 @@ public class ReIndexerPerformer {
.map(mailbox -> new ReIndexingEntry(mailbox, mailboxSession, uid))
.flatMap(this::fullyReadMessage)
.flatMap(message -> reIndex(message, mailboxSession))
- .switchIfEmpty(Mono.just(Result.COMPLETED));
+ .switchIfEmpty(Mono.just(Result.COMPLETED))
+ .doFinally(any -> mailboxManager.endProcessingRequest(mailboxSession));
}
Mono<Result> reIndexMessageId(MessageId messageId) {
@@ -209,7 +213,8 @@ public class ReIndexerPerformer {
.onErrorResume(e -> {
LOGGER.warn("Failed to re-index {}", messageId, e);
return Mono.just(Result.PARTIAL);
- });
+ })
+ .doFinally(any -> mailboxManager.endProcessingRequest(session));
}
Mono<Result> reIndexErrors(ReIndexingContext reIndexingContext, ReIndexingExecutionFailures previousReIndexingFailures, RunningOptions runningOptions) {
@@ -227,7 +232,8 @@ public class ReIndexerPerformer {
return Mono.just(Either.left(new MailboxFailure(mailboxId)));
}), MAILBOX_CONCURRENCY));
- return reIndexMessages(entriesToIndex, runningOptions, reIndexingContext);
+ return reIndexMessages(entriesToIndex, runningOptions, reIndexingContext)
+ .doFinally(any -> mailboxManager.endProcessingRequest(mailboxSession));
}
private Mono<Result> reIndex(MailboxMessage mailboxMessage, MailboxSession session) {
diff --git a/mailbox/tools/quota-recompute/src/main/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasService.java b/mailbox/tools/quota-recompute/src/main/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasService.java
index acd288dfe7..f2d1a9c431 100644
--- a/mailbox/tools/quota-recompute/src/main/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasService.java
+++ b/mailbox/tools/quota-recompute/src/main/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasService.java
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicLong;
import javax.inject.Inject;
import org.apache.james.core.Username;
+import org.apache.james.mailbox.MailboxManager;
import org.apache.james.mailbox.MailboxSession;
import org.apache.james.mailbox.SessionProvider;
import org.apache.james.mailbox.model.QuotaOperation;
@@ -147,18 +148,21 @@ public class RecomputeCurrentQuotasService {
private final CurrentQuotaCalculator currentQuotaCalculator;
private final UserQuotaRootResolver userQuotaRootResolver;
private final SessionProvider sessionProvider;
+ private final MailboxManager mailboxManager;
@Inject
public RecomputeCurrentQuotasService(UsersRepository usersRepository,
CurrentQuotaManager storeCurrentQuotaManager,
CurrentQuotaCalculator currentQuotaCalculator,
UserQuotaRootResolver userQuotaRootResolver,
- SessionProvider sessionProvider) {
+ SessionProvider sessionProvider,
+ MailboxManager mailboxManager) {
this.usersRepository = usersRepository;
this.storeCurrentQuotaManager = storeCurrentQuotaManager;
this.currentQuotaCalculator = currentQuotaCalculator;
this.userQuotaRootResolver = userQuotaRootResolver;
this.sessionProvider = sessionProvider;
+ this.mailboxManager = mailboxManager;
}
public Mono<Task.Result> recomputeCurrentQuotas(Context context, RunningOptions runningOptions) {
@@ -190,6 +194,7 @@ public class RecomputeCurrentQuotasService {
LOGGER.error("Error while recomputing current quotas for {}", quotaRoot, e);
context.addToFailedMailboxes(quotaRoot);
return Mono.just(Task.Result.PARTIAL);
- });
+ })
+ .doFinally(any -> mailboxManager.endProcessingRequest(session));
}
}
diff --git a/server/container/feature-checks/src/main/java/org/apache/james/healthcheck/MailReceptionCheck.java b/server/container/feature-checks/src/main/java/org/apache/james/healthcheck/MailReceptionCheck.java
index c8f31d0844..8d865ee097 100644
--- a/server/container/feature-checks/src/main/java/org/apache/james/healthcheck/MailReceptionCheck.java
+++ b/server/container/feature-checks/src/main/java/org/apache/james/healthcheck/MailReceptionCheck.java
@@ -223,7 +223,8 @@ public class MailReceptionCheck implements HealthCheck {
.onErrorResume(e -> {
LOGGER.error("Mail reception check failed", e);
return Mono.just(Result.unhealthy(componentName(), e.getMessage()));
- });
+ })
+ .doFinally(any -> mailboxManager.endProcessingRequest(session));
}
private Mono<MessageManager> retrieveInbox(Username username, MailboxSession session) {
diff --git a/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/ACLUsernameChangeTaskStep.java b/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/ACLUsernameChangeTaskStep.java
index 813dc8a043..7027645eb0 100644
--- a/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/ACLUsernameChangeTaskStep.java
+++ b/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/ACLUsernameChangeTaskStep.java
@@ -67,7 +67,9 @@ public class ACLUsernameChangeTaskStep implements UsernameChangeTaskStep {
return mailboxManager.search(MailboxQuery.builder().matchesAllMailboxNames().build(), oldSession)
.filter(mailbox -> !mailbox.getPath().getUser().equals(oldUsername))
.concatMap(mailbox -> migrateACLs(oldUsername, newUsername, mailbox))
- .then(updateSubscriptionsOnDeletedMailboxes(oldUsername, oldSession, newSession));
+ .then(updateSubscriptionsOnDeletedMailboxes(oldUsername, oldSession, newSession))
+ .doFinally(any -> mailboxManager.endProcessingRequest(oldSession))
+ .doFinally(any -> mailboxManager.endProcessingRequest(newSession));
}
private Mono<Void> updateSubscriptionsOnDeletedMailboxes(Username oldUsername, MailboxSession oldSession, MailboxSession newSession) {
@@ -90,6 +92,7 @@ public class ACLUsernameChangeTaskStep implements UsernameChangeTaskStep {
return Mono.fromRunnable(Throwing.runnable(() -> mailboxManager.applyRightsCommand(mailbox.getId(), MailboxACL.command().rights(rights).forUser(newUsername).asAddition(), ownerSession)))
.then(Mono.fromRunnable(Throwing.runnable(() -> mailboxManager.applyRightsCommand(mailbox.getId(), MailboxACL.command().rights(rights).forUser(oldUsername).asRemoval(), ownerSession))))
.subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER)
- .then();
+ .then()
+ .doFinally(any -> mailboxManager.endProcessingRequest(ownerSession));
}
}
diff --git a/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/MailboxUsernameChangeTaskStep.java b/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/MailboxUsernameChangeTaskStep.java
index cdf02d268a..a28d9544ad 100644
--- a/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/MailboxUsernameChangeTaskStep.java
+++ b/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/MailboxUsernameChangeTaskStep.java
@@ -72,7 +72,9 @@ public class MailboxUsernameChangeTaskStep implements UsernameChangeTaskStep {
return mailboxManager.search(queryUser, MailboxManager.MailboxSearchFetchType.Minimal, fromSession)
// Only keep top level, rename takes care of sub mailboxes
.filter(mailbox -> mailbox.getPath().getHierarchyLevels(fromSession.getPathDelimiter()).size() == 1)
- .concatMap(mailbox -> migrateMailbox(fromSession, toSession, mailbox));
+ .concatMap(mailbox -> migrateMailbox(fromSession, toSession, mailbox))
+ .doFinally(any -> mailboxManager.endProcessingRequest(fromSession))
+ .doFinally(any -> mailboxManager.endProcessingRequest(toSession));
}
private Mono<Void> migrateMailbox(MailboxSession fromSession, MailboxSession toSession, org.apache.james.mailbox.model.MailboxMetaData mailbox) {
@@ -115,16 +117,20 @@ public class MailboxUsernameChangeTaskStep implements UsernameChangeTaskStep {
return Flux.fromIterable(mailbox.getResolvedAcls().getEntries().entrySet())
.filter(entry -> entry.getKey().getNameType() == MailboxACL.NameType.user && !entry.getKey().isNegative())
.map(entry -> Username.of(entry.getKey().getName()))
- .concatMap(Throwing.function(userWithAccess ->
- Flux.from(subscriptionManager.subscriptionsReactive(mailboxManager.createSystemSession(userWithAccess)))
+ .concatMap(Throwing.function(userWithAccess -> {
+ MailboxSession session = mailboxManager.createSystemSession(userWithAccess);
+ return Flux.from(subscriptionManager.subscriptionsReactive(session))
.filter(subscribedMailbox -> subscribedMailbox.equals(mailbox.getPath()))
- .concatMap(any -> renameSubscription(mailbox, renamedPath, userWithAccess))))
+ .concatMap(any -> renameSubscription(mailbox, renamedPath, userWithAccess))
+ .doFinally(any -> mailboxManager.endProcessingRequest(session));
+ }))
.then();
}
private Mono<Void> renameSubscription(MailboxMetaData mailbox, MailboxPath renamedPath, Username user) {
MailboxSession session = mailboxManager.createSystemSession(user);
return Mono.from(subscriptionManager.subscribeReactive(renamedPath, session))
- .then(Mono.from(subscriptionManager.unsubscribeReactive(mailbox.getPath(), session)));
+ .then(Mono.from(subscriptionManager.unsubscribeReactive(mailbox.getPath(), session)))
+ .doFinally(any -> mailboxManager.endProcessingRequest(session));
}
}
diff --git a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RandomStoring.java b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RandomStoring.java
index 6b817023d3..c1eb916848 100644
--- a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RandomStoring.java
+++ b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RandomStoring.java
@@ -122,7 +122,8 @@ public class RandomStoring extends GenericMailet {
MailboxSession session = mailboxManager.createSystemSession(username);
return mailboxManager
.search(MailboxQuery.privateMailboxesBuilder(session).build(), Minimal, session)
- .map(metaData -> new ReroutingInfos(metaData.getPath().getName(), username));
+ .map(metaData -> new ReroutingInfos(metaData.getPath().getName(), username))
+ .doFinally(any -> mailboxManager.endProcessingRequest(session));
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/PostDequeueDecorator.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/PostDequeueDecorator.java
index 44daed400f..be51824591 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/PostDequeueDecorator.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/PostDequeueDecorator.java
@@ -88,6 +88,7 @@ public class PostDequeueDecorator extends MailQueueItemDecorator {
MailboxSession mailboxSession = mailboxManager.createSystemSession(Username.of(username.get()));
moveFromOutboxToSentWithSeenFlag(messageId, mailboxSession);
getMail().setAttribute(IS_DELIVERED);
+ mailboxManager.endProcessingRequest(mailboxSession);
} catch (MailShouldBeInOutboxException e) {
LOG.info("Message does not exist on Outbox anymore, it could have already been sent", e);
}
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/mailet/filter/ActionApplier.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/mailet/filter/ActionApplier.java
index e9ce7b9954..6bb65926a9 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/mailet/filter/ActionApplier.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/mailet/filter/ActionApplier.java
@@ -96,6 +96,7 @@ public class ActionApplier {
try {
MailboxSession mailboxSession = mailboxManager.createSystemSession(username);
MessageManager messageManager = mailboxManager.getMailbox(mailboxId, mailboxSession);
+ mailboxManager.endProcessingRequest(mailboxSession);
String mailboxName = messageManager.getMailboxPath().getName();
diff --git a/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/main/java/org/apache/james/webadmin/vault/routes/RestoreService.java b/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/main/java/org/apache/james/webadmin/vault/routes/RestoreService.java
index e2fcd4890f..caf0913810 100644
--- a/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/main/java/org/apache/james/webadmin/vault/routes/RestoreService.java
+++ b/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/main/java/org/apache/james/webadmin/vault/routes/RestoreService.java
@@ -79,7 +79,8 @@ class RestoreService {
MessageManager restoreMessageManager = restoreMailboxManager(session);
return Flux.from(deletedMessageVault.search(usernameToRestore, searchQuery))
- .flatMap(deletedMessage -> appendToMailbox(restoreMessageManager, deletedMessage, session), DEFAULT_CONCURRENCY);
+ .flatMap(deletedMessage -> appendToMailbox(restoreMessageManager, deletedMessage, session), DEFAULT_CONCURRENCY)
+ .doFinally(any -> mailboxManager.endProcessingRequest(session));
}
private Mono<RestoreResult> appendToMailbox(MessageManager restoreMailboxManager, DeletedMessage deletedMessage, MailboxSession session) {
diff --git a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/CreateMissingParentsTask.java b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/CreateMissingParentsTask.java
index d8da1c6eba..c92b172c24 100644
--- a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/CreateMissingParentsTask.java
+++ b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/CreateMissingParentsTask.java
@@ -153,6 +153,8 @@ public class CreateMissingParentsTask implements Task {
} catch (MailboxException e) {
LOGGER.error("Error fetching mailbox paths", e);
return Result.PARTIAL;
+ } finally {
+ mailboxManager.endProcessingRequest(session);
}
}
@@ -160,12 +162,13 @@ public class CreateMissingParentsTask implements Task {
MailboxSession ownerSession = mailboxManager.createSystemSession(path.getUser());
return Mono.from(mailboxManager.createMailboxReactive(path, ownerSession))
.doOnNext(this::recordSuccess)
- .then(Mono.just(Result.COMPLETED))
- .onErrorResume(e -> {
- LOGGER.error("Error creating missing parent mailbox: {}", path.getName(), e);
- recordFailure(path);
- return Mono.just(Result.PARTIAL);
- });
+ .then(Mono.just(Result.COMPLETED))
+ .onErrorResume(e -> {
+ LOGGER.error("Error creating missing parent mailbox: {}", path.getName(), e);
+ recordFailure(path);
+ return Mono.just(Result.PARTIAL);
+ })
+ .doFinally(any -> mailboxManager.endProcessingRequest(ownerSession));
}
@Override
diff --git a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/ExpireMailboxService.java b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/ExpireMailboxService.java
index fc709d4bba..526cbdae45 100644
--- a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/ExpireMailboxService.java
+++ b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/ExpireMailboxService.java
@@ -217,7 +217,8 @@ public class ExpireMailboxService {
context.incrementFailedCount();
context.incrementProcessedCount();
return Mono.just(Task.Result.PARTIAL);
- });
+ })
+ .doFinally(any -> mailboxManager.endProcessingRequest(session));
}
private Mono<List<MessageUid>> searchMessagesReactive(MessageManager mgr, MailboxSession session, SearchQuery expiration) {
diff --git a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/UserMailboxesService.java b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/UserMailboxesService.java
index a7bf0d7291..e57048f5ac 100644
--- a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/UserMailboxesService.java
+++ b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/UserMailboxesService.java
@@ -74,6 +74,7 @@ public class UserMailboxesService {
MailboxPath mailboxPath = MailboxPath.forUser(username, mailboxName.asString())
.assertAcceptable(mailboxSession.getPathDelimiter());
mailboxManager.createMailbox(mailboxPath, mailboxSession);
+ mailboxManager.endProcessingRequest(mailboxSession);
} catch (MailboxExistsException e) {
LOGGER.info("Attempt to create mailbox {} for user {} that already exists", mailboxName, username);
}
@@ -85,14 +86,19 @@ public class UserMailboxesService {
listUserMailboxes(mailboxSession)
.map(MailboxMetaData::getPath)
.forEach(Throwing.consumer(mailboxPath -> deleteMailbox(mailboxSession, mailboxPath)));
+ mailboxManager.endProcessingRequest(mailboxSession);
}
public List<MailboxResponse> listMailboxes(Username username) throws UsersRepositoryException {
usernamePreconditions(username);
MailboxSession mailboxSession = mailboxManager.createSystemSession(username);
- return listUserMailboxes(mailboxSession)
- .map(mailboxMetaData -> new MailboxResponse(mailboxMetaData.getPath().getName(), mailboxMetaData.getId()))
- .collect(ImmutableList.toImmutableList());
+ try {
+ return listUserMailboxes(mailboxSession)
+ .map(mailboxMetaData -> new MailboxResponse(mailboxMetaData.getPath().getName(), mailboxMetaData.getId()))
+ .collect(ImmutableList.toImmutableList());
+ } finally {
+ mailboxManager.endProcessingRequest(mailboxSession);
+ }
}
public boolean testMailboxExists(Username username, MailboxName mailboxName) throws MailboxException, UsersRepositoryException {
@@ -100,8 +106,12 @@ public class UserMailboxesService {
MailboxSession mailboxSession = mailboxManager.createSystemSession(username);
MailboxPath mailboxPath = MailboxPath.forUser(username, mailboxName.asString())
.assertAcceptable(mailboxSession.getPathDelimiter());
- return Mono.from(mailboxManager.mailboxExists(mailboxPath, mailboxSession))
- .block();
+ try {
+ return Mono.from(mailboxManager.mailboxExists(mailboxPath, mailboxSession))
+ .block();
+ } finally {
+ mailboxManager.endProcessingRequest(mailboxSession);
+ }
}
@@ -118,7 +128,8 @@ public class UserMailboxesService {
return Mono.just(Result.PARTIAL);
})
.reduce(Task::combine)
- .switchIfEmpty(Mono.just(Result.COMPLETED));
+ .switchIfEmpty(Mono.just(Result.COMPLETED))
+ .doFinally(any -> mailboxManager.endProcessingRequest(mailboxSession));
}
private Mono<Result> deleteMessage(MessageManager messageManager, MessageUid messageUid, MailboxSession mailboxSession, ClearMailboxContentTask.Context context) {
@@ -138,20 +149,29 @@ public class UserMailboxesService {
.assertAcceptable(mailboxSession.getPathDelimiter());
listChildren(mailboxPath, mailboxSession)
.forEach(Throwing.consumer(path -> deleteMailbox(mailboxSession, path)));
+ mailboxManager.endProcessingRequest(mailboxSession);
}
public long messageCount(Username username, MailboxName mailboxName) throws UsersRepositoryException, MailboxException {
usernamePreconditions(username);
MailboxSession mailboxSession = mailboxManager.createSystemSession(username);
- return mailboxManager.getMailbox(MailboxPath.forUser(username, mailboxName.asString()), mailboxSession).getMessageCount(mailboxSession);
+ try {
+ return mailboxManager.getMailbox(MailboxPath.forUser(username, mailboxName.asString()), mailboxSession).getMessageCount(mailboxSession);
+ } finally {
+ mailboxManager.endProcessingRequest(mailboxSession);
+ }
}
public long unseenMessageCount(Username username, MailboxName mailboxName) throws UsersRepositoryException, MailboxException {
usernamePreconditions(username);
MailboxSession mailboxSession = mailboxManager.createSystemSession(username);
- return mailboxManager.getMailbox(MailboxPath.forUser(username, mailboxName.asString()), mailboxSession)
- .getMailboxCounters(mailboxSession)
- .getUnseen();
+ try {
+ return mailboxManager.getMailbox(MailboxPath.forUser(username, mailboxName.asString()), mailboxSession)
+ .getMailboxCounters(mailboxSession)
+ .getUnseen();
+ } finally {
+ mailboxManager.endProcessingRequest(mailboxSession);
+ }
}
private Stream<MailboxPath> listChildren(MailboxPath mailboxPath, MailboxSession mailboxSession) {
@@ -178,6 +198,7 @@ public class UserMailboxesService {
.assertAcceptable(mailboxSession.getPathDelimiter());
Preconditions.checkState(Boolean.TRUE.equals(Mono.from(mailboxManager.mailboxExists(mailboxPath, mailboxSession)).block()),
"Mailbox does not exist. " + mailboxPath.asString());
+ mailboxManager.endProcessingRequest(mailboxSession);
}
private Stream<MailboxMetaData> listUserMailboxes(MailboxSession mailboxSession) {
diff --git a/third-party/rspamd/src/main/java/org/apache/james/rspamd/RspamdListener.java b/third-party/rspamd/src/main/java/org/apache/james/rspamd/RspamdListener.java
index 6af803e4c2..6d9b1c5613 100644
--- a/third-party/rspamd/src/main/java/org/apache/james/rspamd/RspamdListener.java
+++ b/third-party/rspamd/src/main/java/org/apache/james/rspamd/RspamdListener.java
@@ -101,10 +101,12 @@ public class RspamdListener implements SpamEventListener, EventListener.Reactive
}
private Mono<Void> handleMessageAdded(MailboxEvents.Added addedEvent) {
+ MailboxSession mailboxSession = mailboxManager.createSystemSession(Username.of(getClass().getCanonicalName()));
return isAppendedToInbox(addedEvent)
.filter(FunctionalUtils.identityPredicate())
.doOnNext(isHam -> LOGGER.debug("Ham event detected, EventId = {}", addedEvent.getEventId().getId()))
- .flatMap(any -> reportHamWhenAdded(addedEvent, mailboxManager.createSystemSession(Username.of(getClass().getCanonicalName()))));
+ .flatMap(any -> reportHamWhenAdded(addedEvent, mailboxSession))
+ .then(Mono.fromRunnable(() -> mailboxManager.endProcessingRequest(mailboxSession)));
}
private Mono<Void> handleMessageMoved(MessageMoveEvent messageMoveEvent) {
@@ -123,9 +125,11 @@ public class RspamdListener implements SpamEventListener, EventListener.Reactive
}
private Flux<ByteBuffer> mailboxMessagePublisher(MessageMoveEvent messageMoveEvent) {
- return Mono.fromCallable(() -> mapperFactory.getMessageIdMapper(mailboxManager.createSystemSession(Username.of(getClass().getCanonicalName()))))
+ MailboxSession mailboxSession = mailboxManager.createSystemSession(Username.of(getClass().getCanonicalName()));
+ return Mono.fromCallable(() -> mapperFactory.getMessageIdMapper(mailboxSession))
.flatMapMany(messageIdMapper -> messageIdMapper.findReactive(messageMoveEvent.getMessageIds(), MessageMapper.FetchType.FULL))
- .flatMap(MailboxMessage::getFullContentReactive);
+ .flatMap(MailboxMessage::getFullContentReactive)
+ .doFinally(any -> mailboxManager.endProcessingRequest(mailboxSession));
}
private Mono<Void> handleMessageMoved(Flux<ByteBuffer> mailboxMessagesPublisher, MessageMoveEvent messageMoveEvent) {
diff --git a/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/GetMailboxMessagesService.java b/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/GetMailboxMessagesService.java
index 55bbb1b089..447fd17687 100644
--- a/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/GetMailboxMessagesService.java
+++ b/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/GetMailboxMessagesService.java
@@ -102,7 +102,8 @@ public class GetMailboxMessagesService {
.doOnNext(mailboxMessageMetaData -> context.incrementSpamMessageCount())
.filter(message -> randomBooleanWithProbability(runningOptions))
.flatMap(message -> messageIdManager.getMessagesReactive(List.of(message.getMessageId()), FetchGroup.FULL_CONTENT, mailboxSession), ReactorUtils.DEFAULT_CONCURRENCY)
- .filter(runningOptions.correspondingClassificationFilter());
+ .filter(runningOptions.correspondingClassificationFilter())
+ .doFinally(any -> mailboxManager.endProcessingRequest(mailboxSession));
}
private Flux<MessageResult> getMailboxMessagesOfAUser(Username username, MailboxMetaData mailboxMetaData, Optional<Date> afterDate,
@@ -119,7 +120,8 @@ public class GetMailboxMessagesService {
.doOnNext(mailboxMessageMetaData -> context.incrementHamMessageCount())
.filter(message -> randomBooleanWithProbability(runningOptions))
.flatMap(message -> messageIdManager.getMessagesReactive(List.of(message.getMessageId()), FetchGroup.FULL_CONTENT, mailboxSession), ReactorUtils.DEFAULT_CONCURRENCY)
- .filter(runningOptions.correspondingClassificationFilter());
+ .filter(runningOptions.correspondingClassificationFilter())
+ .doFinally(any -> mailboxManager.endProcessingRequest(mailboxSession));
}
public static boolean randomBooleanWithProbability(RunningOptions runningOptions) {
diff --git a/third-party/spamassassin/src/main/java/org/apache/james/spamassassin/SpamAssassinListener.java b/third-party/spamassassin/src/main/java/org/apache/james/spamassassin/SpamAssassinListener.java
index 19a86262df..cace69aef6 100644
--- a/third-party/spamassassin/src/main/java/org/apache/james/spamassassin/SpamAssassinListener.java
+++ b/third-party/spamassassin/src/main/java/org/apache/james/spamassassin/SpamAssassinListener.java
@@ -97,10 +97,12 @@ public class SpamAssassinListener implements SpamEventListener {
if (event instanceof MessageMoveEvent) {
MailboxSession session = mailboxManager.createSystemSession(username);
handleMessageMove(event, session, (MessageMoveEvent) event);
+ mailboxManager.endProcessingRequest(session);
}
if (event instanceof Added) {
MailboxSession session = mailboxManager.createSystemSession(username);
handleAdded(event, session, (Added) event);
+ mailboxManager.endProcessingRequest(session);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org