You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2022/08/09 01:51:08 UTC

[james-project] branch master updated: JAMES-3775 Write a task and webadmin route to feed spam to RSpamD (#1092)

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new 6da4b1ba44 JAMES-3775 Write a task and webadmin route to feed spam to RSpamD (#1092)
6da4b1ba44 is described below

commit 6da4b1ba44b6007d54328b333d118de4231404d2
Author: Trần Hồng Quân <55...@users.noreply.github.com>
AuthorDate: Tue Aug 9 08:51:02 2022 +0700

    JAMES-3775 Write a task and webadmin route to feed spam to RSpamD (#1092)
---
 third-party/rspamd/README.md                       |  47 +++
 third-party/rspamd/pom.xml                         |  61 +++-
 .../james/rspamd/client/RSpamDHttpClient.java      |   3 +
 .../apache/james/rspamd/module/RSpamDModule.java   |  52 +++
 .../james/rspamd/route/FeedMessageRoute.java       | 142 ++++++++
 .../james/rspamd/task/FeedSpamToRSpamDTask.java    | 383 ++++++++++++++++++++
 ...edSpamToRSpamDTaskAdditionalInformationDTO.java | 106 ++++++
 .../rspamd/task/GetMailboxMessagesService.java     |  89 +++++
 .../james/rspamd/route/FeedMessageRouteTest.java   | 393 +++++++++++++++++++++
 ...amToRSpamDTaskAdditionalInformationDTOTest.java |  63 ++++
 .../rspamd/task/FeedSpamToRSpamDTaskTest.java      | 317 +++++++++++++++++
 .../feedSpamEmptyPeriod.additionalInformation.json |  11 +
 ...edSpamNonEmptyPeriod.additionalInformation.json |  12 +
 13 files changed, 1678 insertions(+), 1 deletion(-)

diff --git a/third-party/rspamd/README.md b/third-party/rspamd/README.md
new file mode 100644
index 0000000000..2262693b86
--- /dev/null
+++ b/third-party/rspamd/README.md
@@ -0,0 +1,47 @@
+## Additional webadmin endpoints
+
+### Report spam messages to RSpamD
+One can use this route to schedule a task that reports spam messages to RSpamD for its spam classify learning.
+
+```bash
+curl -XPOST 'http://ip:port/rspamd?action=reportSpam
+```
+
+This endpoint has the following param:
+- `action` (required): need to be `reportSpam`
+- `messagesPerSecond` (optional): Concurrent learns performed for RSpamD, default to 10
+- `period` (optional): duration (support many time units, default in seconds), only messages between `now` and `now - duration` are reported. By default, 
+all messages are reported. 
+   These inputs represent the same duration: `1d`, `1day`, `86400 seconds`, `86400`...
+- `samplingProbability` (optional): float between 0 and 1, represent the chance to report each given message to RSpamD. 
+By default, all messages are reported.
+
+Will return the task id. E.g:
+```
+{
+    "taskId": "70c12761-ab86-4321-bb6f-fde99e2f74b0"
+}
+```
+
+Response codes:
+- 201: Task generation succeeded. Corresponding task id is returned.
+- 400: Invalid arguments supplied in the user request.
+
+[More details about endpoints returning a task](https://james.apache.org/server/manage-webadmin.html#Endpoints_returning_a_task).
+
+The scheduled task will have the following type `FeedSpamToRSpamDTask` and the following additionalInformation:
+
+```json
+{
+  "errorCount": 1,
+  "reportedSpamMessageCount": 2,
+  "runningOptions": {
+    "messagesPerSecond": 10,
+    "periodInSecond": 3600,
+    "samplingProbability": 1.0
+  },
+  "spamMessageCount": 4,
+  "timestamp": "2007-12-03T10:15:30Z",
+  "type": "FeedSpamToRSpamDTask"
+}
+```
\ No newline at end of file
diff --git a/third-party/rspamd/pom.xml b/third-party/rspamd/pom.xml
index 75f9635d44..a8a3beb13a 100644
--- a/third-party/rspamd/pom.xml
+++ b/third-party/rspamd/pom.xml
@@ -33,10 +33,52 @@
     <description>RSpamD Java client (HTTP) and testing utilities</description>
 
     <dependencies>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>apache-james-mailbox-api</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>apache-james-mailbox-memory</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>apache-james-mailbox-memory</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>apache-james-mailbox-store</artifactId>
+        </dependency>
         <dependency>
             <groupId>${james.groupId}</groupId>
             <artifactId>apache-mailet-api</artifactId>
         </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>event-bus-api</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>event-bus-in-vm</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>james-json</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>james-server-data-api</artifactId>
+        </dependency>
         <dependency>
             <groupId>${james.groupId}</groupId>
             <artifactId>james-server-guice-common</artifactId>
@@ -49,6 +91,10 @@
             <type>test-jar</type>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>james-server-task-json</artifactId>
+        </dependency>
         <dependency>
             <groupId>${james.groupId}</groupId>
             <artifactId>james-server-testing</artifactId>
@@ -57,7 +103,6 @@
         <dependency>
             <groupId>${james.groupId}</groupId>
             <artifactId>james-server-webadmin-core</artifactId>
-            <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>${james.groupId}</groupId>
@@ -74,10 +119,24 @@
             <groupId>com.fasterxml.jackson.datatype</groupId>
             <artifactId>jackson-datatype-jdk8</artifactId>
         </dependency>
+        <dependency>
+            <groupId>com.google.inject</groupId>
+            <artifactId>guice</artifactId>
+        </dependency>
         <dependency>
             <groupId>io.projectreactor.netty</groupId>
             <artifactId>reactor-netty</artifactId>
         </dependency>
+        <dependency>
+            <groupId>net.javacrumbs.json-unit</groupId>
+            <artifactId>json-unit-assertj</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.testcontainers</groupId>
             <artifactId>testcontainers</artifactId>
diff --git a/third-party/rspamd/src/main/java/org/apache/james/rspamd/client/RSpamDHttpClient.java b/third-party/rspamd/src/main/java/org/apache/james/rspamd/client/RSpamDHttpClient.java
index 57f78e5362..6399765b3d 100644
--- a/third-party/rspamd/src/main/java/org/apache/james/rspamd/client/RSpamDHttpClient.java
+++ b/third-party/rspamd/src/main/java/org/apache/james/rspamd/client/RSpamDHttpClient.java
@@ -25,6 +25,8 @@ import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 
+import javax.inject.Inject;
+
 import org.apache.james.rspamd.exception.RSpamDUnexpectedException;
 import org.apache.james.rspamd.exception.UnauthorizedException;
 import org.apache.james.rspamd.model.AnalysisResult;
@@ -52,6 +54,7 @@ public class RSpamDHttpClient {
     private final HttpClient httpClient;
     private final ObjectMapper objectMapper;
 
+    @Inject
     public RSpamDHttpClient(RSpamDClientConfiguration configuration) {
         httpClient = buildReactorNettyHttpClient(configuration);
         this.objectMapper = new ObjectMapper().registerModule(new Jdk8Module());
diff --git a/third-party/rspamd/src/main/java/org/apache/james/rspamd/module/RSpamDModule.java b/third-party/rspamd/src/main/java/org/apache/james/rspamd/module/RSpamDModule.java
new file mode 100644
index 0000000000..c3865bf7a1
--- /dev/null
+++ b/third-party/rspamd/src/main/java/org/apache/james/rspamd/module/RSpamDModule.java
@@ -0,0 +1,52 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.rspamd.module;
+
+import org.apache.james.rspamd.route.FeedMessageRoute;
+import org.apache.james.rspamd.task.FeedSpamToRSpamDTaskAdditionalInformationDTO;
+import org.apache.james.server.task.json.dto.AdditionalInformationDTO;
+import org.apache.james.server.task.json.dto.AdditionalInformationDTOModule;
+import org.apache.james.task.TaskExecutionDetails;
+import org.apache.james.webadmin.Routes;
+import org.apache.james.webadmin.dto.DTOModuleInjections;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.multibindings.Multibinder;
+import com.google.inject.multibindings.ProvidesIntoSet;
+import com.google.inject.name.Named;
+
+public class RSpamDModule extends AbstractModule {
+    @Override
+    protected void configure() {
+        Multibinder<Routes> routesMultiBinder = Multibinder.newSetBinder(binder(), Routes.class);
+        routesMultiBinder.addBinding().to(FeedMessageRoute.class);
+    }
+
+    @ProvidesIntoSet
+    public AdditionalInformationDTOModule<? extends TaskExecutionDetails.AdditionalInformation, ? extends AdditionalInformationDTO> feedSpamAdditionalInformation() {
+        return FeedSpamToRSpamDTaskAdditionalInformationDTO.SERIALIZATION_MODULE;
+    }
+
+    @Named(DTOModuleInjections.WEBADMIN_DTO)
+    @ProvidesIntoSet
+    public AdditionalInformationDTOModule<? extends TaskExecutionDetails.AdditionalInformation, ? extends AdditionalInformationDTO> webAdminFeedSpamAdditionalInformation() {
+        return FeedSpamToRSpamDTaskAdditionalInformationDTO.SERIALIZATION_MODULE;
+    }
+}
diff --git a/third-party/rspamd/src/main/java/org/apache/james/rspamd/route/FeedMessageRoute.java b/third-party/rspamd/src/main/java/org/apache/james/rspamd/route/FeedMessageRoute.java
new file mode 100644
index 0000000000..b1c631018b
--- /dev/null
+++ b/third-party/rspamd/src/main/java/org/apache/james/rspamd/route/FeedMessageRoute.java
@@ -0,0 +1,142 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.rspamd.route;
+
+import static org.apache.james.rspamd.task.FeedSpamToRSpamDTask.RunningOptions.DEFAULT_MESSAGES_PER_SECOND;
+import static org.apache.james.rspamd.task.FeedSpamToRSpamDTask.RunningOptions.DEFAULT_SAMPLING_PROBABILITY;
+
+import java.time.Clock;
+import java.time.temporal.ChronoUnit;
+import java.util.Optional;
+import java.util.function.Predicate;
+
+import javax.inject.Inject;
+
+import org.apache.james.mailbox.MailboxManager;
+import org.apache.james.mailbox.MessageIdManager;
+import org.apache.james.mailbox.store.MailboxSessionMapperFactory;
+import org.apache.james.rspamd.client.RSpamDHttpClient;
+import org.apache.james.rspamd.task.FeedSpamToRSpamDTask;
+import org.apache.james.task.Task;
+import org.apache.james.task.TaskManager;
+import org.apache.james.user.api.UsersRepository;
+import org.apache.james.util.DurationParser;
+import org.apache.james.webadmin.Routes;
+import org.apache.james.webadmin.tasks.TaskFromRequest;
+import org.apache.james.webadmin.utils.JsonTransformer;
+
+import com.google.common.base.Preconditions;
+
+import spark.Request;
+import spark.Service;
+
+public class FeedMessageRoute implements Routes {
+    public static final String BASE_PATH = "/rspamd";
+
+    private final TaskManager taskManager;
+    private final MailboxManager mailboxManager;
+    private final MessageIdManager messageIdManager;
+    private final MailboxSessionMapperFactory mapperFactory;
+    private final UsersRepository usersRepository;
+    private final RSpamDHttpClient rSpamDHttpClient;
+    private final JsonTransformer jsonTransformer;
+    private final Clock clock;
+
+    @Inject
+    public FeedMessageRoute(TaskManager taskManager, MailboxManager mailboxManager, UsersRepository usersRepository, RSpamDHttpClient rSpamDHttpClient,
+                            JsonTransformer jsonTransformer, Clock clock, MessageIdManager messageIdManager, MailboxSessionMapperFactory mapperFactory) {
+        this.taskManager = taskManager;
+        this.mailboxManager = mailboxManager;
+        this.usersRepository = usersRepository;
+        this.rSpamDHttpClient = rSpamDHttpClient;
+        this.jsonTransformer = jsonTransformer;
+        this.clock = clock;
+        this.messageIdManager = messageIdManager;
+        this.mapperFactory = mapperFactory;
+    }
+
+    @Override
+    public String getBasePath() {
+        return BASE_PATH;
+    }
+
+    @Override
+    public void define(Service service) {
+        TaskFromRequest feedMessageTaskRequest = this::feedMessageTaskFromRequest;
+        service.post(BASE_PATH, feedMessageTaskRequest.asRoute(taskManager), jsonTransformer);
+    }
+
+    public Task feedMessageTaskFromRequest(Request request) {
+        Preconditions.checkArgument(Optional.ofNullable(request.queryParams("action"))
+                .filter(action -> action.equals("reportSpam") || action.equals("reportHam"))
+                .isPresent(),
+            "'action' is missing or must be 'reportSpam' or 'reportHam'");
+
+        return new FeedSpamToRSpamDTask(mailboxManager, usersRepository, messageIdManager, mapperFactory, rSpamDHttpClient,
+            getFeedSpamTaskRunningOptions(request), clock);
+    }
+
+    private FeedSpamToRSpamDTask.RunningOptions getFeedSpamTaskRunningOptions(Request request) {
+        Optional<Long> periodInSecond = getPeriod(request);
+        int messagesPerSecond = getMessagesPerSecond(request).orElse(DEFAULT_MESSAGES_PER_SECOND);
+        double samplingProbability = getSamplingProbability(request).orElse(DEFAULT_SAMPLING_PROBABILITY);
+        return new FeedSpamToRSpamDTask.RunningOptions(periodInSecond, messagesPerSecond, samplingProbability);
+    }
+
+    private Optional<Long> getPeriod(Request req) {
+        return Optional.ofNullable(req.queryParams("period"))
+            .filter(Predicate.not(String::isEmpty))
+            .map(rawString -> DurationParser.parse(rawString, ChronoUnit.SECONDS).toSeconds())
+            .map(period -> {
+                Preconditions.checkArgument(period > 0,
+                    "'period' must be strictly positive");
+                return period;
+            });
+    }
+
+    private Optional<Integer> getMessagesPerSecond(Request req) {
+        try {
+            return Optional.ofNullable(req.queryParams("messagesPerSecond"))
+                .map(Integer::parseInt)
+                .map(messagesPerSecond -> {
+                    Preconditions.checkArgument(messagesPerSecond > 0,
+                        "'messagesPerSecond' must be strictly positive");
+                    return messagesPerSecond;
+                });
+        } catch (NumberFormatException ex) {
+            throw new IllegalArgumentException("'messagesPerSecond' must be numeric");
+        }
+    }
+
+    private Optional<Double> getSamplingProbability(Request req) {
+        try {
+            return Optional.ofNullable(req.queryParams("samplingProbability"))
+                .map(Double::parseDouble)
+                .map(samplingProbability -> {
+                    Preconditions.checkArgument(samplingProbability >= 0 && samplingProbability <= 1,
+                        "'samplingProbability' must be greater than or equal to 0.0 and smaller than or equal to 1.0");
+                    return samplingProbability;
+                });
+        } catch (NumberFormatException ex) {
+            throw new IllegalArgumentException("'samplingProbability' must be numeric");
+        }
+    }
+}
+
diff --git a/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/FeedSpamToRSpamDTask.java b/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/FeedSpamToRSpamDTask.java
new file mode 100644
index 0000000000..d546c307c0
--- /dev/null
+++ b/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/FeedSpamToRSpamDTask.java
@@ -0,0 +1,383 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.rspamd.task;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Date;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.james.mailbox.MailboxManager;
+import org.apache.james.mailbox.MessageIdManager;
+import org.apache.james.mailbox.model.MessageResult;
+import org.apache.james.mailbox.store.MailboxSessionMapperFactory;
+import org.apache.james.rspamd.client.RSpamDHttpClient;
+import org.apache.james.task.Task;
+import org.apache.james.task.TaskExecutionDetails;
+import org.apache.james.task.TaskType;
+import org.apache.james.user.api.UsersRepository;
+import org.apache.james.user.api.UsersRepositoryException;
+import org.apache.james.util.ReactorUtils;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.github.fge.lambdas.Throwing;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
+
+import reactor.core.publisher.Mono;
+
+public class FeedSpamToRSpamDTask implements Task {
+    public static final String SPAM_MAILBOX_NAME = "Spam";
+    public static final TaskType TASK_TYPE = TaskType.of("FeedSpamToRSpamDTask");
+
+    public static class RunningOptions {
+        public static final Optional<Long> DEFAULT_PERIOD = Optional.empty();
+        public static final int DEFAULT_MESSAGES_PER_SECOND = 10;
+        public static final double DEFAULT_SAMPLING_PROBABILITY = 1;
+        public static final RunningOptions DEFAULT = new RunningOptions(DEFAULT_PERIOD, DEFAULT_MESSAGES_PER_SECOND,
+            DEFAULT_SAMPLING_PROBABILITY);
+
+        private final Optional<Long> periodInSecond;
+        private final int messagesPerSecond;
+        private final double samplingProbability;
+
+        public RunningOptions(@JsonProperty("periodInSecond") Optional<Long> periodInSecond,
+                              @JsonProperty("messagesPerSecond") int messagesPerSecond,
+                              @JsonProperty("samplingProbability") double samplingProbability) {
+            this.periodInSecond = periodInSecond;
+            this.messagesPerSecond = messagesPerSecond;
+            this.samplingProbability = samplingProbability;
+        }
+
+        public Optional<Long> getPeriodInSecond() {
+            return periodInSecond;
+        }
+
+        public int getMessagesPerSecond() {
+            return messagesPerSecond;
+        }
+
+        public double getSamplingProbability() {
+            return samplingProbability;
+        }
+    }
+
+    public static class AdditionalInformation implements TaskExecutionDetails.AdditionalInformation {
+
+        private static AdditionalInformation from(Context context) {
+            Context.Snapshot snapshot = context.snapshot();
+            return new AdditionalInformation(
+                Clock.systemUTC().instant(),
+                snapshot.getSpamMessageCount(),
+                snapshot.getReportedSpamMessageCount(),
+                snapshot.getErrorCount(),
+                snapshot.getMessagesPerSecond(),
+                snapshot.getPeriod(),
+                snapshot.getSamplingProbability());
+        }
+
+        private final Instant timestamp;
+        private final long spamMessageCount;
+        private final long reportedSpamMessageCount;
+        private final long errorCount;
+        private final int messagesPerSecond;
+        private final Optional<Long> period;
+        private final double samplingProbability;
+
+        public AdditionalInformation(Instant timestamp, long spamMessageCount, long reportedSpamMessageCount, long errorCount, int messagesPerSecond, Optional<Long> period, double samplingProbability) {
+            this.timestamp = timestamp;
+            this.spamMessageCount = spamMessageCount;
+            this.reportedSpamMessageCount = reportedSpamMessageCount;
+            this.errorCount = errorCount;
+            this.messagesPerSecond = messagesPerSecond;
+            this.period = period;
+            this.samplingProbability = samplingProbability;
+        }
+
+        public long getSpamMessageCount() {
+            return spamMessageCount;
+        }
+
+        public long getReportedSpamMessageCount() {
+            return reportedSpamMessageCount;
+        }
+
+        public long getErrorCount() {
+            return errorCount;
+        }
+
+        public int getMessagesPerSecond() {
+            return messagesPerSecond;
+        }
+
+        public Optional<Long> getPeriod() {
+            return period;
+        }
+
+        public double getSamplingProbability() {
+            return samplingProbability;
+        }
+
+        @Override
+        public Instant timestamp() {
+            return timestamp;
+        }
+    }
+
+    public static class Context {
+
+        public static class Snapshot {
+
+            public static Builder builder() {
+                return new Builder();
+            }
+
+            static class Builder {
+                private Optional<Long> spamMessageCount;
+                private Optional<Long> reportedSpamMessageCount;
+                private Optional<Long> errorCount;
+                private Optional<Integer> messagesPerSecond;
+                private Optional<Long> period;
+                private Optional<Double> samplingProbability;
+
+                Builder() {
+                    spamMessageCount = Optional.empty();
+                    reportedSpamMessageCount = Optional.empty();
+                    errorCount = Optional.empty();
+                    messagesPerSecond = Optional.empty();
+                    period = Optional.empty();
+                    samplingProbability = Optional.empty();
+                }
+
+                public Snapshot build() {
+                    return new Snapshot(
+                        spamMessageCount.orElse(0L),
+                        reportedSpamMessageCount.orElse(0L),
+                        errorCount.orElse(0L),
+                        messagesPerSecond.orElse(0),
+                        period,
+                        samplingProbability.orElse(1D));
+                }
+
+                public Builder spamMessageCount(long spamMessageCount) {
+                    this.spamMessageCount = Optional.of(spamMessageCount);
+                    return this;
+                }
+
+                public Builder reportedSpamMessageCount(long reportedSpamMessageCount) {
+                    this.reportedSpamMessageCount = Optional.of(reportedSpamMessageCount);
+                    return this;
+                }
+
+                public Builder errorCount(long errorCount) {
+                    this.errorCount = Optional.of(errorCount);
+                    return this;
+                }
+
+                public Builder messagesPerSecond(int messagesPerSecond) {
+                    this.messagesPerSecond = Optional.of(messagesPerSecond);
+                    return this;
+                }
+
+                public Builder period(Optional<Long> period) {
+                    this.period = period;
+                    return this;
+                }
+
+                public Builder samplingProbability(double samplingProbability) {
+                    this.samplingProbability = Optional.of(samplingProbability);
+                    return this;
+                }
+            }
+
+            private final long spamMessageCount;
+            private final long reportedSpamMessageCount;
+            private final long errorCount;
+            private final int messagesPerSecond;
+            private final Optional<Long> period;
+            private final double samplingProbability;
+
+            public Snapshot(long spamMessageCount, long reportedSpamMessageCount, long errorCount, int messagesPerSecond, Optional<Long> period,
+                            double samplingProbability) {
+                this.spamMessageCount = spamMessageCount;
+                this.reportedSpamMessageCount = reportedSpamMessageCount;
+                this.errorCount = errorCount;
+                this.messagesPerSecond = messagesPerSecond;
+                this.period = period;
+                this.samplingProbability = samplingProbability;
+            }
+
+            public long getSpamMessageCount() {
+                return spamMessageCount;
+            }
+
+            public long getReportedSpamMessageCount() {
+                return reportedSpamMessageCount;
+            }
+
+            public long getErrorCount() {
+                return errorCount;
+            }
+
+            public int getMessagesPerSecond() {
+                return messagesPerSecond;
+            }
+
+            public Optional<Long> getPeriod() {
+                return period;
+            }
+
+            public double getSamplingProbability() {
+                return samplingProbability;
+            }
+
+            @Override
+            public final boolean equals(Object o) {
+                if (o instanceof Snapshot) {
+                    Snapshot snapshot = (Snapshot) o;
+
+                    return Objects.equals(this.spamMessageCount, snapshot.spamMessageCount)
+                        && Objects.equals(this.reportedSpamMessageCount, snapshot.reportedSpamMessageCount)
+                        && Objects.equals(this.errorCount, snapshot.errorCount)
+                        && Objects.equals(this.messagesPerSecond, snapshot.messagesPerSecond)
+                        && Objects.equals(this.samplingProbability, snapshot.samplingProbability)
+                        && Objects.equals(this.period, snapshot.period);
+                }
+                return false;
+            }
+
+            @Override
+            public final int hashCode() {
+                return Objects.hash(spamMessageCount, reportedSpamMessageCount, errorCount, messagesPerSecond, period, samplingProbability);
+            }
+
+            @Override
+            public String toString() {
+                return MoreObjects.toStringHelper(this)
+                    .add("spamMessageCount", spamMessageCount)
+                    .add("reportedSpamMessageCount", reportedSpamMessageCount)
+                    .add("errorCount", errorCount)
+                    .add("messagesPerSecond", messagesPerSecond)
+                    .add("period", period)
+                    .add("samplingProbability", samplingProbability)
+                    .toString();
+            }
+        }
+
+        private final AtomicLong spamMessageCount;
+        private final AtomicLong reportedSpamMessageCount;
+        private final AtomicLong errorCount;
+        private final Integer messagesPerSecond;
+        private final Optional<Long> period;
+        private final Double samplingProbability;
+
+        public Context(RunningOptions runningOptions) {
+            this.spamMessageCount = new AtomicLong();
+            this.reportedSpamMessageCount = new AtomicLong();
+            this.errorCount = new AtomicLong();
+            this.messagesPerSecond = runningOptions.messagesPerSecond;
+            this.period = runningOptions.periodInSecond;
+            this.samplingProbability = runningOptions.samplingProbability;
+        }
+
+        public void incrementSpamMessageCount() {
+            spamMessageCount.incrementAndGet();
+        }
+
+        public void incrementReportedSpamMessageCount(int count) {
+            reportedSpamMessageCount.addAndGet(count);
+        }
+
+        public void incrementErrorCount() {
+            errorCount.incrementAndGet();
+        }
+
+        public Snapshot snapshot() {
+            return Snapshot.builder()
+                .spamMessageCount(spamMessageCount.get())
+                .reportedSpamMessageCount(reportedSpamMessageCount.get())
+                .errorCount(errorCount.get())
+                .messagesPerSecond(messagesPerSecond)
+                .period(period)
+                .samplingProbability(samplingProbability)
+                .build();
+        }
+    }
+
+    private final GetMailboxMessagesService messagesService;
+    private final RSpamDHttpClient rSpamDHttpClient;
+    private final RunningOptions runningOptions;
+    private final Context context;
+    private final Clock clock;
+
+    public FeedSpamToRSpamDTask(MailboxManager mailboxManager, UsersRepository usersRepository, MessageIdManager messageIdManager, MailboxSessionMapperFactory mapperFactory,
+                                RSpamDHttpClient rSpamDHttpClient, RunningOptions runningOptions, Clock clock) {
+        this.runningOptions = runningOptions;
+        this.messagesService = new GetMailboxMessagesService(mailboxManager, usersRepository, mapperFactory, messageIdManager);
+        this.rSpamDHttpClient = rSpamDHttpClient;
+        this.context = new Context(runningOptions);
+        this.clock = clock;
+    }
+
+    @Override
+    public Result run() {
+        Optional<Date> afterDate = runningOptions.periodInSecond.map(periodInSecond -> Date.from(clock.instant().minusSeconds(periodInSecond)));
+        try {
+            return messagesService.getMailboxMessagesOfAllUser(SPAM_MAILBOX_NAME, afterDate, runningOptions.getSamplingProbability(), context)
+                .transform(ReactorUtils.<MessageResult, Task.Result>throttle()
+                    .elements(runningOptions.messagesPerSecond)
+                    .per(Duration.ofSeconds(1))
+                    .forOperation(messageResult -> Mono.fromSupplier(Throwing.supplier(() -> rSpamDHttpClient.reportAsSpam(messageResult.getFullContent().getInputStream())))
+                        .then(Mono.fromCallable(() -> {
+                            context.incrementReportedSpamMessageCount(1);
+                            return Result.COMPLETED;
+                        }))
+                        .onErrorResume(error -> {
+                            LOGGER.error("Error when report spam message to RSpamD", error);
+                            context.incrementErrorCount();
+                            return Mono.just(Result.PARTIAL);
+                        })))
+                .reduce(Task::combine)
+                .switchIfEmpty(Mono.just(Result.COMPLETED))
+                .block();
+        } catch (UsersRepositoryException e) {
+            LOGGER.error("Error while accessing users from repository", e);
+            return Task.Result.PARTIAL;
+        }
+    }
+
+    @Override
+    public TaskType type() {
+        return TASK_TYPE;
+    }
+
+    @Override
+    public Optional<TaskExecutionDetails.AdditionalInformation> details() {
+        return Optional.of(AdditionalInformation.from(context));
+    }
+
+    @VisibleForTesting
+    public Context.Snapshot snapshot() {
+        return context.snapshot();
+    }
+}
diff --git a/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/FeedSpamToRSpamDTaskAdditionalInformationDTO.java b/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/FeedSpamToRSpamDTaskAdditionalInformationDTO.java
new file mode 100644
index 0000000000..518fa6c79f
--- /dev/null
+++ b/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/FeedSpamToRSpamDTaskAdditionalInformationDTO.java
@@ -0,0 +1,106 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.rspamd.task;
+
+import java.time.Instant;
+
+import org.apache.james.json.DTOModule;
+import org.apache.james.server.task.json.dto.AdditionalInformationDTO;
+import org.apache.james.server.task.json.dto.AdditionalInformationDTOModule;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class FeedSpamToRSpamDTaskAdditionalInformationDTO implements AdditionalInformationDTO {
+    public static final AdditionalInformationDTOModule<FeedSpamToRSpamDTask.AdditionalInformation, FeedSpamToRSpamDTaskAdditionalInformationDTO> SERIALIZATION_MODULE =
+        DTOModule.forDomainObject(FeedSpamToRSpamDTask.AdditionalInformation.class)
+            .convertToDTO(FeedSpamToRSpamDTaskAdditionalInformationDTO.class)
+            .toDomainObjectConverter(FeedSpamToRSpamDTaskAdditionalInformationDTO::toDomainObject)
+            .toDTOConverter(FeedSpamToRSpamDTaskAdditionalInformationDTO::toDto)
+            .typeName(FeedSpamToRSpamDTask.TASK_TYPE.asString())
+            .withFactory(AdditionalInformationDTOModule::new);
+
+    private static FeedSpamToRSpamDTask.AdditionalInformation toDomainObject(FeedSpamToRSpamDTaskAdditionalInformationDTO dto) {
+        return new FeedSpamToRSpamDTask.AdditionalInformation(
+            dto.timestamp,
+            dto.spamMessageCount,
+            dto.reportedSpamMessageCount,
+            dto.errorCount,
+            dto.runningOptions.getMessagesPerSecond(),
+            dto.runningOptions.getPeriodInSecond(),
+            dto.runningOptions.getSamplingProbability());
+    }
+
+    private static FeedSpamToRSpamDTaskAdditionalInformationDTO toDto(FeedSpamToRSpamDTask.AdditionalInformation domainObject, String type) {
+        return new FeedSpamToRSpamDTaskAdditionalInformationDTO(
+            type,
+            domainObject.timestamp(),
+            domainObject.getSpamMessageCount(),
+            domainObject.getReportedSpamMessageCount(),
+            domainObject.getErrorCount(),
+            new FeedSpamToRSpamDTask.RunningOptions(domainObject.getPeriod(), domainObject.getMessagesPerSecond(), domainObject.getSamplingProbability()));
+    }
+
+    private final String type;
+    private final Instant timestamp;
+    private final long spamMessageCount;
+    private final long reportedSpamMessageCount;
+    private final long errorCount;
+    private final FeedSpamToRSpamDTask.RunningOptions runningOptions;
+
+    public FeedSpamToRSpamDTaskAdditionalInformationDTO(@JsonProperty("type") String type,
+                                                        @JsonProperty("timestamp") Instant timestamp,
+                                                        @JsonProperty("spamMessageCount") long spamMessageCount,
+                                                        @JsonProperty("reportedSpamMessageCount") long reportedSpamMessageCount,
+                                                        @JsonProperty("errorCount") long errorCount,
+                                                        @JsonProperty("runningOptions") FeedSpamToRSpamDTask.RunningOptions runningOptions) {
+        this.type = type;
+        this.timestamp = timestamp;
+        this.spamMessageCount = spamMessageCount;
+        this.reportedSpamMessageCount = reportedSpamMessageCount;
+        this.errorCount = errorCount;
+        this.runningOptions = runningOptions;
+    }
+
+    @Override
+    public String getType() {
+        return type;
+    }
+
+    @Override
+    public Instant getTimestamp() {
+        return timestamp;
+    }
+
+    public long getSpamMessageCount() {
+        return spamMessageCount;
+    }
+
+    public long getReportedSpamMessageCount() {
+        return reportedSpamMessageCount;
+    }
+
+    public long getErrorCount() {
+        return errorCount;
+    }
+
+    public FeedSpamToRSpamDTask.RunningOptions getRunningOptions() {
+        return runningOptions;
+    }
+}
diff --git a/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/GetMailboxMessagesService.java b/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/GetMailboxMessagesService.java
new file mode 100644
index 0000000000..abae46efbd
--- /dev/null
+++ b/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/GetMailboxMessagesService.java
@@ -0,0 +1,89 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.rspamd.task;
+
+import java.util.Date;
+import java.util.Optional;
+
+import org.apache.james.core.Username;
+import org.apache.james.mailbox.MailboxManager;
+import org.apache.james.mailbox.MailboxSession;
+import org.apache.james.mailbox.MessageIdManager;
+import org.apache.james.mailbox.MessageManager;
+import org.apache.james.mailbox.model.FetchGroup;
+import org.apache.james.mailbox.model.MailboxPath;
+import org.apache.james.mailbox.model.MessageRange;
+import org.apache.james.mailbox.model.MessageResult;
+import org.apache.james.mailbox.store.MailboxSessionMapperFactory;
+import org.apache.james.mailbox.store.mail.MessageMapper;
+import org.apache.james.mailbox.store.mail.model.Message;
+import org.apache.james.user.api.UsersRepository;
+import org.apache.james.user.api.UsersRepositoryException;
+import org.apache.james.util.ReactorUtils;
+import org.apache.james.util.streams.Iterators;
+
+import com.github.fge.lambdas.Throwing;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class GetMailboxMessagesService {
+    private static final int UNLIMITED = -1;
+
+    private final MailboxManager mailboxManager;
+    private final UsersRepository userRepository;
+    private final MailboxSessionMapperFactory mapperFactory;
+    private final MessageIdManager messageIdManager;
+
+    public GetMailboxMessagesService(MailboxManager mailboxManager, UsersRepository userRepository, MailboxSessionMapperFactory mapperFactory, MessageIdManager messageIdManager) {
+        this.mailboxManager = mailboxManager;
+        this.userRepository = userRepository;
+        this.mapperFactory = mapperFactory;
+        this.messageIdManager = messageIdManager;
+    }
+
+    public Flux<MessageResult> getMailboxMessagesOfAllUser(String mailboxName, Optional<Date> afterDate, double samplingProbability,
+                                                           FeedSpamToRSpamDTask.Context context) throws UsersRepositoryException {
+        return Iterators.toFlux(userRepository.list())
+            .flatMap(username -> getMailboxMessagesOfAUser(username, mailboxName, afterDate, samplingProbability, context), ReactorUtils.DEFAULT_CONCURRENCY);
+    }
+
+    private Flux<MessageResult> getMailboxMessagesOfAUser(Username username, String mailboxName, Optional<Date> afterDate,
+                                                          double samplingProbability, FeedSpamToRSpamDTask.Context context) {
+        MailboxSession mailboxSession = mailboxManager.createSystemSession(username);
+
+        return Mono.from(mailboxManager.getMailboxReactive(MailboxPath.forUser(username, mailboxName), mailboxSession))
+            .map(Throwing.function(MessageManager::getMailboxEntity))
+            .flatMapMany(Throwing.function(mailbox -> mapperFactory.getMessageMapper(mailboxSession).findInMailboxReactive(mailbox, MessageRange.all(), MessageMapper.FetchType.METADATA, UNLIMITED)))
+            .doOnNext(mailboxMessageMetaData -> context.incrementSpamMessageCount())
+            .filter(mailboxMessageMetaData -> afterDate.map(date -> mailboxMessageMetaData.getInternalDate().after(date)).orElse(true))
+            .filter(message -> randomBooleanWithProbability(samplingProbability))
+            .map(Message::getMessageId)
+            .collectList()
+            .flatMapMany(messageIds -> messageIdManager.getMessagesReactive(messageIds, FetchGroup.FULL_CONTENT, mailboxSession));
+    }
+
+    public static boolean randomBooleanWithProbability(double probability) {
+        if (probability == 1.0) {
+            return true;
+        }
+        return Math.random() < probability;
+    }
+}
diff --git a/third-party/rspamd/src/test/java/org/apache/james/rspamd/route/FeedMessageRouteTest.java b/third-party/rspamd/src/test/java/org/apache/james/rspamd/route/FeedMessageRouteTest.java
new file mode 100644
index 0000000000..97fd3a34d2
--- /dev/null
+++ b/third-party/rspamd/src/test/java/org/apache/james/rspamd/route/FeedMessageRouteTest.java
@@ -0,0 +1,393 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.rspamd.route;
+
+import static io.restassured.RestAssured.given;
+import static io.restassured.http.ContentType.JSON;
+import static org.apache.james.rspamd.DockerRSpamD.PASSWORD;
+import static org.apache.james.rspamd.route.FeedMessageRoute.BASE_PATH;
+import static org.apache.james.rspamd.task.FeedSpamToRSpamDTask.RunningOptions.DEFAULT_MESSAGES_PER_SECOND;
+import static org.apache.james.rspamd.task.FeedSpamToRSpamDTask.RunningOptions.DEFAULT_SAMPLING_PROBABILITY;
+import static org.apache.james.rspamd.task.FeedSpamToRSpamDTaskTest.ALICE;
+import static org.apache.james.rspamd.task.FeedSpamToRSpamDTaskTest.ALICE_SPAM_MAILBOX;
+import static org.apache.james.rspamd.task.FeedSpamToRSpamDTaskTest.BOB;
+import static org.apache.james.rspamd.task.FeedSpamToRSpamDTaskTest.BOB_SPAM_MAILBOX;
+import static org.apache.james.rspamd.task.FeedSpamToRSpamDTaskTest.NOW;
+import static org.apache.james.rspamd.task.FeedSpamToRSpamDTaskTest.ONE_DAY_IN_SECOND;
+import static org.apache.james.rspamd.task.FeedSpamToRSpamDTaskTest.THREE_DAYS_IN_SECOND;
+import static org.apache.james.rspamd.task.FeedSpamToRSpamDTaskTest.TWO_DAYS_IN_SECOND;
+import static org.eclipse.jetty.http.HttpStatus.BAD_REQUEST_400;
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThan;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+
+import java.io.ByteArrayInputStream;
+import java.time.temporal.ChronoUnit;
+import java.util.Date;
+import java.util.Optional;
+import java.util.stream.IntStream;
+
+import javax.mail.Flags;
+
+import org.apache.james.domainlist.api.DomainList;
+import org.apache.james.json.DTOConverter;
+import org.apache.james.mailbox.MailboxSession;
+import org.apache.james.mailbox.MessageIdManager;
+import org.apache.james.mailbox.exception.MailboxException;
+import org.apache.james.mailbox.inmemory.InMemoryMailboxManager;
+import org.apache.james.mailbox.inmemory.manager.InMemoryIntegrationResources;
+import org.apache.james.mailbox.model.MailboxPath;
+import org.apache.james.mailbox.store.MailboxSessionMapperFactory;
+import org.apache.james.rspamd.DockerRSpamDExtension;
+import org.apache.james.rspamd.client.RSpamDClientConfiguration;
+import org.apache.james.rspamd.client.RSpamDHttpClient;
+import org.apache.james.rspamd.task.FeedSpamToRSpamDTask;
+import org.apache.james.rspamd.task.FeedSpamToRSpamDTaskAdditionalInformationDTO;
+import org.apache.james.task.Hostname;
+import org.apache.james.task.MemoryTaskManager;
+import org.apache.james.user.api.UsersRepository;
+import org.apache.james.user.memory.MemoryUsersRepository;
+import org.apache.james.util.DurationParser;
+import org.apache.james.utils.UpdatableTickingClock;
+import org.apache.james.webadmin.WebAdminServer;
+import org.apache.james.webadmin.WebAdminUtils;
+import org.apache.james.webadmin.routes.TasksRoutes;
+import org.apache.james.webadmin.utils.JsonTransformer;
+import org.eclipse.jetty.http.HttpStatus;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.Mockito;
+
+import com.github.fge.lambdas.Throwing;
+
+import io.restassured.RestAssured;
+
+public class FeedMessageRouteTest {
+    @RegisterExtension
+    static DockerRSpamDExtension rSpamDExtension = new DockerRSpamDExtension();
+
+    private InMemoryMailboxManager mailboxManager;
+    private WebAdminServer webAdminServer;
+    private MemoryTaskManager taskManager;
+
+    @BeforeEach
+    void setUp() throws Exception {
+        InMemoryIntegrationResources inMemoryIntegrationResources = InMemoryIntegrationResources.defaultResources();
+        mailboxManager = inMemoryIntegrationResources.getMailboxManager();
+        DomainList domainList = mock(DomainList.class);
+        Mockito.when(domainList.containsDomain(any())).thenReturn(true);
+        UsersRepository usersRepository = MemoryUsersRepository.withVirtualHosting(domainList);
+        usersRepository.addUser(BOB, "anyPassword");
+        usersRepository.addUser(ALICE, "anyPassword");
+        mailboxManager.createMailbox(BOB_SPAM_MAILBOX, mailboxManager.createSystemSession(BOB));
+        mailboxManager.createMailbox(ALICE_SPAM_MAILBOX, mailboxManager.createSystemSession(ALICE));
+        taskManager = new MemoryTaskManager(new Hostname("foo"));
+        UpdatableTickingClock clock = new UpdatableTickingClock(NOW);
+        JsonTransformer jsonTransformer = new JsonTransformer();
+        RSpamDHttpClient client = new RSpamDHttpClient(new RSpamDClientConfiguration(rSpamDExtension.getBaseUrl(), PASSWORD, Optional.empty()));
+        MessageIdManager messageIdManager = inMemoryIntegrationResources.getMessageIdManager();
+        MailboxSessionMapperFactory mapperFactory = mailboxManager.getMapperFactory();
+
+        TasksRoutes tasksRoutes = new TasksRoutes(taskManager, jsonTransformer, DTOConverter.of(FeedSpamToRSpamDTaskAdditionalInformationDTO.SERIALIZATION_MODULE));
+        FeedMessageRoute feedMessageRoute = new FeedMessageRoute(taskManager, mailboxManager, usersRepository, client, jsonTransformer, clock,
+            messageIdManager, mapperFactory);
+
+        webAdminServer = WebAdminUtils.createWebAdminServer(feedMessageRoute, tasksRoutes).start();
+
+        RestAssured.requestSpecification = WebAdminUtils.buildRequestSpecification(webAdminServer)
+            .setBasePath(BASE_PATH)
+            .build();
+    }
+
+    @AfterEach
+    void stop() {
+        webAdminServer.destroy();
+        taskManager.stop();
+    }
+
+    private void appendSpamMessage(MailboxPath mailboxPath, Date internalDate) throws MailboxException {
+        MailboxSession session = mailboxManager.createSystemSession(mailboxPath.getUser());
+        mailboxManager.getMailbox(mailboxPath, session)
+            .appendMessage(new ByteArrayInputStream(String.format("random content %4.3f", Math.random()).getBytes()),
+                internalDate,
+                session,
+                true,
+                new Flags());
+    }
+
+    @Nested
+    class FeedSpam {
+        @Test
+        void taskShouldReportAllSpamMessagesOfAllUsersByDefault() throws MailboxException {
+            appendSpamMessage(BOB_SPAM_MAILBOX, Date.from(NOW));
+            appendSpamMessage(ALICE_SPAM_MAILBOX, Date.from(NOW));
+
+            String taskId = given()
+                .queryParam("action", "reportSpam")
+                .post()
+                .jsonPath()
+                .get("taskId");
+
+            given()
+                .basePath(TasksRoutes.BASE)
+            .when()
+                .get(taskId + "/await")
+            .then()
+                .body("status", is("completed"))
+                .body("additionalInformation.type", is(FeedSpamToRSpamDTask.TASK_TYPE.asString()))
+                .body("additionalInformation.spamMessageCount", is(2))
+                .body("additionalInformation.reportedSpamMessageCount", is(2))
+                .body("additionalInformation.errorCount", is(0))
+                .body("additionalInformation.runningOptions.messagesPerSecond", is(DEFAULT_MESSAGES_PER_SECOND))
+                .body("additionalInformation.runningOptions.periodInSecond", is(nullValue()))
+                .body("additionalInformation.runningOptions.samplingProbability", is((float) DEFAULT_SAMPLING_PROBABILITY));
+        }
+
+        @Test
+        void taskShouldReportOnlyMailInPeriod() throws MailboxException {
+            appendSpamMessage(BOB_SPAM_MAILBOX, Date.from(NOW.minusSeconds(THREE_DAYS_IN_SECOND)));
+            appendSpamMessage(ALICE_SPAM_MAILBOX, Date.from(NOW.minusSeconds(ONE_DAY_IN_SECOND)));
+
+            String taskId = given()
+                .queryParam("action", "reportSpam")
+                .queryParam("period", TWO_DAYS_IN_SECOND)
+                .post()
+                .jsonPath()
+                .get("taskId");
+
+            given()
+                .basePath(TasksRoutes.BASE)
+            .when()
+                .get(taskId + "/await")
+            .then()
+                .body("status", is("completed"))
+                .body("additionalInformation.type", is(FeedSpamToRSpamDTask.TASK_TYPE.asString()))
+                .body("additionalInformation.spamMessageCount", is(2))
+                .body("additionalInformation.reportedSpamMessageCount", is(1))
+                .body("additionalInformation.errorCount", is(0))
+                .body("additionalInformation.runningOptions.messagesPerSecond", is(DEFAULT_MESSAGES_PER_SECOND))
+                .body("additionalInformation.runningOptions.periodInSecond", is(172800))
+                .body("additionalInformation.runningOptions.samplingProbability", is((float) DEFAULT_SAMPLING_PROBABILITY));
+        }
+
+        @Test
+        void taskWithAverageSamplingProbabilityShouldNotReportAllSpamMessages() {
+            IntStream.range(0, 10)
+                .forEach(Throwing.intConsumer(any -> appendSpamMessage(BOB_SPAM_MAILBOX, Date.from(NOW.minusSeconds(ONE_DAY_IN_SECOND)))));
+
+            String taskId = given()
+                .queryParam("action", "reportSpam")
+                .queryParam("samplingProbability", 0.5)
+                .post()
+                .jsonPath()
+                .get("taskId");
+
+            given()
+                .basePath(TasksRoutes.BASE)
+            .when()
+                .get(taskId + "/await")
+            .then()
+                .body("status", is("completed"))
+                .body("additionalInformation.type", is(FeedSpamToRSpamDTask.TASK_TYPE.asString()))
+                .body("additionalInformation.spamMessageCount", is(10))
+                .body("additionalInformation.reportedSpamMessageCount", is(allOf(greaterThan(0), lessThan(10))))
+                .body("additionalInformation.errorCount", is(0))
+                .body("additionalInformation.runningOptions.messagesPerSecond", is(DEFAULT_MESSAGES_PER_SECOND))
+                .body("additionalInformation.runningOptions.periodInSecond", is(nullValue()))
+                .body("additionalInformation.runningOptions.samplingProbability", is(0.5F));
+        }
+
+        @Test
+        void feedMessageShouldReturnErrorWhenInvalidAction() {
+            given()
+                .queryParam("action", "invalid")
+                .post()
+            .then()
+                .statusCode(BAD_REQUEST_400)
+                .contentType(JSON)
+                .body("statusCode", is(BAD_REQUEST_400))
+                .body("type", is("InvalidArgument"))
+                .body("message", is("Invalid arguments supplied in the user request"))
+                .body("details", is("'action' is missing or must be 'reportSpam' or 'reportHam'"));
+        }
+
+        @Test
+        void feedMessageTaskShouldReturnErrorWhenMissingAction() {
+            given()
+                .post()
+            .then()
+                .statusCode(BAD_REQUEST_400)
+                .contentType(JSON)
+                .body("statusCode", is(BAD_REQUEST_400))
+                .body("type", is("InvalidArgument"))
+                .body("message", is("Invalid arguments supplied in the user request"))
+                .body("details", is("'action' is missing or must be 'reportSpam' or 'reportHam'"));
+        }
+
+        @Test
+        void feedSpamShouldReturnTaskId() {
+            given()
+                .queryParam("action", "reportSpam")
+                .post()
+            .then()
+                .statusCode(HttpStatus.CREATED_201)
+                .body("taskId", notNullValue());
+        }
+
+        @Test
+        void feedSpamShouldReturnDetail() {
+            String taskId = given()
+                .queryParam("action", "reportSpam")
+                .post()
+                .jsonPath()
+                .get("taskId");
+
+            given()
+                .basePath(TasksRoutes.BASE)
+            .when()
+                .get(taskId + "/await")
+            .then()
+                .body("status", is("completed"))
+                .body("taskId", is(notNullValue()))
+                .body("type", is(FeedSpamToRSpamDTask.TASK_TYPE.asString()))
+                .body("startedDate", is(notNullValue()))
+                .body("submitDate", is(notNullValue()))
+                .body("completedDate", is(notNullValue()))
+                .body("additionalInformation.type", is(FeedSpamToRSpamDTask.TASK_TYPE.asString()))
+                .body("additionalInformation.timestamp", is(notNullValue()))
+                .body("additionalInformation.spamMessageCount", is(0))
+                .body("additionalInformation.reportedSpamMessageCount", is(0))
+                .body("additionalInformation.errorCount", is(0))
+                .body("additionalInformation.runningOptions.messagesPerSecond", is(DEFAULT_MESSAGES_PER_SECOND))
+                .body("additionalInformation.runningOptions.periodInSecond", is(nullValue()))
+                .body("additionalInformation.runningOptions.samplingProbability", is((float) DEFAULT_SAMPLING_PROBABILITY));
+        }
+
+        @ParameterizedTest
+        @ValueSource(strings = {"3600", "3600 seconds", "1d", "1day"})
+        void feedSpamShouldAcceptPeriodParam(String period) {
+            String taskId = given()
+                .queryParam("action", "reportSpam")
+                .queryParam("period", period)
+                .post()
+                .jsonPath()
+                .get("taskId");
+
+            given()
+                .basePath(TasksRoutes.BASE)
+            .when()
+                .get(taskId + "/await")
+                .then()
+                .body("additionalInformation.runningOptions.periodInSecond", is((int) DurationParser.parse(period, ChronoUnit.SECONDS).toSeconds()));
+        }
+
+        @ParameterizedTest
+        @ValueSource(strings = {"-1", "0", "1 t"})
+        void feedSpamShouldReturnErrorWhenPeriodInvalid(String period) {
+            given()
+                .queryParam("action", "reportSpam")
+                .queryParam("period", period)
+                .post()
+            .then()
+                .statusCode(BAD_REQUEST_400)
+                .contentType(JSON)
+                .body("statusCode", is(BAD_REQUEST_400))
+                .body("type", is("InvalidArgument"))
+                .body("message", is("Invalid arguments supplied in the user request"));
+        }
+
+        @Test
+        void feedSpamShouldAcceptMessagesPerSecondParam() {
+            String taskId = given()
+                .queryParam("action", "reportSpam")
+                .queryParam("messagesPerSecond", 20)
+                .post()
+                .jsonPath()
+                .get("taskId");
+
+            given()
+                .basePath(TasksRoutes.BASE)
+            .when()
+                .get(taskId + "/await")
+            .then()
+                .body("additionalInformation.runningOptions.messagesPerSecond", is(20));
+        }
+
+        @ParameterizedTest
+        @ValueSource(doubles = {-1, -0.1, 1.1})
+        void feedSpamShouldReturnErrorWhenMessagesPerSecondInvalid(double messagesPerSecond) {
+            given()
+                .queryParam("action", "reportSpam")
+                .queryParam("messagesPerSecond", messagesPerSecond)
+                .post()
+            .then()
+                .statusCode(BAD_REQUEST_400)
+                .contentType(JSON)
+                .body("statusCode", is(BAD_REQUEST_400))
+                .body("type", is("InvalidArgument"))
+                .body("message", is("Invalid arguments supplied in the user request"))
+                .body("details", containsString("messagesPerSecond"));
+        }
+
+        @Test
+        void feedSpamShouldAcceptSamplingProbabilityParam() {
+            String taskId = given()
+                .queryParam("action", "reportSpam")
+                .queryParam("samplingProbability", 0.8)
+                .post()
+                .jsonPath()
+                .get("taskId");
+
+            given()
+                .basePath(TasksRoutes.BASE)
+            .when()
+                .get(taskId + "/await")
+                .then()
+                .body("additionalInformation.runningOptions.samplingProbability", is(0.8F));
+        }
+
+        @ParameterizedTest
+        @ValueSource(doubles = {-1, -0.1, 1.1})
+        void feedSpamShouldReturnErrorWhenSamplingProbabilityInvalid(double samplingProbability) {
+            given()
+                .queryParam("action", "reportSpam")
+                .queryParam("samplingProbability", samplingProbability)
+                .post()
+            .then()
+                .statusCode(BAD_REQUEST_400)
+                .contentType(JSON)
+                .body("statusCode", is(BAD_REQUEST_400))
+                .body("type", is("InvalidArgument"))
+                .body("message", is("Invalid arguments supplied in the user request"))
+                .body("details", containsString("samplingProbability"));
+        }
+    }
+}
diff --git a/third-party/rspamd/src/test/java/org/apache/james/rspamd/task/FeedSpamToRSpamDTaskAdditionalInformationDTOTest.java b/third-party/rspamd/src/test/java/org/apache/james/rspamd/task/FeedSpamToRSpamDTaskAdditionalInformationDTOTest.java
new file mode 100644
index 0000000000..169409cde5
--- /dev/null
+++ b/third-party/rspamd/src/test/java/org/apache/james/rspamd/task/FeedSpamToRSpamDTaskAdditionalInformationDTOTest.java
@@ -0,0 +1,63 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.rspamd.task;
+
+import static org.apache.james.rspamd.task.FeedSpamToRSpamDTask.RunningOptions.DEFAULT_MESSAGES_PER_SECOND;
+import static org.apache.james.rspamd.task.FeedSpamToRSpamDTask.RunningOptions.DEFAULT_PERIOD;
+import static org.apache.james.rspamd.task.FeedSpamToRSpamDTask.RunningOptions.DEFAULT_SAMPLING_PROBABILITY;
+
+import java.time.Instant;
+import java.util.Optional;
+
+import org.apache.james.JsonSerializationVerifier;
+import org.apache.james.util.ClassLoaderUtils;
+import org.junit.jupiter.api.Test;
+
+class FeedSpamToRSpamDTaskAdditionalInformationDTOTest {
+    @Test
+    void shouldMatchJsonSerializationContractWhenEmptyPeriod() throws Exception {
+        JsonSerializationVerifier.dtoModule(FeedSpamToRSpamDTaskAdditionalInformationDTO.SERIALIZATION_MODULE)
+            .bean(new FeedSpamToRSpamDTask.AdditionalInformation(
+                Instant.parse("2007-12-03T10:15:30.00Z"),
+                4,
+                2,
+                1,
+                DEFAULT_MESSAGES_PER_SECOND,
+                DEFAULT_PERIOD,
+                DEFAULT_SAMPLING_PROBABILITY))
+            .json(ClassLoaderUtils.getSystemResourceAsString("json/feedSpamEmptyPeriod.additionalInformation.json"))
+            .verify();
+    }
+
+    @Test
+    void shouldMatchJsonSerializationContractWhenNonEmptyPeriod() throws Exception {
+        JsonSerializationVerifier.dtoModule(FeedSpamToRSpamDTaskAdditionalInformationDTO.SERIALIZATION_MODULE)
+            .bean(new FeedSpamToRSpamDTask.AdditionalInformation(
+                Instant.parse("2007-12-03T10:15:30.00Z"),
+                4,
+                2,
+                1,
+                DEFAULT_MESSAGES_PER_SECOND,
+                Optional.of(3600L),
+                DEFAULT_SAMPLING_PROBABILITY))
+            .json(ClassLoaderUtils.getSystemResourceAsString("json/feedSpamNonEmptyPeriod.additionalInformation.json"))
+            .verify();
+    }
+}
diff --git a/third-party/rspamd/src/test/java/org/apache/james/rspamd/task/FeedSpamToRSpamDTaskTest.java b/third-party/rspamd/src/test/java/org/apache/james/rspamd/task/FeedSpamToRSpamDTaskTest.java
new file mode 100644
index 0000000000..70b7aa130b
--- /dev/null
+++ b/third-party/rspamd/src/test/java/org/apache/james/rspamd/task/FeedSpamToRSpamDTaskTest.java
@@ -0,0 +1,317 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.rspamd.task;
+
+import static org.apache.james.rspamd.DockerRSpamD.PASSWORD;
+import static org.apache.james.rspamd.task.FeedSpamToRSpamDTask.RunningOptions.DEFAULT_MESSAGES_PER_SECOND;
+import static org.apache.james.rspamd.task.FeedSpamToRSpamDTask.RunningOptions.DEFAULT_PERIOD;
+import static org.apache.james.rspamd.task.FeedSpamToRSpamDTask.RunningOptions.DEFAULT_SAMPLING_PROBABILITY;
+import static org.apache.james.rspamd.task.FeedSpamToRSpamDTask.SPAM_MAILBOX_NAME;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+
+import java.io.ByteArrayInputStream;
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZonedDateTime;
+import java.util.Date;
+import java.util.Optional;
+import java.util.stream.IntStream;
+
+import javax.mail.Flags;
+
+import org.apache.james.core.Domain;
+import org.apache.james.core.Username;
+import org.apache.james.domainlist.api.DomainList;
+import org.apache.james.mailbox.MailboxSession;
+import org.apache.james.mailbox.MessageIdManager;
+import org.apache.james.mailbox.exception.MailboxException;
+import org.apache.james.mailbox.inmemory.InMemoryMailboxManager;
+import org.apache.james.mailbox.inmemory.manager.InMemoryIntegrationResources;
+import org.apache.james.mailbox.model.MailboxPath;
+import org.apache.james.mailbox.store.MailboxSessionMapperFactory;
+import org.apache.james.rspamd.DockerRSpamDExtension;
+import org.apache.james.rspamd.client.RSpamDClientConfiguration;
+import org.apache.james.rspamd.client.RSpamDHttpClient;
+import org.apache.james.task.Task;
+import org.apache.james.user.api.UsersRepository;
+import org.apache.james.user.memory.MemoryUsersRepository;
+import org.apache.james.utils.UpdatableTickingClock;
+import org.assertj.core.api.SoftAssertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.mockito.Mockito;
+
+import com.github.fge.lambdas.Throwing;
+
+public class FeedSpamToRSpamDTaskTest {
+    @RegisterExtension
+    static DockerRSpamDExtension rSpamDExtension = new DockerRSpamDExtension();
+
+    public static final Domain DOMAIN = Domain.of("domain.tld");
+    public static final Username BOB = Username.fromLocalPartWithDomain("bob", DOMAIN);
+    public static final Username ALICE = Username.fromLocalPartWithDomain("alice", DOMAIN);
+    public static final MailboxPath BOB_SPAM_MAILBOX = MailboxPath.forUser(BOB, SPAM_MAILBOX_NAME);
+    public static final MailboxPath ALICE_SPAM_MAILBOX = MailboxPath.forUser(ALICE, SPAM_MAILBOX_NAME);
+    public static final long THREE_DAYS_IN_SECOND = 259200;
+    public static final long TWO_DAYS_IN_SECOND = 172800;
+    public static final long ONE_DAY_IN_SECOND = 86400;
+    public static final Instant NOW = ZonedDateTime.now().toInstant();
+
+    private InMemoryMailboxManager mailboxManager;
+    private MessageIdManager messageIdManager;
+    private MailboxSessionMapperFactory mapperFactory;
+    private UsersRepository usersRepository;
+    private Clock clock;
+    private RSpamDHttpClient client;
+    private FeedSpamToRSpamDTask task;
+
+    @BeforeEach
+    void setup() throws Exception {
+        InMemoryIntegrationResources inMemoryIntegrationResources = InMemoryIntegrationResources.defaultResources();
+        mailboxManager = inMemoryIntegrationResources.getMailboxManager();
+        DomainList domainList = mock(DomainList.class);
+        Mockito.when(domainList.containsDomain(any())).thenReturn(true);
+        usersRepository = MemoryUsersRepository.withVirtualHosting(domainList);
+        usersRepository.addUser(BOB, "anyPassword");
+        usersRepository.addUser(ALICE, "anyPassword");
+        mailboxManager.createMailbox(BOB_SPAM_MAILBOX, mailboxManager.createSystemSession(BOB));
+        mailboxManager.createMailbox(ALICE_SPAM_MAILBOX, mailboxManager.createSystemSession(ALICE));
+
+        clock = new UpdatableTickingClock(NOW);
+        client = new RSpamDHttpClient(new RSpamDClientConfiguration(rSpamDExtension.getBaseUrl(), PASSWORD, Optional.empty()));
+        messageIdManager = inMemoryIntegrationResources.getMessageIdManager();
+        mapperFactory = mailboxManager.getMapperFactory();
+        task = new FeedSpamToRSpamDTask(mailboxManager, usersRepository, messageIdManager, mapperFactory, client, FeedSpamToRSpamDTask.RunningOptions.DEFAULT, clock);
+    }
+
+    @Test
+    void shouldReturnDefaultInformationWhenDataIsEmpty() {
+        Task.Result result = task.run();
+
+        assertThat(result).isEqualTo(Task.Result.COMPLETED);
+        assertThat(task.snapshot())
+            .isEqualTo(FeedSpamToRSpamDTask.Context.Snapshot.builder()
+                .spamMessageCount(0)
+                .reportedSpamMessageCount(0)
+                .errorCount(0)
+                .messagesPerSecond(DEFAULT_MESSAGES_PER_SECOND)
+                .period(DEFAULT_PERIOD)
+                .samplingProbability(DEFAULT_SAMPLING_PROBABILITY)
+                .build());
+    }
+
+    @Test
+    void taskShouldReportAllSpamMessagesOfAllUsersByDefault() throws MailboxException {
+        appendSpamMessage(BOB_SPAM_MAILBOX, Date.from(NOW));
+        appendSpamMessage(ALICE_SPAM_MAILBOX, Date.from(NOW));
+
+        Task.Result result = task.run();
+
+        assertThat(result).isEqualTo(Task.Result.COMPLETED);
+        assertThat(task.snapshot())
+            .isEqualTo(FeedSpamToRSpamDTask.Context.Snapshot.builder()
+                .spamMessageCount(2)
+                .reportedSpamMessageCount(2)
+                .errorCount(0)
+                .messagesPerSecond(DEFAULT_MESSAGES_PER_SECOND)
+                .period(DEFAULT_PERIOD)
+                .samplingProbability(DEFAULT_SAMPLING_PROBABILITY)
+                .build());
+    }
+
+    @Test
+    void taskShouldReportSpamMessageInPeriod() throws MailboxException {
+        FeedSpamToRSpamDTask.RunningOptions runningOptions = new FeedSpamToRSpamDTask.RunningOptions(Optional.of(TWO_DAYS_IN_SECOND),
+            DEFAULT_MESSAGES_PER_SECOND, DEFAULT_SAMPLING_PROBABILITY);
+        task = new FeedSpamToRSpamDTask(mailboxManager, usersRepository, messageIdManager, mapperFactory, client, runningOptions, clock);
+
+        appendSpamMessage(BOB_SPAM_MAILBOX, Date.from(NOW.minusSeconds(ONE_DAY_IN_SECOND)));
+
+        Task.Result result = task.run();
+
+        assertThat(result).isEqualTo(Task.Result.COMPLETED);
+        assertThat(task.snapshot())
+            .isEqualTo(FeedSpamToRSpamDTask.Context.Snapshot.builder()
+                .spamMessageCount(1)
+                .reportedSpamMessageCount(1)
+                .errorCount(0)
+                .messagesPerSecond(DEFAULT_MESSAGES_PER_SECOND)
+                .period(Optional.of(TWO_DAYS_IN_SECOND))
+                .samplingProbability(DEFAULT_SAMPLING_PROBABILITY)
+                .build());
+    }
+
+    @Test
+    void taskShouldNotReportSpamMessageNotInPeriod() throws MailboxException {
+        FeedSpamToRSpamDTask.RunningOptions runningOptions = new FeedSpamToRSpamDTask.RunningOptions(Optional.of(TWO_DAYS_IN_SECOND),
+            DEFAULT_MESSAGES_PER_SECOND, DEFAULT_SAMPLING_PROBABILITY);
+        task = new FeedSpamToRSpamDTask(mailboxManager, usersRepository, messageIdManager, mapperFactory, client, runningOptions, clock);
+
+        appendSpamMessage(BOB_SPAM_MAILBOX, Date.from(NOW.minusSeconds(THREE_DAYS_IN_SECOND)));
+
+        Task.Result result = task.run();
+
+        assertThat(result).isEqualTo(Task.Result.COMPLETED);
+        assertThat(task.snapshot())
+            .isEqualTo(FeedSpamToRSpamDTask.Context.Snapshot.builder()
+                .spamMessageCount(1)
+                .reportedSpamMessageCount(0)
+                .errorCount(0)
+                .messagesPerSecond(DEFAULT_MESSAGES_PER_SECOND)
+                .period(Optional.of(TWO_DAYS_IN_SECOND))
+                .samplingProbability(DEFAULT_SAMPLING_PROBABILITY)
+                .build());
+    }
+
+    @Test
+    void mixedInternalDateCase() throws MailboxException {
+        FeedSpamToRSpamDTask.RunningOptions runningOptions = new FeedSpamToRSpamDTask.RunningOptions(Optional.of(TWO_DAYS_IN_SECOND),
+            DEFAULT_MESSAGES_PER_SECOND, DEFAULT_SAMPLING_PROBABILITY);
+        task = new FeedSpamToRSpamDTask(mailboxManager, usersRepository, messageIdManager, mapperFactory, client, runningOptions, clock);
+
+        appendSpamMessage(BOB_SPAM_MAILBOX, Date.from(NOW.minusSeconds(THREE_DAYS_IN_SECOND)));
+        appendSpamMessage(BOB_SPAM_MAILBOX, Date.from(NOW.minusSeconds(ONE_DAY_IN_SECOND)));
+
+        Task.Result result = task.run();
+
+        assertThat(result).isEqualTo(Task.Result.COMPLETED);
+        assertThat(task.snapshot())
+            .isEqualTo(FeedSpamToRSpamDTask.Context.Snapshot.builder()
+                .spamMessageCount(2)
+                .reportedSpamMessageCount(1)
+                .errorCount(0)
+                .messagesPerSecond(DEFAULT_MESSAGES_PER_SECOND)
+                .period(Optional.of(TWO_DAYS_IN_SECOND))
+                .samplingProbability(DEFAULT_SAMPLING_PROBABILITY)
+                .build());
+    }
+
+    @Test
+    void taskWithSamplingProbabilityIsZeroShouldReportNonSpamMessage() {
+        FeedSpamToRSpamDTask.RunningOptions runningOptions = new FeedSpamToRSpamDTask.RunningOptions(Optional.empty(),
+            DEFAULT_MESSAGES_PER_SECOND, 0);
+        task = new FeedSpamToRSpamDTask(mailboxManager, usersRepository, messageIdManager, mapperFactory, client, runningOptions, clock);
+
+        IntStream.range(0, 10)
+            .forEach(Throwing.intConsumer(any -> appendSpamMessage(BOB_SPAM_MAILBOX, Date.from(NOW.minusSeconds(ONE_DAY_IN_SECOND)))));
+
+        Task.Result result = task.run();
+
+        assertThat(result).isEqualTo(Task.Result.COMPLETED);
+        assertThat(task.snapshot())
+            .isEqualTo(FeedSpamToRSpamDTask.Context.Snapshot.builder()
+                .spamMessageCount(10)
+                .reportedSpamMessageCount(0)
+                .errorCount(0)
+                .messagesPerSecond(DEFAULT_MESSAGES_PER_SECOND)
+                .period(DEFAULT_PERIOD)
+                .samplingProbability(0)
+                .build());
+    }
+
+    @Test
+    void taskWithDefaultSamplingProbabilityShouldReportAllSpamMessages() {
+        IntStream.range(0, 10)
+            .forEach(Throwing.intConsumer(any -> appendSpamMessage(BOB_SPAM_MAILBOX, Date.from(NOW.minusSeconds(ONE_DAY_IN_SECOND)))));
+
+        Task.Result result = task.run();
+
+        assertThat(result).isEqualTo(Task.Result.COMPLETED);
+        assertThat(task.snapshot())
+            .isEqualTo(FeedSpamToRSpamDTask.Context.Snapshot.builder()
+                .spamMessageCount(10)
+                .reportedSpamMessageCount(10)
+                .errorCount(0)
+                .messagesPerSecond(DEFAULT_MESSAGES_PER_SECOND)
+                .period(DEFAULT_PERIOD)
+                .samplingProbability(DEFAULT_SAMPLING_PROBABILITY)
+                .build());
+    }
+
+    @Test
+    void taskWithVeryLowSamplingProbabilityShouldReportNotAllSpamMessages() {
+        FeedSpamToRSpamDTask.RunningOptions runningOptions = new FeedSpamToRSpamDTask.RunningOptions(Optional.empty(),
+            DEFAULT_MESSAGES_PER_SECOND, 0.01);
+        task = new FeedSpamToRSpamDTask(mailboxManager, usersRepository, messageIdManager, mapperFactory, client, runningOptions, clock);
+
+        IntStream.range(0, 10)
+                .forEach(Throwing.intConsumer(any -> appendSpamMessage(BOB_SPAM_MAILBOX, Date.from(NOW.minusSeconds(ONE_DAY_IN_SECOND)))));
+
+        Task.Result result = task.run();
+
+        SoftAssertions.assertSoftly(softly -> {
+            assertThat(result).isEqualTo(Task.Result.COMPLETED);
+            assertThat(task.snapshot().getSpamMessageCount()).isEqualTo(10);
+            assertThat(task.snapshot().getReportedSpamMessageCount()).isLessThan(10);
+            assertThat(task.snapshot().getErrorCount()).isZero();
+        });
+    }
+
+    @Test
+    void taskWithVeryHighSamplingProbabilityShouldReportMoreThanZeroMessage() {
+        FeedSpamToRSpamDTask.RunningOptions runningOptions = new FeedSpamToRSpamDTask.RunningOptions(Optional.empty(),
+            DEFAULT_MESSAGES_PER_SECOND, 0.99);
+        task = new FeedSpamToRSpamDTask(mailboxManager, usersRepository, messageIdManager, mapperFactory, client, runningOptions, clock);
+
+        IntStream.range(0, 10)
+            .forEach(Throwing.intConsumer(any -> appendSpamMessage(BOB_SPAM_MAILBOX, Date.from(NOW.minusSeconds(ONE_DAY_IN_SECOND)))));
+
+        Task.Result result = task.run();
+
+        SoftAssertions.assertSoftly(softly -> {
+            assertThat(result).isEqualTo(Task.Result.COMPLETED);
+            assertThat(task.snapshot().getSpamMessageCount()).isEqualTo(10);
+            assertThat(task.snapshot().getReportedSpamMessageCount()).isPositive();
+            assertThat(task.snapshot().getErrorCount()).isZero();
+        });
+    }
+
+    @Test
+    void taskWithAverageSamplingProbabilityShouldReportSomeMessages() {
+        FeedSpamToRSpamDTask.RunningOptions runningOptions = new FeedSpamToRSpamDTask.RunningOptions(Optional.empty(),
+            DEFAULT_MESSAGES_PER_SECOND, 0.5);
+        task = new FeedSpamToRSpamDTask(mailboxManager, usersRepository, messageIdManager, mapperFactory, client, runningOptions, clock);
+
+        IntStream.range(0, 10)
+            .forEach(Throwing.intConsumer(any -> appendSpamMessage(BOB_SPAM_MAILBOX, Date.from(NOW.minusSeconds(ONE_DAY_IN_SECOND)))));
+
+        Task.Result result = task.run();
+
+        SoftAssertions.assertSoftly(softly -> {
+            assertThat(result).isEqualTo(Task.Result.COMPLETED);
+            assertThat(task.snapshot().getSpamMessageCount()).isEqualTo(10);
+            assertThat(task.snapshot().getReportedSpamMessageCount()).isBetween(1L, 9L); // skip 0 and 10 cases cause their probability is very low (0.5^10)
+            assertThat(task.snapshot().getErrorCount()).isZero();
+        });
+    }
+
+    private void appendSpamMessage(MailboxPath mailboxPath, Date internalDate) throws MailboxException {
+        MailboxSession session = mailboxManager.createSystemSession(mailboxPath.getUser());
+        mailboxManager.getMailbox(mailboxPath, session)
+            .appendMessage(new ByteArrayInputStream(String.format("random content %4.3f", Math.random()).getBytes()),
+                internalDate,
+                session,
+                true,
+                new Flags());
+    }
+}
diff --git a/third-party/rspamd/src/test/resources/json/feedSpamEmptyPeriod.additionalInformation.json b/third-party/rspamd/src/test/resources/json/feedSpamEmptyPeriod.additionalInformation.json
new file mode 100644
index 0000000000..82fcf45f41
--- /dev/null
+++ b/third-party/rspamd/src/test/resources/json/feedSpamEmptyPeriod.additionalInformation.json
@@ -0,0 +1,11 @@
+{
+  "errorCount": 1,
+  "reportedSpamMessageCount": 2,
+  "runningOptions": {
+    "messagesPerSecond": 10,
+    "samplingProbability": 1.0
+  },
+  "spamMessageCount": 4,
+  "timestamp": "2007-12-03T10:15:30Z",
+  "type": "FeedSpamToRSpamDTask"
+}
\ No newline at end of file
diff --git a/third-party/rspamd/src/test/resources/json/feedSpamNonEmptyPeriod.additionalInformation.json b/third-party/rspamd/src/test/resources/json/feedSpamNonEmptyPeriod.additionalInformation.json
new file mode 100644
index 0000000000..6315215058
--- /dev/null
+++ b/third-party/rspamd/src/test/resources/json/feedSpamNonEmptyPeriod.additionalInformation.json
@@ -0,0 +1,12 @@
+{
+  "errorCount": 1,
+  "reportedSpamMessageCount": 2,
+  "runningOptions": {
+    "messagesPerSecond": 10,
+    "periodInSecond": 3600,
+    "samplingProbability": 1.0
+  },
+  "spamMessageCount": 4,
+  "timestamp": "2007-12-03T10:15:30Z",
+  "type": "FeedSpamToRSpamDTask"
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org