You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2022/08/20 05:54:11 UTC

[james-project] branch master updated (8e99c74db7 -> 5cce1a5f73)

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


    from 8e99c74db7 [BUILD] Slightly fasten Cassandra tests
     new 551927329f JAMES-3775 Write a task to feed ham to RSpamD
     new 5cce1a5f73 JAMES-3775 Write webadmin route to create feeding ham messages task

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 third-party/rspamd/README.md                       |  46 ++++
 .../apache/james/rspamd/module/RSpamDModule.java   |  12 +
 .../james/rspamd/route/FeedMessageRoute.java       |  28 +-
 ...mToRSpamDTask.java => FeedHamToRSpamDTask.java} | 111 ++++----
 ...eedHamToRSpamDTaskAdditionalInformationDTO.java | 106 ++++++++
 .../rspamd/task/GetMailboxMessagesService.java     |  30 +++
 .../james/rspamd/route/FeedMessageRouteTest.java   | 286 +++++++++++++++++++--
 ...mToRSpamDTaskAdditionalInformationDTOTest.java} |  20 +-
 ...DTaskTest.java => FeedHamToRSpamDTaskTest.java} | 206 +++++++++------
 ... feedHamEmptyPeriod.additionalInformation.json} |   6 +-
 ...edHamNonEmptyPeriod.additionalInformation.json} |   6 +-
 11 files changed, 687 insertions(+), 170 deletions(-)
 copy third-party/rspamd/src/main/java/org/apache/james/rspamd/task/{FeedSpamToRSpamDTask.java => FeedHamToRSpamDTask.java} (74%)
 create mode 100644 third-party/rspamd/src/main/java/org/apache/james/rspamd/task/FeedHamToRSpamDTaskAdditionalInformationDTO.java
 copy third-party/rspamd/src/test/java/org/apache/james/rspamd/task/{FeedSpamToRSpamDTaskAdditionalInformationDTOTest.java => FeedHamToRSpamDTaskAdditionalInformationDTOTest.java} (73%)
 copy third-party/rspamd/src/test/java/org/apache/james/rspamd/task/{FeedSpamToRSpamDTaskTest.java => FeedHamToRSpamDTaskTest.java} (52%)
 copy third-party/rspamd/src/test/resources/json/{feedSpamEmptyPeriod.additionalInformation.json => feedHamEmptyPeriod.additionalInformation.json} (61%)
 copy third-party/rspamd/src/test/resources/json/{feedSpamNonEmptyPeriod.additionalInformation.json => feedHamNonEmptyPeriod.additionalInformation.json} (65%)


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 01/02: JAMES-3775 Write a task to feed ham to RSpamD

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 551927329f04bdf7abe4349db4623d7984813059
Author: Quan Tran <hq...@linagora.com>
AuthorDate: Fri Jul 29 11:36:33 2022 +0700

    JAMES-3775 Write a task to feed ham to RSpamD
---
 .../james/rspamd/task/FeedHamToRSpamDTask.java     | 382 +++++++++++++++++++++
 ...eedHamToRSpamDTaskAdditionalInformationDTO.java | 106 ++++++
 .../rspamd/task/GetMailboxMessagesService.java     |  30 ++
 ...amToRSpamDTaskAdditionalInformationDTOTest.java |  63 ++++
 .../james/rspamd/task/FeedHamToRSpamDTaskTest.java | 382 +++++++++++++++++++++
 .../feedHamEmptyPeriod.additionalInformation.json  |  11 +
 ...eedHamNonEmptyPeriod.additionalInformation.json |  12 +
 7 files changed, 986 insertions(+)

