You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2022/09/19 02:59:17 UTC
[james-project] branch master updated: [REFACTORING] List users the reactive way (#1187)
This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new 2c5ac0d567 [REFACTORING] List users the reactive way (#1187)
2c5ac0d567 is described below
commit 2c5ac0d567c62a6dc674bc03a87128bdd01688db
Author: Benoit TELLIER <bt...@linagora.com>
AuthorDate: Mon Sep 19 09:59:12 2022 +0700
[REFACTORING] List users the reactive way (#1187)
It is very error prone to plug blocking components into a reactive pipeline.
Allowing implementations to pass a reactive version reduces this risk.
---
.../quota/task/RecomputeCurrentQuotasService.java | 23 ++++++------
.../org/apache/james/user/api/UsersRepository.java | 2 ++
.../james/user/cassandra/CassandraUsersDAO.java | 11 ++++--
.../java/org/apache/james/user/lib/UsersDAO.java | 12 +++++++
.../apache/james/user/lib/UsersRepositoryImpl.java | 5 +++
.../james/transport/mailets/RandomStoring.java | 6 ++--
.../data/jmap/EmailQueryViewPopulator.java | 16 +++------
.../jmap/MessageFastViewProjectionCorrector.java | 14 +++-----
.../webadmin/service/ExpireMailboxService.java | 41 ++++++++++------------
.../webadmin/service/ExpireMailboxServiceTest.java | 38 ++++++++++----------
.../james/rspamd/task/FeedHamToRspamdTask.java | 40 +++++++++------------
.../james/rspamd/task/FeedSpamToRspamdTask.java | 40 +++++++++------------
.../rspamd/task/GetMailboxMessagesService.java | 14 ++++----
13 files changed, 126 insertions(+), 136 deletions(-)
diff --git a/mailbox/tools/quota-recompute/src/main/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasService.java b/mailbox/tools/quota-recompute/src/main/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasService.java
index ef49c231f7..acd288dfe7 100644
--- a/mailbox/tools/quota-recompute/src/main/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasService.java
+++ b/mailbox/tools/quota-recompute/src/main/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasService.java
@@ -39,7 +39,6 @@ import org.apache.james.task.Task;
import org.apache.james.user.api.UsersRepository;
import org.apache.james.user.api.UsersRepositoryException;
import org.apache.james.util.ReactorUtils;
-import org.apache.james.util.streams.Iterators;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,6 +46,7 @@ import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class RecomputeCurrentQuotasService {
@@ -162,17 +162,16 @@ public class RecomputeCurrentQuotasService {
}
public Mono<Task.Result> recomputeCurrentQuotas(Context context, RunningOptions runningOptions) {
- try {
- return Iterators.toFlux(usersRepository.list())
- .transform(ReactorUtils.<Username, Task.Result>throttle()
- .elements(runningOptions.getUsersPerSecond())
- .per(Duration.ofSeconds(1))
- .forOperation(username -> recomputeUserCurrentQuotas(context, username)))
- .reduce(Task.Result.COMPLETED, Task::combine);
- } catch (UsersRepositoryException e) {
- LOGGER.error("Error while accessing users from repository", e);
- return Mono.just(Task.Result.PARTIAL);
- }
+ return Flux.from(usersRepository.listReactive())
+ .transform(ReactorUtils.<Username, Task.Result>throttle()
+ .elements(runningOptions.getUsersPerSecond())
+ .per(Duration.ofSeconds(1))
+ .forOperation(username -> recomputeUserCurrentQuotas(context, username)))
+ .reduce(Task.Result.COMPLETED, Task::combine)
+ .onErrorResume(UsersRepositoryException.class, e -> {
+ LOGGER.error("Error while accessing users from repository", e);
+ return Mono.just(Task.Result.PARTIAL);
+ });
}
private Mono<Task.Result> recomputeUserCurrentQuotas(Context context, Username username) {
diff --git a/server/data/data-api/src/main/java/org/apache/james/user/api/UsersRepository.java b/server/data/data-api/src/main/java/org/apache/james/user/api/UsersRepository.java
index 7d0b36b1ac..34c99decef 100644
--- a/server/data/data-api/src/main/java/org/apache/james/user/api/UsersRepository.java
+++ b/server/data/data-api/src/main/java/org/apache/james/user/api/UsersRepository.java
@@ -125,6 +125,8 @@ public interface UsersRepository {
*/
Iterator<Username> list() throws UsersRepositoryException;
+ Publisher<Username> listReactive();
+
/**
* Return true if virtualHosting support is enabled, otherwise false
*
diff --git a/server/data/data-cassandra/src/main/java/org/apache/james/user/cassandra/CassandraUsersDAO.java b/server/data/data-cassandra/src/main/java/org/apache/james/user/cassandra/CassandraUsersDAO.java
index 5d71bde9d1..a64bbad2d0 100644
--- a/server/data/data-cassandra/src/main/java/org/apache/james/user/cassandra/CassandraUsersDAO.java
+++ b/server/data/data-cassandra/src/main/java/org/apache/james/user/cassandra/CassandraUsersDAO.java
@@ -230,13 +230,18 @@ public class CassandraUsersDAO implements UsersDAO {
@Override
public Iterator<Username> list() {
- return executor.executeRows(listStatement.bind())
- .mapNotNull(row -> row.getString(NAME))
- .map(Username::of)
+ return listReactive()
.toIterable()
.iterator();
}
+ @Override
+ public Flux<Username> listReactive() {
+ return executor.executeRows(listStatement.bind())
+ .mapNotNull(row -> row.getString(NAME))
+ .map(Username::of);
+ }
+
@Override
public void addUser(Username username, String password) throws UsersRepositoryException {
DefaultUser user = new DefaultUser(username, preferredAlgorithm, preferredAlgorithm);
diff --git a/server/data/data-library/src/main/java/org/apache/james/user/lib/UsersDAO.java b/server/data/data-library/src/main/java/org/apache/james/user/lib/UsersDAO.java
index 3e4590ebf4..d345406292 100644
--- a/server/data/data-library/src/main/java/org/apache/james/user/lib/UsersDAO.java
+++ b/server/data/data-library/src/main/java/org/apache/james/user/lib/UsersDAO.java
@@ -28,7 +28,9 @@ import org.apache.james.user.api.model.User;
import org.apache.james.util.ReactorUtils;
import org.reactivestreams.Publisher;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
public interface UsersDAO {
default boolean getDefaultVirtualHostingValue() {
@@ -52,5 +54,15 @@ public interface UsersDAO {
Iterator<Username> list() throws UsersRepositoryException;
+ default Publisher<Username> listReactive() {
+ return Flux.fromIterable(() -> {
+ try {
+ return list();
+ } catch (UsersRepositoryException e) {
+ throw new RuntimeException(e);
+ }
+ }).subscribeOn(Schedulers.boundedElastic());
+ }
+
void addUser(Username username, String password) throws UsersRepositoryException;
}
diff --git a/server/data/data-library/src/main/java/org/apache/james/user/lib/UsersRepositoryImpl.java b/server/data/data-library/src/main/java/org/apache/james/user/lib/UsersRepositoryImpl.java
index 564409ae8e..826371edba 100644
--- a/server/data/data-library/src/main/java/org/apache/james/user/lib/UsersRepositoryImpl.java
+++ b/server/data/data-library/src/main/java/org/apache/james/user/lib/UsersRepositoryImpl.java
@@ -199,6 +199,11 @@ public class UsersRepositoryImpl<T extends UsersDAO> implements UsersRepository,
return usersDAO.list();
}
+ @Override
+ public Publisher<Username> listReactive() {
+ return usersDAO.listReactive();
+ }
+
@Override
public boolean supportVirtualHosting() {
return virtualHosting;
diff --git a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RandomStoring.java b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RandomStoring.java
index e0f24aed8c..0465e68c54 100644
--- a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RandomStoring.java
+++ b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RandomStoring.java
@@ -39,8 +39,6 @@ import org.apache.james.mailbox.MailboxSession;
import org.apache.james.mailbox.model.search.MailboxQuery;
import org.apache.james.transport.mailets.delivery.MailStore;
import org.apache.james.user.api.UsersRepository;
-import org.apache.james.user.api.UsersRepositoryException;
-import org.apache.james.util.streams.Iterators;
import org.apache.mailet.Attribute;
import org.apache.mailet.Mail;
import org.apache.mailet.base.GenericMailet;
@@ -110,8 +108,8 @@ public class RandomStoring extends GenericMailet {
.collect(ImmutableSet.toImmutableSet());
}
- private Mono<List<ReroutingInfos>> retrieveReroutingInfos() throws UsersRepositoryException {
- return Iterators.toFlux(usersRepository.list())
+ private Mono<List<ReroutingInfos>> retrieveReroutingInfos() {
+ return Flux.from(usersRepository.listReactive())
.flatMap(this::buildReRoutingInfos)
.collect(ImmutableList.toImmutableList());
}
diff --git a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/EmailQueryViewPopulator.java b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/EmailQueryViewPopulator.java
index 2d925f0428..fc1a4f800d 100644
--- a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/EmailQueryViewPopulator.java
+++ b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/EmailQueryViewPopulator.java
@@ -50,9 +50,7 @@ import org.apache.james.mime4j.stream.MimeConfig;
import org.apache.james.task.Task;
import org.apache.james.task.Task.Result;
import org.apache.james.user.api.UsersRepository;
-import org.apache.james.user.api.UsersRepositoryException;
import org.apache.james.util.ReactorUtils;
-import org.apache.james.util.streams.Iterators;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -131,15 +129,11 @@ public class EmailQueryViewPopulator {
}
private Flux<MessageResult> listAllMailboxMessages(Progress progress) {
- try {
- return Iterators.toFlux(usersRepository.list())
- .map(mailboxManager::createSystemSession)
- .doOnNext(any -> progress.incrementProcessedUserCount())
- .flatMap(session -> listUserMailboxMessages(progress, session), USER_CONCURRENCY)
- .filter(messageResult -> !messageResult.getFlags().contains(DELETED));
- } catch (UsersRepositoryException e) {
- return Flux.error(e);
- }
+ return Flux.from(usersRepository.listReactive())
+ .map(mailboxManager::createSystemSession)
+ .doOnNext(any -> progress.incrementProcessedUserCount())
+ .flatMap(session -> listUserMailboxMessages(progress, session), USER_CONCURRENCY)
+ .filter(messageResult -> !messageResult.getFlags().contains(DELETED));
}
private Flux<MessageResult> listUserMailboxMessages(Progress progress, MailboxSession session) {
diff --git a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/MessageFastViewProjectionCorrector.java b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/MessageFastViewProjectionCorrector.java
index 7abf9092d2..f3d9e2ee0b 100644
--- a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/MessageFastViewProjectionCorrector.java
+++ b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/MessageFastViewProjectionCorrector.java
@@ -46,9 +46,7 @@ import org.apache.james.mailbox.model.search.MailboxQuery;
import org.apache.james.task.Task;
import org.apache.james.task.Task.Result;
import org.apache.james.user.api.UsersRepository;
-import org.apache.james.user.api.UsersRepositoryException;
import org.apache.james.util.ReactorUtils;
-import org.apache.james.util.streams.Iterators;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -158,14 +156,10 @@ public class MessageFastViewProjectionCorrector {
}
private Flux<ProjectionEntry> listAllMailboxMessages(Progress progress) {
- try {
- return Iterators.toFlux(usersRepository.list())
- .map(mailboxManager::createSystemSession)
- .doOnNext(any -> progress.incrementProcessedUserCount())
- .flatMap(session -> listUserMailboxMessages(progress, session), USER_CONCURRENCY);
- } catch (UsersRepositoryException e) {
- return Flux.error(e);
- }
+ return Flux.from(usersRepository.listReactive())
+ .map(mailboxManager::createSystemSession)
+ .doOnNext(any -> progress.incrementProcessedUserCount())
+ .flatMap(session -> listUserMailboxMessages(progress, session), USER_CONCURRENCY);
}
private Flux<ProjectionEntry> listUserMailboxMessages(Progress progress, MailboxSession session) {
diff --git a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/ExpireMailboxService.java b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/ExpireMailboxService.java
index b2bbedcab4..fc709d4bba 100644
--- a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/ExpireMailboxService.java
+++ b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/ExpireMailboxService.java
@@ -45,7 +45,6 @@ import org.apache.james.user.api.UsersRepository;
import org.apache.james.user.api.UsersRepositoryException;
import org.apache.james.util.DurationParser;
import org.apache.james.util.ReactorUtils;
-import org.apache.james.util.streams.Iterators;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -177,28 +176,24 @@ public class ExpireMailboxService {
}
public Mono<Result> expireMailboxes(Context context, RunningOptions runningOptions, Date now) {
- try {
- SearchQuery expiration = SearchQuery.of(
- runningOptions.maxAgeDuration.map(maxAge -> {
- Date limit = Date.from(now.toInstant().minus(maxAge));
- return SearchQuery.internalDateBefore(limit, DateResolution.Second);
- })
- .orElse(
- SearchQuery.headerDateBefore("Expires", now, DateResolution.Second)
- )
- );
- // Note: user list may be a blocking iterable, must run on a scheduler that supports this.
- return Iterators.toFlux(usersRepository.list()).subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER)
- .transform(ReactorUtils.<Username, Task.Result>throttle()
- .elements(runningOptions.getUsersPerSecond())
- .per(Duration.ofSeconds(1))
- .forOperation(username ->
- expireUserMailbox(context, username, runningOptions.getMailbox(), expiration)))
- .reduce(Task.Result.COMPLETED, Task::combine);
- } catch (UsersRepositoryException e) {
- LOGGER.error("Error while accessing users from repository", e);
- return Mono.just(Task.Result.PARTIAL);
- }
+ SearchQuery expiration = SearchQuery.of(
+ runningOptions.maxAgeDuration.map(maxAge -> {
+ Date limit = Date.from(now.toInstant().minus(maxAge));
+ return SearchQuery.internalDateBefore(limit, DateResolution.Second);
+ }).orElse(SearchQuery.headerDateBefore("Expires", now, DateResolution.Second)));
+
+ return Flux.from(usersRepository.listReactive())
+ .transform(ReactorUtils.<Username, Task.Result>throttle()
+ .elements(runningOptions.getUsersPerSecond())
+ .per(Duration.ofSeconds(1))
+ .forOperation(username ->
+ expireUserMailbox(context, username, runningOptions.getMailbox(), expiration)))
+ .reduce(Task.Result.COMPLETED, Task::combine)
+
+ .onErrorResume(UsersRepositoryException.class, e -> {
+ LOGGER.error("Error while accessing users from repository", e);
+ return Mono.just(Task.Result.PARTIAL);
+ });
}
private Mono<Result> expireUserMailbox(Context context, Username username, String mailbox, SearchQuery expiration) {
diff --git a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/service/ExpireMailboxServiceTest.java b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/service/ExpireMailboxServiceTest.java
index 1bf6a94066..2d1085ffdc 100644
--- a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/service/ExpireMailboxServiceTest.java
+++ b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/service/ExpireMailboxServiceTest.java
@@ -157,8 +157,8 @@ public class ExpireMailboxServiceTest {
@Test
- public void testIgnoresUserListFailure() throws Exception {
- when(usersRepository.list()).thenThrow(new UsersRepositoryException("it is broken"));
+ void testIgnoresUserListFailure() {
+ when(usersRepository.listReactive()).thenReturn(Flux.error(new UsersRepositoryException("it is broken")));
ExpireMailboxService.Context context = new ExpireMailboxService.Context();
Date now = new Date();
@@ -172,8 +172,8 @@ public class ExpireMailboxServiceTest {
}
@Test
- public void testIgnoresMissingMailbox() throws Exception {
- when(usersRepository.list()).thenReturn(List.of(alice).iterator());
+ void testIgnoresMissingMailbox() {
+ when(usersRepository.listReactive()).thenReturn(Flux.just(alice));
// intentionally no mailbox creation
@@ -189,8 +189,8 @@ public class ExpireMailboxServiceTest {
}
@Test
- public void testIgnoresEmptyMailbox() throws Exception {
- when(usersRepository.list()).thenReturn(List.of(alice).iterator());
+ void testIgnoresEmptyMailbox() throws Exception {
+ when(usersRepository.listReactive()).thenReturn(Flux.just(alice));
mailboxManager.createMailbox(aliceInbox, aliceSession);
@@ -207,8 +207,8 @@ public class ExpireMailboxServiceTest {
}
@Test
- public void testHandlesMailboxErrors() throws Exception {
- when(usersRepository.list()).thenReturn(List.of(alice).iterator());
+ void testHandlesMailboxErrors() throws Exception {
+ when(usersRepository.listReactive()).thenReturn(Flux.just(alice));
mailboxManager.createMailbox(aliceInbox, aliceSession);
MessageManager aliceManager = mailboxManager.getMailbox(aliceInbox, aliceSession);
@@ -228,8 +228,8 @@ public class ExpireMailboxServiceTest {
}
@Test
- public void testExpiresMailboxByAge() throws Exception {
- when(usersRepository.list()).thenReturn(List.of(alice).iterator());
+ void testExpiresMailboxByAge() throws Exception {
+ when(usersRepository.listReactive()).thenReturn(Flux.just(alice));
mailboxManager.createMailbox(aliceInbox, aliceSession);
MessageManager aliceManager = mailboxManager.getMailbox(aliceInbox, aliceSession);
@@ -249,8 +249,8 @@ public class ExpireMailboxServiceTest {
}
@Test
- public void testExpiresMailboxByHeader() throws Exception {
- when(usersRepository.list()).thenReturn(List.of(alice).iterator());
+ void testExpiresMailboxByHeader() throws Exception {
+ when(usersRepository.listReactive()).thenReturn(Flux.just(alice));
mailboxManager.createMailbox(aliceInbox, aliceSession);
MessageManager aliceManager = mailboxManager.getMailbox(aliceInbox, aliceSession);
@@ -281,8 +281,8 @@ public class ExpireMailboxServiceTest {
}
@Test
- public void testExpiresNamedMailbox() throws Exception {
- when(usersRepository.list()).thenReturn(List.of(alice).iterator());
+ void testExpiresNamedMailbox() throws Exception {
+ when(usersRepository.listReactive()).thenReturn(Flux.just(alice));
String mailboxName = "Archived";
MailboxPath mailboxPath = MailboxPath.forUser(alice, mailboxName);
@@ -305,8 +305,8 @@ public class ExpireMailboxServiceTest {
}
@Test
- public void testExpiresNamedMailbox2() throws Exception {
- when(usersRepository.list()).thenReturn(List.of(alice).iterator());
+ void testExpiresNamedMailbox2() throws Exception {
+ when(usersRepository.listReactive()).thenReturn(Flux.just(alice));
ExpireMailboxService.Context context = new ExpireMailboxService.Context();
ExpireMailboxService.RunningOptions options = new ExpireMailboxService.RunningOptions(1, "NoSuchMailbox", false, Optional.of("1s"));
@@ -321,12 +321,12 @@ public class ExpireMailboxServiceTest {
}
@Test
- public void testContinuesAfterFailure() throws Exception {
+ void testContinuesAfterFailure() throws Exception {
Username bob = Username.of("bob@example.org");
MailboxPath bobInbox = MailboxPath.inbox(bob);
MailboxSession bobSession = mailboxManager.createSystemSession(bob);
-
- when(usersRepository.list()).thenReturn(List.of(alice, bob).iterator());
+
+ when(usersRepository.listReactive()).thenReturn(Flux.just(alice, bob));
mailboxManager.createMailbox(aliceInbox, aliceSession);
MessageManager aliceManager = mailboxManager.getMailbox(aliceInbox, aliceSession);
diff --git a/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/FeedHamToRspamdTask.java b/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/FeedHamToRspamdTask.java
index f0b687e282..ab75e04c4d 100644
--- a/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/FeedHamToRspamdTask.java
+++ b/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/FeedHamToRspamdTask.java
@@ -36,7 +36,6 @@ import org.apache.james.task.Task;
import org.apache.james.task.TaskExecutionDetails;
import org.apache.james.task.TaskType;
import org.apache.james.user.api.UsersRepository;
-import org.apache.james.user.api.UsersRepositoryException;
import org.apache.james.util.ReactorUtils;
import com.github.fge.lambdas.Throwing;
@@ -236,28 +235,23 @@ public class FeedHamToRspamdTask implements Task {
@Override
public Result run() {
Optional<Date> afterDate = runningOptions.getPeriodInSecond().map(periodInSecond -> Date.from(clock.instant().minusSeconds(periodInSecond)));
- try {
- return messagesService.getHamMessagesOfAllUser(afterDate, runningOptions, context)
- .transform(ReactorUtils.<MessageResult, Result>throttle()
- .elements(runningOptions.getMessagesPerSecond())
- .per(Duration.ofSeconds(1))
- .forOperation(messageResult -> Mono.fromSupplier(Throwing.supplier(() -> rspamdHttpClient.reportAsHam(messageResult.getFullContent().getInputStream())))
- .then(Mono.fromCallable(() -> {
- context.incrementReportedHamMessageCount(1);
- return Result.COMPLETED;
- }))
- .onErrorResume(error -> {
- LOGGER.error("Error when report ham message to Rspamd", error);
- context.incrementErrorCount();
- return Mono.just(Result.PARTIAL);
- })))
- .reduce(Task::combine)
- .switchIfEmpty(Mono.just(Result.COMPLETED))
- .block();
- } catch (UsersRepositoryException e) {
- LOGGER.error("Error while accessing users from repository", e);
- return Result.PARTIAL;
- }
+ return messagesService.getHamMessagesOfAllUser(afterDate, runningOptions, context)
+ .transform(ReactorUtils.<MessageResult, Result>throttle()
+ .elements(runningOptions.getMessagesPerSecond())
+ .per(Duration.ofSeconds(1))
+ .forOperation(messageResult -> Mono.fromSupplier(Throwing.supplier(() -> rspamdHttpClient.reportAsHam(messageResult.getFullContent().getInputStream())))
+ .then(Mono.fromCallable(() -> {
+ context.incrementReportedHamMessageCount(1);
+ return Result.COMPLETED;
+ }))
+ .onErrorResume(error -> {
+ LOGGER.error("Error when report ham message to Rspamd", error);
+ context.incrementErrorCount();
+ return Mono.just(Result.PARTIAL);
+ })))
+ .reduce(Task::combine)
+ .switchIfEmpty(Mono.just(Result.COMPLETED))
+ .block();
}
@Override
diff --git a/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/FeedSpamToRspamdTask.java b/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/FeedSpamToRspamdTask.java
index aae57cb5f5..4b1f0068be 100644
--- a/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/FeedSpamToRspamdTask.java
+++ b/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/FeedSpamToRspamdTask.java
@@ -36,7 +36,6 @@ import org.apache.james.task.Task;
import org.apache.james.task.TaskExecutionDetails;
import org.apache.james.task.TaskType;
import org.apache.james.user.api.UsersRepository;
-import org.apache.james.user.api.UsersRepositoryException;
import org.apache.james.util.ReactorUtils;
import com.github.fge.lambdas.Throwing;
@@ -237,28 +236,23 @@ public class FeedSpamToRspamdTask implements Task {
@Override
public Result run() {
Optional<Date> afterDate = runningOptions.getPeriodInSecond().map(periodInSecond -> Date.from(clock.instant().minusSeconds(periodInSecond)));
- try {
- return messagesService.getMailboxMessagesOfAllUser(SPAM_MAILBOX_NAME, afterDate, runningOptions, context)
- .transform(ReactorUtils.<MessageResult, Task.Result>throttle()
- .elements(runningOptions.getMessagesPerSecond())
- .per(Duration.ofSeconds(1))
- .forOperation(messageResult -> Mono.fromSupplier(Throwing.supplier(() -> rspamdHttpClient.reportAsSpam(messageResult.getFullContent().getInputStream())))
- .then(Mono.fromCallable(() -> {
- context.incrementReportedSpamMessageCount(1);
- return Result.COMPLETED;
- }))
- .onErrorResume(error -> {
- LOGGER.error("Error when report spam message to Rspamd", error);
- context.incrementErrorCount();
- return Mono.just(Result.PARTIAL);
- })))
- .reduce(Task::combine)
- .switchIfEmpty(Mono.just(Result.COMPLETED))
- .block();
- } catch (UsersRepositoryException e) {
- LOGGER.error("Error while accessing users from repository", e);
- return Task.Result.PARTIAL;
- }
+ return messagesService.getMailboxMessagesOfAllUser(SPAM_MAILBOX_NAME, afterDate, runningOptions, context)
+ .transform(ReactorUtils.<MessageResult, Task.Result>throttle()
+ .elements(runningOptions.getMessagesPerSecond())
+ .per(Duration.ofSeconds(1))
+ .forOperation(messageResult -> Mono.fromSupplier(Throwing.supplier(() -> rspamdHttpClient.reportAsSpam(messageResult.getFullContent().getInputStream())))
+ .then(Mono.fromCallable(() -> {
+ context.incrementReportedSpamMessageCount(1);
+ return Result.COMPLETED;
+ }))
+ .onErrorResume(error -> {
+ LOGGER.error("Error when report spam message to Rspamd", error);
+ context.incrementErrorCount();
+ return Mono.just(Result.PARTIAL);
+ })))
+ .reduce(Task::combine)
+ .switchIfEmpty(Mono.just(Result.COMPLETED))
+ .block();
}
@Override
diff --git a/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/GetMailboxMessagesService.java b/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/GetMailboxMessagesService.java
index 1e00b58434..11c5375403 100644
--- a/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/GetMailboxMessagesService.java
+++ b/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/GetMailboxMessagesService.java
@@ -39,9 +39,7 @@ import org.apache.james.mailbox.store.MailboxSessionMapperFactory;
import org.apache.james.mailbox.store.mail.MessageMapper;
import org.apache.james.mailbox.store.mail.model.Message;
import org.apache.james.user.api.UsersRepository;
-import org.apache.james.user.api.UsersRepositoryException;
import org.apache.james.util.ReactorUtils;
-import org.apache.james.util.streams.Iterators;
import com.github.fge.lambdas.Throwing;
@@ -65,19 +63,19 @@ public class GetMailboxMessagesService {
}
public Flux<MessageResult> getMailboxMessagesOfAllUser(String mailboxName, Optional<Date> afterDate, RunningOptions runningOptions,
- FeedSpamToRspamdTask.Context context) throws UsersRepositoryException {
- return Iterators.toFlux(userRepository.list())
+ FeedSpamToRspamdTask.Context context) {
+ return Flux.from(userRepository.listReactive())
.flatMap(username -> getMailboxMessagesOfAUser(username, mailboxName, afterDate, runningOptions, context), ReactorUtils.DEFAULT_CONCURRENCY);
}
public Flux<MessageResult> getHamMessagesOfAllUser(Optional<Date> afterDate, RunningOptions runningOptions,
- FeedHamToRspamdTask.Context context) throws UsersRepositoryException {
- return Iterators.toFlux(userRepository.list())
+ FeedHamToRspamdTask.Context context) {
+ return Flux.from(userRepository.listReactive())
.flatMap(Throwing.function(username ->
Flux.from(mailboxManager.search(MailboxQuery.privateMailboxesBuilder(mailboxManager.createSystemSession(username)).build(),
mailboxManager.createSystemSession(username)))
- .filter(mbxMetadata -> hamMailboxesPredicate(mbxMetadata.getPath()))
- .flatMap(mbxMetadata -> getMailboxMessagesOfAUser(username, mbxMetadata, afterDate, runningOptions, context), 2)), ReactorUtils.DEFAULT_CONCURRENCY);
+ .filter(mbxMetadata -> hamMailboxesPredicate(mbxMetadata.getPath()))
+ .flatMap(mbxMetadata -> getMailboxMessagesOfAUser(username, mbxMetadata, afterDate, runningOptions, context), 2)), ReactorUtils.DEFAULT_CONCURRENCY);
}
private Flux<MessageResult> getMailboxMessagesOfAUser(Username username, String mailboxName, Optional<Date> afterDate,
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org