You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by ka...@apache.org on 2022/05/25 08:37:01 UTC
[james-project] 01/04: JAMES-3758 Webadmin task to delete old emails from the user INBOX
This is an automated email from the ASF dual-hosted git repository.
kao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 7f6d295e1a5069c6e0c7f36bb1f50ee5da4390f7
Author: Karsten Otto <ka...@akquinet.de>
AuthorDate: Tue May 17 15:30:45 2022 +0200
JAMES-3758 Webadmin task to delete old emails from the user INBOX
---
.../james/mailbox/store/StoreMailboxManager.java | 2 +-
.../WebadminMailboxTaskSerializationModule.java | 19 ++
.../james/webadmin/routes/MessagesRoutes.java | 29 +-
.../ExpireMailboxAdditionalInformationDTO.java | 118 +++++++
.../james/webadmin/service/ExpireMailboxDTO.java | 66 ++++
.../webadmin/service/ExpireMailboxService.java | 248 +++++++++++++++
.../james/webadmin/service/ExpireMailboxTask.java | 129 ++++++++
.../webadmin/routes/MessageRoutesExpireTest.java | 300 +++++++++++++++++
.../james/webadmin/routes/MessageRoutesTest.java | 3 +-
.../webadmin/service/ExpireMailboxServiceTest.java | 354 +++++++++++++++++++++
...ireMailboxTaskAdditionalInformationDTOTest.java | 43 +++
.../ExpireMailboxTaskSerializationTest.java | 55 ++++
.../json/expireMailbox.additionalInformation.json | 14 +
.../resources/json/expireMailbox.age.task.json | 9 +
.../resources/json/expireMailbox.header.task.json | 8 +
15 files changed, 1394 insertions(+), 3 deletions(-)
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
index d8df5eec14..c3385d53af 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
@@ -293,7 +293,7 @@ public class StoreMailboxManager implements MailboxManager {
return createMessageManager(mailboxRow, session);
}).sneakyThrow())
.switchIfEmpty(Mono.fromCallable(() -> {
- LOGGER.info("Mailbox '{}' not found.", mailboxPath);
+ LOGGER.debug("Mailbox '{}' not found.", mailboxPath);
throw new MailboxNotFoundException(mailboxPath);
}));
}
diff --git a/server/container/guice/protocols/webadmin-mailbox/src/main/java/org/apache/james/modules/server/WebadminMailboxTaskSerializationModule.java b/server/container/guice/protocols/webadmin-mailbox/src/main/java/org/apache/james/modules/server/WebadminMailboxTaskSerializationModule.java
index 6db6e69d6f..0e6920e734 100644
--- a/server/container/guice/protocols/webadmin-mailbox/src/main/java/org/apache/james/modules/server/WebadminMailboxTaskSerializationModule.java
+++ b/server/container/guice/protocols/webadmin-mailbox/src/main/java/org/apache/james/modules/server/WebadminMailboxTaskSerializationModule.java
@@ -39,6 +39,9 @@ import org.apache.james.webadmin.service.EventDeadLettersRedeliverGroupTaskDTO;
import org.apache.james.webadmin.service.EventDeadLettersRedeliverOneTaskDTO;
import org.apache.james.webadmin.service.EventDeadLettersRedeliverService;
import org.apache.james.webadmin.service.EventDeadLettersRedeliveryTaskAdditionalInformationDTO;
+import org.apache.james.webadmin.service.ExpireMailboxAdditionalInformationDTO;
+import org.apache.james.webadmin.service.ExpireMailboxDTO;
+import org.apache.james.webadmin.service.ExpireMailboxService;
import org.apache.james.webadmin.service.SubscribeAllTaskAdditionalInformationDTO;
import org.apache.james.webadmin.service.SubscribeAllTaskDTO;
import org.apache.james.webadmin.service.UserMailboxesService;
@@ -83,6 +86,11 @@ public class WebadminMailboxTaskSerializationModule extends AbstractModule {
return ClearMailboxContentTaskDTO.module(userMailboxesService);
}
+ @ProvidesIntoSet
+ public TaskDTOModule<? extends Task, ? extends TaskDTO> expireMailboxTask(ExpireMailboxService service) {
+ return ExpireMailboxDTO.module(service);
+ }
+
@ProvidesIntoSet
public AdditionalInformationDTOModule<? extends TaskExecutionDetails.AdditionalInformation, ? extends AdditionalInformationDTO> eventDeadLettersRedeliveryAdditionalInformationForAll() {
return EventDeadLettersRedeliveryTaskAdditionalInformationDTO.EventDeadLettersRedeliveryTaskAdditionalInformationForAll.module();
@@ -160,4 +168,15 @@ public class WebadminMailboxTaskSerializationModule extends AbstractModule {
return ClearMailboxContentTaskAdditionalInformationDTO.SERIALIZATION_MODULE;
}
+ @ProvidesIntoSet
+ public AdditionalInformationDTOModule<? extends TaskExecutionDetails.AdditionalInformation, ? extends AdditionalInformationDTO> expireMailboxAdditionalInformationDTO() {
+ return ExpireMailboxAdditionalInformationDTO.module();
+ }
+
+ @Named(DTOModuleInjections.WEBADMIN_DTO)
+ @ProvidesIntoSet
+ public AdditionalInformationDTOModule<? extends TaskExecutionDetails.AdditionalInformation, ? extends AdditionalInformationDTO> webAdminExpireMailboxAdditionalInformationDTO() {
+ return ExpireMailboxAdditionalInformationDTO.module();
+ }
+
}
diff --git a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/routes/MessagesRoutes.java b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/routes/MessagesRoutes.java
index 462fad59ad..d61d8cba60 100644
--- a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/routes/MessagesRoutes.java
+++ b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/routes/MessagesRoutes.java
@@ -29,8 +29,12 @@ import javax.inject.Named;
import org.apache.james.mailbox.indexer.MessageIdReIndexer;
import org.apache.james.mailbox.model.MessageId;
+import org.apache.james.task.Task;
import org.apache.james.task.TaskManager;
import org.apache.james.webadmin.Routes;
+import org.apache.james.webadmin.service.ExpireMailboxService;
+import org.apache.james.webadmin.service.ExpireMailboxTask;
+import org.apache.james.webadmin.tasks.TaskFromRequest;
import org.apache.james.webadmin.tasks.TaskFromRequestRegistry;
import org.apache.james.webadmin.utils.ErrorResponder;
import org.apache.james.webadmin.utils.JsonTransformer;
@@ -48,17 +52,20 @@ public class MessagesRoutes implements Routes {
private final TaskManager taskManager;
private final MessageId.Factory messageIdFactory;
private final MessageIdReIndexer reIndexer;
+ private final ExpireMailboxService expireMailboxService;
private final JsonTransformer jsonTransformer;
private final Set<TaskFromRequestRegistry.TaskRegistration> allMessagesTaskRegistration;
public static final String ALL_MESSAGES_TASKS = "allMessagesTasks";
@Inject
- MessagesRoutes(TaskManager taskManager, MessageId.Factory messageIdFactory, MessageIdReIndexer reIndexer, JsonTransformer jsonTransformer,
+ MessagesRoutes(TaskManager taskManager, MessageId.Factory messageIdFactory, MessageIdReIndexer reIndexer,
+ ExpireMailboxService expireMailboxService, JsonTransformer jsonTransformer,
@Named(ALL_MESSAGES_TASKS) Set<TaskFromRequestRegistry.TaskRegistration> allMessagesTaskRegistration) {
this.taskManager = taskManager;
this.messageIdFactory = messageIdFactory;
this.reIndexer = reIndexer;
+ this.expireMailboxService = expireMailboxService;
this.jsonTransformer = jsonTransformer;
this.allMessagesTaskRegistration = allMessagesTaskRegistration;
}
@@ -70,11 +77,31 @@ public class MessagesRoutes implements Routes {
@Override
public void define(Service service) {
+ TaskFromRequest expireMailboxTaskRequest = this::expireMailbox;
+ service.delete(BASE_PATH, expireMailboxTaskRequest.asRoute(taskManager), jsonTransformer);
service.post(MESSAGE_PATH, reIndexMessage(), jsonTransformer);
allMessagesOperations()
.ifPresent(route -> service.post(BASE_PATH, route, jsonTransformer));
}
+ private Task expireMailbox(Request request) {
+ try {
+ ExpireMailboxService.RunningOptions runningOptions = ExpireMailboxService.RunningOptions.fromParams(
+ Optional.ofNullable(request.queryParams("byExpiresHeader")),
+ Optional.ofNullable(request.queryParams("olderThan")),
+ Optional.ofNullable(request.queryParams("usersPerSecond")),
+ Optional.ofNullable(request.queryParams("mailbox")));
+ return new ExpireMailboxTask(expireMailboxService, runningOptions);
+ } catch (IllegalArgumentException e) {
+ throw ErrorResponder.builder()
+ .statusCode(HttpStatus.BAD_REQUEST_400)
+ .type(ErrorResponder.ErrorType.INVALID_ARGUMENT)
+ .message("Invalid arguments supplied in the user request")
+ .cause(e)
+ .haltError();
+ }
+ }
+
private Route reIndexMessage() {
return TaskFromRequestRegistry.builder()
.parameterName(TASK_PARAMETER)
diff --git a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/ExpireMailboxAdditionalInformationDTO.java b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/ExpireMailboxAdditionalInformationDTO.java
new file mode 100644
index 0000000000..531f111d91
--- /dev/null
+++ b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/ExpireMailboxAdditionalInformationDTO.java
@@ -0,0 +1,118 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.webadmin.service;
+
+import java.time.Instant;
+
+import org.apache.james.json.DTOModule;
+import org.apache.james.server.task.json.dto.AdditionalInformationDTO;
+import org.apache.james.server.task.json.dto.AdditionalInformationDTOModule;
+import org.apache.james.webadmin.service.ExpireMailboxService.RunningOptions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class ExpireMailboxAdditionalInformationDTO implements AdditionalInformationDTO {
+
+ public static AdditionalInformationDTOModule<ExpireMailboxTask.AdditionalInformation, ExpireMailboxAdditionalInformationDTO> module() {
+ return DTOModule.forDomainObject(ExpireMailboxTask.AdditionalInformation.class)
+ .convertToDTO(ExpireMailboxAdditionalInformationDTO.class)
+ .toDomainObjectConverter(ExpireMailboxAdditionalInformationDTO::toDomainObject)
+ .toDTOConverter(ExpireMailboxAdditionalInformationDTO::toDto)
+ .typeName(ExpireMailboxTask.TASK_TYPE.asString())
+ .withFactory(AdditionalInformationDTOModule::new);
+ }
+
+ private static ExpireMailboxTask.AdditionalInformation toDomainObject(ExpireMailboxAdditionalInformationDTO dto) {
+ return new ExpireMailboxTask.AdditionalInformation(
+ dto.getTimestamp(),
+ dto.getRunningOptions(),
+ dto.getMailboxesExpired(),
+ dto.getMailboxesFailed(),
+ dto.getMailboxesProcessed(),
+ dto.getMessagesDeleted());
+ }
+
+ private static ExpireMailboxAdditionalInformationDTO toDto(ExpireMailboxTask.AdditionalInformation details, String type) {
+ return new ExpireMailboxAdditionalInformationDTO(
+ details.timestamp(),
+ type,
+ details.getRunningOptions(),
+ details.getMailboxesExpired(),
+ details.getMailboxesFailed(),
+ details.getMailboxesProcessed(),
+ details.getMessagesDeleted());
+ }
+
+ private final Instant timestamp;
+ private final String type;
+ private final RunningOptions runningOptions;
+ private final long mailboxesExpired;
+ private final long mailboxesFailed;
+ private final long mailboxesProcessed;
+ private final long messagesDeleted;
+
+ @JsonCreator
+ public ExpireMailboxAdditionalInformationDTO(@JsonProperty("timestamp") Instant timestamp,
+ @JsonProperty("type") String type,
+ @JsonProperty("runningOptions") RunningOptions runningOptions,
+ @JsonProperty("mailboxesExpired") long mailboxesExpired,
+ @JsonProperty("mailboxesFailed") long mailboxesFailed,
+ @JsonProperty("mailboxesProcessed") long mailboxesProcessed,
+ @JsonProperty("messagesDeleted") long messagesDeleted) {
+ this.timestamp = timestamp;
+ this.type = type;
+ this.runningOptions = runningOptions;
+ this.mailboxesExpired = mailboxesExpired;
+ this.mailboxesFailed = mailboxesFailed;
+ this.mailboxesProcessed = mailboxesProcessed;
+ this.messagesDeleted = messagesDeleted;
+ }
+
+ public RunningOptions getRunningOptions() {
+ return runningOptions;
+ }
+
+ public long getMailboxesExpired() {
+ return mailboxesExpired;
+ }
+
+ public long getMailboxesFailed() {
+ return mailboxesFailed;
+ }
+
+ public long getMailboxesProcessed() {
+ return mailboxesProcessed;
+ }
+
+ public long getMessagesDeleted() {
+ return messagesDeleted;
+ }
+
+ @Override
+ public String getType() {
+ return type;
+ }
+
+ @Override
+ public Instant getTimestamp() {
+ return timestamp;
+ }
+}
diff --git a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/ExpireMailboxDTO.java b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/ExpireMailboxDTO.java
new file mode 100644
index 0000000000..ebd6ce77aa
--- /dev/null
+++ b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/ExpireMailboxDTO.java
@@ -0,0 +1,66 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.webadmin.service;
+
+import org.apache.james.json.DTOModule;
+import org.apache.james.server.task.json.dto.TaskDTO;
+import org.apache.james.server.task.json.dto.TaskDTOModule;
+import org.apache.james.webadmin.service.ExpireMailboxService.RunningOptions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class ExpireMailboxDTO implements TaskDTO {
+ public static TaskDTOModule<ExpireMailboxTask, ExpireMailboxDTO> module(ExpireMailboxService service) {
+ return DTOModule.forDomainObject(ExpireMailboxTask.class)
+ .convertToDTO(ExpireMailboxDTO.class)
+ .toDomainObjectConverter(dto -> toDomainObject(dto, service))
+ .toDTOConverter(ExpireMailboxDTO::toDto)
+ .typeName(ExpireMailboxTask.TASK_TYPE.asString())
+ .withFactory(TaskDTOModule::new);
+ }
+
+ private static ExpireMailboxTask toDomainObject(ExpireMailboxDTO dto, ExpireMailboxService service) {
+ return new ExpireMailboxTask(service, dto.getRunningOptions());
+ }
+
+ private static ExpireMailboxDTO toDto(ExpireMailboxTask details, String type) {
+ return new ExpireMailboxDTO(details.getRunningOptions(), type);
+ }
+
+ private final RunningOptions runningOptions;
+ private final String type;
+
+ @JsonCreator
+ public ExpireMailboxDTO(@JsonProperty("runningOptions") RunningOptions runningOptions,
+ @JsonProperty("type") String type) {
+ this.runningOptions = runningOptions;
+ this.type = type;
+ }
+
+ @Override
+ public String getType() {
+ return type;
+ }
+
+ public RunningOptions getRunningOptions() {
+ return runningOptions;
+ }
+}
diff --git a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/ExpireMailboxService.java b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/ExpireMailboxService.java
new file mode 100644
index 0000000000..7692f1a295
--- /dev/null
+++ b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/ExpireMailboxService.java
@@ -0,0 +1,248 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.webadmin.service;
+
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.Date;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.inject.Inject;
+
+import org.apache.james.core.Username;
+import org.apache.james.mailbox.MailboxManager;
+import org.apache.james.mailbox.MailboxSession;
+import org.apache.james.mailbox.MessageManager;
+import org.apache.james.mailbox.MessageUid;
+import org.apache.james.mailbox.exception.MailboxException;
+import org.apache.james.mailbox.exception.MailboxNotFoundException;
+import org.apache.james.mailbox.model.MailboxConstants;
+import org.apache.james.mailbox.model.MailboxPath;
+import org.apache.james.mailbox.model.SearchQuery;
+import org.apache.james.mailbox.model.SearchQuery.DateResolution;
+import org.apache.james.task.Task;
+import org.apache.james.task.Task.Result;
+import org.apache.james.user.api.UsersRepository;
+import org.apache.james.user.api.UsersRepositoryException;
+import org.apache.james.util.DurationParser;
+import org.apache.james.util.ReactorUtils;
+import org.apache.james.util.streams.Iterators;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
+
+public class ExpireMailboxService {
+
+ public static class RunningOptions {
+ public static final RunningOptions DEFAULT = new RunningOptions(1, MailboxConstants.INBOX, true, Optional.empty());
+
+ public static RunningOptions fromParams(
+ Optional<String> byExpiresHeader, Optional<String> olderThan,
+ Optional<String> usersPerSecond, Optional<String> mailbox) {
+ try {
+ if (byExpiresHeader.isPresent() == olderThan.isPresent()) {
+ throw new IllegalArgumentException("Must specify either 'olderThan' or 'byExpiresHeader' parameter");
+ }
+ return new RunningOptions(
+ usersPerSecond.map(Integer::parseInt).orElse(DEFAULT.getUsersPerSecond()),
+ mailbox.orElse(DEFAULT.getMailbox()), byExpiresHeader.isPresent(), olderThan);
+ } catch (NumberFormatException ex) {
+ throw new IllegalArgumentException("'usersPerSecond' must be numeric");
+ }
+ }
+
+ private final int usersPerSecond;
+
+ private final String mailbox;
+
+ private final boolean byExpiresHeader;
+
+ private final Optional<String> olderThan;
+
+ @JsonIgnore
+ private final Optional<Duration> maxAgeDuration;
+
+ @JsonCreator
+ public RunningOptions(@JsonProperty("usersPerSecond") int usersPerSecond,
+ @JsonProperty("mailbox") String mailbox,
+ @JsonProperty("byExpiresHeader") boolean byExpiresHeader,
+ @JsonProperty("olderThan") Optional<String> olderThan) {
+ Preconditions.checkArgument(usersPerSecond > 0, "'usersPerSecond' must be strictly positive");
+ this.usersPerSecond = usersPerSecond;
+ this.mailbox = mailbox;
+ this.byExpiresHeader = byExpiresHeader;
+ this.olderThan = olderThan;
+ this.maxAgeDuration = olderThan.map(v -> DurationParser.parse(olderThan.get(), ChronoUnit.DAYS));
+ }
+
+ public int getUsersPerSecond() {
+ return usersPerSecond;
+ }
+
+ public String getMailbox() {
+ return mailbox;
+ }
+
+ public boolean getByExpiresHeader() {
+ return byExpiresHeader;
+ }
+
+ public Optional<String> getOlderThan() {
+ return olderThan;
+ }
+ }
+
+ public static class Context {
+ private final AtomicLong inboxesExpired;
+ private final AtomicLong inboxesFailed;
+ private final AtomicLong inboxesProcessed;
+ private final AtomicLong messagesDeleted;
+
+ public Context() {
+ this.inboxesExpired = new AtomicLong(0L);
+ this.inboxesFailed = new AtomicLong(0L);
+ this.inboxesProcessed = new AtomicLong(0L);
+ this.messagesDeleted = new AtomicLong(0L);
+ }
+
+ public long getInboxesExpired() {
+ return inboxesExpired.get();
+ }
+
+ public long getInboxesFailed() {
+ return inboxesFailed.get();
+ }
+
+ public long getInboxesProcessed() {
+ return inboxesProcessed.get();
+ }
+
+ public long getMessagesDeleted() {
+ return messagesDeleted.get();
+ }
+
+ public void incrementExpiredCount() {
+ inboxesExpired.incrementAndGet();
+ }
+
+ public void incrementFailedCount() {
+ inboxesFailed.incrementAndGet();
+ }
+
+ public void incrementProcessedCount() {
+ inboxesProcessed.incrementAndGet();
+ }
+
+ public void incrementMessagesDeleted(long count) {
+ messagesDeleted.addAndGet(count);
+ }
+ }
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ExpireMailboxService.class);
+
+ private final UsersRepository usersRepository;
+ private final MailboxManager mailboxManager;
+
+ @Inject
+ public ExpireMailboxService(UsersRepository usersRepository, MailboxManager mailboxManager) {
+ this.usersRepository = usersRepository;
+ this.mailboxManager = mailboxManager;
+ }
+
+ public Mono<Result> expireMailboxes(Context context, RunningOptions runningOptions, Date now) {
+ try {
+ SearchQuery expiration = SearchQuery.of(
+ runningOptions.maxAgeDuration.map(maxAge -> {
+ Date limit = Date.from(now.toInstant().minus(maxAge));
+ return SearchQuery.internalDateBefore(limit, DateResolution.Second);
+ })
+ .orElse(
+ SearchQuery.headerDateBefore("Expires", now, DateResolution.Second)
+ )
+ );
+ return Iterators.toFlux(usersRepository.list())
+ .transform(ReactorUtils.<Username, Task.Result>throttle()
+ .elements(runningOptions.getUsersPerSecond())
+ .per(Duration.ofSeconds(1))
+ .forOperation(username ->
+ expireUserMailbox(context, username, runningOptions.getMailbox(), expiration)))
+ .reduce(Task.Result.COMPLETED, Task::combine);
+ } catch (UsersRepositoryException e) {
+ LOGGER.error("Error while accessing users from repository", e);
+ return Mono.just(Task.Result.PARTIAL);
+ }
+ }
+
+ private Mono<Result> expireUserMailbox(Context context, Username username, String mailbox, SearchQuery expiration) {
+ MailboxSession session = mailboxManager.createSystemSession(username);
+ MailboxPath mailboxPath = MailboxPath.forUser(username, mailbox);
+ return Mono.from(mailboxManager.getMailboxReactive(mailboxPath, session))
+ // newly created users do not have mailboxes yet, just skip them
+ .onErrorResume(MailboxNotFoundException.class, ignore -> Mono.empty())
+ .flatMap(mgr -> searchMessagesReactive(mgr, session, expiration)
+ .flatMap(list -> deleteMessagesReactive(mgr, session, list)))
+ .doOnNext(expired -> {
+ if (expired > 0) {
+ context.incrementExpiredCount();
+ context.incrementMessagesDeleted(expired);
+ }
+ context.incrementProcessedCount();
+ })
+ .then(Mono.just(Task.Result.COMPLETED))
+ .onErrorResume(e -> {
+ LOGGER.warn("Failed to expire user mailbox {}", username, e);
+ context.incrementFailedCount();
+ context.incrementProcessedCount();
+ return Mono.just(Task.Result.PARTIAL);
+ });
+ }
+
+ private Mono<List<MessageUid>> searchMessagesReactive(MessageManager mgr, MailboxSession session, SearchQuery expiration) {
+ try {
+ return Flux.from(mgr.search(expiration, session)).collectList();
+ } catch (MailboxException e) {
+ return Mono.error(e);
+ }
+ }
+
+ private Mono<Integer> deleteMessagesReactive(MessageManager mgr, MailboxSession session, List<MessageUid> uids) {
+ if (uids.isEmpty()) {
+ return Mono.just(0);
+ } else {
+ return Mono.fromCallable(
+ () -> {
+ mgr.delete(uids, session);
+ return uids.size();
+ })
+ .subscribeOn(Schedulers.elastic());
+ }
+ }
+}
diff --git a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/ExpireMailboxTask.java b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/ExpireMailboxTask.java
new file mode 100644
index 0000000000..cc7814f070
--- /dev/null
+++ b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/ExpireMailboxTask.java
@@ -0,0 +1,129 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.webadmin.service;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.util.Date;
+import java.util.Optional;
+
+import javax.inject.Inject;
+
+import org.apache.james.task.Task;
+import org.apache.james.task.TaskExecutionDetails;
+import org.apache.james.task.TaskType;
+import org.apache.james.webadmin.service.ExpireMailboxService.Context;
+import org.apache.james.webadmin.service.ExpireMailboxService.RunningOptions;
+
+public class ExpireMailboxTask implements Task {
+ public static final TaskType TASK_TYPE = TaskType.of("ExpireMailboxTask");
+
+ public static class AdditionalInformation implements TaskExecutionDetails.AdditionalInformation {
+
+ public static ExpireMailboxTask.AdditionalInformation from(Context context,
+ RunningOptions runningOptions) {
+ return new ExpireMailboxTask.AdditionalInformation(
+ Clock.systemUTC().instant(),
+ runningOptions,
+ context.getInboxesExpired(),
+ context.getInboxesFailed(),
+ context.getInboxesProcessed(),
+ context.getMessagesDeleted());
+ }
+
+ private final Instant timestamp;
+ private final RunningOptions runningOptions;
+ private final long mailboxesExpired;
+ private final long mailboxesFailed;
+ private final long mailboxesProcessed;
+ private final long messagesDeleted;
+
+ public AdditionalInformation(Instant timestamp,
+ RunningOptions runningOptions,
+ long mailboxesExpired,
+ long mailboxesFailed,
+ long mailboxesProcessed,
+ long messagesDeleted) {
+ this.timestamp = timestamp;
+ this.runningOptions = runningOptions;
+ this.mailboxesExpired = mailboxesExpired;
+ this.mailboxesFailed = mailboxesFailed;
+ this.mailboxesProcessed = mailboxesProcessed;
+ this.messagesDeleted = messagesDeleted;
+ }
+
+ public RunningOptions getRunningOptions() {
+ return runningOptions;
+ }
+
+ public long getMailboxesExpired() {
+ return mailboxesExpired;
+ }
+
+ public long getMailboxesFailed() {
+ return mailboxesFailed;
+ }
+
+ public long getMailboxesProcessed() {
+ return mailboxesProcessed;
+ }
+
+ public long getMessagesDeleted() {
+ return messagesDeleted;
+ }
+
+ @Override
+ public Instant timestamp() {
+ return timestamp;
+ }
+ }
+
+ private final ExpireMailboxService expireMailboxService;
+ private final Context context;
+ private final RunningOptions runningOptions;
+
+ @Inject
+ public ExpireMailboxTask(ExpireMailboxService expireMailboxService,
+ RunningOptions runningOptions) {
+ this.expireMailboxService = expireMailboxService;
+ this.context = new Context();
+ this.runningOptions = runningOptions;
+ }
+
+ @Override
+ public Result run() {
+ return expireMailboxService.expireMailboxes(context, runningOptions, new Date())
+ .block();
+ }
+
+ @Override
+ public TaskType type() {
+ return TASK_TYPE;
+ }
+
+ @Override
+ public Optional<TaskExecutionDetails.AdditionalInformation> details() {
+ return Optional.of(ExpireMailboxTask.AdditionalInformation.from(context, runningOptions));
+ }
+
+ public RunningOptions getRunningOptions() {
+ return runningOptions;
+ }
+}
diff --git a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MessageRoutesExpireTest.java b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MessageRoutesExpireTest.java
new file mode 100644
index 0000000000..d12a8efc41
--- /dev/null
+++ b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MessageRoutesExpireTest.java
@@ -0,0 +1,300 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.webadmin.routes;
+
+import com.google.common.collect.ImmutableSet;
+import io.restassured.RestAssured;
+import org.apache.james.core.Username;
+import org.apache.james.domainlist.api.DomainList;
+import org.apache.james.json.DTOConverter;
+import org.apache.james.mailbox.MailboxSession;
+import org.apache.james.mailbox.MessageManager;
+import org.apache.james.mailbox.inmemory.InMemoryMailboxManager;
+import org.apache.james.mailbox.inmemory.InMemoryMessageId;
+import org.apache.james.mailbox.inmemory.manager.InMemoryIntegrationResources;
+import org.apache.james.mailbox.model.MailboxPath;
+import org.apache.james.task.Hostname;
+import org.apache.james.task.MemoryTaskManager;
+import org.apache.james.user.memory.MemoryUsersRepository;
+import org.apache.james.webadmin.WebAdminServer;
+import org.apache.james.webadmin.WebAdminUtils;
+import org.apache.james.webadmin.service.ExpireMailboxAdditionalInformationDTO;
+import org.apache.james.webadmin.service.ExpireMailboxService;
+import org.apache.james.webadmin.service.ExpireMailboxService.RunningOptions;
+import org.apache.james.webadmin.utils.ErrorResponder;
+import org.apache.james.webadmin.utils.JsonTransformer;
+import org.eclipse.jetty.http.HttpStatus;
+import org.hamcrest.Matchers;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+
+import java.util.Date;
+
+import static io.restassured.RestAssured.given;
+import static io.restassured.RestAssured.when;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+class MessageRoutesExpireTest {
+ private static final DomainList NO_DOMAIN_LIST = null;
+ private static final Username USERNAME = Username.of("benwa");
+ private static final MailboxPath INBOX = MailboxPath.inbox(USERNAME);
+
+ private WebAdminServer webAdminServer;
+ private InMemoryMailboxManager mailboxManager;
+ private MemoryTaskManager taskManager;
+ private MemoryUsersRepository usersRepository;
+
+ @BeforeEach
+ void beforeEach() {
+ taskManager = new MemoryTaskManager(new Hostname("foo"));
+ mailboxManager = InMemoryIntegrationResources.defaultResources().getMailboxManager();
+ usersRepository = MemoryUsersRepository.withoutVirtualHosting(NO_DOMAIN_LIST);
+ JsonTransformer jsonTransformer = new JsonTransformer();
+
+ webAdminServer = WebAdminUtils.createWebAdminServer(
+ new TasksRoutes(taskManager, jsonTransformer,
+ DTOConverter.of(
+ ExpireMailboxAdditionalInformationDTO.module())),
+ new MessagesRoutes(taskManager,
+ new InMemoryMessageId.Factory(),
+ null,
+ new ExpireMailboxService(usersRepository, mailboxManager),
+ jsonTransformer,
+ ImmutableSet.of()))
+ .start();
+
+ RestAssured.requestSpecification = WebAdminUtils.buildRequestSpecification(webAdminServer).build();
+ RestAssured.enableLoggingOfRequestAndResponseIfValidationFails();
+ }
+
+ @AfterEach
+ void tearDown() {
+ webAdminServer.destroy();
+ taskManager.stop();
+ }
+
+ @Nested
+ class ExpireMailbox {
+ @Nested
+ class Validation {
+ @Test
+ void expireMailboxShouldFailWithNoOption() {
+ when()
+ .delete("/messages")
+ .then()
+ .statusCode(HttpStatus.BAD_REQUEST_400)
+ .body("statusCode", is(400))
+ .body("type", is(ErrorResponder.ErrorType.INVALID_ARGUMENT.getType()))
+ .body("message", is("Invalid arguments supplied in the user request"))
+ .body("details", is("Must specify either 'olderThan' or 'byExpiresHeader' parameter"));
+ }
+
+ @Test
+ void expireMailboxShouldFailWithBothOptions() {
+ when()
+ .delete("/messages?byExpiresHeader&olderThan=30d")
+ .then()
+ .statusCode(HttpStatus.BAD_REQUEST_400)
+ .body("statusCode", is(400))
+ .body("type", is(ErrorResponder.ErrorType.INVALID_ARGUMENT.getType()))
+ .body("message", is("Invalid arguments supplied in the user request"))
+ .body("details", is("Must specify either 'olderThan' or 'byExpiresHeader' parameter"));
+ }
+
+ @Test
+ void expireMailboxShouldFailWithBadOlderThan() {
+ when()
+ .delete("/messages?olderThan=bad")
+ .then()
+ .statusCode(HttpStatus.BAD_REQUEST_400)
+ .body("statusCode", is(400))
+ .body("type", is(ErrorResponder.ErrorType.INVALID_ARGUMENT.getType()))
+ .body("message", is("Invalid arguments supplied in the user request"))
+ .body("details", is("'usersPerSecond' must be numeric"));
+ }
+
+ @Test
+ void expireMailboxShouldFailWithNegativeOlderThan() {
+ when()
+ .delete("/messages?olderThan=-30d")
+ .then()
+ .statusCode(HttpStatus.BAD_REQUEST_400)
+ .body("statusCode", is(400))
+ .body("type", is(ErrorResponder.ErrorType.INVALID_ARGUMENT.getType()))
+ .body("message", is("Invalid arguments supplied in the user request"))
+ .body("details", is("Duration amount should be positive"));
+ }
+
+ @Test
+ void expireMailboxShouldFailWithBadUsersPerSeconds() {
+ when()
+ .delete("/messages?byExpiresHeader&usersPerSecond=bad")
+ .then()
+ .statusCode(HttpStatus.BAD_REQUEST_400)
+ .body("statusCode", is(400))
+ .body("type", is(ErrorResponder.ErrorType.INVALID_ARGUMENT.getType()))
+ .body("message", is("Invalid arguments supplied in the user request"))
+ .body("details", is("'usersPerSecond' must be numeric"));
+ }
+
+ @Test
+ void expireMailboxShouldFailWithNegativeUsersPerSeconds() {
+ when()
+ .delete("/messages?byExpiresHeader&usersPerSecond=-10")
+ .then()
+ .statusCode(HttpStatus.BAD_REQUEST_400)
+ .body("statusCode", is(400))
+ .body("type", is(ErrorResponder.ErrorType.INVALID_ARGUMENT.getType()))
+ .body("message", is("Invalid arguments supplied in the user request"))
+ .body("details", is("'usersPerSecond' must be strictly positive"));
+ }
+
+ }
+
+ @Nested
+ class TaskDetails {
+ @Test
+ void expireMailboxShouldNotFailWhenNoMailsFound() {
+ String taskId = when()
+ .delete("/messages?byExpiresHeader")
+ .jsonPath()
+ .get("taskId");
+
+ given()
+ .basePath(TasksRoutes.BASE)
+ .when()
+ .get(taskId + "/await")
+ .then()
+ .body("type", Matchers.is("ExpireMailboxTask"))
+ .body("status", is("completed"))
+ .body("taskId", is(notNullValue()))
+ .body("additionalInformation.type", is("ExpireMailboxTask"))
+ .body("additionalInformation.runningOptions.usersPerSecond", is(RunningOptions.DEFAULT.getUsersPerSecond()))
+ .body("additionalInformation.runningOptions.byExpiresHeader", is(true))
+ .body("additionalInformation.runningOptions.olderThan", is(nullValue()))
+ .body("additionalInformation.mailboxesExpired", is(0))
+ .body("additionalInformation.mailboxesFailed", is(0))
+ .body("additionalInformation.mailboxesProcessed", is(0))
+ .body("additionalInformation.messagesDeleted", is(0));
+ }
+
+ @Test
+ void expireMailboxShouldReturnTaskDetailsWhenMail() throws Exception {
+ usersRepository.addUser(USERNAME, "secret");
+ MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
+ mailboxManager.createMailbox(INBOX, systemSession).get();
+ mailboxManager.getMailbox(INBOX, systemSession).appendMessage(
+ MessageManager.AppendCommand.builder()
+ .withInternalDate(new Date(System.currentTimeMillis() - 5000))
+ .build("header: value\r\n\r\nbody"),
+ systemSession).getId();
+
+ String taskId = when()
+ .delete("/messages?olderThan=1s")
+ .jsonPath()
+ .get("taskId");
+
+ given()
+ .basePath(TasksRoutes.BASE)
+ .when()
+ .get(taskId + "/await")
+ .then()
+ .body("type", Matchers.is("ExpireMailboxTask"))
+ .body("status", is("completed"))
+ .body("taskId", is(notNullValue()))
+ .body("additionalInformation.type", is("ExpireMailboxTask"))
+ .body("additionalInformation.runningOptions.usersPerSecond", is(RunningOptions.DEFAULT.getUsersPerSecond()))
+ .body("additionalInformation.runningOptions.byExpiresHeader", is(false))
+ .body("additionalInformation.runningOptions.olderThan", is("1s"))
+ .body("additionalInformation.mailboxesExpired", is(1))
+ .body("additionalInformation.mailboxesFailed", is(0))
+ .body("additionalInformation.mailboxesProcessed", is(1))
+ .body("additionalInformation.messagesDeleted", is(1));
+ }
+ }
+
+ @Nested
+ class SideEffects {
+ @Test
+ void expireMailboxShouldExpireOldMail() throws Exception {
+ usersRepository.addUser(USERNAME, "secret");
+ MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
+ mailboxManager.createMailbox(INBOX, systemSession).get();
+ mailboxManager.getMailbox(INBOX, systemSession).appendMessage(
+ MessageManager.AppendCommand.builder()
+ .withInternalDate(new Date(System.currentTimeMillis() - 5000))
+ .build("header: value\r\n\r\nbody"),
+ systemSession).getId();
+
+ String taskId = when()
+ .delete("/messages?olderThan=1s")
+ .jsonPath()
+ .get("taskId");
+
+ given()
+ .basePath(TasksRoutes.BASE)
+ .when()
+ .get(taskId + "/await")
+ .then()
+ .body("status", is("completed"))
+ .body("additionalInformation.messagesDeleted", is(1));
+
+ MessageManager mailbox = mailboxManager.getMailbox(INBOX, systemSession);
+ assertThat(mailbox.getMessageCount(systemSession)).isEqualTo(0);
+ }
+
+ @Test
+ void expireMailboxShouldNotExpireNewMail() throws Exception {
+ usersRepository.addUser(USERNAME, "secret");
+ MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
+ mailboxManager.createMailbox(INBOX, systemSession).get();
+ mailboxManager.getMailbox(INBOX, systemSession).appendMessage(
+ MessageManager.AppendCommand.builder()
+ .withInternalDate(new Date(System.currentTimeMillis()))
+ .build("header: value\r\n\r\nbody"),
+ systemSession).getId();
+
+ String taskId = when()
+ .delete("/messages?olderThan=7d")
+ .jsonPath()
+ .get("taskId");
+
+ given()
+ .basePath(TasksRoutes.BASE)
+ .when()
+ .get(taskId + "/await")
+ .then()
+ .body("status", is("completed"))
+ .body("additionalInformation.messagesDeleted", is(0));
+
+ MessageManager mailbox = mailboxManager.getMailbox(INBOX, systemSession);
+ assertThat(mailbox.getMessageCount(systemSession)).isEqualTo(1);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MessageRoutesTest.java b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MessageRoutesTest.java
index f86221fab1..1dd356c446 100644
--- a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MessageRoutesTest.java
+++ b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MessageRoutesTest.java
@@ -77,7 +77,7 @@ class MessageRoutesTest {
void beforeEach() {
taskManager = new MemoryTaskManager(new Hostname("foo"));
mailboxManager = InMemoryIntegrationResources.defaultResources().getMailboxManager();
- searchIndex = mock(ListeningMessageSearchIndex.class);;
+ searchIndex = mock(ListeningMessageSearchIndex.class);
Mockito.when(searchIndex.add(any(), any(), any())).thenReturn(Mono.empty());
Mockito.when(searchIndex.deleteAll(any(), any())).thenReturn(Mono.empty());
ReIndexerPerformer reIndexerPerformer = new ReIndexerPerformer(
@@ -93,6 +93,7 @@ class MessageRoutesTest {
new MessagesRoutes(taskManager,
new InMemoryMessageId.Factory(),
new MessageIdReIndexerImpl(reIndexerPerformer),
+ null,
jsonTransformer,
ImmutableSet.of()))
.start();
diff --git a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/service/ExpireMailboxServiceTest.java b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/service/ExpireMailboxServiceTest.java
new file mode 100644
index 0000000000..1bf6a94066
--- /dev/null
+++ b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/service/ExpireMailboxServiceTest.java
@@ -0,0 +1,354 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.webadmin.service;
+
+import org.apache.james.core.Username;
+import org.apache.james.mailbox.MailboxManager;
+import org.apache.james.mailbox.MailboxSession;
+import org.apache.james.mailbox.MessageManager;
+import org.apache.james.mailbox.MessageUid;
+import org.apache.james.mailbox.exception.MailboxException;
+import org.apache.james.mailbox.inmemory.manager.InMemoryIntegrationResources;
+import org.apache.james.mailbox.model.ComposedMessageId;
+import org.apache.james.mailbox.model.Mailbox;
+import org.apache.james.mailbox.model.MailboxConstants;
+import org.apache.james.mailbox.model.MailboxId;
+import org.apache.james.mailbox.model.MailboxPath;
+import org.apache.james.mailbox.model.MessageId;
+import org.apache.james.mailbox.model.SearchQuery;
+import org.apache.james.mailbox.store.extractor.DefaultTextExtractor;
+import org.apache.james.mailbox.store.search.MessageSearchIndex;
+import org.apache.james.mailbox.store.search.SimpleMessageSearchIndex;
+import org.apache.james.mime4j.dom.Message;
+import org.apache.james.mime4j.field.Fields;
+import org.apache.james.task.Task;
+import org.apache.james.user.api.UsersRepository;
+import org.apache.james.user.api.UsersRepositoryException;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import reactor.core.publisher.Flux;
+
+import java.nio.charset.StandardCharsets;
+import java.time.ZonedDateTime;
+import java.util.Collection;
+import java.util.Date;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.TimeZone;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ExpireMailboxServiceTest {
+
+ private static class FailingSearchIndex implements MessageSearchIndex {
+
+ private MessageSearchIndex delegate;
+
+ private int failuresRemaining = 0;
+
+ public MessageSearchIndex setDelegate(MessageSearchIndex delegate) {
+ this.delegate = delegate;
+ return this;
+ }
+
+ public void generateFailures(int count) {
+ this.failuresRemaining = count;
+ }
+
+ private synchronized void handleFailure() throws MailboxException {
+ if (failuresRemaining > 0) {
+ --failuresRemaining;
+ throw new MailboxException("search failed");
+ }
+ }
+
+ @Override
+ public Flux<MessageUid> search(MailboxSession session, Mailbox mailbox, SearchQuery searchQuery) throws MailboxException {
+ handleFailure();
+ return delegate.search(session, mailbox, searchQuery);
+ }
+
+ @Override
+ public Flux<MessageId> search(MailboxSession session, Collection<MailboxId> mailboxIds, SearchQuery searchQuery, long limit) throws MailboxException {
+ handleFailure();
+ return delegate.search(session, mailboxIds, searchQuery, limit);
+ }
+
+ @Override
+ public EnumSet<MailboxManager.SearchCapabilities> getSupportedCapabilities(EnumSet<MailboxManager.MessageCapabilities> messageCapabilities) {
+ return delegate.getSupportedCapabilities(messageCapabilities);
+ }
+ }
+
+ private static final ExpireMailboxService.RunningOptions OLDER_THAN_1S = new ExpireMailboxService.RunningOptions(1, MailboxConstants.INBOX, false, Optional.of("1s"));
+
+ private final Username alice = Username.of("alice@example.org");
+ private final MailboxPath aliceInbox = MailboxPath.inbox(alice);
+
+ private UsersRepository usersRepository;
+
+ private MailboxSession aliceSession;
+ private MailboxManager mailboxManager;
+ private FailingSearchIndex searchIndex;
+
+ private ExpireMailboxService testee;
+
+ @BeforeEach
+ public void setUp() {
+ searchIndex = new FailingSearchIndex();
+
+ InMemoryIntegrationResources resources = InMemoryIntegrationResources.builder()
+ .preProvisionnedFakeAuthenticator()
+ .fakeAuthorizator()
+ .inVmEventBus()
+ .defaultAnnotationLimits()
+ .defaultMessageParser()
+ .searchIndex(stage -> searchIndex.setDelegate(new SimpleMessageSearchIndex(stage.getMapperFactory(), stage.getMapperFactory(), new DefaultTextExtractor(), stage.getAttachmentContentLoader())))
+ .noPreDeletionHooks()
+ .storeQuotaManager()
+ .build();
+
+ usersRepository = mock(UsersRepository.class);
+
+ mailboxManager = resources.getMailboxManager();
+ aliceSession = mailboxManager.createSystemSession(alice);
+
+ testee = new ExpireMailboxService(usersRepository, mailboxManager);
+ }
+
+ private static Date asDate(ZonedDateTime dateTime) {
+ return Date.from(dateTime.toInstant());
+ }
+
+ private ComposedMessageId appendMessage(MessageManager messageManager, MailboxSession session, ZonedDateTime internalDate) throws Exception {
+ return appendMessage(messageManager, session, Message.Builder.of()
+ .setSubject("test")
+ .setBody("whatever", StandardCharsets.UTF_8)
+ .setDate(asDate(internalDate))
+ .build());
+ }
+
+ private ComposedMessageId appendMessage(MessageManager messageManager, MailboxSession session, Message mailContent) throws Exception {
+ return messageManager.appendMessage(MessageManager.AppendCommand.builder()
+ .withInternalDate(mailContent.getDate())
+ .build(mailContent), session)
+ .getId();
+ }
+
+
+ @Test
+ public void testIgnoresUserListFailure() throws Exception {
+ when(usersRepository.list()).thenThrow(new UsersRepositoryException("it is broken"));
+
+ ExpireMailboxService.Context context = new ExpireMailboxService.Context();
+ Date now = new Date();
+ Task.Result result = testee.expireMailboxes(context, OLDER_THAN_1S, now).block();
+
+ assertThat(result).isEqualTo(Task.Result.PARTIAL);
+ assertThat(context.getInboxesExpired()).isEqualTo(0);
+ assertThat(context.getInboxesFailed()).isEqualTo(0);
+ assertThat(context.getInboxesProcessed()).isEqualTo(0);
+ assertThat(context.getMessagesDeleted()).isEqualTo(0);
+ }
+
+ @Test
+ public void testIgnoresMissingMailbox() throws Exception {
+ when(usersRepository.list()).thenReturn(List.of(alice).iterator());
+
+ // intentionally no mailbox creation
+
+ ExpireMailboxService.Context context = new ExpireMailboxService.Context();
+ Date now = new Date();
+ Task.Result result = testee.expireMailboxes(context, OLDER_THAN_1S, now).block();
+
+ assertThat(result).isEqualTo(Task.Result.COMPLETED);
+ assertThat(context.getInboxesExpired()).isEqualTo(0);
+ assertThat(context.getInboxesFailed()).isEqualTo(0);
+ assertThat(context.getInboxesProcessed()).isEqualTo(0); // skipped
+ assertThat(context.getMessagesDeleted()).isEqualTo(0);
+ }
+
+ @Test
+ public void testIgnoresEmptyMailbox() throws Exception {
+ when(usersRepository.list()).thenReturn(List.of(alice).iterator());
+
+ mailboxManager.createMailbox(aliceInbox, aliceSession);
+
+ ExpireMailboxService.Context context = new ExpireMailboxService.Context();
+ Date now = new Date();
+ Task.Result result = testee.expireMailboxes(context, OLDER_THAN_1S, now).block();
+
+ assertThat(result).isEqualTo(Task.Result.COMPLETED);
+ assertThat(context.getInboxesExpired()).isEqualTo(0);
+ assertThat(context.getInboxesFailed()).isEqualTo(0);
+ assertThat(context.getInboxesProcessed()).isEqualTo(1);
+ assertThat(context.getMessagesDeleted()).isEqualTo(0);
+ assertThat(context.getMessagesDeleted()).isEqualTo(0);
+ }
+
+ @Test
+ public void testHandlesMailboxErrors() throws Exception {
+ when(usersRepository.list()).thenReturn(List.of(alice).iterator());
+
+ mailboxManager.createMailbox(aliceInbox, aliceSession);
+ MessageManager aliceManager = mailboxManager.getMailbox(aliceInbox, aliceSession);
+ appendMessage(aliceManager, aliceSession, ZonedDateTime.now());
+
+ searchIndex.generateFailures(1);
+
+ ExpireMailboxService.Context context = new ExpireMailboxService.Context();
+ Date now = new Date();
+ Task.Result result = testee.expireMailboxes(context, OLDER_THAN_1S, now).block();
+
+ assertThat(result).isEqualTo(Task.Result.PARTIAL);
+ assertThat(context.getInboxesExpired()).isEqualTo(0);
+ assertThat(context.getInboxesFailed()).isEqualTo(1);
+ assertThat(context.getInboxesProcessed()).isEqualTo(1);
+ assertThat(context.getMessagesDeleted()).isEqualTo(0);
+ }
+
+ @Test
+ public void testExpiresMailboxByAge() throws Exception {
+ when(usersRepository.list()).thenReturn(List.of(alice).iterator());
+
+ mailboxManager.createMailbox(aliceInbox, aliceSession);
+ MessageManager aliceManager = mailboxManager.getMailbox(aliceInbox, aliceSession);
+ appendMessage(aliceManager, aliceSession, ZonedDateTime.now().minusSeconds(5));
+
+ ExpireMailboxService.Context context = new ExpireMailboxService.Context();
+ Date now = new Date();
+ Task.Result result = testee.expireMailboxes(context, OLDER_THAN_1S, now).block();
+
+ assertThat(result).isEqualTo(Task.Result.COMPLETED);
+ assertThat(context.getInboxesExpired()).isEqualTo(1);
+ assertThat(context.getInboxesFailed()).isEqualTo(0);
+ assertThat(context.getInboxesProcessed()).isEqualTo(1);
+ assertThat(context.getMessagesDeleted()).isEqualTo(1);
+
+ assertThat(aliceManager.getMessageCount(aliceSession)).isEqualTo(0);
+ }
+
+ @Test
+ public void testExpiresMailboxByHeader() throws Exception {
+ when(usersRepository.list()).thenReturn(List.of(alice).iterator());
+
+ mailboxManager.createMailbox(aliceInbox, aliceSession);
+ MessageManager aliceManager = mailboxManager.getMailbox(aliceInbox, aliceSession);
+
+ ZonedDateTime created = ZonedDateTime.now();
+ ZonedDateTime expires = created.plusSeconds(5);
+ ZonedDateTime now = expires.plusSeconds(10);
+
+ appendMessage(aliceManager, aliceSession, Message.Builder.of()
+ .setSubject("test")
+ .setBody("whatever", StandardCharsets.UTF_8)
+ .setDate(asDate(created))
+ .setField(Fields.date("Expires", asDate(expires), TimeZone.getTimeZone(expires.getZone())))
+ .build()
+ );
+
+ ExpireMailboxService.Context context = new ExpireMailboxService.Context();
+ ExpireMailboxService.RunningOptions options = new ExpireMailboxService.RunningOptions(1, MailboxConstants.INBOX, true, Optional.empty());
+ Task.Result result = testee.expireMailboxes(context, options, asDate(now)).block();
+
+ assertThat(result).isEqualTo(Task.Result.COMPLETED);
+ assertThat(context.getInboxesExpired()).isEqualTo(1);
+ assertThat(context.getInboxesFailed()).isEqualTo(0);
+ assertThat(context.getInboxesProcessed()).isEqualTo(1);
+ assertThat(context.getMessagesDeleted()).isEqualTo(1);
+
+ assertThat(aliceManager.getMessageCount(aliceSession)).isEqualTo(0);
+ }
+
+ @Test
+ public void testExpiresNamedMailbox() throws Exception {
+ when(usersRepository.list()).thenReturn(List.of(alice).iterator());
+
+ String mailboxName = "Archived";
+ MailboxPath mailboxPath = MailboxPath.forUser(alice, mailboxName);
+ mailboxManager.createMailbox(mailboxPath, aliceSession);
+ MessageManager aliceManager = mailboxManager.getMailbox(mailboxPath, aliceSession);
+ appendMessage(aliceManager, aliceSession, ZonedDateTime.now().minusSeconds(5));
+
+ ExpireMailboxService.Context context = new ExpireMailboxService.Context();
+ ExpireMailboxService.RunningOptions options = new ExpireMailboxService.RunningOptions(1, mailboxName, false, Optional.of("1s"));
+ Date now = new Date();
+ Task.Result result = testee.expireMailboxes(context, options, now).block();
+
+ assertThat(result).isEqualTo(Task.Result.COMPLETED);
+ assertThat(context.getInboxesExpired()).isEqualTo(1);
+ assertThat(context.getInboxesFailed()).isEqualTo(0);
+ assertThat(context.getInboxesProcessed()).isEqualTo(1);
+ assertThat(context.getMessagesDeleted()).isEqualTo(1);
+
+ assertThat(aliceManager.getMessageCount(aliceSession)).isEqualTo(0);
+ }
+
+ @Test
+ public void testExpiresNamedMailbox2() throws Exception {
+ when(usersRepository.list()).thenReturn(List.of(alice).iterator());
+
+ ExpireMailboxService.Context context = new ExpireMailboxService.Context();
+ ExpireMailboxService.RunningOptions options = new ExpireMailboxService.RunningOptions(1, "NoSuchMailbox", false, Optional.of("1s"));
+ Date now = new Date();
+ Task.Result result = testee.expireMailboxes(context, options, now).block();
+
+ assertThat(result).isEqualTo(Task.Result.COMPLETED);
+ assertThat(context.getInboxesExpired()).isEqualTo(0);
+ assertThat(context.getInboxesFailed()).isEqualTo(0);
+ assertThat(context.getInboxesProcessed()).isEqualTo(0);
+ assertThat(context.getMessagesDeleted()).isEqualTo(0);
+ }
+
+ @Test
+ public void testContinuesAfterFailure() throws Exception {
+ Username bob = Username.of("bob@example.org");
+ MailboxPath bobInbox = MailboxPath.inbox(bob);
+ MailboxSession bobSession = mailboxManager.createSystemSession(bob);
+
+ when(usersRepository.list()).thenReturn(List.of(alice, bob).iterator());
+
+ mailboxManager.createMailbox(aliceInbox, aliceSession);
+ MessageManager aliceManager = mailboxManager.getMailbox(aliceInbox, aliceSession);
+ appendMessage(aliceManager, aliceSession, ZonedDateTime.now().minusSeconds(5));
+
+ mailboxManager.createMailbox(bobInbox, bobSession);
+ MessageManager bobManager = mailboxManager.getMailbox(bobInbox, bobSession);
+ appendMessage(bobManager, bobSession, ZonedDateTime.now().minusSeconds(5));
+
+ searchIndex.generateFailures(1);
+
+ ExpireMailboxService.Context context = new ExpireMailboxService.Context();
+ Date now = new Date();
+ Task.Result result = testee.expireMailboxes(context, OLDER_THAN_1S, now).block();
+
+ assertThat(result).isEqualTo(Task.Result.PARTIAL);
+ assertThat(context.getInboxesExpired()).isEqualTo(1);
+ assertThat(context.getInboxesFailed()).isEqualTo(1);
+ assertThat(context.getInboxesProcessed()).isEqualTo(2);
+ assertThat(context.getMessagesDeleted()).isEqualTo(1);
+
+ assertThat(aliceManager.getMessageCount(aliceSession)).isEqualTo(1);
+ assertThat(bobManager.getMessageCount(bobSession)).isEqualTo(0);
+ }
+}
diff --git a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/service/ExpireMailboxTaskAdditionalInformationDTOTest.java b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/service/ExpireMailboxTaskAdditionalInformationDTOTest.java
new file mode 100644
index 0000000000..66a2027ec4
--- /dev/null
+++ b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/service/ExpireMailboxTaskAdditionalInformationDTOTest.java
@@ -0,0 +1,43 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.webadmin.service;
+
+import org.apache.james.JsonSerializationVerifier;
+import org.apache.james.mailbox.model.MailboxConstants;
+import org.apache.james.util.ClassLoaderUtils;
+import org.junit.jupiter.api.Test;
+
+import java.time.Instant;
+import java.util.Optional;
+
+public class ExpireMailboxTaskAdditionalInformationDTOTest {
+ private static final Instant INSTANT = Instant.parse("2007-12-03T10:15:30.00Z");
+
+ private static final ExpireMailboxTask.AdditionalInformation DOMAIN_OBJECT = new ExpireMailboxTask.AdditionalInformation(
+ INSTANT, new ExpireMailboxService.RunningOptions(1, MailboxConstants.INBOX, false, Optional.of("90d")), 5, 2, 10, 234);
+
+ @Test
+ void shouldMatchJsonSerializationContract() throws Exception {
+ JsonSerializationVerifier.dtoModule(ExpireMailboxAdditionalInformationDTO.module())
+ .bean(DOMAIN_OBJECT)
+ .json(ClassLoaderUtils.getSystemResourceAsString("json/expireMailbox.additionalInformation.json"))
+ .verify();
+ }
+}
diff --git a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/service/ExpireMailboxTaskSerializationTest.java b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/service/ExpireMailboxTaskSerializationTest.java
new file mode 100644
index 0000000000..7efa30747d
--- /dev/null
+++ b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/service/ExpireMailboxTaskSerializationTest.java
@@ -0,0 +1,55 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.webadmin.service;
+
+import org.apache.james.JsonSerializationVerifier;
+import org.apache.james.mailbox.model.MailboxConstants;
+import org.apache.james.util.ClassLoaderUtils;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Optional;
+
+import static org.mockito.Mockito.mock;
+
+public class ExpireMailboxTaskSerializationTest {
+ private ExpireMailboxService expireMailboxService;
+
+ @BeforeEach
+ void setUp() {
+ expireMailboxService = mock(ExpireMailboxService.class);
+ }
+
+ @Test
+ void shouldMatchJsonSerializationContractWithOlderThan() throws Exception {
+ JsonSerializationVerifier.dtoModule(ExpireMailboxDTO.module(expireMailboxService))
+ .bean(new ExpireMailboxTask(expireMailboxService, new ExpireMailboxService.RunningOptions(1, MailboxConstants.INBOX, false, Optional.of("90d"))))
+ .json(ClassLoaderUtils.getSystemResourceAsString("json/expireMailbox.age.task.json"))
+ .verify();
+ }
+
+ @Test
+ void shouldMatchJsonSerializationContractWithExpiresHeader() throws Exception {
+ JsonSerializationVerifier.dtoModule(ExpireMailboxDTO.module(expireMailboxService))
+ .bean(new ExpireMailboxTask(expireMailboxService, new ExpireMailboxService.RunningOptions(1, MailboxConstants.INBOX, true, Optional.empty())))
+ .json(ClassLoaderUtils.getSystemResourceAsString("json/expireMailbox.header.task.json"))
+ .verify();
+ }
+}
diff --git a/server/protocols/webadmin/webadmin-mailbox/src/test/resources/json/expireMailbox.additionalInformation.json b/server/protocols/webadmin/webadmin-mailbox/src/test/resources/json/expireMailbox.additionalInformation.json
new file mode 100644
index 0000000000..2a22a06a12
--- /dev/null
+++ b/server/protocols/webadmin/webadmin-mailbox/src/test/resources/json/expireMailbox.additionalInformation.json
@@ -0,0 +1,14 @@
+{
+ "mailboxesExpired": 5,
+ "mailboxesFailed": 2,
+ "mailboxesProcessed": 10,
+ "messagesDeleted": 234,
+ "runningOptions": {
+ "usersPerSecond": 1,
+ "mailbox": "INBOX",
+ "byExpiresHeader": false,
+ "olderThan": "90d"
+ },
+ "timestamp": "2007-12-03T10:15:30Z",
+ "type": "ExpireMailboxTask"
+}
diff --git a/server/protocols/webadmin/webadmin-mailbox/src/test/resources/json/expireMailbox.age.task.json b/server/protocols/webadmin/webadmin-mailbox/src/test/resources/json/expireMailbox.age.task.json
new file mode 100644
index 0000000000..c36ed0be9d
--- /dev/null
+++ b/server/protocols/webadmin/webadmin-mailbox/src/test/resources/json/expireMailbox.age.task.json
@@ -0,0 +1,9 @@
+{
+ "runningOptions": {
+ "usersPerSecond": 1,
+ "mailbox": "INBOX",
+ "byExpiresHeader": false,
+ "olderThan": "90d"
+ },
+ "type": "ExpireMailboxTask"
+}
diff --git a/server/protocols/webadmin/webadmin-mailbox/src/test/resources/json/expireMailbox.header.task.json b/server/protocols/webadmin/webadmin-mailbox/src/test/resources/json/expireMailbox.header.task.json
new file mode 100644
index 0000000000..30efffbae4
--- /dev/null
+++ b/server/protocols/webadmin/webadmin-mailbox/src/test/resources/json/expireMailbox.header.task.json
@@ -0,0 +1,8 @@
+{
+ "runningOptions": {
+ "usersPerSecond": 1,
+ "mailbox": "INBOX",
+ "byExpiresHeader": true
+ },
+ "type": "ExpireMailboxTask"
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org