diff --git a/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/FeedHamToRSpamDTask.java b/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/FeedHamToRSpamDTask.java
new file mode 100644
index 0000000000..e34208e876
--- /dev/null
+++ b/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/FeedHamToRSpamDTask.java
@@ -0,0 +1,382 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.rspamd.task;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Date;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.james.mailbox.MailboxManager;
+import org.apache.james.mailbox.MessageIdManager;
+import org.apache.james.mailbox.model.MessageResult;
+import org.apache.james.mailbox.store.MailboxSessionMapperFactory;
+import org.apache.james.rspamd.client.RSpamDHttpClient;
+import org.apache.james.task.Task;
+import org.apache.james.task.TaskExecutionDetails;
+import org.apache.james.task.TaskType;
+import org.apache.james.user.api.UsersRepository;
+import org.apache.james.user.api.UsersRepositoryException;
+import org.apache.james.util.ReactorUtils;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.github.fge.lambdas.Throwing;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
+
+import reactor.core.publisher.Mono;
+
+public class FeedHamToRSpamDTask implements Task {
+    public static final TaskType TASK_TYPE = TaskType.of("FeedHamToRSpamDTask");
+
+    public static class RunningOptions {
+        public static final Optional<Long> DEFAULT_PERIOD = Optional.empty();
+        public static final int DEFAULT_MESSAGES_PER_SECOND = 10;
+        public static final double DEFAULT_SAMPLING_PROBABILITY = 1;
+        public static final RunningOptions DEFAULT = new RunningOptions(DEFAULT_PERIOD, DEFAULT_MESSAGES_PER_SECOND,
+            DEFAULT_SAMPLING_PROBABILITY);
+
+        private final Optional<Long> periodInSecond;
+        private final int messagesPerSecond;
+        private final double samplingProbability;
+
+        public RunningOptions(@JsonProperty("periodInSecond") Optional<Long> periodInSecond,
+                              @JsonProperty("messagesPerSecond") int messagesPerSecond,
+                              @JsonProperty("samplingProbability") double samplingProbability) {
+            this.periodInSecond = periodInSecond;
+            this.messagesPerSecond = messagesPerSecond;
+            this.samplingProbability = samplingProbability;
+        }
+
+        public Optional<Long> getPeriodInSecond() {
+            return periodInSecond;
+        }
+
+        public int getMessagesPerSecond() {
+            return messagesPerSecond;
+        }
+
+        public double getSamplingProbability() {
+            return samplingProbability;
+        }
+    }
+
+    public static class AdditionalInformation implements TaskExecutionDetails.AdditionalInformation {
+
+        private static AdditionalInformation from(Context context) {
+            Context.Snapshot snapshot = context.snapshot();
+            return new AdditionalInformation(
+                Clock.systemUTC().instant(),
+                snapshot.getHamMessageCount(),
+                snapshot.getReportedHamMessageCount(),
+                snapshot.getErrorCount(),
+                snapshot.getMessagesPerSecond(),
+                snapshot.getPeriod(),
+                snapshot.getSamplingProbability());
+        }
+
+        private final Instant timestamp;
+        private final long hamMessageCount;
+        private final long reportedHamMessageCount;
+        private final long errorCount;
+        private final int messagesPerSecond;
+        private final Optional<Long> period;
+        private final double samplingProbability;
+
+        public AdditionalInformation(Instant timestamp, long hamMessageCount, long reportedHamMessageCount, long errorCount, int messagesPerSecond, Optional<Long> period, double samplingProbability) {
+            this.timestamp = timestamp;
+            this.hamMessageCount = hamMessageCount;
+            this.reportedHamMessageCount = reportedHamMessageCount;
+            this.errorCount = errorCount;
+            this.messagesPerSecond = messagesPerSecond;
+            this.period = period;
+            this.samplingProbability = samplingProbability;
+        }
+
+        public long getHamMessageCount() {
+            return hamMessageCount;
+        }
+
+        public long getReportedHamMessageCount() {
+            return reportedHamMessageCount;
+        }
+
+        public long getErrorCount() {
+            return errorCount;
+        }
+
+        public int getMessagesPerSecond() {
+            return messagesPerSecond;
+        }
+
+        public Optional<Long> getPeriod() {
+            return period;
+        }
+
+        public double getSamplingProbability() {
+            return samplingProbability;
+        }
+
+        @Override
+        public Instant timestamp() {
+            return timestamp;
+        }
+    }
+
+    public static class Context {
+
+        public static class Snapshot {
+
+            public static Builder builder() {
+                return new Builder();
+            }
+
+            static class Builder {
+                private Optional<Long> hamMessageCount;
+                private Optional<Long> reportedHamMessageCount;
+                private Optional<Long> errorCount;
+                private Optional<Integer> messagesPerSecond;
+                private Optional<Long> period;
+                private Optional<Double> samplingProbability;
+
+                Builder() {
+                    hamMessageCount = Optional.empty();
+                    reportedHamMessageCount = Optional.empty();
+                    errorCount = Optional.empty();
+                    messagesPerSecond = Optional.empty();
+                    period = Optional.empty();
+                    samplingProbability = Optional.empty();
+                }
+
+                public Snapshot build() {
+                    return new Snapshot(
+                        hamMessageCount.orElse(0L),
+                        reportedHamMessageCount.orElse(0L),
+                        errorCount.orElse(0L),
+                        messagesPerSecond.orElse(0),
+                        period,
+                        samplingProbability.orElse(1D));
+                }
+
+                public Builder hamMessageCount(long hamMessageCount) {
+                    this.hamMessageCount = Optional.of(hamMessageCount);
+                    return this;
+                }
+
+                public Builder reportedHamMessageCount(long reportedHamMessageCount) {
+                    this.reportedHamMessageCount = Optional.of(reportedHamMessageCount);
+                    return this;
+                }
+
+                public Builder errorCount(long errorCount) {
+                    this.errorCount = Optional.of(errorCount);
+                    return this;
+                }
+
+                public Builder messagesPerSecond(int messagesPerSecond) {
+                    this.messagesPerSecond = Optional.of(messagesPerSecond);
+                    return this;
+                }
+
+                public Builder period(Optional<Long> period) {
+                    this.period = period;
+                    return this;
+                }
+
+                public Builder samplingProbability(double samplingProbability) {
+                    this.samplingProbability = Optional.of(samplingProbability);
+                    return this;
+                }
+            }
+
+            private final long hamMessageCount;
+            private final long reportedHamMessageCount;
+            private final long errorCount;
+            private final int messagesPerSecond;
+            private final Optional<Long> period;
+            private final double samplingProbability;
+
+            public Snapshot(long hamMessageCount, long reportedHamMessageCount, long errorCount, int messagesPerSecond, Optional<Long> period,
+                            double samplingProbability) {
+                this.hamMessageCount = hamMessageCount;
+                this.reportedHamMessageCount = reportedHamMessageCount;
+                this.errorCount = errorCount;
+                this.messagesPerSecond = messagesPerSecond;
+                this.period = period;
+                this.samplingProbability = samplingProbability;
+            }
+
+            public long getHamMessageCount() {
+                return hamMessageCount;
+            }
+
+            public long getReportedHamMessageCount() {
+                return reportedHamMessageCount;
+            }
+
+            public long getErrorCount() {
+                return errorCount;
+            }
+
+            public int getMessagesPerSecond() {
+                return messagesPerSecond;
+            }
+
+            public Optional<Long> getPeriod() {
+                return period;
+            }
+
+            public double getSamplingProbability() {
+                return samplingProbability;
+            }
+
+            @Override
+            public final boolean equals(Object o) {
+                if (o instanceof Snapshot) {
+                    Snapshot snapshot = (Snapshot) o;
+
+                    return Objects.equals(this.hamMessageCount, snapshot.hamMessageCount)
+                        && Objects.equals(this.reportedHamMessageCount, snapshot.reportedHamMessageCount)
+                        && Objects.equals(this.errorCount, snapshot.errorCount)
+                        && Objects.equals(this.messagesPerSecond, snapshot.messagesPerSecond)
+                        && Objects.equals(this.samplingProbability, snapshot.samplingProbability)
+                        && Objects.equals(this.period, snapshot.period);
+                }
+                return false;
+            }
+
+            @Override
+            public final int hashCode() {
+                return Objects.hash(hamMessageCount, reportedHamMessageCount, errorCount, messagesPerSecond, period, samplingProbability);
+            }
+
+            @Override
+            public String toString() {
+                return MoreObjects.toStringHelper(this)
+                    .add("hamMessageCount", hamMessageCount)
+                    .add("reportedHamMessageCount", reportedHamMessageCount)
+                    .add("errorCount", errorCount)
+                    .add("messagesPerSecond", messagesPerSecond)
+                    .add("period", period)
+                    .add("samplingProbability", samplingProbability)
+                    .toString();
+            }
+        }
+
+        private final AtomicLong hamMessageCount;
+        private final AtomicLong reportedHamMessageCount;
+        private final AtomicLong errorCount;
+        private final Integer messagesPerSecond;
+        private final Optional<Long> period;
+        private final Double samplingProbability;
+
+        public Context(RunningOptions runningOptions) {
+            this.hamMessageCount = new AtomicLong();
+            this.reportedHamMessageCount = new AtomicLong();
+            this.errorCount = new AtomicLong();
+            this.messagesPerSecond = runningOptions.messagesPerSecond;
+            this.period = runningOptions.periodInSecond;
+            this.samplingProbability = runningOptions.samplingProbability;
+        }
+
+        public void incrementHamMessageCount() {
+            hamMessageCount.incrementAndGet();
+        }
+
+        public void incrementReportedHamMessageCount(int count) {
+            reportedHamMessageCount.addAndGet(count);
+        }
+
+        public void incrementErrorCount() {
+            errorCount.incrementAndGet();
+        }
+
+        public Snapshot snapshot() {
+            return Snapshot.builder()
+                .hamMessageCount(hamMessageCount.get())
+                .reportedHamMessageCount(reportedHamMessageCount.get())
+                .errorCount(errorCount.get())
+                .messagesPerSecond(messagesPerSecond)
+                .period(period)
+                .samplingProbability(samplingProbability)
+                .build();
+        }
+    }
+
+    private final GetMailboxMessagesService messagesService;
+    private final RSpamDHttpClient rSpamDHttpClient;
+    private final RunningOptions runningOptions;
+    private final Context context;
+    private final Clock clock;
+
+    public FeedHamToRSpamDTask(MailboxManager mailboxManager, UsersRepository usersRepository, MessageIdManager messageIdManager, MailboxSessionMapperFactory mapperFactory,
+                               RSpamDHttpClient rSpamDHttpClient, RunningOptions runningOptions, Clock clock) {
+        this.runningOptions = runningOptions;
+        this.messagesService = new GetMailboxMessagesService(mailboxManager, usersRepository, mapperFactory, messageIdManager);
+        this.rSpamDHttpClient = rSpamDHttpClient;
+        this.context = new Context(runningOptions);
+        this.clock = clock;
+    }
+
+    @Override
+    public Result run() {
+        Optional<Date> afterDate = runningOptions.periodInSecond.map(periodInSecond -> Date.from(clock.instant().minusSeconds(periodInSecond)));
+        try {
+            return messagesService.getHamMessagesOfAllUser(afterDate, runningOptions.getSamplingProbability(), context)
+                .transform(ReactorUtils.<MessageResult, Result>throttle()
+                    .elements(runningOptions.messagesPerSecond)
+                    .per(Duration.ofSeconds(1))
+                    .forOperation(messageResult -> Mono.fromSupplier(Throwing.supplier(() -> rSpamDHttpClient.reportAsHam(messageResult.getFullContent().getInputStream())))
+                        .then(Mono.fromCallable(() -> {
+                            context.incrementReportedHamMessageCount(1);
+                            return Result.COMPLETED;
+                        }))
+                        .onErrorResume(error -> {
+                            LOGGER.error("Error when report ham message to RSpamD", error);
+                            context.incrementErrorCount();
+                            return Mono.just(Result.PARTIAL);
+                        })))
+                .reduce(Task::combine)
+                .switchIfEmpty(Mono.just(Result.COMPLETED))
+                .block();
+        } catch (UsersRepositoryException e) {
+            LOGGER.error("Error while accessing users from repository", e);
+            return Result.PARTIAL;
+        }
+    }
+
+    @Override
+    public TaskType type() {
+        return TASK_TYPE;
+    }
+
+    @Override
+    public Optional<TaskExecutionDetails.AdditionalInformation> details() {
+        return Optional.of(AdditionalInformation.from(context));
+    }
+
+    @VisibleForTesting
+    public Context.Snapshot snapshot() {
+        return context.snapshot();
+    }
+}
diff --git a/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/FeedHamToRSpamDTaskAdditionalInformationDTO.java b/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/FeedHamToRSpamDTaskAdditionalInformationDTO.java
new file mode 100644
index 0000000000..ae69092e6a
--- /dev/null
+++ b/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/FeedHamToRSpamDTaskAdditionalInformationDTO.java
@@ -0,0 +1,106 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.rspamd.task;
+
+import java.time.Instant;
+
+import org.apache.james.json.DTOModule;
+import org.apache.james.server.task.json.dto.AdditionalInformationDTO;
+import org.apache.james.server.task.json.dto.AdditionalInformationDTOModule;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class FeedHamToRSpamDTaskAdditionalInformationDTO implements AdditionalInformationDTO {
+    public static final AdditionalInformationDTOModule<FeedHamToRSpamDTask.AdditionalInformation, FeedHamToRSpamDTaskAdditionalInformationDTO> SERIALIZATION_MODULE =
+        DTOModule.forDomainObject(FeedHamToRSpamDTask.AdditionalInformation.class)
+            .convertToDTO(FeedHamToRSpamDTaskAdditionalInformationDTO.class)
+            .toDomainObjectConverter(FeedHamToRSpamDTaskAdditionalInformationDTO::toDomainObject)
+            .toDTOConverter(FeedHamToRSpamDTaskAdditionalInformationDTO::toDto)
+            .typeName(FeedHamToRSpamDTask.TASK_TYPE.asString())
+            .withFactory(AdditionalInformationDTOModule::new);
+
+    private static FeedHamToRSpamDTask.AdditionalInformation toDomainObject(FeedHamToRSpamDTaskAdditionalInformationDTO dto) {
+        return new FeedHamToRSpamDTask.AdditionalInformation(
+            dto.timestamp,
+            dto.hamMessageCount,
+            dto.reportedHamMessageCount,
+            dto.errorCount,
+            dto.runningOptions.getMessagesPerSecond(),
+            dto.runningOptions.getPeriodInSecond(),
+            dto.runningOptions.getSamplingProbability());
+    }
+
+    private static FeedHamToRSpamDTaskAdditionalInformationDTO toDto(FeedHamToRSpamDTask.AdditionalInformation domainObject, String type) {
+        return new FeedHamToRSpamDTaskAdditionalInformationDTO(
+            type,
+            domainObject.timestamp(),
+            domainObject.getHamMessageCount(),
+            domainObject.getReportedHamMessageCount(),
+            domainObject.getErrorCount(),
+            new FeedHamToRSpamDTask.RunningOptions(domainObject.getPeriod(), domainObject.getMessagesPerSecond(), domainObject.getSamplingProbability()));
+    }
+
+    private final String type;
+    private final Instant timestamp;
+    private final long hamMessageCount;
+    private final long reportedHamMessageCount;
+    private final long errorCount;
+    private final FeedHamToRSpamDTask.RunningOptions runningOptions;
+
+    public FeedHamToRSpamDTaskAdditionalInformationDTO(@JsonProperty("type") String type,
+                                                       @JsonProperty("timestamp") Instant timestamp,
+                                                       @JsonProperty("hamMessageCount") long hamMessageCount,
+                                                       @JsonProperty("reportedHamMessageCount") long reportedHamMessageCount,
+                                                       @JsonProperty("errorCount") long errorCount,
+                                                       @JsonProperty("runningOptions") FeedHamToRSpamDTask.RunningOptions runningOptions) {
+        this.type = type;
+        this.timestamp = timestamp;
+        this.hamMessageCount = hamMessageCount;
+        this.reportedHamMessageCount = reportedHamMessageCount;
+        this.errorCount = errorCount;
+        this.runningOptions = runningOptions;
+    }
+
+    @Override
+    public String getType() {
+        return type;
+    }
+
+    @Override
+    public Instant getTimestamp() {
+        return timestamp;
+    }
+
+    public long getHamMessageCount() {
+        return hamMessageCount;
+    }
+
+    public long getReportedHamMessageCount() {
+        return reportedHamMessageCount;
+    }
+
+    public long getErrorCount() {
+        return errorCount;
+    }
+
+    public FeedHamToRSpamDTask.RunningOptions getRunningOptions() {
+        return runningOptions;
+    }
+}
diff --git a/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/GetMailboxMessagesService.java b/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/GetMailboxMessagesService.java
index abae46efbd..f394f4a5c1 100644
--- a/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/GetMailboxMessagesService.java
+++ b/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/GetMailboxMessagesService.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.rspamd.task;
 
