You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2022/08/09 01:51:08 UTC
[james-project] branch master updated: JAMES-3775 Write a task and webadmin route to feed spam to RSpamD (#1092)
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new 6da4b1ba44 JAMES-3775 Write a task and webadmin route to feed spam to RSpamD (#1092)
6da4b1ba44 is described below
commit 6da4b1ba44b6007d54328b333d118de4231404d2
Author: Trần Hồng Quân <55...@users.noreply.github.com>
AuthorDate: Tue Aug 9 08:51:02 2022 +0700
JAMES-3775 Write a task and webadmin route to feed spam to RSpamD (#1092)
---
third-party/rspamd/README.md | 47 +++
third-party/rspamd/pom.xml | 61 +++-
.../james/rspamd/client/RSpamDHttpClient.java | 3 +
.../apache/james/rspamd/module/RSpamDModule.java | 52 +++
.../james/rspamd/route/FeedMessageRoute.java | 142 ++++++++
.../james/rspamd/task/FeedSpamToRSpamDTask.java | 383 ++++++++++++++++++++
...edSpamToRSpamDTaskAdditionalInformationDTO.java | 106 ++++++
.../rspamd/task/GetMailboxMessagesService.java | 89 +++++
.../james/rspamd/route/FeedMessageRouteTest.java | 393 +++++++++++++++++++++
...amToRSpamDTaskAdditionalInformationDTOTest.java | 63 ++++
.../rspamd/task/FeedSpamToRSpamDTaskTest.java | 317 +++++++++++++++++
.../feedSpamEmptyPeriod.additionalInformation.json | 11 +
...edSpamNonEmptyPeriod.additionalInformation.json | 12 +
13 files changed, 1678 insertions(+), 1 deletion(-)
diff --git a/third-party/rspamd/README.md b/third-party/rspamd/README.md
new file mode 100644
index 0000000000..2262693b86
--- /dev/null
+++ b/third-party/rspamd/README.md
@@ -0,0 +1,47 @@
+## Additional webadmin endpoints
+
+### Report spam messages to RSpamD
+One can use this route to schedule a task that reports spam messages to RSpamD for its spam classify learning.
+
+```bash
+curl -XPOST 'http://ip:port/rspamd?action=reportSpam
+```
+
+This endpoint has the following param:
+- `action` (required): need to be `reportSpam`
+- `messagesPerSecond` (optional): Concurrent learns performed for RSpamD, default to 10
+- `period` (optional): duration (support many time units, default in seconds), only messages between `now` and `now - duration` are reported. By default,
+all messages are reported.
+ These inputs represent the same duration: `1d`, `1day`, `86400 seconds`, `86400`...
+- `samplingProbability` (optional): float between 0 and 1, represent the chance to report each given message to RSpamD.
+By default, all messages are reported.
+
+Will return the task id. E.g:
+```
+{
+ "taskId": "70c12761-ab86-4321-bb6f-fde99e2f74b0"
+}
+```
+
+Response codes:
+- 201: Task generation succeeded. Corresponding task id is returned.
+- 400: Invalid arguments supplied in the user request.
+
+[More details about endpoints returning a task](https://james.apache.org/server/manage-webadmin.html#Endpoints_returning_a_task).
+
+The scheduled task will have the following type `FeedSpamToRSpamDTask` and the following additionalInformation:
+
+```json
+{
+ "errorCount": 1,
+ "reportedSpamMessageCount": 2,
+ "runningOptions": {
+ "messagesPerSecond": 10,
+ "periodInSecond": 3600,
+ "samplingProbability": 1.0
+ },
+ "spamMessageCount": 4,
+ "timestamp": "2007-12-03T10:15:30Z",
+ "type": "FeedSpamToRSpamDTask"
+}
+```
\ No newline at end of file
diff --git a/third-party/rspamd/pom.xml b/third-party/rspamd/pom.xml
index 75f9635d44..a8a3beb13a 100644
--- a/third-party/rspamd/pom.xml
+++ b/third-party/rspamd/pom.xml
@@ -33,10 +33,52 @@
<description>RSpamD Java client (HTTP) and testing utilities</description>
<dependencies>
+ <dependency>
+ <groupId>${james.groupId}</groupId>
+ <artifactId>apache-james-mailbox-api</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>${james.groupId}</groupId>
+ <artifactId>apache-james-mailbox-memory</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>${james.groupId}</groupId>
+ <artifactId>apache-james-mailbox-memory</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>${james.groupId}</groupId>
+ <artifactId>apache-james-mailbox-store</artifactId>
+ </dependency>
<dependency>
<groupId>${james.groupId}</groupId>
<artifactId>apache-mailet-api</artifactId>
</dependency>
+ <dependency>
+ <groupId>${james.groupId}</groupId>
+ <artifactId>event-bus-api</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>${james.groupId}</groupId>
+ <artifactId>event-bus-in-vm</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>${james.groupId}</groupId>
+ <artifactId>james-json</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>${james.groupId}</groupId>
+ <artifactId>james-server-data-api</artifactId>
+ </dependency>
<dependency>
<groupId>${james.groupId}</groupId>
<artifactId>james-server-guice-common</artifactId>
@@ -49,6 +91,10 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>${james.groupId}</groupId>
+ <artifactId>james-server-task-json</artifactId>
+ </dependency>
<dependency>
<groupId>${james.groupId}</groupId>
<artifactId>james-server-testing</artifactId>
@@ -57,7 +103,6 @@
<dependency>
<groupId>${james.groupId}</groupId>
<artifactId>james-server-webadmin-core</artifactId>
- <scope>test</scope>
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
@@ -74,10 +119,24 @@
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jdk8</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.google.inject</groupId>
+ <artifactId>guice</artifactId>
+ </dependency>
<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty</artifactId>
</dependency>
+ <dependency>
+ <groupId>net.javacrumbs.json-unit</groupId>
+ <artifactId>json-unit-assertj</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
diff --git a/third-party/rspamd/src/main/java/org/apache/james/rspamd/client/RSpamDHttpClient.java b/third-party/rspamd/src/main/java/org/apache/james/rspamd/client/RSpamDHttpClient.java
index 57f78e5362..6399765b3d 100644
--- a/third-party/rspamd/src/main/java/org/apache/james/rspamd/client/RSpamDHttpClient.java
+++ b/third-party/rspamd/src/main/java/org/apache/james/rspamd/client/RSpamDHttpClient.java
@@ -25,6 +25,8 @@ import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
+import javax.inject.Inject;
+
import org.apache.james.rspamd.exception.RSpamDUnexpectedException;
import org.apache.james.rspamd.exception.UnauthorizedException;
import org.apache.james.rspamd.model.AnalysisResult;
@@ -52,6 +54,7 @@ public class RSpamDHttpClient {
private final HttpClient httpClient;
private final ObjectMapper objectMapper;
+ @Inject
public RSpamDHttpClient(RSpamDClientConfiguration configuration) {
httpClient = buildReactorNettyHttpClient(configuration);
this.objectMapper = new ObjectMapper().registerModule(new Jdk8Module());
diff --git a/third-party/rspamd/src/main/java/org/apache/james/rspamd/module/RSpamDModule.java b/third-party/rspamd/src/main/java/org/apache/james/rspamd/module/RSpamDModule.java
new file mode 100644
index 0000000000..c3865bf7a1
--- /dev/null
+++ b/third-party/rspamd/src/main/java/org/apache/james/rspamd/module/RSpamDModule.java
@@ -0,0 +1,52 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.rspamd.module;
+
+import org.apache.james.rspamd.route.FeedMessageRoute;
+import org.apache.james.rspamd.task.FeedSpamToRSpamDTaskAdditionalInformationDTO;
+import org.apache.james.server.task.json.dto.AdditionalInformationDTO;
+import org.apache.james.server.task.json.dto.AdditionalInformationDTOModule;
+import org.apache.james.task.TaskExecutionDetails;
+import org.apache.james.webadmin.Routes;
+import org.apache.james.webadmin.dto.DTOModuleInjections;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.multibindings.Multibinder;
+import com.google.inject.multibindings.ProvidesIntoSet;
+import com.google.inject.name.Named;
+
+public class RSpamDModule extends AbstractModule {
+ @Override
+ protected void configure() {
+ Multibinder<Routes> routesMultiBinder = Multibinder.newSetBinder(binder(), Routes.class);
+ routesMultiBinder.addBinding().to(FeedMessageRoute.class);
+ }
+
+ @ProvidesIntoSet
+ public AdditionalInformationDTOModule<? extends TaskExecutionDetails.AdditionalInformation, ? extends AdditionalInformationDTO> feedSpamAdditionalInformation() {
+ return FeedSpamToRSpamDTaskAdditionalInformationDTO.SERIALIZATION_MODULE;
+ }
+
+ @Named(DTOModuleInjections.WEBADMIN_DTO)
+ @ProvidesIntoSet
+ public AdditionalInformationDTOModule<? extends TaskExecutionDetails.AdditionalInformation, ? extends AdditionalInformationDTO> webAdminFeedSpamAdditionalInformation() {
+ return FeedSpamToRSpamDTaskAdditionalInformationDTO.SERIALIZATION_MODULE;
+ }
+}
diff --git a/third-party/rspamd/src/main/java/org/apache/james/rspamd/route/FeedMessageRoute.java b/third-party/rspamd/src/main/java/org/apache/james/rspamd/route/FeedMessageRoute.java
new file mode 100644
index 0000000000..b1c631018b
--- /dev/null
+++ b/third-party/rspamd/src/main/java/org/apache/james/rspamd/route/FeedMessageRoute.java
@@ -0,0 +1,142 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.rspamd.route;
+
+import static org.apache.james.rspamd.task.FeedSpamToRSpamDTask.RunningOptions.DEFAULT_MESSAGES_PER_SECOND;
+import static org.apache.james.rspamd.task.FeedSpamToRSpamDTask.RunningOptions.DEFAULT_SAMPLING_PROBABILITY;
+
+import java.time.Clock;
+import java.time.temporal.ChronoUnit;
+import java.util.Optional;
+import java.util.function.Predicate;
+
+import javax.inject.Inject;
+
+import org.apache.james.mailbox.MailboxManager;
+import org.apache.james.mailbox.MessageIdManager;
+import org.apache.james.mailbox.store.MailboxSessionMapperFactory;
+import org.apache.james.rspamd.client.RSpamDHttpClient;
+import org.apache.james.rspamd.task.FeedSpamToRSpamDTask;
+import org.apache.james.task.Task;
+import org.apache.james.task.TaskManager;
+import org.apache.james.user.api.UsersRepository;
+import org.apache.james.util.DurationParser;
+import org.apache.james.webadmin.Routes;
+import org.apache.james.webadmin.tasks.TaskFromRequest;
+import org.apache.james.webadmin.utils.JsonTransformer;
+
+import com.google.common.base.Preconditions;
+
+import spark.Request;
+import spark.Service;
+
+public class FeedMessageRoute implements Routes {
+ public static final String BASE_PATH = "/rspamd";
+
+ private final TaskManager taskManager;
+ private final MailboxManager mailboxManager;
+ private final MessageIdManager messageIdManager;
+ private final MailboxSessionMapperFactory mapperFactory;
+ private final UsersRepository usersRepository;
+ private final RSpamDHttpClient rSpamDHttpClient;
+ private final JsonTransformer jsonTransformer;
+ private final Clock clock;
+
+ @Inject
+ public FeedMessageRoute(TaskManager taskManager, MailboxManager mailboxManager, UsersRepository usersRepository, RSpamDHttpClient rSpamDHttpClient,
+ JsonTransformer jsonTransformer, Clock clock, MessageIdManager messageIdManager, MailboxSessionMapperFactory mapperFactory) {
+ this.taskManager = taskManager;
+ this.mailboxManager = mailboxManager;
+ this.usersRepository = usersRepository;
+ this.rSpamDHttpClient = rSpamDHttpClient;
+ this.jsonTransformer = jsonTransformer;
+ this.clock = clock;
+ this.messageIdManager = messageIdManager;
+ this.mapperFactory = mapperFactory;
+ }
+
+ @Override
+ public String getBasePath() {
+ return BASE_PATH;
+ }
+
+ @Override
+ public void define(Service service) {
+ TaskFromRequest feedMessageTaskRequest = this::feedMessageTaskFromRequest;
+ service.post(BASE_PATH, feedMessageTaskRequest.asRoute(taskManager), jsonTransformer);
+ }
+
+ public Task feedMessageTaskFromRequest(Request request) {
+ Preconditions.checkArgument(Optional.ofNullable(request.queryParams("action"))
+ .filter(action -> action.equals("reportSpam") || action.equals("reportHam"))
+ .isPresent(),
+ "'action' is missing or must be 'reportSpam' or 'reportHam'");
+
+ return new FeedSpamToRSpamDTask(mailboxManager, usersRepository, messageIdManager, mapperFactory, rSpamDHttpClient,
+ getFeedSpamTaskRunningOptions(request), clock);
+ }
+
+ private FeedSpamToRSpamDTask.RunningOptions getFeedSpamTaskRunningOptions(Request request) {
+ Optional<Long> periodInSecond = getPeriod(request);
+ int messagesPerSecond = getMessagesPerSecond(request).orElse(DEFAULT_MESSAGES_PER_SECOND);
+ double samplingProbability = getSamplingProbability(request).orElse(DEFAULT_SAMPLING_PROBABILITY);
+ return new FeedSpamToRSpamDTask.RunningOptions(periodInSecond, messagesPerSecond, samplingProbability);
+ }
+
+ private Optional<Long> getPeriod(Request req) {
+ return Optional.ofNullable(req.queryParams("period"))
+ .filter(Predicate.not(String::isEmpty))
+ .map(rawString -> DurationParser.parse(rawString, ChronoUnit.SECONDS).toSeconds())
+ .map(period -> {
+ Preconditions.checkArgument(period > 0,
+ "'period' must be strictly positive");
+ return period;
+ });
+ }
+
+ private Optional<Integer> getMessagesPerSecond(Request req) {
+ try {
+ return Optional.ofNullable(req.queryParams("messagesPerSecond"))
+ .map(Integer::parseInt)
+ .map(messagesPerSecond -> {
+ Preconditions.checkArgument(messagesPerSecond > 0,
+ "'messagesPerSecond' must be strictly positive");
+ return messagesPerSecond;
+ });
+ } catch (NumberFormatException ex) {
+ throw new IllegalArgumentException("'messagesPerSecond' must be numeric");
+ }
+ }
+
+ private Optional<Double> getSamplingProbability(Request req) {
+ try {
+ return Optional.ofNullable(req.queryParams("samplingProbability"))
+ .map(Double::parseDouble)
+ .map(samplingProbability -> {
+ Preconditions.checkArgument(samplingProbability >= 0 && samplingProbability <= 1,
+ "'samplingProbability' must be greater than or equal to 0.0 and smaller than or equal to 1.0");
+ return samplingProbability;
+ });
+ } catch (NumberFormatException ex) {
+ throw new IllegalArgumentException("'samplingProbability' must be numeric");
+ }
+ }
+}
+
diff --git a/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/FeedSpamToRSpamDTask.java b/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/FeedSpamToRSpamDTask.java
new file mode 100644
index 0000000000..d546c307c0
--- /dev/null
+++ b/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/FeedSpamToRSpamDTask.java
@@ -0,0 +1,383 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.rspamd.task;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Date;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.james.mailbox.MailboxManager;
+import org.apache.james.mailbox.MessageIdManager;
+import org.apache.james.mailbox.model.MessageResult;
+import org.apache.james.mailbox.store.MailboxSessionMapperFactory;
+import org.apache.james.rspamd.client.RSpamDHttpClient;
+import org.apache.james.task.Task;
+import org.apache.james.task.TaskExecutionDetails;
+import org.apache.james.task.TaskType;
+import org.apache.james.user.api.UsersRepository;
+import org.apache.james.user.api.UsersRepositoryException;
+import org.apache.james.util.ReactorUtils;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.github.fge.lambdas.Throwing;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
+
+import reactor.core.publisher.Mono;
+
+public class FeedSpamToRSpamDTask implements Task {
+ public static final String SPAM_MAILBOX_NAME = "Spam";
+ public static final TaskType TASK_TYPE = TaskType.of("FeedSpamToRSpamDTask");
+
+ public static class RunningOptions {
+ public static final Optional<Long> DEFAULT_PERIOD = Optional.empty();
+ public static final int DEFAULT_MESSAGES_PER_SECOND = 10;
+ public static final double DEFAULT_SAMPLING_PROBABILITY = 1;
+ public static final RunningOptions DEFAULT = new RunningOptions(DEFAULT_PERIOD, DEFAULT_MESSAGES_PER_SECOND,
+ DEFAULT_SAMPLING_PROBABILITY);
+
+ private final Optional<Long> periodInSecond;
+ private final int messagesPerSecond;
+ private final double samplingProbability;
+
+ public RunningOptions(@JsonProperty("periodInSecond") Optional<Long> periodInSecond,
+ @JsonProperty("messagesPerSecond") int messagesPerSecond,
+ @JsonProperty("samplingProbability") double samplingProbability) {
+ this.periodInSecond = periodInSecond;
+ this.messagesPerSecond = messagesPerSecond;
+ this.samplingProbability = samplingProbability;
+ }
+
+ public Optional<Long> getPeriodInSecond() {
+ return periodInSecond;
+ }
+
+ public int getMessagesPerSecond() {
+ return messagesPerSecond;
+ }
+
+ public double getSamplingProbability() {
+ return samplingProbability;
+ }
+ }
+
+ public static class AdditionalInformation implements TaskExecutionDetails.AdditionalInformation {
+
+ private static AdditionalInformation from(Context context) {
+ Context.Snapshot snapshot = context.snapshot();
+ return new AdditionalInformation(
+ Clock.systemUTC().instant(),
+ snapshot.getSpamMessageCount(),
+ snapshot.getReportedSpamMessageCount(),
+ snapshot.getErrorCount(),
+ snapshot.getMessagesPerSecond(),
+ snapshot.getPeriod(),
+ snapshot.getSamplingProbability());
+ }
+
+ private final Instant timestamp;
+ private final long spamMessageCount;
+ private final long reportedSpamMessageCount;
+ private final long errorCount;
+ private final int messagesPerSecond;
+ private final Optional<Long> period;
+ private final double samplingProbability;
+
+ public AdditionalInformation(Instant timestamp, long spamMessageCount, long reportedSpamMessageCount, long errorCount, int messagesPerSecond, Optional<Long> period, double samplingProbability) {
+ this.timestamp = timestamp;
+ this.spamMessageCount = spamMessageCount;
+ this.reportedSpamMessageCount = reportedSpamMessageCount;
+ this.errorCount = errorCount;
+ this.messagesPerSecond = messagesPerSecond;
+ this.period = period;
+ this.samplingProbability = samplingProbability;
+ }
+
+ public long getSpamMessageCount() {
+ return spamMessageCount;
+ }
+
+ public long getReportedSpamMessageCount() {
+ return reportedSpamMessageCount;
+ }
+
+ public long getErrorCount() {
+ return errorCount;
+ }
+
+ public int getMessagesPerSecond() {
+ return messagesPerSecond;
+ }
+
+ public Optional<Long> getPeriod() {
+ return period;
+ }
+
+ public double getSamplingProbability() {
+ return samplingProbability;
+ }
+
+ @Override
+ public Instant timestamp() {
+ return timestamp;
+ }
+ }
+
+ public static class Context {
+
+ public static class Snapshot {
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ static class Builder {
+ private Optional<Long> spamMessageCount;
+ private Optional<Long> reportedSpamMessageCount;
+ private Optional<Long> errorCount;
+ private Optional<Integer> messagesPerSecond;
+ private Optional<Long> period;
+ private Optional<Double> samplingProbability;
+
+ Builder() {
+ spamMessageCount = Optional.empty();
+ reportedSpamMessageCount = Optional.empty();
+ errorCount = Optional.empty();
+ messagesPerSecond = Optional.empty();
+ period = Optional.empty();
+ samplingProbability = Optional.empty();
+ }
+
+ public Snapshot build() {
+ return new Snapshot(
+ spamMessageCount.orElse(0L),
+ reportedSpamMessageCount.orElse(0L),
+ errorCount.orElse(0L),
+ messagesPerSecond.orElse(0),
+ period,
+ samplingProbability.orElse(1D));
+ }
+
+ public Builder spamMessageCount(long spamMessageCount) {
+ this.spamMessageCount = Optional.of(spamMessageCount);
+ return this;
+ }
+
+ public Builder reportedSpamMessageCount(long reportedSpamMessageCount) {
+ this.reportedSpamMessageCount = Optional.of(reportedSpamMessageCount);
+ return this;
+ }
+
+ public Builder errorCount(long errorCount) {
+ this.errorCount = Optional.of(errorCount);
+ return this;
+ }
+
+ public Builder messagesPerSecond(int messagesPerSecond) {
+ this.messagesPerSecond = Optional.of(messagesPerSecond);
+ return this;
+ }
+
+ public Builder period(Optional<Long> period) {
+ this.period = period;
+ return this;
+ }
+
+ public Builder samplingProbability(double samplingProbability) {
+ this.samplingProbability = Optional.of(samplingProbability);
+ return this;
+ }
+ }
+
+ private final long spamMessageCount;
+ private final long reportedSpamMessageCount;
+ private final long errorCount;
+ private final int messagesPerSecond;
+ private final Optional<Long> period;
+ private final double samplingProbability;
+
+ public Snapshot(long spamMessageCount, long reportedSpamMessageCount, long errorCount, int messagesPerSecond, Optional<Long> period,
+ double samplingProbability) {
+ this.spamMessageCount = spamMessageCount;
+ this.reportedSpamMessageCount = reportedSpamMessageCount;
+ this.errorCount = errorCount;
+ this.messagesPerSecond = messagesPerSecond;
+ this.period = period;
+ this.samplingProbability = samplingProbability;
+ }
+
+ public long getSpamMessageCount() {
+ return spamMessageCount;
+ }
+
+ public long getReportedSpamMessageCount() {
+ return reportedSpamMessageCount;
+ }
+
+ public long getErrorCount() {
+ return errorCount;
+ }
+
+ public int getMessagesPerSecond() {
+ return messagesPerSecond;
+ }
+
+ public Optional<Long> getPeriod() {
+ return period;
+ }
+
+ public double getSamplingProbability() {
+ return samplingProbability;
+ }
+
+ @Override
+ public final boolean equals(Object o) {
+ if (o instanceof Snapshot) {
+ Snapshot snapshot = (Snapshot) o;
+
+ return Objects.equals(this.spamMessageCount, snapshot.spamMessageCount)
+ && Objects.equals(this.reportedSpamMessageCount, snapshot.reportedSpamMessageCount)
+ && Objects.equals(this.errorCount, snapshot.errorCount)
+ && Objects.equals(this.messagesPerSecond, snapshot.messagesPerSecond)
+ && Objects.equals(this.samplingProbability, snapshot.samplingProbability)
+ && Objects.equals(this.period, snapshot.period);
+ }
+ return false;
+ }
+
+ @Override
+ public final int hashCode() {
+ return Objects.hash(spamMessageCount, reportedSpamMessageCount, errorCount, messagesPerSecond, period, samplingProbability);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("spamMessageCount", spamMessageCount)
+ .add("reportedSpamMessageCount", reportedSpamMessageCount)
+ .add("errorCount", errorCount)
+ .add("messagesPerSecond", messagesPerSecond)
+ .add("period", period)
+ .add("samplingProbability", samplingProbability)
+ .toString();
+ }
+ }
+
+ private final AtomicLong spamMessageCount;
+ private final AtomicLong reportedSpamMessageCount;
+ private final AtomicLong errorCount;
+ private final Integer messagesPerSecond;
+ private final Optional<Long> period;
+ private final Double samplingProbability;
+
+ public Context(RunningOptions runningOptions) {
+ this.spamMessageCount = new AtomicLong();
+ this.reportedSpamMessageCount = new AtomicLong();
+ this.errorCount = new AtomicLong();
+ this.messagesPerSecond = runningOptions.messagesPerSecond;
+ this.period = runningOptions.periodInSecond;
+ this.samplingProbability = runningOptions.samplingProbability;
+ }
+
+ public void incrementSpamMessageCount() {
+ spamMessageCount.incrementAndGet();
+ }
+
+ public void incrementReportedSpamMessageCount(int count) {
+ reportedSpamMessageCount.addAndGet(count);
+ }
+
+ public void incrementErrorCount() {
+ errorCount.incrementAndGet();
+ }
+
+ public Snapshot snapshot() {
+ return Snapshot.builder()
+ .spamMessageCount(spamMessageCount.get())
+ .reportedSpamMessageCount(reportedSpamMessageCount.get())
+ .errorCount(errorCount.get())
+ .messagesPerSecond(messagesPerSecond)
+ .period(period)
+ .samplingProbability(samplingProbability)
+ .build();
+ }
+ }
+
+ private final GetMailboxMessagesService messagesService;
+ private final RSpamDHttpClient rSpamDHttpClient;
+ private final RunningOptions runningOptions;
+ private final Context context;
+ private final Clock clock;
+
+ public FeedSpamToRSpamDTask(MailboxManager mailboxManager, UsersRepository usersRepository, MessageIdManager messageIdManager, MailboxSessionMapperFactory mapperFactory,
+ RSpamDHttpClient rSpamDHttpClient, RunningOptions runningOptions, Clock clock) {
+ this.runningOptions = runningOptions;
+ this.messagesService = new GetMailboxMessagesService(mailboxManager, usersRepository, mapperFactory, messageIdManager);
+ this.rSpamDHttpClient = rSpamDHttpClient;
+ this.context = new Context(runningOptions);
+ this.clock = clock;
+ }
+
+ @Override
+ public Result run() {
+ Optional<Date> afterDate = runningOptions.periodInSecond.map(periodInSecond -> Date.from(clock.instant().minusSeconds(periodInSecond)));
+ try {
+ return messagesService.getMailboxMessagesOfAllUser(SPAM_MAILBOX_NAME, afterDate, runningOptions.getSamplingProbability(), context)
+ .transform(ReactorUtils.<MessageResult, Task.Result>throttle()
+ .elements(runningOptions.messagesPerSecond)
+ .per(Duration.ofSeconds(1))
+ .forOperation(messageResult -> Mono.fromSupplier(Throwing.supplier(() -> rSpamDHttpClient.reportAsSpam(messageResult.getFullContent().getInputStream())))
+ .then(Mono.fromCallable(() -> {
+ context.incrementReportedSpamMessageCount(1);
+ return Result.COMPLETED;
+ }))
+ .onErrorResume(error -> {
+ LOGGER.error("Error when report spam message to RSpamD", error);
+ context.incrementErrorCount();
+ return Mono.just(Result.PARTIAL);
+ })))
+ .reduce(Task::combine)
+ .switchIfEmpty(Mono.just(Result.COMPLETED))
+ .block();
+ } catch (UsersRepositoryException e) {
+ LOGGER.error("Error while accessing users from repository", e);
+ return Task.Result.PARTIAL;
+ }
+ }
+
+ @Override
+ public TaskType type() {
+ return TASK_TYPE;
+ }
+
+ @Override
+ public Optional<TaskExecutionDetails.AdditionalInformation> details() {
+ return Optional.of(AdditionalInformation.from(context));
+ }
+
+ @VisibleForTesting
+ public Context.Snapshot snapshot() {
+ return context.snapshot();
+ }
+}
diff --git a/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/FeedSpamToRSpamDTaskAdditionalInformationDTO.java b/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/FeedSpamToRSpamDTaskAdditionalInformationDTO.java
new file mode 100644
index 0000000000..518fa6c79f
--- /dev/null
+++ b/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/FeedSpamToRSpamDTaskAdditionalInformationDTO.java
@@ -0,0 +1,106 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.rspamd.task;
+
+import java.time.Instant;
+
+import org.apache.james.json.DTOModule;
+import org.apache.james.server.task.json.dto.AdditionalInformationDTO;
+import org.apache.james.server.task.json.dto.AdditionalInformationDTOModule;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class FeedSpamToRSpamDTaskAdditionalInformationDTO implements AdditionalInformationDTO {
+ public static final AdditionalInformationDTOModule<FeedSpamToRSpamDTask.AdditionalInformation, FeedSpamToRSpamDTaskAdditionalInformationDTO> SERIALIZATION_MODULE =
+ DTOModule.forDomainObject(FeedSpamToRSpamDTask.AdditionalInformation.class)
+ .convertToDTO(FeedSpamToRSpamDTaskAdditionalInformationDTO.class)
+ .toDomainObjectConverter(FeedSpamToRSpamDTaskAdditionalInformationDTO::toDomainObject)
+ .toDTOConverter(FeedSpamToRSpamDTaskAdditionalInformationDTO::toDto)
+ .typeName(FeedSpamToRSpamDTask.TASK_TYPE.asString())
+ .withFactory(AdditionalInformationDTOModule::new);
+
+ private static FeedSpamToRSpamDTask.AdditionalInformation toDomainObject(FeedSpamToRSpamDTaskAdditionalInformationDTO dto) {
+ return new FeedSpamToRSpamDTask.AdditionalInformation(
+ dto.timestamp,
+ dto.spamMessageCount,
+ dto.reportedSpamMessageCount,
+ dto.errorCount,
+ dto.runningOptions.getMessagesPerSecond(),
+ dto.runningOptions.getPeriodInSecond(),
+ dto.runningOptions.getSamplingProbability());
+ }
+
+ private static FeedSpamToRSpamDTaskAdditionalInformationDTO toDto(FeedSpamToRSpamDTask.AdditionalInformation domainObject, String type) {
+ return new FeedSpamToRSpamDTaskAdditionalInformationDTO(
+ type,
+ domainObject.timestamp(),
+ domainObject.getSpamMessageCount(),
+ domainObject.getReportedSpamMessageCount(),
+ domainObject.getErrorCount(),
+ new FeedSpamToRSpamDTask.RunningOptions(domainObject.getPeriod(), domainObject.getMessagesPerSecond(), domainObject.getSamplingProbability()));
+ }
+
+ private final String type;
+ private final Instant timestamp;
+ private final long spamMessageCount;
+ private final long reportedSpamMessageCount;
+ private final long errorCount;
+ private final FeedSpamToRSpamDTask.RunningOptions runningOptions;
+
+ public FeedSpamToRSpamDTaskAdditionalInformationDTO(@JsonProperty("type") String type,
+ @JsonProperty("timestamp") Instant timestamp,
+ @JsonProperty("spamMessageCount") long spamMessageCount,
+ @JsonProperty("reportedSpamMessageCount") long reportedSpamMessageCount,
+ @JsonProperty("errorCount") long errorCount,
+ @JsonProperty("runningOptions") FeedSpamToRSpamDTask.RunningOptions runningOptions) {
+ this.type = type;
+ this.timestamp = timestamp;
+ this.spamMessageCount = spamMessageCount;
+ this.reportedSpamMessageCount = reportedSpamMessageCount;
+ this.errorCount = errorCount;
+ this.runningOptions = runningOptions;
+ }
+
+ @Override
+ public String getType() {
+ return type;
+ }
+
+ @Override
+ public Instant getTimestamp() {
+ return timestamp;
+ }
+
+ public long getSpamMessageCount() {
+ return spamMessageCount;
+ }
+
+ public long getReportedSpamMessageCount() {
+ return reportedSpamMessageCount;
+ }
+
+ public long getErrorCount() {
+ return errorCount;
+ }
+
+ public FeedSpamToRSpamDTask.RunningOptions getRunningOptions() {
+ return runningOptions;
+ }
+}
diff --git a/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/GetMailboxMessagesService.java b/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/GetMailboxMessagesService.java
new file mode 100644
index 0000000000..abae46efbd
--- /dev/null
+++ b/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/GetMailboxMessagesService.java
@@ -0,0 +1,89 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.rspamd.task;
+
+import java.util.Date;
+import java.util.Optional;
+
+import org.apache.james.core.Username;
+import org.apache.james.mailbox.MailboxManager;
+import org.apache.james.mailbox.MailboxSession;
+import org.apache.james.mailbox.MessageIdManager;
+import org.apache.james.mailbox.MessageManager;
+import org.apache.james.mailbox.model.FetchGroup;
+import org.apache.james.mailbox.model.MailboxPath;
+import org.apache.james.mailbox.model.MessageRange;
+import org.apache.james.mailbox.model.MessageResult;
+import org.apache.james.mailbox.store.MailboxSessionMapperFactory;
+import org.apache.james.mailbox.store.mail.MessageMapper;
+import org.apache.james.mailbox.store.mail.model.Message;
+import org.apache.james.user.api.UsersRepository;
+import org.apache.james.user.api.UsersRepositoryException;
+import org.apache.james.util.ReactorUtils;
+import org.apache.james.util.streams.Iterators;
+
+import com.github.fge.lambdas.Throwing;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class GetMailboxMessagesService {
+ private static final int UNLIMITED = -1;
+
+ private final MailboxManager mailboxManager;
+ private final UsersRepository userRepository;
+ private final MailboxSessionMapperFactory mapperFactory;
+ private final MessageIdManager messageIdManager;
+
+ public GetMailboxMessagesService(MailboxManager mailboxManager, UsersRepository userRepository, MailboxSessionMapperFactory mapperFactory, MessageIdManager messageIdManager) {
+ this.mailboxManager = mailboxManager;
+ this.userRepository = userRepository;
+ this.mapperFactory = mapperFactory;
+ this.messageIdManager = messageIdManager;
+ }
+
+ public Flux<MessageResult> getMailboxMessagesOfAllUser(String mailboxName, Optional<Date> afterDate, double samplingProbability,
+ FeedSpamToRSpamDTask.Context context) throws UsersRepositoryException {
+ return Iterators.toFlux(userRepository.list())
+ .flatMap(username -> getMailboxMessagesOfAUser(username, mailboxName, afterDate, samplingProbability, context), ReactorUtils.DEFAULT_CONCURRENCY);
+ }
+
+ private Flux<MessageResult> getMailboxMessagesOfAUser(Username username, String mailboxName, Optional<Date> afterDate,
+ double samplingProbability, FeedSpamToRSpamDTask.Context context) {
+ MailboxSession mailboxSession = mailboxManager.createSystemSession(username);
+
+ return Mono.from(mailboxManager.getMailboxReactive(MailboxPath.forUser(username, mailboxName), mailboxSession))
+ .map(Throwing.function(MessageManager::getMailboxEntity))
+ .flatMapMany(Throwing.function(mailbox -> mapperFactory.getMessageMapper(mailboxSession).findInMailboxReactive(mailbox, MessageRange.all(), MessageMapper.FetchType.METADATA, UNLIMITED)))
+ .doOnNext(mailboxMessageMetaData -> context.incrementSpamMessageCount())
+ .filter(mailboxMessageMetaData -> afterDate.map(date -> mailboxMessageMetaData.getInternalDate().after(date)).orElse(true))
+ .filter(message -> randomBooleanWithProbability(samplingProbability))
+ .map(Message::getMessageId)
+ .collectList()
+ .flatMapMany(messageIds -> messageIdManager.getMessagesReactive(messageIds, FetchGroup.FULL_CONTENT, mailboxSession));
+ }
+
+ public static boolean randomBooleanWithProbability(double probability) {
+ if (probability == 1.0) {
+ return true;
+ }
+ return Math.random() < probability;
+ }
+}
diff --git a/third-party/rspamd/src/test/java/org/apache/james/rspamd/route/FeedMessageRouteTest.java b/third-party/rspamd/src/test/java/org/apache/james/rspamd/route/FeedMessageRouteTest.java
new file mode 100644
index 0000000000..97fd3a34d2
--- /dev/null
+++ b/third-party/rspamd/src/test/java/org/apache/james/rspamd/route/FeedMessageRouteTest.java
@@ -0,0 +1,393 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.rspamd.route;
+
+import static io.restassured.RestAssured.given;
+import static io.restassured.http.ContentType.JSON;
+import static org.apache.james.rspamd.DockerRSpamD.PASSWORD;
+import static org.apache.james.rspamd.route.FeedMessageRoute.BASE_PATH;
+import static org.apache.james.rspamd.task.FeedSpamToRSpamDTask.RunningOptions.DEFAULT_MESSAGES_PER_SECOND;
+import static org.apache.james.rspamd.task.FeedSpamToRSpamDTask.RunningOptions.DEFAULT_SAMPLING_PROBABILITY;
+import static org.apache.james.rspamd.task.FeedSpamToRSpamDTaskTest.ALICE;
+import static org.apache.james.rspamd.task.FeedSpamToRSpamDTaskTest.ALICE_SPAM_MAILBOX;
+import static org.apache.james.rspamd.task.FeedSpamToRSpamDTaskTest.BOB;
+import static org.apache.james.rspamd.task.FeedSpamToRSpamDTaskTest.BOB_SPAM_MAILBOX;
+import static org.apache.james.rspamd.task.FeedSpamToRSpamDTaskTest.NOW;
+import static org.apache.james.rspamd.task.FeedSpamToRSpamDTaskTest.ONE_DAY_IN_SECOND;
+import static org.apache.james.rspamd.task.FeedSpamToRSpamDTaskTest.THREE_DAYS_IN_SECOND;
+import static org.apache.james.rspamd.task.FeedSpamToRSpamDTaskTest.TWO_DAYS_IN_SECOND;
+import static org.eclipse.jetty.http.HttpStatus.BAD_REQUEST_400;
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThan;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+
+import java.io.ByteArrayInputStream;
+import java.time.temporal.ChronoUnit;
+import java.util.Date;
+import java.util.Optional;
+import java.util.stream.IntStream;
+
+import javax.mail.Flags;
+
+import org.apache.james.domainlist.api.DomainList;
+import org.apache.james.json.DTOConverter;
+import org.apache.james.mailbox.MailboxSession;
+import org.apache.james.mailbox.MessageIdManager;
+import org.apache.james.mailbox.exception.MailboxException;
+import org.apache.james.mailbox.inmemory.InMemoryMailboxManager;
+import org.apache.james.mailbox.inmemory.manager.InMemoryIntegrationResources;
+import org.apache.james.mailbox.model.MailboxPath;
+import org.apache.james.mailbox.store.MailboxSessionMapperFactory;
+import org.apache.james.rspamd.DockerRSpamDExtension;
+import org.apache.james.rspamd.client.RSpamDClientConfiguration;
+import org.apache.james.rspamd.client.RSpamDHttpClient;
+import org.apache.james.rspamd.task.FeedSpamToRSpamDTask;
+import org.apache.james.rspamd.task.FeedSpamToRSpamDTaskAdditionalInformationDTO;
+import org.apache.james.task.Hostname;
+import org.apache.james.task.MemoryTaskManager;
+import org.apache.james.user.api.UsersRepository;
+import org.apache.james.user.memory.MemoryUsersRepository;
+import org.apache.james.util.DurationParser;
+import org.apache.james.utils.UpdatableTickingClock;
+import org.apache.james.webadmin.WebAdminServer;
+import org.apache.james.webadmin.WebAdminUtils;
+import org.apache.james.webadmin.routes.TasksRoutes;
+import org.apache.james.webadmin.utils.JsonTransformer;
+import org.eclipse.jetty.http.HttpStatus;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.Mockito;
+
+import com.github.fge.lambdas.Throwing;
+
+import io.restassured.RestAssured;
+
+public class FeedMessageRouteTest {
+ @RegisterExtension
+ static DockerRSpamDExtension rSpamDExtension = new DockerRSpamDExtension();
+
+ private InMemoryMailboxManager mailboxManager;
+ private WebAdminServer webAdminServer;
+ private MemoryTaskManager taskManager;
+
+ @BeforeEach
+ void setUp() throws Exception {
+ InMemoryIntegrationResources inMemoryIntegrationResources = InMemoryIntegrationResources.defaultResources();
+ mailboxManager = inMemoryIntegrationResources.getMailboxManager();
+ DomainList domainList = mock(DomainList.class);
+ Mockito.when(domainList.containsDomain(any())).thenReturn(true);
+ UsersRepository usersRepository = MemoryUsersRepository.withVirtualHosting(domainList);
+ usersRepository.addUser(BOB, "anyPassword");
+ usersRepository.addUser(ALICE, "anyPassword");
+ mailboxManager.createMailbox(BOB_SPAM_MAILBOX, mailboxManager.createSystemSession(BOB));
+ mailboxManager.createMailbox(ALICE_SPAM_MAILBOX, mailboxManager.createSystemSession(ALICE));
+ taskManager = new MemoryTaskManager(new Hostname("foo"));
+ UpdatableTickingClock clock = new UpdatableTickingClock(NOW);
+ JsonTransformer jsonTransformer = new JsonTransformer();
+ RSpamDHttpClient client = new RSpamDHttpClient(new RSpamDClientConfiguration(rSpamDExtension.getBaseUrl(), PASSWORD, Optional.empty()));
+ MessageIdManager messageIdManager = inMemoryIntegrationResources.getMessageIdManager();
+ MailboxSessionMapperFactory mapperFactory = mailboxManager.getMapperFactory();
+
+ TasksRoutes tasksRoutes = new TasksRoutes(taskManager, jsonTransformer, DTOConverter.of(FeedSpamToRSpamDTaskAdditionalInformationDTO.SERIALIZATION_MODULE));
+ FeedMessageRoute feedMessageRoute = new FeedMessageRoute(taskManager, mailboxManager, usersRepository, client, jsonTransformer, clock,
+ messageIdManager, mapperFactory);
+
+ webAdminServer = WebAdminUtils.createWebAdminServer(feedMessageRoute, tasksRoutes).start();
+
+ RestAssured.requestSpecification = WebAdminUtils.buildRequestSpecification(webAdminServer)
+ .setBasePath(BASE_PATH)
+ .build();
+ }
+
+ @AfterEach
+ void stop() {
+ webAdminServer.destroy();
+ taskManager.stop();
+ }
+
+ private void appendSpamMessage(MailboxPath mailboxPath, Date internalDate) throws MailboxException {
+ MailboxSession session = mailboxManager.createSystemSession(mailboxPath.getUser());
+ mailboxManager.getMailbox(mailboxPath, session)
+ .appendMessage(new ByteArrayInputStream(String.format("random content %4.3f", Math.random()).getBytes()),
+ internalDate,
+ session,
+ true,
+ new Flags());
+ }
+
+ @Nested
+ class FeedSpam {
+ @Test
+ void taskShouldReportAllSpamMessagesOfAllUsersByDefault() throws MailboxException {
+ appendSpamMessage(BOB_SPAM_MAILBOX, Date.from(NOW));
+ appendSpamMessage(ALICE_SPAM_MAILBOX, Date.from(NOW));
+
+ String taskId = given()
+ .queryParam("action", "reportSpam")
+ .post()
+ .jsonPath()
+ .get("taskId");
+
+ given()
+ .basePath(TasksRoutes.BASE)
+ .when()
+ .get(taskId + "/await")
+ .then()
+ .body("status", is("completed"))
+ .body("additionalInformation.type", is(FeedSpamToRSpamDTask.TASK_TYPE.asString()))
+ .body("additionalInformation.spamMessageCount", is(2))
+ .body("additionalInformation.reportedSpamMessageCount", is(2))
+ .body("additionalInformation.errorCount", is(0))
+ .body("additionalInformation.runningOptions.messagesPerSecond", is(DEFAULT_MESSAGES_PER_SECOND))
+ .body("additionalInformation.runningOptions.periodInSecond", is(nullValue()))
+ .body("additionalInformation.runningOptions.samplingProbability", is((float) DEFAULT_SAMPLING_PROBABILITY));
+ }
+
+ @Test
+ void taskShouldReportOnlyMailInPeriod() throws MailboxException {
+ appendSpamMessage(BOB_SPAM_MAILBOX, Date.from(NOW.minusSeconds(THREE_DAYS_IN_SECOND)));
+ appendSpamMessage(ALICE_SPAM_MAILBOX, Date.from(NOW.minusSeconds(ONE_DAY_IN_SECOND)));
+
+ String taskId = given()
+ .queryParam("action", "reportSpam")
+ .queryParam("period", TWO_DAYS_IN_SECOND)
+ .post()
+ .jsonPath()
+ .get("taskId");
+
+ given()
+ .basePath(TasksRoutes.BASE)
+ .when()
+ .get(taskId + "/await")
+ .then()
+ .body("status", is("completed"))
+ .body("additionalInformation.type", is(FeedSpamToRSpamDTask.TASK_TYPE.asString()))
+ .body("additionalInformation.spamMessageCount", is(2))
+ .body("additionalInformation.reportedSpamMessageCount", is(1))
+ .body("additionalInformation.errorCount", is(0))
+ .body("additionalInformation.runningOptions.messagesPerSecond", is(DEFAULT_MESSAGES_PER_SECOND))
+ .body("additionalInformation.runningOptions.periodInSecond", is(172800))
+ .body("additionalInformation.runningOptions.samplingProbability", is((float) DEFAULT_SAMPLING_PROBABILITY));
+ }
+
+ @Test
+ void taskWithAverageSamplingProbabilityShouldNotReportAllSpamMessages() {
+ IntStream.range(0, 10)
+ .forEach(Throwing.intConsumer(any -> appendSpamMessage(BOB_SPAM_MAILBOX, Date.from(NOW.minusSeconds(ONE_DAY_IN_SECOND)))));
+
+ String taskId = given()
+ .queryParam("action", "reportSpam")
+ .queryParam("samplingProbability", 0.5)
+ .post()
+ .jsonPath()
+ .get("taskId");
+
+ given()
+ .basePath(TasksRoutes.BASE)
+ .when()
+ .get(taskId + "/await")
+ .then()
+ .body("status", is("completed"))
+ .body("additionalInformation.type", is(FeedSpamToRSpamDTask.TASK_TYPE.asString()))
+ .body("additionalInformation.spamMessageCount", is(10))
+ .body("additionalInformation.reportedSpamMessageCount", is(allOf(greaterThan(0), lessThan(10))))
+ .body("additionalInformation.errorCount", is(0))
+ .body("additionalInformation.runningOptions.messagesPerSecond", is(DEFAULT_MESSAGES_PER_SECOND))
+ .body("additionalInformation.runningOptions.periodInSecond", is(nullValue()))
+ .body("additionalInformation.runningOptions.samplingProbability", is(0.5F));
+ }
+
+ @Test
+ void feedMessageShouldReturnErrorWhenInvalidAction() {
+ given()
+ .queryParam("action", "invalid")
+ .post()
+ .then()
+ .statusCode(BAD_REQUEST_400)
+ .contentType(JSON)
+ .body("statusCode", is(BAD_REQUEST_400))
+ .body("type", is("InvalidArgument"))
+ .body("message", is("Invalid arguments supplied in the user request"))
+ .body("details", is("'action' is missing or must be 'reportSpam' or 'reportHam'"));
+ }
+
+ @Test
+ void feedMessageTaskShouldReturnErrorWhenMissingAction() {
+ given()
+ .post()
+ .then()
+ .statusCode(BAD_REQUEST_400)
+ .contentType(JSON)
+ .body("statusCode", is(BAD_REQUEST_400))
+ .body("type", is("InvalidArgument"))
+ .body("message", is("Invalid arguments supplied in the user request"))
+ .body("details", is("'action' is missing or must be 'reportSpam' or 'reportHam'"));
+ }
+
+ @Test
+ void feedSpamShouldReturnTaskId() {
+ given()
+ .queryParam("action", "reportSpam")
+ .post()
+ .then()
+ .statusCode(HttpStatus.CREATED_201)
+ .body("taskId", notNullValue());
+ }
+
+ @Test
+ void feedSpamShouldReturnDetail() {
+ String taskId = given()
+ .queryParam("action", "reportSpam")
+ .post()
+ .jsonPath()
+ .get("taskId");
+
+ given()
+ .basePath(TasksRoutes.BASE)
+ .when()
+ .get(taskId + "/await")
+ .then()
+ .body("status", is("completed"))
+ .body("taskId", is(notNullValue()))
+ .body("type", is(FeedSpamToRSpamDTask.TASK_TYPE.asString()))
+ .body("startedDate", is(notNullValue()))
+ .body("submitDate", is(notNullValue()))
+ .body("completedDate", is(notNullValue()))
+ .body("additionalInformation.type", is(FeedSpamToRSpamDTask.TASK_TYPE.asString()))
+ .body("additionalInformation.timestamp", is(notNullValue()))
+ .body("additionalInformation.spamMessageCount", is(0))
+ .body("additionalInformation.reportedSpamMessageCount", is(0))
+ .body("additionalInformation.errorCount", is(0))
+ .body("additionalInformation.runningOptions.messagesPerSecond", is(DEFAULT_MESSAGES_PER_SECOND))
+ .body("additionalInformation.runningOptions.periodInSecond", is(nullValue()))
+ .body("additionalInformation.runningOptions.samplingProbability", is((float) DEFAULT_SAMPLING_PROBABILITY));
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"3600", "3600 seconds", "1d", "1day"})
+ void feedSpamShouldAcceptPeriodParam(String period) {
+ String taskId = given()
+ .queryParam("action", "reportSpam")
+ .queryParam("period", period)
+ .post()
+ .jsonPath()
+ .get("taskId");
+
+ given()
+ .basePath(TasksRoutes.BASE)
+ .when()
+ .get(taskId + "/await")
+ .then()
+ .body("additionalInformation.runningOptions.periodInSecond", is((int) DurationParser.parse(period, ChronoUnit.SECONDS).toSeconds()));
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"-1", "0", "1 t"})
+ void feedSpamShouldReturnErrorWhenPeriodInvalid(String period) {
+ given()
+ .queryParam("action", "reportSpam")
+ .queryParam("period", period)
+ .post()
+ .then()
+ .statusCode(BAD_REQUEST_400)
+ .contentType(JSON)
+ .body("statusCode", is(BAD_REQUEST_400))
+ .body("type", is("InvalidArgument"))
+ .body("message", is("Invalid arguments supplied in the user request"));
+ }
+
+ @Test
+ void feedSpamShouldAcceptMessagesPerSecondParam() {
+ String taskId = given()
+ .queryParam("action", "reportSpam")
+ .queryParam("messagesPerSecond", 20)
+ .post()
+ .jsonPath()
+ .get("taskId");
+
+ given()
+ .basePath(TasksRoutes.BASE)
+ .when()
+ .get(taskId + "/await")
+ .then()
+ .body("additionalInformation.runningOptions.messagesPerSecond", is(20));
+ }
+
+ @ParameterizedTest
+ @ValueSource(doubles = {-1, -0.1, 1.1})
+ void feedSpamShouldReturnErrorWhenMessagesPerSecondInvalid(double messagesPerSecond) {
+ given()
+ .queryParam("action", "reportSpam")
+ .queryParam("messagesPerSecond", messagesPerSecond)
+ .post()
+ .then()
+ .statusCode(BAD_REQUEST_400)
+ .contentType(JSON)
+ .body("statusCode", is(BAD_REQUEST_400))
+ .body("type", is("InvalidArgument"))
+ .body("message", is("Invalid arguments supplied in the user request"))
+ .body("details", containsString("messagesPerSecond"));
+ }
+
+ @Test
+ void feedSpamShouldAcceptSamplingProbabilityParam() {
+ String taskId = given()
+ .queryParam("action", "reportSpam")
+ .queryParam("samplingProbability", 0.8)
+ .post()
+ .jsonPath()
+ .get("taskId");
+
+ given()
+ .basePath(TasksRoutes.BASE)
+ .when()
+ .get(taskId + "/await")
+ .then()
+ .body("additionalInformation.runningOptions.samplingProbability", is(0.8F));
+ }
+
+ @ParameterizedTest
+ @ValueSource(doubles = {-1, -0.1, 1.1})
+ void feedSpamShouldReturnErrorWhenSamplingProbabilityInvalid(double samplingProbability) {
+ given()
+ .queryParam("action", "reportSpam")
+ .queryParam("samplingProbability", samplingProbability)
+ .post()
+ .then()
+ .statusCode(BAD_REQUEST_400)
+ .contentType(JSON)
+ .body("statusCode", is(BAD_REQUEST_400))
+ .body("type", is("InvalidArgument"))
+ .body("message", is("Invalid arguments supplied in the user request"))
+ .body("details", containsString("samplingProbability"));
+ }
+ }
+}
diff --git a/third-party/rspamd/src/test/java/org/apache/james/rspamd/task/FeedSpamToRSpamDTaskAdditionalInformationDTOTest.java b/third-party/rspamd/src/test/java/org/apache/james/rspamd/task/FeedSpamToRSpamDTaskAdditionalInformationDTOTest.java
new file mode 100644
index 0000000000..169409cde5
--- /dev/null
+++ b/third-party/rspamd/src/test/java/org/apache/james/rspamd/task/FeedSpamToRSpamDTaskAdditionalInformationDTOTest.java
@@ -0,0 +1,63 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.rspamd.task;
+
+import static org.apache.james.rspamd.task.FeedSpamToRSpamDTask.RunningOptions.DEFAULT_MESSAGES_PER_SECOND;
+import static org.apache.james.rspamd.task.FeedSpamToRSpamDTask.RunningOptions.DEFAULT_PERIOD;
+import static org.apache.james.rspamd.task.FeedSpamToRSpamDTask.RunningOptions.DEFAULT_SAMPLING_PROBABILITY;
+
+import java.time.Instant;
+import java.util.Optional;
+
+import org.apache.james.JsonSerializationVerifier;
+import org.apache.james.util.ClassLoaderUtils;
+import org.junit.jupiter.api.Test;
+
+class FeedSpamToRSpamDTaskAdditionalInformationDTOTest {
+ @Test
+ void shouldMatchJsonSerializationContractWhenEmptyPeriod() throws Exception {
+ JsonSerializationVerifier.dtoModule(FeedSpamToRSpamDTaskAdditionalInformationDTO.SERIALIZATION_MODULE)
+ .bean(new FeedSpamToRSpamDTask.AdditionalInformation(
+ Instant.parse("2007-12-03T10:15:30.00Z"),
+ 4,
+ 2,
+ 1,
+ DEFAULT_MESSAGES_PER_SECOND,
+ DEFAULT_PERIOD,
+ DEFAULT_SAMPLING_PROBABILITY))
+ .json(ClassLoaderUtils.getSystemResourceAsString("json/feedSpamEmptyPeriod.additionalInformation.json"))
+ .verify();
+ }
+
+ @Test
+ void shouldMatchJsonSerializationContractWhenNonEmptyPeriod() throws Exception {
+ JsonSerializationVerifier.dtoModule(FeedSpamToRSpamDTaskAdditionalInformationDTO.SERIALIZATION_MODULE)
+ .bean(new FeedSpamToRSpamDTask.AdditionalInformation(
+ Instant.parse("2007-12-03T10:15:30.00Z"),
+ 4,
+ 2,
+ 1,
+ DEFAULT_MESSAGES_PER_SECOND,
+ Optional.of(3600L),
+ DEFAULT_SAMPLING_PROBABILITY))
+ .json(ClassLoaderUtils.getSystemResourceAsString("json/feedSpamNonEmptyPeriod.additionalInformation.json"))
+ .verify();
+ }
+}
diff --git a/third-party/rspamd/src/test/java/org/apache/james/rspamd/task/FeedSpamToRSpamDTaskTest.java b/third-party/rspamd/src/test/java/org/apache/james/rspamd/task/FeedSpamToRSpamDTaskTest.java
new file mode 100644
index 0000000000..70b7aa130b
--- /dev/null
+++ b/third-party/rspamd/src/test/java/org/apache/james/rspamd/task/FeedSpamToRSpamDTaskTest.java
@@ -0,0 +1,317 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.rspamd.task;
+
+import static org.apache.james.rspamd.DockerRSpamD.PASSWORD;
+import static org.apache.james.rspamd.task.FeedSpamToRSpamDTask.RunningOptions.DEFAULT_MESSAGES_PER_SECOND;
+import static org.apache.james.rspamd.task.FeedSpamToRSpamDTask.RunningOptions.DEFAULT_PERIOD;
+import static org.apache.james.rspamd.task.FeedSpamToRSpamDTask.RunningOptions.DEFAULT_SAMPLING_PROBABILITY;
+import static org.apache.james.rspamd.task.FeedSpamToRSpamDTask.SPAM_MAILBOX_NAME;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+
+import java.io.ByteArrayInputStream;
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZonedDateTime;
+import java.util.Date;
+import java.util.Optional;
+import java.util.stream.IntStream;
+
+import javax.mail.Flags;
+
+import org.apache.james.core.Domain;
+import org.apache.james.core.Username;
+import org.apache.james.domainlist.api.DomainList;
+import org.apache.james.mailbox.MailboxSession;
+import org.apache.james.mailbox.MessageIdManager;
+import org.apache.james.mailbox.exception.MailboxException;
+import org.apache.james.mailbox.inmemory.InMemoryMailboxManager;
+import org.apache.james.mailbox.inmemory.manager.InMemoryIntegrationResources;
+import org.apache.james.mailbox.model.MailboxPath;
+import org.apache.james.mailbox.store.MailboxSessionMapperFactory;
+import org.apache.james.rspamd.DockerRSpamDExtension;
+import org.apache.james.rspamd.client.RSpamDClientConfiguration;
+import org.apache.james.rspamd.client.RSpamDHttpClient;
+import org.apache.james.task.Task;
+import org.apache.james.user.api.UsersRepository;
+import org.apache.james.user.memory.MemoryUsersRepository;
+import org.apache.james.utils.UpdatableTickingClock;
+import org.assertj.core.api.SoftAssertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.mockito.Mockito;
+
+import com.github.fge.lambdas.Throwing;
+
+public class FeedSpamToRSpamDTaskTest {
+ @RegisterExtension
+ static DockerRSpamDExtension rSpamDExtension = new DockerRSpamDExtension();
+
+ public static final Domain DOMAIN = Domain.of("domain.tld");
+ public static final Username BOB = Username.fromLocalPartWithDomain("bob", DOMAIN);
+ public static final Username ALICE = Username.fromLocalPartWithDomain("alice", DOMAIN);
+ public static final MailboxPath BOB_SPAM_MAILBOX = MailboxPath.forUser(BOB, SPAM_MAILBOX_NAME);
+ public static final MailboxPath ALICE_SPAM_MAILBOX = MailboxPath.forUser(ALICE, SPAM_MAILBOX_NAME);
+ public static final long THREE_DAYS_IN_SECOND = 259200;
+ public static final long TWO_DAYS_IN_SECOND = 172800;
+ public static final long ONE_DAY_IN_SECOND = 86400;
+ public static final Instant NOW = ZonedDateTime.now().toInstant();
+
+ private InMemoryMailboxManager mailboxManager;
+ private MessageIdManager messageIdManager;
+ private MailboxSessionMapperFactory mapperFactory;
+ private UsersRepository usersRepository;
+ private Clock clock;
+ private RSpamDHttpClient client;
+ private FeedSpamToRSpamDTask task;
+
+ @BeforeEach
+ void setup() throws Exception {
+ InMemoryIntegrationResources inMemoryIntegrationResources = InMemoryIntegrationResources.defaultResources();
+ mailboxManager = inMemoryIntegrationResources.getMailboxManager();
+ DomainList domainList = mock(DomainList.class);
+ Mockito.when(domainList.containsDomain(any())).thenReturn(true);
+ usersRepository = MemoryUsersRepository.withVirtualHosting(domainList);
+ usersRepository.addUser(BOB, "anyPassword");
+ usersRepository.addUser(ALICE, "anyPassword");
+ mailboxManager.createMailbox(BOB_SPAM_MAILBOX, mailboxManager.createSystemSession(BOB));
+ mailboxManager.createMailbox(ALICE_SPAM_MAILBOX, mailboxManager.createSystemSession(ALICE));
+
+ clock = new UpdatableTickingClock(NOW);
+ client = new RSpamDHttpClient(new RSpamDClientConfiguration(rSpamDExtension.getBaseUrl(), PASSWORD, Optional.empty()));
+ messageIdManager = inMemoryIntegrationResources.getMessageIdManager();
+ mapperFactory = mailboxManager.getMapperFactory();
+ task = new FeedSpamToRSpamDTask(mailboxManager, usersRepository, messageIdManager, mapperFactory, client, FeedSpamToRSpamDTask.RunningOptions.DEFAULT, clock);
+ }
+
+ @Test
+ void shouldReturnDefaultInformationWhenDataIsEmpty() {
+ Task.Result result = task.run();
+
+ assertThat(result).isEqualTo(Task.Result.COMPLETED);
+ assertThat(task.snapshot())
+ .isEqualTo(FeedSpamToRSpamDTask.Context.Snapshot.builder()
+ .spamMessageCount(0)
+ .reportedSpamMessageCount(0)
+ .errorCount(0)
+ .messagesPerSecond(DEFAULT_MESSAGES_PER_SECOND)
+ .period(DEFAULT_PERIOD)
+ .samplingProbability(DEFAULT_SAMPLING_PROBABILITY)
+ .build());
+ }
+
+ @Test
+ void taskShouldReportAllSpamMessagesOfAllUsersByDefault() throws MailboxException {
+ appendSpamMessage(BOB_SPAM_MAILBOX, Date.from(NOW));
+ appendSpamMessage(ALICE_SPAM_MAILBOX, Date.from(NOW));
+
+ Task.Result result = task.run();
+
+ assertThat(result).isEqualTo(Task.Result.COMPLETED);
+ assertThat(task.snapshot())
+ .isEqualTo(FeedSpamToRSpamDTask.Context.Snapshot.builder()
+ .spamMessageCount(2)
+ .reportedSpamMessageCount(2)
+ .errorCount(0)
+ .messagesPerSecond(DEFAULT_MESSAGES_PER_SECOND)
+ .period(DEFAULT_PERIOD)
+ .samplingProbability(DEFAULT_SAMPLING_PROBABILITY)
+ .build());
+ }
+
+ @Test
+ void taskShouldReportSpamMessageInPeriod() throws MailboxException {
+ FeedSpamToRSpamDTask.RunningOptions runningOptions = new FeedSpamToRSpamDTask.RunningOptions(Optional.of(TWO_DAYS_IN_SECOND),
+ DEFAULT_MESSAGES_PER_SECOND, DEFAULT_SAMPLING_PROBABILITY);
+ task = new FeedSpamToRSpamDTask(mailboxManager, usersRepository, messageIdManager, mapperFactory, client, runningOptions, clock);
+
+ appendSpamMessage(BOB_SPAM_MAILBOX, Date.from(NOW.minusSeconds(ONE_DAY_IN_SECOND)));
+
+ Task.Result result = task.run();
+
+ assertThat(result).isEqualTo(Task.Result.COMPLETED);
+ assertThat(task.snapshot())
+ .isEqualTo(FeedSpamToRSpamDTask.Context.Snapshot.builder()
+ .spamMessageCount(1)
+ .reportedSpamMessageCount(1)
+ .errorCount(0)
+ .messagesPerSecond(DEFAULT_MESSAGES_PER_SECOND)
+ .period(Optional.of(TWO_DAYS_IN_SECOND))
+ .samplingProbability(DEFAULT_SAMPLING_PROBABILITY)
+ .build());
+ }
+
+ @Test
+ void taskShouldNotReportSpamMessageNotInPeriod() throws MailboxException {
+ FeedSpamToRSpamDTask.RunningOptions runningOptions = new FeedSpamToRSpamDTask.RunningOptions(Optional.of(TWO_DAYS_IN_SECOND),
+ DEFAULT_MESSAGES_PER_SECOND, DEFAULT_SAMPLING_PROBABILITY);
+ task = new FeedSpamToRSpamDTask(mailboxManager, usersRepository, messageIdManager, mapperFactory, client, runningOptions, clock);
+
+ appendSpamMessage(BOB_SPAM_MAILBOX, Date.from(NOW.minusSeconds(THREE_DAYS_IN_SECOND)));
+
+ Task.Result result = task.run();
+
+ assertThat(result).isEqualTo(Task.Result.COMPLETED);
+ assertThat(task.snapshot())
+ .isEqualTo(FeedSpamToRSpamDTask.Context.Snapshot.builder()
+ .spamMessageCount(1)
+ .reportedSpamMessageCount(0)
+ .errorCount(0)
+ .messagesPerSecond(DEFAULT_MESSAGES_PER_SECOND)
+ .period(Optional.of(TWO_DAYS_IN_SECOND))
+ .samplingProbability(DEFAULT_SAMPLING_PROBABILITY)
+ .build());
+ }
+
+ @Test
+ void mixedInternalDateCase() throws MailboxException {
+ FeedSpamToRSpamDTask.RunningOptions runningOptions = new FeedSpamToRSpamDTask.RunningOptions(Optional.of(TWO_DAYS_IN_SECOND),
+ DEFAULT_MESSAGES_PER_SECOND, DEFAULT_SAMPLING_PROBABILITY);
+ task = new FeedSpamToRSpamDTask(mailboxManager, usersRepository, messageIdManager, mapperFactory, client, runningOptions, clock);
+
+ appendSpamMessage(BOB_SPAM_MAILBOX, Date.from(NOW.minusSeconds(THREE_DAYS_IN_SECOND)));
+ appendSpamMessage(BOB_SPAM_MAILBOX, Date.from(NOW.minusSeconds(ONE_DAY_IN_SECOND)));
+
+ Task.Result result = task.run();
+
+ assertThat(result).isEqualTo(Task.Result.COMPLETED);
+ assertThat(task.snapshot())
+ .isEqualTo(FeedSpamToRSpamDTask.Context.Snapshot.builder()
+ .spamMessageCount(2)
+ .reportedSpamMessageCount(1)
+ .errorCount(0)
+ .messagesPerSecond(DEFAULT_MESSAGES_PER_SECOND)
+ .period(Optional.of(TWO_DAYS_IN_SECOND))
+ .samplingProbability(DEFAULT_SAMPLING_PROBABILITY)
+ .build());
+ }
+
+ @Test
+ void taskWithSamplingProbabilityIsZeroShouldReportNonSpamMessage() {
+ FeedSpamToRSpamDTask.RunningOptions runningOptions = new FeedSpamToRSpamDTask.RunningOptions(Optional.empty(),
+ DEFAULT_MESSAGES_PER_SECOND, 0);
+ task = new FeedSpamToRSpamDTask(mailboxManager, usersRepository, messageIdManager, mapperFactory, client, runningOptions, clock);
+
+ IntStream.range(0, 10)
+ .forEach(Throwing.intConsumer(any -> appendSpamMessage(BOB_SPAM_MAILBOX, Date.from(NOW.minusSeconds(ONE_DAY_IN_SECOND)))));
+
+ Task.Result result = task.run();
+
+ assertThat(result).isEqualTo(Task.Result.COMPLETED);
+ assertThat(task.snapshot())
+ .isEqualTo(FeedSpamToRSpamDTask.Context.Snapshot.builder()
+ .spamMessageCount(10)
+ .reportedSpamMessageCount(0)
+ .errorCount(0)
+ .messagesPerSecond(DEFAULT_MESSAGES_PER_SECOND)
+ .period(DEFAULT_PERIOD)
+ .samplingProbability(0)
+ .build());
+ }
+
+ @Test
+ void taskWithDefaultSamplingProbabilityShouldReportAllSpamMessages() {
+ IntStream.range(0, 10)
+ .forEach(Throwing.intConsumer(any -> appendSpamMessage(BOB_SPAM_MAILBOX, Date.from(NOW.minusSeconds(ONE_DAY_IN_SECOND)))));
+
+ Task.Result result = task.run();
+
+ assertThat(result).isEqualTo(Task.Result.COMPLETED);
+ assertThat(task.snapshot())
+ .isEqualTo(FeedSpamToRSpamDTask.Context.Snapshot.builder()
+ .spamMessageCount(10)
+ .reportedSpamMessageCount(10)
+ .errorCount(0)
+ .messagesPerSecond(DEFAULT_MESSAGES_PER_SECOND)
+ .period(DEFAULT_PERIOD)
+ .samplingProbability(DEFAULT_SAMPLING_PROBABILITY)
+ .build());
+ }
+
+ @Test
+ void taskWithVeryLowSamplingProbabilityShouldReportNotAllSpamMessages() {
+ FeedSpamToRSpamDTask.RunningOptions runningOptions = new FeedSpamToRSpamDTask.RunningOptions(Optional.empty(),
+ DEFAULT_MESSAGES_PER_SECOND, 0.01);
+ task = new FeedSpamToRSpamDTask(mailboxManager, usersRepository, messageIdManager, mapperFactory, client, runningOptions, clock);
+
+ IntStream.range(0, 10)
+ .forEach(Throwing.intConsumer(any -> appendSpamMessage(BOB_SPAM_MAILBOX, Date.from(NOW.minusSeconds(ONE_DAY_IN_SECOND)))));
+
+ Task.Result result = task.run();
+
+ SoftAssertions.assertSoftly(softly -> {
+ assertThat(result).isEqualTo(Task.Result.COMPLETED);
+ assertThat(task.snapshot().getSpamMessageCount()).isEqualTo(10);
+ assertThat(task.snapshot().getReportedSpamMessageCount()).isLessThan(10);
+ assertThat(task.snapshot().getErrorCount()).isZero();
+ });
+ }
+
+ @Test
+ void taskWithVeryHighSamplingProbabilityShouldReportMoreThanZeroMessage() {
+ FeedSpamToRSpamDTask.RunningOptions runningOptions = new FeedSpamToRSpamDTask.RunningOptions(Optional.empty(),
+ DEFAULT_MESSAGES_PER_SECOND, 0.99);
+ task = new FeedSpamToRSpamDTask(mailboxManager, usersRepository, messageIdManager, mapperFactory, client, runningOptions, clock);
+
+ IntStream.range(0, 10)
+ .forEach(Throwing.intConsumer(any -> appendSpamMessage(BOB_SPAM_MAILBOX, Date.from(NOW.minusSeconds(ONE_DAY_IN_SECOND)))));
+
+ Task.Result result = task.run();
+
+ SoftAssertions.assertSoftly(softly -> {
+ assertThat(result).isEqualTo(Task.Result.COMPLETED);
+ assertThat(task.snapshot().getSpamMessageCount()).isEqualTo(10);
+ assertThat(task.snapshot().getReportedSpamMessageCount()).isPositive();
+ assertThat(task.snapshot().getErrorCount()).isZero();
+ });
+ }
+
+ @Test
+ void taskWithAverageSamplingProbabilityShouldReportSomeMessages() {
+ FeedSpamToRSpamDTask.RunningOptions runningOptions = new FeedSpamToRSpamDTask.RunningOptions(Optional.empty(),
+ DEFAULT_MESSAGES_PER_SECOND, 0.5);
+ task = new FeedSpamToRSpamDTask(mailboxManager, usersRepository, messageIdManager, mapperFactory, client, runningOptions, clock);
+
+ IntStream.range(0, 10)
+ .forEach(Throwing.intConsumer(any -> appendSpamMessage(BOB_SPAM_MAILBOX, Date.from(NOW.minusSeconds(ONE_DAY_IN_SECOND)))));
+
+ Task.Result result = task.run();
+
+ SoftAssertions.assertSoftly(softly -> {
+ assertThat(result).isEqualTo(Task.Result.COMPLETED);
+ assertThat(task.snapshot().getSpamMessageCount()).isEqualTo(10);
+ assertThat(task.snapshot().getReportedSpamMessageCount()).isBetween(1L, 9L); // skip 0 and 10 cases cause their probability is very low (0.5^10)
+ assertThat(task.snapshot().getErrorCount()).isZero();
+ });
+ }
+
+ private void appendSpamMessage(MailboxPath mailboxPath, Date internalDate) throws MailboxException {
+ MailboxSession session = mailboxManager.createSystemSession(mailboxPath.getUser());
+ mailboxManager.getMailbox(mailboxPath, session)
+ .appendMessage(new ByteArrayInputStream(String.format("random content %4.3f", Math.random()).getBytes()),
+ internalDate,
+ session,
+ true,
+ new Flags());
+ }
+}
diff --git a/third-party/rspamd/src/test/resources/json/feedSpamEmptyPeriod.additionalInformation.json b/third-party/rspamd/src/test/resources/json/feedSpamEmptyPeriod.additionalInformation.json
new file mode 100644
index 0000000000..82fcf45f41
--- /dev/null
+++ b/third-party/rspamd/src/test/resources/json/feedSpamEmptyPeriod.additionalInformation.json
@@ -0,0 +1,11 @@
+{
+ "errorCount": 1,
+ "reportedSpamMessageCount": 2,
+ "runningOptions": {
+ "messagesPerSecond": 10,
+ "samplingProbability": 1.0
+ },
+ "spamMessageCount": 4,
+ "timestamp": "2007-12-03T10:15:30Z",
+ "type": "FeedSpamToRSpamDTask"
+}
\ No newline at end of file
diff --git a/third-party/rspamd/src/test/resources/json/feedSpamNonEmptyPeriod.additionalInformation.json b/third-party/rspamd/src/test/resources/json/feedSpamNonEmptyPeriod.additionalInformation.json
new file mode 100644
index 0000000000..6315215058
--- /dev/null
+++ b/third-party/rspamd/src/test/resources/json/feedSpamNonEmptyPeriod.additionalInformation.json
@@ -0,0 +1,12 @@
+{
+ "errorCount": 1,
+ "reportedSpamMessageCount": 2,
+ "runningOptions": {
+ "messagesPerSecond": 10,
+ "periodInSecond": 3600,
+ "samplingProbability": 1.0
+ },
+ "spamMessageCount": 4,
+ "timestamp": "2007-12-03T10:15:30Z",
+ "type": "FeedSpamToRSpamDTask"
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org