You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2022/09/19 02:59:17 UTC

[james-project] branch master updated: [REFACTORING] List users the reactive way (#1187)

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c5ac0d567 [REFACTORING] List users the reactive way (#1187)
2c5ac0d567 is described below

commit 2c5ac0d567c62a6dc674bc03a87128bdd01688db
Author: Benoit TELLIER <bt...@linagora.com>
AuthorDate: Mon Sep 19 09:59:12 2022 +0700

    [REFACTORING] List users the reactive way (#1187)
    
    It is very error prone to plug blocking components into a reactive pipeline.
    Allowing implementations to pass a reactive version reduces this risk.
---
 .../quota/task/RecomputeCurrentQuotasService.java  | 23 ++++++------
 .../org/apache/james/user/api/UsersRepository.java |  2 ++
 .../james/user/cassandra/CassandraUsersDAO.java    | 11 ++++--
 .../java/org/apache/james/user/lib/UsersDAO.java   | 12 +++++++
 .../apache/james/user/lib/UsersRepositoryImpl.java |  5 +++
 .../james/transport/mailets/RandomStoring.java     |  6 ++--
 .../data/jmap/EmailQueryViewPopulator.java         | 16 +++------
 .../jmap/MessageFastViewProjectionCorrector.java   | 14 +++-----
 .../webadmin/service/ExpireMailboxService.java     | 41 ++++++++++------------
 .../webadmin/service/ExpireMailboxServiceTest.java | 38 ++++++++++----------
 .../james/rspamd/task/FeedHamToRspamdTask.java     | 40 +++++++++------------
 .../james/rspamd/task/FeedSpamToRspamdTask.java    | 40 +++++++++------------
 .../rspamd/task/GetMailboxMessagesService.java     | 14 ++++----
 13 files changed, 126 insertions(+), 136 deletions(-)

diff --git a/mailbox/tools/quota-recompute/src/main/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasService.java b/mailbox/tools/quota-recompute/src/main/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasService.java
index ef49c231f7..acd288dfe7 100644
--- a/mailbox/tools/quota-recompute/src/main/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasService.java
+++ b/mailbox/tools/quota-recompute/src/main/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasService.java
@@ -39,7 +39,6 @@ import org.apache.james.task.Task;
 import org.apache.james.user.api.UsersRepository;
 import org.apache.james.user.api.UsersRepositoryException;
 import org.apache.james.util.ReactorUtils;
-import org.apache.james.util.streams.Iterators;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,6 +46,7 @@ import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public class RecomputeCurrentQuotasService {
@@ -162,17 +162,16 @@ public class RecomputeCurrentQuotasService {
     }
 
     public Mono<Task.Result> recomputeCurrentQuotas(Context context, RunningOptions runningOptions) {
-        try {
-            return Iterators.toFlux(usersRepository.list())
-                .transform(ReactorUtils.<Username, Task.Result>throttle()
-                    .elements(runningOptions.getUsersPerSecond())
-                    .per(Duration.ofSeconds(1))
-                    .forOperation(username -> recomputeUserCurrentQuotas(context, username)))
-                .reduce(Task.Result.COMPLETED, Task::combine);
-        } catch (UsersRepositoryException e) {
-            LOGGER.error("Error while accessing users from repository", e);
-            return Mono.just(Task.Result.PARTIAL);
-        }
+        return Flux.from(usersRepository.listReactive())
+            .transform(ReactorUtils.<Username, Task.Result>throttle()
+                .elements(runningOptions.getUsersPerSecond())
+                .per(Duration.ofSeconds(1))
+                .forOperation(username -> recomputeUserCurrentQuotas(context, username)))
+            .reduce(Task.Result.COMPLETED, Task::combine)
+            .onErrorResume(UsersRepositoryException.class, e -> {
+                LOGGER.error("Error while accessing users from repository", e);
+                return Mono.just(Task.Result.PARTIAL);
+            });
     }
 
     private Mono<Task.Result> recomputeUserCurrentQuotas(Context context, Username username) {
diff --git a/server/data/data-api/src/main/java/org/apache/james/user/api/UsersRepository.java b/server/data/data-api/src/main/java/org/apache/james/user/api/UsersRepository.java
index 7d0b36b1ac..34c99decef 100644
--- a/server/data/data-api/src/main/java/org/apache/james/user/api/UsersRepository.java
+++ b/server/data/data-api/src/main/java/org/apache/james/user/api/UsersRepository.java
@@ -125,6 +125,8 @@ public interface UsersRepository {
      */
     Iterator<Username> list() throws UsersRepositoryException;
 
+    Publisher<Username> listReactive();
+
     /**
      * Return true if virtualHosting support is enabled, otherwise false
      * 
diff --git a/server/data/data-cassandra/src/main/java/org/apache/james/user/cassandra/CassandraUsersDAO.java b/server/data/data-cassandra/src/main/java/org/apache/james/user/cassandra/CassandraUsersDAO.java
index 5d71bde9d1..a64bbad2d0 100644
--- a/server/data/data-cassandra/src/main/java/org/apache/james/user/cassandra/CassandraUsersDAO.java
+++ b/server/data/data-cassandra/src/main/java/org/apache/james/user/cassandra/CassandraUsersDAO.java
@@ -230,13 +230,18 @@ public class CassandraUsersDAO implements UsersDAO {
 
     @Override
     public Iterator<Username> list() {
-        return executor.executeRows(listStatement.bind())
-            .mapNotNull(row -> row.getString(NAME))
-            .map(Username::of)
+        return listReactive()
             .toIterable()
             .iterator();
     }
 
+    @Override
+    public Flux<Username> listReactive() {
+        return executor.executeRows(listStatement.bind())
+            .mapNotNull(row -> row.getString(NAME))
+            .map(Username::of);
+    }
+
     @Override
     public void addUser(Username username, String password) throws UsersRepositoryException {
         DefaultUser user = new DefaultUser(username, preferredAlgorithm, preferredAlgorithm);
diff --git a/server/data/data-library/src/main/java/org/apache/james/user/lib/UsersDAO.java b/server/data/data-library/src/main/java/org/apache/james/user/lib/UsersDAO.java
index 3e4590ebf4..d345406292 100644
--- a/server/data/data-library/src/main/java/org/apache/james/user/lib/UsersDAO.java
+++ b/server/data/data-library/src/main/java/org/apache/james/user/lib/UsersDAO.java
@@ -28,7 +28,9 @@ import org.apache.james.user.api.model.User;
 import org.apache.james.util.ReactorUtils;
 import org.reactivestreams.Publisher;
 
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
 public interface UsersDAO {
     default boolean getDefaultVirtualHostingValue() {
@@ -52,5 +54,15 @@ public interface UsersDAO {
 
     Iterator<Username> list() throws UsersRepositoryException;
 
+    default Publisher<Username> listReactive() {
+        return Flux.fromIterable(() -> {
+            try {
+                return list();
+            } catch (UsersRepositoryException e) {
+                throw new RuntimeException(e);
+            }
+        }).subscribeOn(Schedulers.boundedElastic());
+    }
+
     void addUser(Username username, String password) throws UsersRepositoryException;
 }
diff --git a/server/data/data-library/src/main/java/org/apache/james/user/lib/UsersRepositoryImpl.java b/server/data/data-library/src/main/java/org/apache/james/user/lib/UsersRepositoryImpl.java
index 564409ae8e..826371edba 100644
--- a/server/data/data-library/src/main/java/org/apache/james/user/lib/UsersRepositoryImpl.java
+++ b/server/data/data-library/src/main/java/org/apache/james/user/lib/UsersRepositoryImpl.java
@@ -199,6 +199,11 @@ public class UsersRepositoryImpl<T extends UsersDAO> implements UsersRepository,
         return usersDAO.list();
     }
 
+    @Override
+    public Publisher<Username> listReactive() {
+        return usersDAO.listReactive();
+    }
+
     @Override
     public boolean supportVirtualHosting() {
         return virtualHosting;
diff --git a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RandomStoring.java b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RandomStoring.java
index e0f24aed8c..0465e68c54 100644
--- a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RandomStoring.java
+++ b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RandomStoring.java
@@ -39,8 +39,6 @@ import org.apache.james.mailbox.MailboxSession;
 import org.apache.james.mailbox.model.search.MailboxQuery;
 import org.apache.james.transport.mailets.delivery.MailStore;
 import org.apache.james.user.api.UsersRepository;
-import org.apache.james.user.api.UsersRepositoryException;
-import org.apache.james.util.streams.Iterators;
 import org.apache.mailet.Attribute;
 import org.apache.mailet.Mail;
 import org.apache.mailet.base.GenericMailet;
@@ -110,8 +108,8 @@ public class RandomStoring extends GenericMailet {
             .collect(ImmutableSet.toImmutableSet());
     }
 
-    private Mono<List<ReroutingInfos>> retrieveReroutingInfos() throws UsersRepositoryException {
-        return Iterators.toFlux(usersRepository.list())
+    private Mono<List<ReroutingInfos>> retrieveReroutingInfos() {
+        return Flux.from(usersRepository.listReactive())
             .flatMap(this::buildReRoutingInfos)
             .collect(ImmutableList.toImmutableList());
     }
diff --git a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/EmailQueryViewPopulator.java b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/EmailQueryViewPopulator.java
index 2d925f0428..fc1a4f800d 100644
--- a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/EmailQueryViewPopulator.java
+++ b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/EmailQueryViewPopulator.java
@@ -50,9 +50,7 @@ import org.apache.james.mime4j.stream.MimeConfig;
 import org.apache.james.task.Task;
 import org.apache.james.task.Task.Result;
 import org.apache.james.user.api.UsersRepository;
-import org.apache.james.user.api.UsersRepositoryException;
 import org.apache.james.util.ReactorUtils;
-import org.apache.james.util.streams.Iterators;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -131,15 +129,11 @@ public class EmailQueryViewPopulator {
     }
 
     private Flux<MessageResult> listAllMailboxMessages(Progress progress) {
-        try {
-            return Iterators.toFlux(usersRepository.list())
-                .map(mailboxManager::createSystemSession)
-                .doOnNext(any -> progress.incrementProcessedUserCount())
-                .flatMap(session -> listUserMailboxMessages(progress, session), USER_CONCURRENCY)
-                .filter(messageResult -> !messageResult.getFlags().contains(DELETED));
-        } catch (UsersRepositoryException e) {
-            return Flux.error(e);
-        }
+        return Flux.from(usersRepository.listReactive())
+            .map(mailboxManager::createSystemSession)
+            .doOnNext(any -> progress.incrementProcessedUserCount())
+            .flatMap(session -> listUserMailboxMessages(progress, session), USER_CONCURRENCY)
+            .filter(messageResult -> !messageResult.getFlags().contains(DELETED));
     }
 
     private Flux<MessageResult> listUserMailboxMessages(Progress progress, MailboxSession session) {
diff --git a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/MessageFastViewProjectionCorrector.java b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/MessageFastViewProjectionCorrector.java
index 7abf9092d2..f3d9e2ee0b 100644
--- a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/MessageFastViewProjectionCorrector.java
+++ b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/MessageFastViewProjectionCorrector.java
@@ -46,9 +46,7 @@ import org.apache.james.mailbox.model.search.MailboxQuery;
 import org.apache.james.task.Task;
 import org.apache.james.task.Task.Result;
 import org.apache.james.user.api.UsersRepository;
-import org.apache.james.user.api.UsersRepositoryException;
 import org.apache.james.util.ReactorUtils;
-import org.apache.james.util.streams.Iterators;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -158,14 +156,10 @@ public class MessageFastViewProjectionCorrector {
     }
 
     private Flux<ProjectionEntry> listAllMailboxMessages(Progress progress) {
-        try {
-            return Iterators.toFlux(usersRepository.list())
-                .map(mailboxManager::createSystemSession)
-                .doOnNext(any -> progress.incrementProcessedUserCount())
-                .flatMap(session -> listUserMailboxMessages(progress, session), USER_CONCURRENCY);
-        } catch (UsersRepositoryException e) {
-            return Flux.error(e);
-        }
+        return Flux.from(usersRepository.listReactive())
+            .map(mailboxManager::createSystemSession)
+            .doOnNext(any -> progress.incrementProcessedUserCount())
+            .flatMap(session -> listUserMailboxMessages(progress, session), USER_CONCURRENCY);
     }
 
     private Flux<ProjectionEntry> listUserMailboxMessages(Progress progress, MailboxSession session) {
diff --git a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/ExpireMailboxService.java b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/ExpireMailboxService.java
index b2bbedcab4..fc709d4bba 100644
--- a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/ExpireMailboxService.java
+++ b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/ExpireMailboxService.java
@@ -45,7 +45,6 @@ import org.apache.james.user.api.UsersRepository;
 import org.apache.james.user.api.UsersRepositoryException;
 import org.apache.james.util.DurationParser;
 import org.apache.james.util.ReactorUtils;
-import org.apache.james.util.streams.Iterators;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -177,28 +176,24 @@ public class ExpireMailboxService {
     }
 
     public Mono<Result> expireMailboxes(Context context, RunningOptions runningOptions, Date now) {
-        try {
-            SearchQuery expiration = SearchQuery.of(
-                runningOptions.maxAgeDuration.map(maxAge -> {
-                        Date limit = Date.from(now.toInstant().minus(maxAge));
-                        return SearchQuery.internalDateBefore(limit, DateResolution.Second);
-                    })
-                    .orElse(
-                        SearchQuery.headerDateBefore("Expires", now, DateResolution.Second)
-                    )
-            );
-            // Note: user list may be a blocking iterable, must run on a scheduler that supports this.
-            return Iterators.toFlux(usersRepository.list()).subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER)
-                .transform(ReactorUtils.<Username, Task.Result>throttle()
-                    .elements(runningOptions.getUsersPerSecond())
-                    .per(Duration.ofSeconds(1))
-                    .forOperation(username -> 
-                        expireUserMailbox(context, username, runningOptions.getMailbox(), expiration)))
-                .reduce(Task.Result.COMPLETED, Task::combine);
-        } catch (UsersRepositoryException e) {
-            LOGGER.error("Error while accessing users from repository", e);
-            return Mono.just(Task.Result.PARTIAL);
-        }
+        SearchQuery expiration = SearchQuery.of(
+            runningOptions.maxAgeDuration.map(maxAge -> {
+                Date limit = Date.from(now.toInstant().minus(maxAge));
+                return SearchQuery.internalDateBefore(limit, DateResolution.Second);
+            }).orElse(SearchQuery.headerDateBefore("Expires", now, DateResolution.Second)));
+
+        return Flux.from(usersRepository.listReactive())
+            .transform(ReactorUtils.<Username, Task.Result>throttle()
+                .elements(runningOptions.getUsersPerSecond())
+                .per(Duration.ofSeconds(1))
+                .forOperation(username ->
+                    expireUserMailbox(context, username, runningOptions.getMailbox(), expiration)))
+            .reduce(Task.Result.COMPLETED, Task::combine)
+
+            .onErrorResume(UsersRepositoryException.class, e -> {
+                LOGGER.error("Error while accessing users from repository", e);
+                return Mono.just(Task.Result.PARTIAL);
+            });
     }
 
     private Mono<Result> expireUserMailbox(Context context, Username username, String mailbox, SearchQuery expiration) {
diff --git a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/service/ExpireMailboxServiceTest.java b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/service/ExpireMailboxServiceTest.java
index 1bf6a94066..2d1085ffdc 100644
--- a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/service/ExpireMailboxServiceTest.java
+++ b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/service/ExpireMailboxServiceTest.java
@@ -157,8 +157,8 @@ public class ExpireMailboxServiceTest {
 
 
     @Test
-    public void testIgnoresUserListFailure() throws Exception {
-        when(usersRepository.list()).thenThrow(new UsersRepositoryException("it is broken"));
+    void testIgnoresUserListFailure() {
+        when(usersRepository.listReactive()).thenReturn(Flux.error(new UsersRepositoryException("it is broken")));
 
         ExpireMailboxService.Context context = new ExpireMailboxService.Context();
         Date now = new Date();
@@ -172,8 +172,8 @@ public class ExpireMailboxServiceTest {
     }
 
     @Test
-    public void testIgnoresMissingMailbox() throws Exception {
-        when(usersRepository.list()).thenReturn(List.of(alice).iterator());
+    void testIgnoresMissingMailbox() {
+        when(usersRepository.listReactive()).thenReturn(Flux.just(alice));
         
         // intentionally no mailbox creation
         
@@ -189,8 +189,8 @@ public class ExpireMailboxServiceTest {
     }
     
     @Test
-    public void testIgnoresEmptyMailbox() throws Exception {
-        when(usersRepository.list()).thenReturn(List.of(alice).iterator());
+    void testIgnoresEmptyMailbox() throws Exception {
+        when(usersRepository.listReactive()).thenReturn(Flux.just(alice));
        
         mailboxManager.createMailbox(aliceInbox, aliceSession);
 
@@ -207,8 +207,8 @@ public class ExpireMailboxServiceTest {
     }
 
     @Test
-    public void testHandlesMailboxErrors() throws Exception {
-        when(usersRepository.list()).thenReturn(List.of(alice).iterator());
+    void testHandlesMailboxErrors() throws Exception {
+        when(usersRepository.listReactive()).thenReturn(Flux.just(alice));
 
         mailboxManager.createMailbox(aliceInbox, aliceSession);
         MessageManager aliceManager = mailboxManager.getMailbox(aliceInbox, aliceSession);
@@ -228,8 +228,8 @@ public class ExpireMailboxServiceTest {
     }
 
     @Test
-    public void testExpiresMailboxByAge() throws Exception {
-        when(usersRepository.list()).thenReturn(List.of(alice).iterator());
+    void testExpiresMailboxByAge() throws Exception {
+        when(usersRepository.listReactive()).thenReturn(Flux.just(alice));
 
         mailboxManager.createMailbox(aliceInbox, aliceSession);
         MessageManager aliceManager = mailboxManager.getMailbox(aliceInbox, aliceSession);
@@ -249,8 +249,8 @@ public class ExpireMailboxServiceTest {
     }
 
     @Test
-    public void testExpiresMailboxByHeader() throws Exception {
-        when(usersRepository.list()).thenReturn(List.of(alice).iterator());
+    void testExpiresMailboxByHeader() throws Exception {
+        when(usersRepository.listReactive()).thenReturn(Flux.just(alice));
 
         mailboxManager.createMailbox(aliceInbox, aliceSession);
         MessageManager aliceManager = mailboxManager.getMailbox(aliceInbox, aliceSession);
@@ -281,8 +281,8 @@ public class ExpireMailboxServiceTest {
     }
 
     @Test
-    public void testExpiresNamedMailbox() throws Exception {
-        when(usersRepository.list()).thenReturn(List.of(alice).iterator());
+    void testExpiresNamedMailbox() throws Exception {
+        when(usersRepository.listReactive()).thenReturn(Flux.just(alice));
 
         String mailboxName = "Archived";
         MailboxPath mailboxPath = MailboxPath.forUser(alice, mailboxName);
@@ -305,8 +305,8 @@ public class ExpireMailboxServiceTest {
     }
 
     @Test
-    public void testExpiresNamedMailbox2() throws Exception {
-        when(usersRepository.list()).thenReturn(List.of(alice).iterator());
+    void testExpiresNamedMailbox2() throws Exception {
+        when(usersRepository.listReactive()).thenReturn(Flux.just(alice));
 
         ExpireMailboxService.Context context = new ExpireMailboxService.Context();
         ExpireMailboxService.RunningOptions options = new ExpireMailboxService.RunningOptions(1, "NoSuchMailbox", false, Optional.of("1s"));
@@ -321,12 +321,12 @@ public class ExpireMailboxServiceTest {
     }
 
     @Test
-    public void testContinuesAfterFailure() throws Exception {
+    void testContinuesAfterFailure() throws Exception {
         Username bob = Username.of("bob@example.org");
         MailboxPath bobInbox = MailboxPath.inbox(bob);
         MailboxSession bobSession = mailboxManager.createSystemSession(bob);
-        
-        when(usersRepository.list()).thenReturn(List.of(alice, bob).iterator());
+
+        when(usersRepository.listReactive()).thenReturn(Flux.just(alice, bob));
 
         mailboxManager.createMailbox(aliceInbox, aliceSession);
         MessageManager aliceManager = mailboxManager.getMailbox(aliceInbox, aliceSession);
diff --git a/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/FeedHamToRspamdTask.java b/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/FeedHamToRspamdTask.java
index f0b687e282..ab75e04c4d 100644
--- a/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/FeedHamToRspamdTask.java
+++ b/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/FeedHamToRspamdTask.java
@@ -36,7 +36,6 @@ import org.apache.james.task.Task;
 import org.apache.james.task.TaskExecutionDetails;
 import org.apache.james.task.TaskType;
 import org.apache.james.user.api.UsersRepository;
-import org.apache.james.user.api.UsersRepositoryException;
 import org.apache.james.util.ReactorUtils;
 
 import com.github.fge.lambdas.Throwing;
@@ -236,28 +235,23 @@ public class FeedHamToRspamdTask implements Task {
     @Override
     public Result run() {
         Optional<Date> afterDate = runningOptions.getPeriodInSecond().map(periodInSecond -> Date.from(clock.instant().minusSeconds(periodInSecond)));
-        try {
-            return messagesService.getHamMessagesOfAllUser(afterDate, runningOptions, context)
-                .transform(ReactorUtils.<MessageResult, Result>throttle()
-                    .elements(runningOptions.getMessagesPerSecond())
-                    .per(Duration.ofSeconds(1))
-                    .forOperation(messageResult -> Mono.fromSupplier(Throwing.supplier(() -> rspamdHttpClient.reportAsHam(messageResult.getFullContent().getInputStream())))
-                        .then(Mono.fromCallable(() -> {
-                            context.incrementReportedHamMessageCount(1);
-                            return Result.COMPLETED;
-                        }))
-                        .onErrorResume(error -> {
-                            LOGGER.error("Error when report ham message to Rspamd", error);
-                            context.incrementErrorCount();
-                            return Mono.just(Result.PARTIAL);
-                        })))
-                .reduce(Task::combine)
-                .switchIfEmpty(Mono.just(Result.COMPLETED))
-                .block();
-        } catch (UsersRepositoryException e) {
-            LOGGER.error("Error while accessing users from repository", e);
-            return Result.PARTIAL;
-        }
+        return messagesService.getHamMessagesOfAllUser(afterDate, runningOptions, context)
+            .transform(ReactorUtils.<MessageResult, Result>throttle()
+                .elements(runningOptions.getMessagesPerSecond())
+                .per(Duration.ofSeconds(1))
+                .forOperation(messageResult -> Mono.fromSupplier(Throwing.supplier(() -> rspamdHttpClient.reportAsHam(messageResult.getFullContent().getInputStream())))
+                    .then(Mono.fromCallable(() -> {
+                        context.incrementReportedHamMessageCount(1);
+                        return Result.COMPLETED;
+                    }))
+                    .onErrorResume(error -> {
+                        LOGGER.error("Error when report ham message to Rspamd", error);
+                        context.incrementErrorCount();
+                        return Mono.just(Result.PARTIAL);
+                    })))
+            .reduce(Task::combine)
+            .switchIfEmpty(Mono.just(Result.COMPLETED))
+            .block();
     }
 
     @Override
diff --git a/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/FeedSpamToRspamdTask.java b/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/FeedSpamToRspamdTask.java
index aae57cb5f5..4b1f0068be 100644
--- a/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/FeedSpamToRspamdTask.java
+++ b/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/FeedSpamToRspamdTask.java
@@ -36,7 +36,6 @@ import org.apache.james.task.Task;
 import org.apache.james.task.TaskExecutionDetails;
 import org.apache.james.task.TaskType;
 import org.apache.james.user.api.UsersRepository;
-import org.apache.james.user.api.UsersRepositoryException;
 import org.apache.james.util.ReactorUtils;
 
 import com.github.fge.lambdas.Throwing;
@@ -237,28 +236,23 @@ public class FeedSpamToRspamdTask implements Task {
     @Override
     public Result run() {
         Optional<Date> afterDate = runningOptions.getPeriodInSecond().map(periodInSecond -> Date.from(clock.instant().minusSeconds(periodInSecond)));
-        try {
-            return messagesService.getMailboxMessagesOfAllUser(SPAM_MAILBOX_NAME, afterDate, runningOptions, context)
-                .transform(ReactorUtils.<MessageResult, Task.Result>throttle()
-                    .elements(runningOptions.getMessagesPerSecond())
-                    .per(Duration.ofSeconds(1))
-                    .forOperation(messageResult -> Mono.fromSupplier(Throwing.supplier(() -> rspamdHttpClient.reportAsSpam(messageResult.getFullContent().getInputStream())))
-                        .then(Mono.fromCallable(() -> {
-                            context.incrementReportedSpamMessageCount(1);
-                            return Result.COMPLETED;
-                        }))
-                        .onErrorResume(error -> {
-                            LOGGER.error("Error when report spam message to Rspamd", error);
-                            context.incrementErrorCount();
-                            return Mono.just(Result.PARTIAL);
-                        })))
-                .reduce(Task::combine)
-                .switchIfEmpty(Mono.just(Result.COMPLETED))
-                .block();
-        } catch (UsersRepositoryException e) {
-            LOGGER.error("Error while accessing users from repository", e);
-            return Task.Result.PARTIAL;
-        }
+        return messagesService.getMailboxMessagesOfAllUser(SPAM_MAILBOX_NAME, afterDate, runningOptions, context)
+            .transform(ReactorUtils.<MessageResult, Task.Result>throttle()
+                .elements(runningOptions.getMessagesPerSecond())
+                .per(Duration.ofSeconds(1))
+                .forOperation(messageResult -> Mono.fromSupplier(Throwing.supplier(() -> rspamdHttpClient.reportAsSpam(messageResult.getFullContent().getInputStream())))
+                    .then(Mono.fromCallable(() -> {
+                        context.incrementReportedSpamMessageCount(1);
+                        return Result.COMPLETED;
+                    }))
+                    .onErrorResume(error -> {
+                        LOGGER.error("Error when report spam message to Rspamd", error);
+                        context.incrementErrorCount();
+                        return Mono.just(Result.PARTIAL);
+                    })))
+            .reduce(Task::combine)
+            .switchIfEmpty(Mono.just(Result.COMPLETED))
+            .block();
     }
 
     @Override
diff --git a/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/GetMailboxMessagesService.java b/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/GetMailboxMessagesService.java
index 1e00b58434..11c5375403 100644
--- a/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/GetMailboxMessagesService.java
+++ b/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/GetMailboxMessagesService.java
@@ -39,9 +39,7 @@ import org.apache.james.mailbox.store.MailboxSessionMapperFactory;
 import org.apache.james.mailbox.store.mail.MessageMapper;
 import org.apache.james.mailbox.store.mail.model.Message;
 import org.apache.james.user.api.UsersRepository;
-import org.apache.james.user.api.UsersRepositoryException;
 import org.apache.james.util.ReactorUtils;
-import org.apache.james.util.streams.Iterators;
 
 import com.github.fge.lambdas.Throwing;
 
@@ -65,19 +63,19 @@ public class GetMailboxMessagesService {
     }
 
     public Flux<MessageResult> getMailboxMessagesOfAllUser(String mailboxName, Optional<Date> afterDate, RunningOptions runningOptions,
-                                                           FeedSpamToRspamdTask.Context context) throws UsersRepositoryException {
-        return Iterators.toFlux(userRepository.list())
+                                                           FeedSpamToRspamdTask.Context context) {
+        return Flux.from(userRepository.listReactive())
             .flatMap(username -> getMailboxMessagesOfAUser(username, mailboxName, afterDate, runningOptions, context), ReactorUtils.DEFAULT_CONCURRENCY);
     }
 
     public Flux<MessageResult> getHamMessagesOfAllUser(Optional<Date> afterDate, RunningOptions runningOptions,
-                                                       FeedHamToRspamdTask.Context context) throws UsersRepositoryException {
-        return Iterators.toFlux(userRepository.list())
+                                                       FeedHamToRspamdTask.Context context) {
+        return Flux.from(userRepository.listReactive())
             .flatMap(Throwing.function(username ->
                 Flux.from(mailboxManager.search(MailboxQuery.privateMailboxesBuilder(mailboxManager.createSystemSession(username)).build(),
                     mailboxManager.createSystemSession(username)))
-                .filter(mbxMetadata -> hamMailboxesPredicate(mbxMetadata.getPath()))
-                .flatMap(mbxMetadata -> getMailboxMessagesOfAUser(username, mbxMetadata, afterDate, runningOptions, context), 2)), ReactorUtils.DEFAULT_CONCURRENCY);
+                    .filter(mbxMetadata -> hamMailboxesPredicate(mbxMetadata.getPath()))
+                    .flatMap(mbxMetadata -> getMailboxMessagesOfAUser(username, mbxMetadata, afterDate, runningOptions, context), 2)), ReactorUtils.DEFAULT_CONCURRENCY);
     }
 
     private Flux<MessageResult> getMailboxMessagesOfAUser(Username username, String mailboxName, Optional<Date> afterDate,


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org