+import static org.apache.james.rspamd.task.FeedSpamToRSpamDTask.SPAM_MAILBOX_NAME;
+
 import java.util.Date;
 import java.util.Optional;
 
@@ -46,6 +48,7 @@ import reactor.core.publisher.Mono;
 
 public class GetMailboxMessagesService {
     private static final int UNLIMITED = -1;
+    private static final String TRASH_MAILBOX_NAME = "Trash";
 
     private final MailboxManager mailboxManager;
     private final UsersRepository userRepository;
@@ -65,6 +68,14 @@ public class GetMailboxMessagesService {
             .flatMap(username -> getMailboxMessagesOfAUser(username, mailboxName, afterDate, samplingProbability, context), ReactorUtils.DEFAULT_CONCURRENCY);
     }
 
+    public Flux<MessageResult> getHamMessagesOfAllUser(Optional<Date> afterDate, double samplingProbability,
+                                                       FeedHamToRSpamDTask.Context context) throws UsersRepositoryException {
+        return Iterators.toFlux(userRepository.list())
+            .flatMap(Throwing.function(username -> Flux.fromIterable(mailboxManager.list(mailboxManager.createSystemSession(username)))
+                .filter(this::hamMailboxesPredicate)
+                .flatMap(mailboxPath -> getMailboxMessagesOfAUser(username, mailboxPath, afterDate, samplingProbability, context), 2)), ReactorUtils.DEFAULT_CONCURRENCY);
+    }
+
     private Flux<MessageResult> getMailboxMessagesOfAUser(Username username, String mailboxName, Optional<Date> afterDate,
                                                           double samplingProbability, FeedSpamToRSpamDTask.Context context) {
         MailboxSession mailboxSession = mailboxManager.createSystemSession(username);
@@ -80,10 +91,29 @@ public class GetMailboxMessagesService {
             .flatMapMany(messageIds -> messageIdManager.getMessagesReactive(messageIds, FetchGroup.FULL_CONTENT, mailboxSession));
     }
 
+    private Flux<MessageResult> getMailboxMessagesOfAUser(Username username, MailboxPath mailboxPath, Optional<Date> afterDate,
+                                                          double samplingProbability, FeedHamToRSpamDTask.Context context) {
+        MailboxSession mailboxSession = mailboxManager.createSystemSession(username);
+
+        return Mono.from(mailboxManager.getMailboxReactive(mailboxPath, mailboxSession))
+            .map(Throwing.function(MessageManager::getMailboxEntity))
+            .flatMapMany(Throwing.function(mailbox -> mapperFactory.getMessageMapper(mailboxSession).findInMailboxReactive(mailbox, MessageRange.all(), MessageMapper.FetchType.METADATA, UNLIMITED)))
+            .doOnNext(mailboxMessageMetaData -> context.incrementHamMessageCount())
+            .filter(mailboxMessageMetaData -> afterDate.map(date -> mailboxMessageMetaData.getInternalDate().after(date)).orElse(true))
+            .filter(message -> randomBooleanWithProbability(samplingProbability))
+            .map(Message::getMessageId)
+            .collectList()
+            .flatMapMany(messageIds -> messageIdManager.getMessagesReactive(messageIds, FetchGroup.FULL_CONTENT, mailboxSession));
+    }
+
     public static boolean randomBooleanWithProbability(double probability) {
         if (probability == 1.0) {
             return true;
         }
         return Math.random() < probability;
     }
+
+    private boolean hamMailboxesPredicate(MailboxPath mailboxPath) {
+        return !mailboxPath.getName().equals(SPAM_MAILBOX_NAME) && !mailboxPath.getName().equals(TRASH_MAILBOX_NAME);
+    }
 }
diff --git a/third-party/rspamd/src/test/java/org/apache/james/rspamd/task/FeedHamToRSpamDTaskAdditionalInformationDTOTest.java b/third-party/rspamd/src/test/java/org/apache/james/rspamd/task/FeedHamToRSpamDTaskAdditionalInformationDTOTest.java
new file mode 100644
index 0000000000..be856de645
--- /dev/null
+++ b/third-party/rspamd/src/test/java/org/apache/james/rspamd/task/FeedHamToRSpamDTaskAdditionalInformationDTOTest.java
@@ -0,0 +1,63 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.rspamd.task;
+
+import static org.apache.james.rspamd.task.FeedHamToRSpamDTask.RunningOptions.DEFAULT_MESSAGES_PER_SECOND;
+import static org.apache.james.rspamd.task.FeedHamToRSpamDTask.RunningOptions.DEFAULT_PERIOD;
+import static org.apache.james.rspamd.task.FeedHamToRSpamDTask.RunningOptions.DEFAULT_SAMPLING_PROBABILITY;
+
+import java.time.Instant;
+import java.util.Optional;
+
+import org.apache.james.JsonSerializationVerifier;
+import org.apache.james.util.ClassLoaderUtils;
+import org.junit.jupiter.api.Test;
+
+class FeedHamToRSpamDTaskAdditionalInformationDTOTest {
+    @Test
+    void shouldMatchJsonSerializationContractWhenEmptyPeriod() throws Exception {
+        JsonSerializationVerifier.dtoModule(FeedHamToRSpamDTaskAdditionalInformationDTO.SERIALIZATION_MODULE)
+            .bean(new FeedHamToRSpamDTask.AdditionalInformation(
+                Instant.parse("2007-12-03T10:15:30.00Z"),
+                4,
+                2,
+                1,
+                DEFAULT_MESSAGES_PER_SECOND,
+                DEFAULT_PERIOD,
+                DEFAULT_SAMPLING_PROBABILITY))
+            .json(ClassLoaderUtils.getSystemResourceAsString("json/feedHamEmptyPeriod.additionalInformation.json"))
+            .verify();
+    }
+
+    @Test
+    void shouldMatchJsonSerializationContractWhenNonEmptyPeriod() throws Exception {
+        JsonSerializationVerifier.dtoModule(FeedHamToRSpamDTaskAdditionalInformationDTO.SERIALIZATION_MODULE)
+            .bean(new FeedHamToRSpamDTask.AdditionalInformation(
+                Instant.parse("2007-12-03T10:15:30.00Z"),
+                4,
+                2,
+                1,
+                DEFAULT_MESSAGES_PER_SECOND,
+                Optional.of(3600L),
+                DEFAULT_SAMPLING_PROBABILITY))
+            .json(ClassLoaderUtils.getSystemResourceAsString("json/feedHamNonEmptyPeriod.additionalInformation.json"))
+            .verify();
+    }
+}
diff --git a/third-party/rspamd/src/test/java/org/apache/james/rspamd/task/FeedHamToRSpamDTaskTest.java b/third-party/rspamd/src/test/java/org/apache/james/rspamd/task/FeedHamToRSpamDTaskTest.java
new file mode 100644
index 0000000000..c5905d809b
--- /dev/null
+++ b/third-party/rspamd/src/test/java/org/apache/james/rspamd/task/FeedHamToRSpamDTaskTest.java
@@ -0,0 +1,382 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.rspamd.task;
+
+import static org.apache.james.rspamd.DockerRSpamD.PASSWORD;
+import static org.apache.james.rspamd.task.FeedHamToRSpamDTask.RunningOptions.DEFAULT_MESSAGES_PER_SECOND;
+import static org.apache.james.rspamd.task.FeedHamToRSpamDTask.RunningOptions.DEFAULT_PERIOD;
+import static org.apache.james.rspamd.task.FeedHamToRSpamDTask.RunningOptions.DEFAULT_SAMPLING_PROBABILITY;
+import static org.apache.james.rspamd.task.FeedSpamToRSpamDTaskTest.BOB_SPAM_MAILBOX;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+
+import java.io.ByteArrayInputStream;
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZonedDateTime;
+import java.util.Date;
+import java.util.Optional;
+import java.util.stream.IntStream;
+
+import javax.mail.Flags;
+
+import org.apache.james.core.Domain;
+import org.apache.james.core.Username;
+import org.apache.james.domainlist.api.DomainList;
+import org.apache.james.mailbox.MailboxSession;
+import org.apache.james.mailbox.MessageIdManager;
+import org.apache.james.mailbox.exception.MailboxException;
+import org.apache.james.mailbox.inmemory.InMemoryMailboxManager;
+import org.apache.james.mailbox.inmemory.manager.InMemoryIntegrationResources;
+import org.apache.james.mailbox.model.MailboxPath;
+import org.apache.james.mailbox.store.MailboxSessionMapperFactory;
+import org.apache.james.rspamd.DockerRSpamDExtension;
+import org.apache.james.rspamd.client.RSpamDClientConfiguration;
+import org.apache.james.rspamd.client.RSpamDHttpClient;
+import org.apache.james.task.Task;
+import org.apache.james.user.api.UsersRepository;
+import org.apache.james.user.memory.MemoryUsersRepository;
+import org.apache.james.utils.UpdatableTickingClock;
+import org.assertj.core.api.SoftAssertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.mockito.Mockito;
+
+import com.github.fge.lambdas.Throwing;
+
+public class FeedHamToRSpamDTaskTest {
+    @RegisterExtension
+    static DockerRSpamDExtension rSpamDExtension = new DockerRSpamDExtension();
+
+    public static final String INBOX_MAILBOX_NAME = "INBOX";
+    public static final Domain DOMAIN = Domain.of("domain.tld");
+    public static final Username BOB = Username.fromLocalPartWithDomain("bob", DOMAIN);
+    public static final Username ALICE = Username.fromLocalPartWithDomain("alice", DOMAIN);
+    public static final MailboxPath BOB_INBOX_MAILBOX = MailboxPath.forUser(BOB, INBOX_MAILBOX_NAME);
+    public static final MailboxPath BOB_CUSTOM_MAILBOX = MailboxPath.forUser(BOB, "Custom");
+    public static final MailboxPath BOB_TRASH_MAILBOX = MailboxPath.forUser(BOB, "Trash");
+    public static final MailboxPath ALICE_INBOX_MAILBOX = MailboxPath.forUser(ALICE, INBOX_MAILBOX_NAME);
+    public static final long THREE_DAYS_IN_SECOND = 259200;
+    public static final long TWO_DAYS_IN_SECOND = 172800;
+    public static final long ONE_DAY_IN_SECOND = 86400;
+    public static final Instant NOW = ZonedDateTime.now().toInstant();
+
+    private InMemoryMailboxManager mailboxManager;
+    private MessageIdManager messageIdManager;
+    private MailboxSessionMapperFactory mapperFactory;
+    private UsersRepository usersRepository;
+    private Clock clock;
+    private RSpamDHttpClient client;
+    private FeedHamToRSpamDTask task;
+
+    @BeforeEach
+    void setup() throws Exception {
+        InMemoryIntegrationResources inMemoryIntegrationResources = InMemoryIntegrationResources.defaultResources();
+        mailboxManager = inMemoryIntegrationResources.getMailboxManager();
+        DomainList domainList = mock(DomainList.class);
+        Mockito.when(domainList.containsDomain(any())).thenReturn(true);
+        usersRepository = MemoryUsersRepository.withVirtualHosting(domainList);
+        usersRepository.addUser(BOB, "anyPassword");
+        usersRepository.addUser(ALICE, "anyPassword");
+        mailboxManager.createMailbox(BOB_INBOX_MAILBOX, mailboxManager.createSystemSession(BOB));
+        mailboxManager.createMailbox(BOB_CUSTOM_MAILBOX, mailboxManager.createSystemSession(BOB));
+        mailboxManager.createMailbox(BOB_TRASH_MAILBOX, mailboxManager.createSystemSession(BOB));
+        mailboxManager.createMailbox(BOB_SPAM_MAILBOX, mailboxManager.createSystemSession(BOB));
+        mailboxManager.createMailbox(ALICE_INBOX_MAILBOX, mailboxManager.createSystemSession(ALICE));
+
+        clock = new UpdatableTickingClock(NOW);
+        client = new RSpamDHttpClient(new RSpamDClientConfiguration(rSpamDExtension.getBaseUrl(), PASSWORD, Optional.empty()));
+        messageIdManager = inMemoryIntegrationResources.getMessageIdManager();
+        mapperFactory = mailboxManager.getMapperFactory();
+        task = new FeedHamToRSpamDTask(mailboxManager, usersRepository, messageIdManager, mapperFactory, client, FeedHamToRSpamDTask.RunningOptions.DEFAULT, clock);
+    }
+
+    @Test
+    void shouldReturnDefaultInformationWhenDataIsEmpty() {
+        Task.Result result = task.run();
+
+        assertThat(result).isEqualTo(Task.Result.COMPLETED);
+        assertThat(task.snapshot())
+            .isEqualTo(FeedHamToRSpamDTask.Context.Snapshot.builder()
+                .hamMessageCount(0)
+                .reportedHamMessageCount(0)
+                .errorCount(0)
+                .messagesPerSecond(DEFAULT_MESSAGES_PER_SECOND)
+                .period(DEFAULT_PERIOD)
+                .samplingProbability(DEFAULT_SAMPLING_PROBABILITY)
+                .build());
+    }
+
+    @Test
+    void taskShouldReportAllHamMessagesOfAllUsersByDefault() throws MailboxException {
+        appendHamMessage(BOB_INBOX_MAILBOX, Date.from(NOW));
+        appendHamMessage(ALICE_INBOX_MAILBOX, Date.from(NOW));
+
+        Task.Result result = task.run();
+
+        assertThat(result).isEqualTo(Task.Result.COMPLETED);
+        assertThat(task.snapshot())
+            .isEqualTo(FeedHamToRSpamDTask.Context.Snapshot.builder()
+                .hamMessageCount(2)
+                .reportedHamMessageCount(2)
+                .errorCount(0)
+                .messagesPerSecond(DEFAULT_MESSAGES_PER_SECOND)
+                .period(DEFAULT_PERIOD)
+                .samplingProbability(DEFAULT_SAMPLING_PROBABILITY)
+                .build());
+    }
+
+    @Test
+    void taskShouldReportHamMessageInPeriod() throws MailboxException {
+        FeedHamToRSpamDTask.RunningOptions runningOptions = new FeedHamToRSpamDTask.RunningOptions(Optional.of(TWO_DAYS_IN_SECOND),
+            DEFAULT_MESSAGES_PER_SECOND, DEFAULT_SAMPLING_PROBABILITY);
+        task = new FeedHamToRSpamDTask(mailboxManager, usersRepository, messageIdManager, mapperFactory, client, runningOptions, clock);
+
+        appendHamMessage(BOB_INBOX_MAILBOX, Date.from(NOW.minusSeconds(ONE_DAY_IN_SECOND)));
+
+        Task.Result result = task.run();
+
+        assertThat(result).isEqualTo(Task.Result.COMPLETED);
+        assertThat(task.snapshot())
+            .isEqualTo(FeedHamToRSpamDTask.Context.Snapshot.builder()
+                .hamMessageCount(1)
+                .reportedHamMessageCount(1)
+                .errorCount(0)
+                .messagesPerSecond(DEFAULT_MESSAGES_PER_SECOND)
+                .period(Optional.of(TWO_DAYS_IN_SECOND))
+                .samplingProbability(DEFAULT_SAMPLING_PROBABILITY)
+                .build());
+    }
+
+    @Test
+    void taskShouldNotReportHamMessageNotInPeriod() throws MailboxException {
+        FeedHamToRSpamDTask.RunningOptions runningOptions = new FeedHamToRSpamDTask.RunningOptions(Optional.of(TWO_DAYS_IN_SECOND),
+            DEFAULT_MESSAGES_PER_SECOND, DEFAULT_SAMPLING_PROBABILITY);
+        task = new FeedHamToRSpamDTask(mailboxManager, usersRepository, messageIdManager, mapperFactory, client, runningOptions, clock);
+
+        appendHamMessage(BOB_INBOX_MAILBOX, Date.from(NOW.minusSeconds(THREE_DAYS_IN_SECOND)));
+
+        Task.Result result = task.run();
+
+        assertThat(result).isEqualTo(Task.Result.COMPLETED);
+        assertThat(task.snapshot())
+            .isEqualTo(FeedHamToRSpamDTask.Context.Snapshot.builder()
+                .hamMessageCount(1)
+                .reportedHamMessageCount(0)
+                .errorCount(0)
+                .messagesPerSecond(DEFAULT_MESSAGES_PER_SECOND)
+                .period(Optional.of(TWO_DAYS_IN_SECOND))
+                .samplingProbability(DEFAULT_SAMPLING_PROBABILITY)
+                .build());
+    }
+
+    @Test
+    void mixedInternalDateCase() throws MailboxException {
+        FeedHamToRSpamDTask.RunningOptions runningOptions = new FeedHamToRSpamDTask.RunningOptions(Optional.of(TWO_DAYS_IN_SECOND),
+            DEFAULT_MESSAGES_PER_SECOND, DEFAULT_SAMPLING_PROBABILITY);
+        task = new FeedHamToRSpamDTask(mailboxManager, usersRepository, messageIdManager, mapperFactory, client, runningOptions, clock);
+
+        appendHamMessage(BOB_INBOX_MAILBOX, Date.from(NOW.minusSeconds(THREE_DAYS_IN_SECOND)));
+        appendHamMessage(BOB_INBOX_MAILBOX, Date.from(NOW.minusSeconds(ONE_DAY_IN_SECOND)));
+
+        Task.Result result = task.run();
+
+        assertThat(result).isEqualTo(Task.Result.COMPLETED);
+        assertThat(task.snapshot())
+            .isEqualTo(FeedHamToRSpamDTask.Context.Snapshot.builder()
+                .hamMessageCount(2)
+                .reportedHamMessageCount(1)
+                .errorCount(0)
+                .messagesPerSecond(DEFAULT_MESSAGES_PER_SECOND)
+                .period(Optional.of(TWO_DAYS_IN_SECOND))
+                .samplingProbability(DEFAULT_SAMPLING_PROBABILITY)
+                .build());
+    }
+
+    @Test
+    void taskWithSamplingProbabilityIsZeroShouldReportNonHamMessage() {
+        FeedHamToRSpamDTask.RunningOptions runningOptions = new FeedHamToRSpamDTask.RunningOptions(Optional.empty(),
+            DEFAULT_MESSAGES_PER_SECOND, 0);
+        task = new FeedHamToRSpamDTask(mailboxManager, usersRepository, messageIdManager, mapperFactory, client, runningOptions, clock);
+
+        IntStream.range(0, 10)
+            .forEach(Throwing.intConsumer(any -> appendHamMessage(BOB_INBOX_MAILBOX, Date.from(NOW.minusSeconds(ONE_DAY_IN_SECOND)))));
+
+        Task.Result result = task.run();
+
+        assertThat(result).isEqualTo(Task.Result.COMPLETED);
+        assertThat(task.snapshot())
+            .isEqualTo(FeedHamToRSpamDTask.Context.Snapshot.builder()
+                .hamMessageCount(10)
+                .reportedHamMessageCount(0)
+                .errorCount(0)
+                .messagesPerSecond(DEFAULT_MESSAGES_PER_SECOND)
+                .period(DEFAULT_PERIOD)
+                .samplingProbability(0)
+                .build());
+    }
+
+    @Test
+    void taskWithDefaultSamplingProbabilityShouldReportAllHamMessages() {
+        IntStream.range(0, 10)
+            .forEach(Throwing.intConsumer(any -> appendHamMessage(BOB_INBOX_MAILBOX, Date.from(NOW.minusSeconds(ONE_DAY_IN_SECOND)))));
+
+        Task.Result result = task.run();
+
+        assertThat(result).isEqualTo(Task.Result.COMPLETED);
+        assertThat(task.snapshot())
+            .isEqualTo(FeedHamToRSpamDTask.Context.Snapshot.builder()
+                .hamMessageCount(10)
+                .reportedHamMessageCount(10)
+                .errorCount(0)
+                .messagesPerSecond(DEFAULT_MESSAGES_PER_SECOND)
+                .period(DEFAULT_PERIOD)
+                .samplingProbability(DEFAULT_SAMPLING_PROBABILITY)
+                .build());
+    }
+
+    @Test
+    void taskWithVeryLowSamplingProbabilityShouldReportNotAllHamMessages() {
+        FeedHamToRSpamDTask.RunningOptions runningOptions = new FeedHamToRSpamDTask.RunningOptions(Optional.empty(),
+            DEFAULT_MESSAGES_PER_SECOND, 0.01);
+        task = new FeedHamToRSpamDTask(mailboxManager, usersRepository, messageIdManager, mapperFactory, client, runningOptions, clock);
+
+        IntStream.range(0, 10)
+                .forEach(Throwing.intConsumer(any -> appendHamMessage(BOB_INBOX_MAILBOX, Date.from(NOW.minusSeconds(ONE_DAY_IN_SECOND)))));
+
+        Task.Result result = task.run();
+
+        SoftAssertions.assertSoftly(softly -> {
+            assertThat(result).isEqualTo(Task.Result.COMPLETED);
+            assertThat(task.snapshot().getHamMessageCount()).isEqualTo(10);
+            assertThat(task.snapshot().getReportedHamMessageCount()).isLessThan(10);
+            assertThat(task.snapshot().getErrorCount()).isZero();
+        });
+    }
+
+    @Test
+    void taskWithVeryHighSamplingProbabilityShouldReportMoreThanZeroMessage() {
+        FeedHamToRSpamDTask.RunningOptions runningOptions = new FeedHamToRSpamDTask.RunningOptions(Optional.empty(),
+            DEFAULT_MESSAGES_PER_SECOND, 0.99);
+        task = new FeedHamToRSpamDTask(mailboxManager, usersRepository, messageIdManager, mapperFactory, client, runningOptions, clock);
+
+        IntStream.range(0, 10)
+            .forEach(Throwing.intConsumer(any -> appendHamMessage(BOB_INBOX_MAILBOX, Date.from(NOW.minusSeconds(ONE_DAY_IN_SECOND)))));
+
+        Task.Result result = task.run();
+
+        SoftAssertions.assertSoftly(softly -> {
+            assertThat(result).isEqualTo(Task.Result.COMPLETED);
+            assertThat(task.snapshot().getHamMessageCount()).isEqualTo(10);
+            assertThat(task.snapshot().getReportedHamMessageCount()).isPositive();
+            assertThat(task.snapshot().getErrorCount()).isZero();
+        });
+    }
+
+    @Test
+    void taskWithAverageSamplingProbabilityShouldReportSomeMessages() {
+        FeedHamToRSpamDTask.RunningOptions runningOptions = new FeedHamToRSpamDTask.RunningOptions(Optional.empty(),
+            DEFAULT_MESSAGES_PER_SECOND, 0.5);
+        task = new FeedHamToRSpamDTask(mailboxManager, usersRepository, messageIdManager, mapperFactory, client, runningOptions, clock);
+
+        IntStream.range(0, 10)
+            .forEach(Throwing.intConsumer(any -> appendHamMessage(BOB_INBOX_MAILBOX, Date.from(NOW.minusSeconds(ONE_DAY_IN_SECOND)))));
+
+        Task.Result result = task.run();
+
+        SoftAssertions.assertSoftly(softly -> {
+            assertThat(result).isEqualTo(Task.Result.COMPLETED);
+            assertThat(task.snapshot().getHamMessageCount()).isEqualTo(10);
+            assertThat(task.snapshot().getReportedHamMessageCount()).isBetween(1L, 9L); // skip 0 and 10 cases cause their probability is very low (0.5^10)
+            assertThat(task.snapshot().getErrorCount()).isZero();
+        });
+    }
+
+    @Test
+    void shouldNotReportMessagesInTrashAndSpamMailboxes() throws MailboxException {
+        appendHamMessage(BOB_TRASH_MAILBOX, Date.from(NOW));
+        appendHamMessage(BOB_SPAM_MAILBOX, Date.from(NOW));
+
+        Task.Result result = task.run();
+
+        assertThat(result).isEqualTo(Task.Result.COMPLETED);
+        assertThat(task.snapshot())
+            .isEqualTo(FeedHamToRSpamDTask.Context.Snapshot.builder()
+                .hamMessageCount(0)
+                .reportedHamMessageCount(0)
+                .errorCount(0)
+                .messagesPerSecond(DEFAULT_MESSAGES_PER_SECOND)
+                .period(DEFAULT_PERIOD)
+                .samplingProbability(DEFAULT_SAMPLING_PROBABILITY)
+                .build());
+    }
+
+    @Test
+    void shouldReportMessagesInHamMailboxes() throws MailboxException {
+        appendHamMessage(BOB_INBOX_MAILBOX, Date.from(NOW));
+        appendHamMessage(BOB_CUSTOM_MAILBOX, Date.from(NOW));
+
+        Task.Result result = task.run();
+
+        assertThat(result).isEqualTo(Task.Result.COMPLETED);
+        assertThat(task.snapshot())
+            .isEqualTo(FeedHamToRSpamDTask.Context.Snapshot.builder()
+                .hamMessageCount(2)
+                .reportedHamMessageCount(2)
+                .errorCount(0)
+                .messagesPerSecond(DEFAULT_MESSAGES_PER_SECOND)
+                .period(DEFAULT_PERIOD)
+                .samplingProbability(DEFAULT_SAMPLING_PROBABILITY)
+                .build());
+    }
+
+    @Test
+    void mixedMailboxesCase() throws MailboxException {
+        appendHamMessage(BOB_INBOX_MAILBOX, Date.from(NOW));
+        appendHamMessage(BOB_CUSTOM_MAILBOX, Date.from(NOW));
+        appendHamMessage(BOB_TRASH_MAILBOX, Date.from(NOW));
+        appendHamMessage(BOB_SPAM_MAILBOX, Date.from(NOW));
+
+        Task.Result result = task.run();
+
+        assertThat(result).isEqualTo(Task.Result.COMPLETED);
+        assertThat(task.snapshot())
+            .isEqualTo(FeedHamToRSpamDTask.Context.Snapshot.builder()
+                .hamMessageCount(2)
+                .reportedHamMessageCount(2)
+                .errorCount(0)
+                .messagesPerSecond(DEFAULT_MESSAGES_PER_SECOND)
+                .period(DEFAULT_PERIOD)
+                .samplingProbability(DEFAULT_SAMPLING_PROBABILITY)
+                .build());
+    }
+
+    private void appendHamMessage(MailboxPath mailboxPath, Date internalDate) throws MailboxException {
+        MailboxSession session = mailboxManager.createSystemSession(mailboxPath.getUser());
+        mailboxManager.getMailbox(mailboxPath, session)
+            .appendMessage(new ByteArrayInputStream(String.format("random content %4.3f", Math.random()).getBytes()),
+                internalDate,
+                session,
+                true,
+                new Flags());
+    }
+}
diff --git a/third-party/rspamd/src/test/resources/json/feedHamEmptyPeriod.additionalInformation.json b/third-party/rspamd/src/test/resources/json/feedHamEmptyPeriod.additionalInformation.json
new file mode 100644
index 0000000000..dc3faae270
--- /dev/null
+++ b/third-party/rspamd/src/test/resources/json/feedHamEmptyPeriod.additionalInformation.json
@@ -0,0 +1,11 @@
+{
+  "errorCount": 1,
+  "reportedHamMessageCount": 2,
+  "runningOptions": {
+    "messagesPerSecond": 10,
+    "samplingProbability": 1.0
+  },
+  "hamMessageCount": 4,
+  "timestamp": "2007-12-03T10:15:30Z",
+  "type": "FeedHamToRSpamDTask"
+}
\ No newline at end of file
diff --git a/third-party/rspamd/src/test/resources/json/feedHamNonEmptyPeriod.additionalInformation.json b/third-party/rspamd/src/test/resources/json/feedHamNonEmptyPeriod.additionalInformation.json
new file mode 100644
index 0000000000..bcdaf02c88
--- /dev/null
+++ b/third-party/rspamd/src/test/resources/json/feedHamNonEmptyPeriod.additionalInformation.json
@@ -0,0 +1,12 @@
+{
+  "errorCount": 1,
+  "reportedHamMessageCount": 2,
+  "runningOptions": {
+    "messagesPerSecond": 10,
+    "periodInSecond": 3600,
+    "samplingProbability": 1.0
+  },
+  "hamMessageCount": 4,
+  "timestamp": "2007-12-03T10:15:30Z",
+  "type": "FeedHamToRSpamDTask"
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 02/02: JAMES-3775 Write webadmin route to create feeding ham messages task

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 5cce1a5f734400fcac4260ed51b6249153b9df74
Author: Quan Tran <hq...@linagora.com>
AuthorDate: Fri Jul 29 17:11:33 2022 +0700

    JAMES-3775 Write webadmin route to create feeding ham messages task
---
 third-party/rspamd/README.md                       |  46 ++++
 .../apache/james/rspamd/module/RSpamDModule.java   |  12 +
 .../james/rspamd/route/FeedMessageRoute.java       |  28 +-
 .../james/rspamd/route/FeedMessageRouteTest.java   | 286 +++++++++++++++++++--
 4 files changed, 346 insertions(+), 26 deletions(-)

diff --git a/third-party/rspamd/README.md b/third-party/rspamd/README.md
index 2f98347f0d..e508228d11 100644
--- a/third-party/rspamd/README.md
+++ b/third-party/rspamd/README.md
@@ -124,4 +124,50 @@ The scheduled task will have the following type `FeedSpamToRSpamDTask` and the f
   "timestamp": "2007-12-03T10:15:30Z",
   "type": "FeedSpamToRSpamDTask"
 }
+```
+
+### Report ham messages to RSpamD
+One can use this route to schedule a task that reports ham messages to RSpamD for its spam classify learning.
+
+```bash
+curl -XPOST 'http://ip:port/rspamd?action=reportHam
+```
+
+This endpoint has the following param:
+- `action` (required): need to be `reportHam`
+- `messagesPerSecond` (optional): Concurrent learns performed for RSpamD, default to 10
+- `period` (optional): duration (support many time units, default in seconds), only messages between `now` and `now - duration` are reported. By default,
+  all messages are reported.
+  These inputs represent the same duration: `1d`, `1day`, `86400 seconds`, `86400`...
+- `samplingProbability` (optional): float between 0 and 1, represent the chance to report each given message to RSpamD.
+  By default, all messages are reported.
+
+Will return the task id. E.g:
+```
+{
+    "taskId": "70c12761-ab86-4321-bb6f-fde99e2f74b0"
+}
+```
+
+Response codes:
+- 201: Task generation succeeded. Corresponding task id is returned.
+- 400: Invalid arguments supplied in the user request.
+
+[More details about endpoints returning a task](https://james.apache.org/server/manage-webadmin.html#Endpoints_returning_a_task).
+
+The scheduled task will have the following type `FeedHamToRSpamDTask` and the following additionalInformation:
+
+```json
+{
+  "errorCount": 1,
+  "reportedHamMessageCount": 2,
+  "runningOptions": {
+    "messagesPerSecond": 10,
+    "periodInSecond": 3600,
+    "samplingProbability": 1.0
+  },
+  "hamMessageCount": 4,
+  "timestamp": "2007-12-03T10:15:30Z",
+  "type": "FeedHamToRSpamDTask"
+}
 ```
\ No newline at end of file
diff --git a/third-party/rspamd/src/main/java/org/apache/james/rspamd/module/RSpamDModule.java b/third-party/rspamd/src/main/java/org/apache/james/rspamd/module/RSpamDModule.java
index c3865bf7a1..3d27718446 100644
--- a/third-party/rspamd/src/main/java/org/apache/james/rspamd/module/RSpamDModule.java
+++ b/third-party/rspamd/src/main/java/org/apache/james/rspamd/module/RSpamDModule.java
@@ -20,6 +20,7 @@
 package org.apache.james.rspamd.module;
 
 import org.apache.james.rspamd.route.FeedMessageRoute;
+import org.apache.james.rspamd.task.FeedHamToRSpamDTaskAdditionalInformationDTO;
 import org.apache.james.rspamd.task.FeedSpamToRSpamDTaskAdditionalInformationDTO;
 import org.apache.james.server.task.json.dto.AdditionalInformationDTO;
 import org.apache.james.server.task.json.dto.AdditionalInformationDTOModule;
@@ -49,4 +50,15 @@ public class RSpamDModule extends AbstractModule {
     public AdditionalInformationDTOModule<? extends TaskExecutionDetails.AdditionalInformation, ? extends AdditionalInformationDTO> webAdminFeedSpamAdditionalInformation() {
         return FeedSpamToRSpamDTaskAdditionalInformationDTO.SERIALIZATION_MODULE;
     }
+
+    @ProvidesIntoSet
+    public AdditionalInformationDTOModule<? extends TaskExecutionDetails.AdditionalInformation, ? extends AdditionalInformationDTO> feedHamAdditionalInformation() {
+        return FeedHamToRSpamDTaskAdditionalInformationDTO.SERIALIZATION_MODULE;
+    }
+
+    @Named(DTOModuleInjections.WEBADMIN_DTO)
+    @ProvidesIntoSet
+    public AdditionalInformationDTOModule<? extends TaskExecutionDetails.AdditionalInformation, ? extends AdditionalInformationDTO> webAdminFeedHamAdditionalInformation() {
+        return FeedHamToRSpamDTaskAdditionalInformationDTO.SERIALIZATION_MODULE;
+    }
 }
diff --git a/third-party/rspamd/src/main/java/org/apache/james/rspamd/route/FeedMessageRoute.java b/third-party/rspamd/src/main/java/org/apache/james/rspamd/route/FeedMessageRoute.java
index b1c631018b..483b7aeb92 100644
--- a/third-party/rspamd/src/main/java/org/apache/james/rspamd/route/FeedMessageRoute.java
+++ b/third-party/rspamd/src/main/java/org/apache/james/rspamd/route/FeedMessageRoute.java
@@ -19,9 +19,6 @@
 
 package org.apache.james.rspamd.route;
 
-import static org.apache.james.rspamd.task.FeedSpamToRSpamDTask.RunningOptions.DEFAULT_MESSAGES_PER_SECOND;
-import static org.apache.james.rspamd.task.FeedSpamToRSpamDTask.RunningOptions.DEFAULT_SAMPLING_PROBABILITY;
-
 import java.time.Clock;
 import java.time.temporal.ChronoUnit;
 import java.util.Optional;
@@ -33,6 +30,7 @@ import org.apache.james.mailbox.MailboxManager;
 import org.apache.james.mailbox.MessageIdManager;
 import org.apache.james.mailbox.store.MailboxSessionMapperFactory;
 import org.apache.james.rspamd.client.RSpamDHttpClient;
+import org.apache.james.rspamd.task.FeedHamToRSpamDTask;
 import org.apache.james.rspamd.task.FeedSpamToRSpamDTask;
 import org.apache.james.task.Task;
 import org.apache.james.task.TaskManager;
@@ -49,6 +47,8 @@ import spark.Service;
 
 public class FeedMessageRoute implements Routes {
     public static final String BASE_PATH = "/rspamd";
+    private static final String REPORT_SPAM_PARAM = "reportSpam";
+    private static final String REPORT_HAM_PARAM = "reportHam";
 
     private final TaskManager taskManager;
     private final MailboxManager mailboxManager;
@@ -85,21 +85,29 @@ public class FeedMessageRoute implements Routes {
 
     public Task feedMessageTaskFromRequest(Request request) {
         Preconditions.checkArgument(Optional.ofNullable(request.queryParams("action"))
-                .filter(action -> action.equals("reportSpam") || action.equals("reportHam"))
-                .isPresent(),
-            "'action' is missing or must be 'reportSpam' or 'reportHam'");
+                .filter(action -> action.equals(REPORT_SPAM_PARAM) || action.equals(REPORT_HAM_PARAM))
+                .isPresent(), String.format("'action' is missing or must be '%s' or '%s'", REPORT_SPAM_PARAM, REPORT_HAM_PARAM));
 
-        return new FeedSpamToRSpamDTask(mailboxManager, usersRepository, messageIdManager, mapperFactory, rSpamDHttpClient,
-            getFeedSpamTaskRunningOptions(request), clock);
+        return Optional.ofNullable(request.queryParams("action"))
+            .filter(action -> action.equals(REPORT_SPAM_PARAM))
+            .map(any -> (Task) new FeedSpamToRSpamDTask(mailboxManager, usersRepository, messageIdManager, mapperFactory, rSpamDHttpClient, getFeedSpamTaskRunningOptions(request), clock))
+            .orElse(new FeedHamToRSpamDTask(mailboxManager, usersRepository, messageIdManager, mapperFactory, rSpamDHttpClient, getFeedHamTaskRunningOptions(request), clock));
     }
 
     private FeedSpamToRSpamDTask.RunningOptions getFeedSpamTaskRunningOptions(Request request) {
         Optional<Long> periodInSecond = getPeriod(request);
-        int messagesPerSecond = getMessagesPerSecond(request).orElse(DEFAULT_MESSAGES_PER_SECOND);
-        double samplingProbability = getSamplingProbability(request).orElse(DEFAULT_SAMPLING_PROBABILITY);
+        int messagesPerSecond = getMessagesPerSecond(request).orElse(FeedSpamToRSpamDTask.RunningOptions.DEFAULT_MESSAGES_PER_SECOND);
+        double samplingProbability = getSamplingProbability(request).orElse(FeedSpamToRSpamDTask.RunningOptions.DEFAULT_SAMPLING_PROBABILITY);
         return new FeedSpamToRSpamDTask.RunningOptions(periodInSecond, messagesPerSecond, samplingProbability);
     }
 
+    private FeedHamToRSpamDTask.RunningOptions getFeedHamTaskRunningOptions(Request request) {
+        Optional<Long> periodInSecond = getPeriod(request);
+        int messagesPerSecond = getMessagesPerSecond(request).orElse(FeedHamToRSpamDTask.RunningOptions.DEFAULT_MESSAGES_PER_SECOND);
+        double samplingProbability = getSamplingProbability(request).orElse(FeedHamToRSpamDTask.RunningOptions.DEFAULT_SAMPLING_PROBABILITY);
+        return new FeedHamToRSpamDTask.RunningOptions(periodInSecond, messagesPerSecond, samplingProbability);
+    }
+
     private Optional<Long> getPeriod(Request req) {
         return Optional.ofNullable(req.queryParams("period"))
             .filter(Predicate.not(String::isEmpty))
diff --git a/third-party/rspamd/src/test/java/org/apache/james/rspamd/route/FeedMessageRouteTest.java b/third-party/rspamd/src/test/java/org/apache/james/rspamd/route/FeedMessageRouteTest.java
index 4af36a5406..e31d69a896 100644
--- a/third-party/rspamd/src/test/java/org/apache/james/rspamd/route/FeedMessageRouteTest.java
+++ b/third-party/rspamd/src/test/java/org/apache/james/rspamd/route/FeedMessageRouteTest.java
@@ -23,8 +23,8 @@ import static io.restassured.RestAssured.given;
 import static io.restassured.http.ContentType.JSON;
 import static org.apache.james.rspamd.DockerRSpamD.PASSWORD;
 import static org.apache.james.rspamd.route.FeedMessageRoute.BASE_PATH;
-import static org.apache.james.rspamd.task.FeedSpamToRSpamDTask.RunningOptions.DEFAULT_MESSAGES_PER_SECOND;
-import static org.apache.james.rspamd.task.FeedSpamToRSpamDTask.RunningOptions.DEFAULT_SAMPLING_PROBABILITY;
+import static org.apache.james.rspamd.task.FeedHamToRSpamDTaskTest.ALICE_INBOX_MAILBOX;
+import static org.apache.james.rspamd.task.FeedHamToRSpamDTaskTest.BOB_INBOX_MAILBOX;
 import static org.apache.james.rspamd.task.FeedSpamToRSpamDTaskTest.ALICE;
 import static org.apache.james.rspamd.task.FeedSpamToRSpamDTaskTest.ALICE_SPAM_MAILBOX;
 import static org.apache.james.rspamd.task.FeedSpamToRSpamDTaskTest.BOB;
@@ -65,6 +65,8 @@ import org.apache.james.mailbox.store.MailboxSessionMapperFactory;
 import org.apache.james.rspamd.DockerRSpamDExtension;
 import org.apache.james.rspamd.client.RSpamDClientConfiguration;
 import org.apache.james.rspamd.client.RSpamDHttpClient;
+import org.apache.james.rspamd.task.FeedHamToRSpamDTask;
+import org.apache.james.rspamd.task.FeedHamToRSpamDTaskAdditionalInformationDTO;
 import org.apache.james.rspamd.task.FeedSpamToRSpamDTask;
 import org.apache.james.rspamd.task.FeedSpamToRSpamDTaskAdditionalInformationDTO;
 import org.apache.james.task.Hostname;
@@ -111,7 +113,10 @@ public class FeedMessageRouteTest {
         usersRepository.addUser(BOB, "anyPassword");
         usersRepository.addUser(ALICE, "anyPassword");
         mailboxManager.createMailbox(BOB_SPAM_MAILBOX, mailboxManager.createSystemSession(BOB));
+        mailboxManager.createMailbox(BOB_INBOX_MAILBOX, mailboxManager.createSystemSession(BOB));
         mailboxManager.createMailbox(ALICE_SPAM_MAILBOX, mailboxManager.createSystemSession(ALICE));
+        mailboxManager.createMailbox(ALICE_INBOX_MAILBOX, mailboxManager.createSystemSession(ALICE));
+
         taskManager = new MemoryTaskManager(new Hostname("foo"));
         UpdatableTickingClock clock = new UpdatableTickingClock(NOW);
         JsonTransformer jsonTransformer = new JsonTransformer();
@@ -119,7 +124,8 @@ public class FeedMessageRouteTest {
         MessageIdManager messageIdManager = inMemoryIntegrationResources.getMessageIdManager();
         MailboxSessionMapperFactory mapperFactory = mailboxManager.getMapperFactory();
 
-        TasksRoutes tasksRoutes = new TasksRoutes(taskManager, jsonTransformer, DTOConverter.of(FeedSpamToRSpamDTaskAdditionalInformationDTO.SERIALIZATION_MODULE));
+        TasksRoutes tasksRoutes = new TasksRoutes(taskManager, jsonTransformer, DTOConverter.of(FeedSpamToRSpamDTaskAdditionalInformationDTO.SERIALIZATION_MODULE,
+            FeedHamToRSpamDTaskAdditionalInformationDTO.SERIALIZATION_MODULE));
         FeedMessageRoute feedMessageRoute = new FeedMessageRoute(taskManager, mailboxManager, usersRepository, client, jsonTransformer, clock,
             messageIdManager, mapperFactory);
 
@@ -136,7 +142,7 @@ public class FeedMessageRouteTest {
         taskManager.stop();
     }
 
-    private void appendSpamMessage(MailboxPath mailboxPath, Date internalDate) throws MailboxException {
+    private void appendMessage(MailboxPath mailboxPath, Date internalDate) throws MailboxException {
         MailboxSession session = mailboxManager.createSystemSession(mailboxPath.getUser());
         mailboxManager.getMailbox(mailboxPath, session)
             .appendMessage(new ByteArrayInputStream(String.format("random content %4.3f", Math.random()).getBytes()),
@@ -150,8 +156,8 @@ public class FeedMessageRouteTest {
     class FeedSpam {
         @Test
         void taskShouldReportAllSpamMessagesOfAllUsersByDefault() throws MailboxException {
-            appendSpamMessage(BOB_SPAM_MAILBOX, Date.from(NOW));
-            appendSpamMessage(ALICE_SPAM_MAILBOX, Date.from(NOW));
+            appendMessage(BOB_SPAM_MAILBOX, Date.from(NOW));
+            appendMessage(ALICE_SPAM_MAILBOX, Date.from(NOW));
 
             String taskId = given()
                 .queryParam("action", "reportSpam")
@@ -169,15 +175,15 @@ public class FeedMessageRouteTest {
                 .body("additionalInformation.spamMessageCount", is(2))
                 .body("additionalInformation.reportedSpamMessageCount", is(2))
                 .body("additionalInformation.errorCount", is(0))
-                .body("additionalInformation.runningOptions.messagesPerSecond", is(DEFAULT_MESSAGES_PER_SECOND))
+                .body("additionalInformation.runningOptions.messagesPerSecond", is(FeedSpamToRSpamDTask.RunningOptions.DEFAULT_MESSAGES_PER_SECOND))
                 .body("additionalInformation.runningOptions.periodInSecond", is(nullValue()))
-                .body("additionalInformation.runningOptions.samplingProbability", is((float) DEFAULT_SAMPLING_PROBABILITY));
+                .body("additionalInformation.runningOptions.samplingProbability", is((float) FeedSpamToRSpamDTask.RunningOptions.DEFAULT_SAMPLING_PROBABILITY));
         }
 
         @Test
         void taskShouldReportOnlyMailInPeriod() throws MailboxException {
-            appendSpamMessage(BOB_SPAM_MAILBOX, Date.from(NOW.minusSeconds(THREE_DAYS_IN_SECOND)));
-            appendSpamMessage(ALICE_SPAM_MAILBOX, Date.from(NOW.minusSeconds(ONE_DAY_IN_SECOND)));
+            appendMessage(BOB_SPAM_MAILBOX, Date.from(NOW.minusSeconds(THREE_DAYS_IN_SECOND)));
+            appendMessage(ALICE_SPAM_MAILBOX, Date.from(NOW.minusSeconds(ONE_DAY_IN_SECOND)));
 
             String taskId = given()
                 .queryParam("action", "reportSpam")
@@ -196,15 +202,15 @@ public class FeedMessageRouteTest {
                 .body("additionalInformation.spamMessageCount", is(2))
                 .body("additionalInformation.reportedSpamMessageCount", is(1))
                 .body("additionalInformation.errorCount", is(0))
-                .body("additionalInformation.runningOptions.messagesPerSecond", is(DEFAULT_MESSAGES_PER_SECOND))
+                .body("additionalInformation.runningOptions.messagesPerSecond", is(FeedSpamToRSpamDTask.RunningOptions.DEFAULT_MESSAGES_PER_SECOND))
                 .body("additionalInformation.runningOptions.periodInSecond", is(172800))
-                .body("additionalInformation.runningOptions.samplingProbability", is((float) DEFAULT_SAMPLING_PROBABILITY));
+                .body("additionalInformation.runningOptions.samplingProbability", is((float) FeedSpamToRSpamDTask.RunningOptions.DEFAULT_SAMPLING_PROBABILITY));
         }
 
         @Test
         void taskWithAverageSamplingProbabilityShouldNotReportAllSpamMessages() {
             IntStream.range(0, 10)
-                .forEach(Throwing.intConsumer(any -> appendSpamMessage(BOB_SPAM_MAILBOX, Date.from(NOW.minusSeconds(ONE_DAY_IN_SECOND)))));
+                .forEach(Throwing.intConsumer(any -> appendMessage(BOB_SPAM_MAILBOX, Date.from(NOW.minusSeconds(ONE_DAY_IN_SECOND)))));
 
             String taskId = given()
                 .queryParam("action", "reportSpam")
@@ -223,7 +229,7 @@ public class FeedMessageRouteTest {
                 .body("additionalInformation.spamMessageCount", is(10))
                 .body("additionalInformation.reportedSpamMessageCount", is(allOf(greaterThan(0), lessThan(10))))
                 .body("additionalInformation.errorCount", is(0))
-                .body("additionalInformation.runningOptions.messagesPerSecond", is(DEFAULT_MESSAGES_PER_SECOND))
+                .body("additionalInformation.runningOptions.messagesPerSecond", is(FeedSpamToRSpamDTask.RunningOptions.DEFAULT_MESSAGES_PER_SECOND))
                 .body("additionalInformation.runningOptions.periodInSecond", is(nullValue()))
                 .body("additionalInformation.runningOptions.samplingProbability", is(0.5F));
         }
@@ -289,9 +295,9 @@ public class FeedMessageRouteTest {
                 .body("additionalInformation.spamMessageCount", is(0))
                 .body("additionalInformation.reportedSpamMessageCount", is(0))
                 .body("additionalInformation.errorCount", is(0))
-                .body("additionalInformation.runningOptions.messagesPerSecond", is(DEFAULT_MESSAGES_PER_SECOND))
+                .body("additionalInformation.runningOptions.messagesPerSecond", is(FeedSpamToRSpamDTask.RunningOptions.DEFAULT_MESSAGES_PER_SECOND))
                 .body("additionalInformation.runningOptions.periodInSecond", is(nullValue()))
-                .body("additionalInformation.runningOptions.samplingProbability", is((float) DEFAULT_SAMPLING_PROBABILITY));
+                .body("additionalInformation.runningOptions.samplingProbability", is((float) FeedSpamToRSpamDTask.RunningOptions.DEFAULT_SAMPLING_PROBABILITY));
         }
 
         @ParameterizedTest
@@ -393,4 +399,252 @@ public class FeedMessageRouteTest {
                 .body("details", containsString("samplingProbability"));
         }
     }
+
+    @Nested
+    class FeedHam {
+        @Test
+        void taskShouldReportAllHamMessagesOfAllUsersByDefault() throws MailboxException {
+            appendMessage(BOB_INBOX_MAILBOX, Date.from(NOW));
+            appendMessage(ALICE_INBOX_MAILBOX, Date.from(NOW));
+
+            String taskId = given()
+                .queryParam("action", "reportHam")
+                .post()
+                .jsonPath()
+                .get("taskId");
+
+            given()
+                .basePath(TasksRoutes.BASE)
+            .when()
+                .get(taskId + "/await")
+            .then()
+                .body("status", is("completed"))
+                .body("additionalInformation.type", is(FeedHamToRSpamDTask.TASK_TYPE.asString()))
+                .body("additionalInformation.hamMessageCount", is(2))
+                .body("additionalInformation.reportedHamMessageCount", is(2))
+                .body("additionalInformation.errorCount", is(0))
+                .body("additionalInformation.runningOptions.messagesPerSecond", is(FeedHamToRSpamDTask.RunningOptions.DEFAULT_MESSAGES_PER_SECOND))
+                .body("additionalInformation.runningOptions.periodInSecond", is(nullValue()))
+                .body("additionalInformation.runningOptions.samplingProbability", is((float) FeedHamToRSpamDTask.RunningOptions.DEFAULT_SAMPLING_PROBABILITY));
+        }
+
+        @Test
+        void taskShouldReportOnlyMailInPeriod() throws MailboxException {
+            appendMessage(BOB_INBOX_MAILBOX, Date.from(NOW.minusSeconds(THREE_DAYS_IN_SECOND)));
+            appendMessage(ALICE_INBOX_MAILBOX, Date.from(NOW.minusSeconds(ONE_DAY_IN_SECOND)));
+
+            String taskId = given()
+                .queryParam("action", "reportHam")
+                .queryParam("period", TWO_DAYS_IN_SECOND)
+                .post()
+                .jsonPath()
+                .get("taskId");
+
+            given()
+                .basePath(TasksRoutes.BASE)
+            .when()
+                .get(taskId + "/await")
+            .then()
+                .body("status", is("completed"))
+                .body("additionalInformation.type", is(FeedHamToRSpamDTask.TASK_TYPE.asString()))
+                .body("additionalInformation.hamMessageCount", is(2))
+                .body("additionalInformation.reportedHamMessageCount", is(1))
+                .body("additionalInformation.errorCount", is(0))
+                .body("additionalInformation.runningOptions.messagesPerSecond", is(FeedHamToRSpamDTask.RunningOptions.DEFAULT_MESSAGES_PER_SECOND))
+                .body("additionalInformation.runningOptions.periodInSecond", is(172800))
+                .body("additionalInformation.runningOptions.samplingProbability", is((float) FeedHamToRSpamDTask.RunningOptions.DEFAULT_SAMPLING_PROBABILITY));
+        }
+
+        @Test
+        void taskWithAverageSamplingProbabilityShouldNotReportAllHamMessages() {
+            IntStream.range(0, 10)
+                .forEach(Throwing.intConsumer(any -> appendMessage(BOB_INBOX_MAILBOX, Date.from(NOW.minusSeconds(ONE_DAY_IN_SECOND)))));
+
+            String taskId = given()
+                .queryParam("action", "reportHam")
+                .queryParam("samplingProbability", 0.5)
+                .post()
+                .jsonPath()
+                .get("taskId");
+
+            given()
+                .basePath(TasksRoutes.BASE)
+            .when()
+                .get(taskId + "/await")
+            .then()
+                .body("status", is("completed"))
+                .body("additionalInformation.type", is(FeedHamToRSpamDTask.TASK_TYPE.asString()))
+                .body("additionalInformation.hamMessageCount", is(10))
+                .body("additionalInformation.reportedHamMessageCount", is(allOf(greaterThan(0), lessThan(10))))
+                .body("additionalInformation.errorCount", is(0))
+                .body("additionalInformation.runningOptions.messagesPerSecond", is(FeedHamToRSpamDTask.RunningOptions.DEFAULT_MESSAGES_PER_SECOND))
+                .body("additionalInformation.runningOptions.periodInSecond", is(nullValue()))
+                .body("additionalInformation.runningOptions.samplingProbability", is(0.5F));
+        }
+
+        @Test
+        void feedMessageShouldReturnErrorWhenInvalidAction() {
+            given()
+                .queryParam("action", "invalid")
+                .post()
+            .then()
+                .statusCode(BAD_REQUEST_400)
+                .contentType(JSON)
+                .body("statusCode", is(BAD_REQUEST_400))
+                .body("type", is("InvalidArgument"))
+                .body("message", is("Invalid arguments supplied in the user request"))
+                .body("details", is("'action' is missing or must be 'reportSpam' or 'reportHam'"));
+        }
+
+        @Test
+        void feedMessageTaskShouldReturnErrorWhenMissingAction() {
+            given()
+                .post()
+            .then()
+                .statusCode(BAD_REQUEST_400)
+                .contentType(JSON)
+                .body("statusCode", is(BAD_REQUEST_400))
+                .body("type", is("InvalidArgument"))
+                .body("message", is("Invalid arguments supplied in the user request"))
+                .body("details", is("'action' is missing or must be 'reportSpam' or 'reportHam'"));
+        }
+
+        @Test
+        void feedHamShouldReturnTaskId() {
+            given()
+                .queryParam("action", "reportHam")
+                .post()
+            .then()
+                .statusCode(HttpStatus.CREATED_201)
+                .body("taskId", notNullValue());
+        }
+
+        @Test
+        void feedHamShouldReturnDetail() {
+            String taskId = given()
+                .queryParam("action", "reportHam")
+                .post()
+                .jsonPath()
+                .get("taskId");
+
+            given()
+                .basePath(TasksRoutes.BASE)
+            .when()
+                .get(taskId + "/await")
+            .then()
+                .body("status", is("completed"))
+                .body("taskId", is(notNullValue()))
+                .body("type", is(FeedHamToRSpamDTask.TASK_TYPE.asString()))
+                .body("startedDate", is(notNullValue()))
+                .body("submitDate", is(notNullValue()))
+                .body("completedDate", is(notNullValue()))
+                .body("additionalInformation.type", is(FeedHamToRSpamDTask.TASK_TYPE.asString()))
+                .body("additionalInformation.timestamp", is(notNullValue()))
+                .body("additionalInformation.hamMessageCount", is(0))
+                .body("additionalInformation.reportedHamMessageCount", is(0))
+                .body("additionalInformation.errorCount", is(0))
+                .body("additionalInformation.runningOptions.messagesPerSecond", is(FeedHamToRSpamDTask.RunningOptions.DEFAULT_MESSAGES_PER_SECOND))
+                .body("additionalInformation.runningOptions.periodInSecond", is(nullValue()))
+                .body("additionalInformation.runningOptions.samplingProbability", is((float) FeedHamToRSpamDTask.RunningOptions.DEFAULT_SAMPLING_PROBABILITY));
+        }
+
+        @ParameterizedTest
+        @ValueSource(strings = {"3600", "3600 seconds", "1d", "1day"})
+        void feedHamShouldAcceptPeriodParam(String period) {
+            String taskId = given()
+                .queryParam("action", "reportHam")
+                .queryParam("period", period)
+                .post()
+                .jsonPath()
+                .get("taskId");
+
+            given()
+                .basePath(TasksRoutes.BASE)
+            .when()
+                .get(taskId + "/await")
+            .then()
+                .body("additionalInformation.runningOptions.periodInSecond", is((int) DurationParser.parse(period, ChronoUnit.SECONDS).toSeconds()));
+        }
+
+        @ParameterizedTest
+        @ValueSource(strings = {"-1", "0", "1 t"})
+        void feedHamShouldReturnErrorWhenPeriodInvalid(String period) {
+            given()
+                .queryParam("action", "reportHam")
+                .queryParam("period", period)
+                .post()
+            .then()
+                .statusCode(BAD_REQUEST_400)
+                .contentType(JSON)
+                .body("statusCode", is(BAD_REQUEST_400))
+                .body("type", is("InvalidArgument"))
+                .body("message", is("Invalid arguments supplied in the user request"));
+        }
+
+        @Test
+        void feedHamShouldAcceptMessagesPerSecondParam() {
+            String taskId = given()
+                .queryParam("action", "reportHam")
+                .queryParam("messagesPerSecond", 20)
+                .post()
+                .jsonPath()
+                .get("taskId");
+
+            given()
+                .basePath(TasksRoutes.BASE)
+            .when()
+                .get(taskId + "/await")
+                .then()
+                .body("additionalInformation.runningOptions.messagesPerSecond", is(20));
+        }
+
+        @ParameterizedTest
+        @ValueSource(doubles = {-1, -0.1, 1.1})
+        void feedHamShouldReturnErrorWhenMessagesPerSecondInvalid(double messagesPerSecond) {
+            given()
+                .queryParam("action", "reportHam")
+                .queryParam("messagesPerSecond", messagesPerSecond)
+                .post()
+            .then()
+                .statusCode(BAD_REQUEST_400)
+                .contentType(JSON)
+                .body("statusCode", is(BAD_REQUEST_400))
+                .body("type", is("InvalidArgument"))
+                .body("message", is("Invalid arguments supplied in the user request"))
+                .body("details", containsString("messagesPerSecond"));
+        }
+
+        @Test
+        void feedHamShouldAcceptSamplingProbabilityParam() {
+            String taskId = given()
+                .queryParam("action", "reportHam")
+                .queryParam("samplingProbability", 0.8)
+                .post()
+                .jsonPath()
+                .get("taskId");
+
+            given()
+                .basePath(TasksRoutes.BASE)
+            .when()
+                .get(taskId + "/await")
+            .then()
+                .body("additionalInformation.runningOptions.samplingProbability", is(0.8F));
+        }
+
+        @ParameterizedTest
+        @ValueSource(doubles = {-1, -0.1, 1.1})
+        void feedHamShouldReturnErrorWhenSamplingProbabilityInvalid(double samplingProbability) {
+            given()
+                .queryParam("action", "reportHam")
+                .queryParam("samplingProbability", samplingProbability)
+                .post()
+            .then()
+                .statusCode(BAD_REQUEST_400)
+                .contentType(JSON)
+                .body("statusCode", is(BAD_REQUEST_400))
+                .body("type", is("InvalidArgument"))
+                .body("message", is("Invalid arguments supplied in the user request"))
+                .body("details", containsString("samplingProbability"));
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org