You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/07/17 02:24:16 UTC
[james-project] 02/31: JAMES-3296 Add task to republish RabbitMQ
MailQueue from Cassandra
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 7bc6a36334b4765d373b7610f26a1f1d044fbe28
Author: RĂ©mi KOWALSKI <rk...@linagora.com>
AuthorDate: Tue Jul 7 17:32:23 2020 +0200
JAMES-3296 Add task to republish RabbitMQ MailQueue from Cassandra
---
pom.xml | 5 +
.../guice/cassandra-rabbitmq-guice/pom.xml | 4 +
.../james/CassandraRabbitMQJamesServerMain.java | 3 +-
server/container/guice/pom.xml | 6 +
.../webadmin-rabbitmq-mailqueue}/pom.xml | 25 ++-
.../server/RabbitMailQueueRoutesModule.java | 36 +++++
.../RabbitMailQueueTaskSerializationModule.java | 53 +++++++
server/container/guice/rabbitmq/pom.xml | 8 +
...dminServerTaskSerializationIntegrationTest.java | 23 ++-
server/protocols/webadmin/pom.xml | 1 +
.../protocols/webadmin/webadmin-mailqueue/pom.xml | 2 +-
.../pom.xml | 18 ++-
.../webadmin/routes/RabbitMQMailQueuesRoutes.java | 174 +++++++++++++++++++++
...ProcessedMailsTaskAdditionalInformationDTO.java | 90 +++++++++++
.../service/RepublishNotProcessedMailsTaskDTO.java | 85 ++++++++++
.../service/RepublishNotprocessedMailsTask.java | 107 +++++++++++++
.../routes/RabbitMQMailQueuesRoutesTest.java | 144 +++++++++++++++++
.../RepublishNotprocessedMailsTaskTest.java | 108 +++++++++++++
src/site/markdown/server/manage-webadmin.md | 35 +++++
19 files changed, 903 insertions(+), 24 deletions(-)
diff --git a/pom.xml b/pom.xml
index f2de229..0e7b225 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1877,6 +1877,11 @@
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
+ <artifactId>james-server-webadmin-rabbitmq</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${james.groupId}</groupId>
<artifactId>james-server-webadmin-swagger</artifactId>
<version>${project.version}</version>
</dependency>
diff --git a/server/container/guice/cassandra-rabbitmq-guice/pom.xml b/server/container/guice/cassandra-rabbitmq-guice/pom.xml
index 1b02418..6edd29c 100644
--- a/server/container/guice/cassandra-rabbitmq-guice/pom.xml
+++ b/server/container/guice/cassandra-rabbitmq-guice/pom.xml
@@ -181,6 +181,10 @@
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
+ <artifactId>james-server-webadmin-rabbitmq</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>${james.groupId}</groupId>
<artifactId>testing-base</artifactId>
<scope>test</scope>
</dependency>
diff --git a/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/CassandraRabbitMQJamesServerMain.java b/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/CassandraRabbitMQJamesServerMain.java
index 2e40639..322acd3 100644
--- a/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/CassandraRabbitMQJamesServerMain.java
+++ b/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/CassandraRabbitMQJamesServerMain.java
@@ -29,6 +29,7 @@ import org.apache.james.modules.blobstore.BlobStoreModulesChooser;
import org.apache.james.modules.event.RabbitMQEventBusModule;
import org.apache.james.modules.rabbitmq.RabbitMQModule;
import org.apache.james.modules.server.JMXServerModule;
+import org.apache.james.modules.server.RabbitMailQueueRoutesModule;
import com.google.inject.Module;
import com.google.inject.util.Modules;
@@ -37,7 +38,7 @@ public class CassandraRabbitMQJamesServerMain implements JamesServerMain {
protected static final Module MODULES =
Modules
.override(Modules.combine(REQUIRE_TASK_MANAGER_MODULE, new DistributedTaskManagerModule()))
- .with(new RabbitMQModule(), new RabbitMQEventBusModule(), new DistributedTaskSerializationModule());
+ .with(new RabbitMQModule(), new RabbitMailQueueRoutesModule(), new RabbitMQEventBusModule(), new DistributedTaskSerializationModule());
public static void main(String[] args) throws Exception {
CassandraRabbitMQJamesConfiguration configuration = CassandraRabbitMQJamesConfiguration.builder()
diff --git a/server/container/guice/pom.xml b/server/container/guice/pom.xml
index bf8fe7f..cbae105 100644
--- a/server/container/guice/pom.xml
+++ b/server/container/guice/pom.xml
@@ -71,6 +71,7 @@
<module>protocols/webadmin-mailbox</module>
<module>protocols/webadmin-mailqueue</module>
<module>protocols/webadmin-mailrepository</module>
+ <module>protocols/webadmin-rabbitmq-mailqueue</module>
<module>protocols/webadmin-swagger</module>
<module>rabbitmq</module>
<module>testing</module>
@@ -200,6 +201,11 @@
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
+ <artifactId>james-server-guice-webadmin-rabbitmq-mailqueue</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${james.groupId}</groupId>
<artifactId>james-server-guice-webadmin-mailrepository</artifactId>
<version>${project.version}</version>
</dependency>
diff --git a/server/container/guice/rabbitmq/pom.xml b/server/container/guice/protocols/webadmin-rabbitmq-mailqueue/pom.xml
similarity index 73%
copy from server/container/guice/rabbitmq/pom.xml
copy to server/container/guice/protocols/webadmin-rabbitmq-mailqueue/pom.xml
index b117e4f..9d625f4 100644
--- a/server/container/guice/rabbitmq/pom.xml
+++ b/server/container/guice/protocols/webadmin-rabbitmq-mailqueue/pom.xml
@@ -18,35 +18,29 @@
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
<modelVersion>4.0.0</modelVersion>
+
<parent>
- <groupId>org.apache.james</groupId>
<artifactId>james-server-guice</artifactId>
+ <groupId>org.apache.james</groupId>
<version>3.6.0-SNAPSHOT</version>
- <relativePath>../pom.xml</relativePath>
+ <relativePath>../../pom.xml</relativePath>
</parent>
- <artifactId>james-server-guice-rabbitmq</artifactId>
+ <artifactId>james-server-guice-webadmin-rabbitmq-mailqueue</artifactId>
- <name>Apache James :: Server :: Guice :: RabbitMQ</name>
- <description>Guice Module for RabbitMQ</description>
+ <name>Apache James :: Server :: Guice :: Webadmin :: RabbitMQ :: MailQueue</name>
+ <description>Webadmin rabbitMQ mailqueue modules for Guice implementation of James server</description>
<dependencies>
<dependency>
<groupId>${james.groupId}</groupId>
- <artifactId>apache-james-backends-rabbitmq</artifactId>
- </dependency>
- <dependency>
- <groupId>${james.groupId}</groupId>
- <artifactId>james-server-guice-configuration</artifactId>
+ <artifactId>james-server-webadmin-core</artifactId>
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
- <artifactId>james-server-queue-api</artifactId>
- </dependency>
- <dependency>
- <groupId>${james.groupId}</groupId>
- <artifactId>james-server-queue-rabbitmq</artifactId>
+ <artifactId>james-server-webadmin-rabbitmq</artifactId>
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
@@ -58,5 +52,4 @@
<artifactId>guice</artifactId>
</dependency>
</dependencies>
-
</project>
diff --git a/server/container/guice/protocols/webadmin-rabbitmq-mailqueue/src/main/java/org/apache/james/modules/server/RabbitMailQueueRoutesModule.java b/server/container/guice/protocols/webadmin-rabbitmq-mailqueue/src/main/java/org/apache/james/modules/server/RabbitMailQueueRoutesModule.java
new file mode 100644
index 0000000..dbffe76
--- /dev/null
+++ b/server/container/guice/protocols/webadmin-rabbitmq-mailqueue/src/main/java/org/apache/james/modules/server/RabbitMailQueueRoutesModule.java
@@ -0,0 +1,36 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.modules.server;
+
+import org.apache.james.webadmin.Routes;
+import org.apache.james.webadmin.routes.RabbitMQMailQueuesRoutes;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.multibindings.Multibinder;
+
+public class RabbitMailQueueRoutesModule extends AbstractModule {
+ @Override
+ protected void configure() {
+ install(new RabbitMailQueueTaskSerializationModule());
+
+ Multibinder<Routes> routesMultibinder = Multibinder.newSetBinder(binder(), Routes.class);
+ routesMultibinder.addBinding().to(RabbitMQMailQueuesRoutes.class);
+ }
+}
diff --git a/server/container/guice/protocols/webadmin-rabbitmq-mailqueue/src/main/java/org/apache/james/modules/server/RabbitMailQueueTaskSerializationModule.java b/server/container/guice/protocols/webadmin-rabbitmq-mailqueue/src/main/java/org/apache/james/modules/server/RabbitMailQueueTaskSerializationModule.java
new file mode 100644
index 0000000..61d7048
--- /dev/null
+++ b/server/container/guice/protocols/webadmin-rabbitmq-mailqueue/src/main/java/org/apache/james/modules/server/RabbitMailQueueTaskSerializationModule.java
@@ -0,0 +1,53 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+package org.apache.james.modules.server;
+
+import org.apache.james.queue.api.MailQueueFactory;
+import org.apache.james.queue.rabbitmq.RabbitMQMailQueue;
+import org.apache.james.server.task.json.dto.AdditionalInformationDTO;
+import org.apache.james.server.task.json.dto.AdditionalInformationDTOModule;
+import org.apache.james.server.task.json.dto.TaskDTO;
+import org.apache.james.server.task.json.dto.TaskDTOModule;
+import org.apache.james.task.Task;
+import org.apache.james.task.TaskExecutionDetails;
+import org.apache.james.webadmin.dto.DTOModuleInjections;
+import org.apache.james.webadmin.service.RepublishNotProcessedMailsTaskAdditionalInformationDTO;
+import org.apache.james.webadmin.service.RepublishNotProcessedMailsTaskDTO;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.multibindings.ProvidesIntoSet;
+import com.google.inject.name.Named;
+
+public class RabbitMailQueueTaskSerializationModule extends AbstractModule {
+ @ProvidesIntoSet
+ public TaskDTOModule<? extends Task, ? extends TaskDTO> republishNotProcessedMailsTask(MailQueueFactory<RabbitMQMailQueue> mailQueueFactory) {
+ return RepublishNotProcessedMailsTaskDTO.module(mailQueueFactory);
+ }
+
+ @ProvidesIntoSet
+ public AdditionalInformationDTOModule<? extends TaskExecutionDetails.AdditionalInformation, ? extends AdditionalInformationDTO> republishNotProcessedMailsAdditionalInformation() {
+ return RepublishNotProcessedMailsTaskAdditionalInformationDTO.module();
+ }
+
+ @Named(DTOModuleInjections.WEBADMIN_DTO)
+ @ProvidesIntoSet
+ public AdditionalInformationDTOModule<? extends TaskExecutionDetails.AdditionalInformation, ? extends AdditionalInformationDTO> republishNotProcessedMailsAdditionalInformationWebAdmin() {
+ return RepublishNotProcessedMailsTaskAdditionalInformationDTO.module();
+ }
+}
diff --git a/server/container/guice/rabbitmq/pom.xml b/server/container/guice/rabbitmq/pom.xml
index b117e4f..5c4b0cc 100644
--- a/server/container/guice/rabbitmq/pom.xml
+++ b/server/container/guice/rabbitmq/pom.xml
@@ -42,6 +42,14 @@
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
+ <artifactId>james-server-guice-webadmin-rabbitmq-mailqueue</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>${james.groupId}</groupId>
+ <artifactId>james-server-webadmin-rabbitmq</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>${james.groupId}</groupId>
<artifactId>james-server-queue-api</artifactId>
</dependency>
<dependency>
diff --git a/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQWebAdminServerTaskSerializationIntegrationTest.java b/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQWebAdminServerTaskSerializationIntegrationTest.java
index f0775df..3d5e3ed 100644
--- a/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQWebAdminServerTaskSerializationIntegrationTest.java
+++ b/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQWebAdminServerTaskSerializationIntegrationTest.java
@@ -797,4 +797,25 @@ class RabbitMQWebAdminServerTaskSerializationIntegrationTest {
.mailboxPath(MailboxPath.forUser(Username.of(USERNAME), "Important-mailbox"))
.build();
}
-}
\ No newline at end of file
+
+ @Test
+ void republishNotProcessedMailsOnSpoolShouldComplete() {
+ String taskId = with()
+ .basePath("/mailQueues/spool")
+ .queryParam("action", "RepublishNotProcessedMails")
+ .queryParam("olderThan", "2d")
+ .post()
+ .jsonPath()
+ .get("taskId");
+
+ given()
+ .basePath(TasksRoutes.BASE)
+ .when()
+ .get(taskId + "/await")
+ .then()
+ .body("status", is("completed"))
+ .body("taskId", is(taskId))
+ .body("type", is("republish-not-processed-mails"))
+ .body("additionalInformation.nbRequeuedMails", is(0));
+ }
+}
diff --git a/server/protocols/webadmin/pom.xml b/server/protocols/webadmin/pom.xml
index 4600e0a..db7f387 100644
--- a/server/protocols/webadmin/pom.xml
+++ b/server/protocols/webadmin/pom.xml
@@ -42,6 +42,7 @@
<module>webadmin-mailbox-deleted-message-vault</module>
<module>webadmin-mailqueue</module>
<module>webadmin-mailrepository</module>
+ <module>webadmin-rabbitmq</module>
<module>webadmin-swagger</module>
</modules>
diff --git a/server/protocols/webadmin/webadmin-mailqueue/pom.xml b/server/protocols/webadmin/webadmin-mailqueue/pom.xml
index f119ef2..a35a339 100644
--- a/server/protocols/webadmin/webadmin-mailqueue/pom.xml
+++ b/server/protocols/webadmin/webadmin-mailqueue/pom.xml
@@ -129,7 +129,7 @@
<version>v1</version>
</info>
<swaggerDirectory>${project.build.directory}</swaggerDirectory>
- <swaggerFileName>webadmin-mailbox</swaggerFileName>
+ <swaggerFileName>webadmin-mailqueue</swaggerFileName>
</apiSource>
</apiSources>
</configuration>
diff --git a/server/protocols/webadmin/webadmin-mailqueue/pom.xml b/server/protocols/webadmin/webadmin-rabbitmq/pom.xml
similarity index 88%
copy from server/protocols/webadmin/webadmin-mailqueue/pom.xml
copy to server/protocols/webadmin/webadmin-rabbitmq/pom.xml
index f119ef2..862deff 100644
--- a/server/protocols/webadmin/webadmin-mailqueue/pom.xml
+++ b/server/protocols/webadmin/webadmin-rabbitmq/pom.xml
@@ -27,10 +27,10 @@
<relativePath>../../../pom.xml</relativePath>
</parent>
- <artifactId>james-server-webadmin-mailqueue</artifactId>
+ <artifactId>james-server-webadmin-rabbitmq</artifactId>
<packaging>jar</packaging>
- <name>Apache James :: Server :: Web Admin :: MailQueue</name>
+ <name>Apache James :: Server :: Web Admin :: RabbitMQ</name>
<dependencies>
<dependency>
@@ -56,7 +56,11 @@
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
- <artifactId>james-server-queue-memory</artifactId>
+ <artifactId>james-server-queue-rabbitmq</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>${james.groupId}</groupId>
+ <artifactId>james-server-testing</artifactId>
<scope>test</scope>
</dependency>
<dependency>
@@ -65,7 +69,7 @@
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
- <artifactId>james-server-task-memory</artifactId>
+ <artifactId>james-server-task-distributed</artifactId>
<scope>test</scope>
</dependency>
<dependency>
@@ -80,6 +84,10 @@
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
+ <artifactId>james-server-webadmin-mailqueue</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>${james.groupId}</groupId>
<artifactId>metrics-tests</artifactId>
<scope>test</scope>
</dependency>
@@ -129,7 +137,7 @@
<version>v1</version>
</info>
<swaggerDirectory>${project.build.directory}</swaggerDirectory>
- <swaggerFileName>webadmin-mailbox</swaggerFileName>
+ <swaggerFileName>webadmin-rabbitmq</swaggerFileName>
</apiSource>
</apiSources>
</configuration>
diff --git a/server/protocols/webadmin/webadmin-rabbitmq/src/main/java/org/apache/james/webadmin/routes/RabbitMQMailQueuesRoutes.java b/server/protocols/webadmin/webadmin-rabbitmq/src/main/java/org/apache/james/webadmin/routes/RabbitMQMailQueuesRoutes.java
new file mode 100644
index 0000000..c366df6
--- /dev/null
+++ b/server/protocols/webadmin/webadmin-rabbitmq/src/main/java/org/apache/james/webadmin/routes/RabbitMQMailQueuesRoutes.java
@@ -0,0 +1,174 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.webadmin.routes;
+
+import static org.apache.james.webadmin.Constants.SEPARATOR;
+import static org.apache.james.webadmin.routes.MailQueueRoutes.BASE_URL;
+import static org.apache.james.webadmin.routes.MailQueueRoutes.MAIL_QUEUE_NAME;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import java.util.function.Predicate;
+
+import javax.inject.Inject;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+
+import org.apache.james.queue.api.MailQueueFactory;
+import org.apache.james.queue.api.MailQueueName;
+import org.apache.james.queue.rabbitmq.RabbitMQMailQueue;
+import org.apache.james.task.Task;
+import org.apache.james.task.TaskManager;
+import org.apache.james.util.DurationParser;
+import org.apache.james.webadmin.Routes;
+import org.apache.james.webadmin.service.RepublishNotprocessedMailsTask;
+import org.apache.james.webadmin.tasks.TaskFromRequest;
+import org.apache.james.webadmin.tasks.TaskFromRequestRegistry;
+import org.apache.james.webadmin.tasks.TaskRegistrationKey;
+import org.apache.james.webadmin.utils.ErrorResponder;
+import org.apache.james.webadmin.utils.JsonTransformer;
+import org.eclipse.jetty.http.HttpStatus;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiImplicitParam;
+import io.swagger.annotations.ApiImplicitParams;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import spark.Request;
+import spark.Service;
+
+@Api(tags = "MailQueues")
+@Path(BASE_URL)
+@Produces("application/json")
+public class RabbitMQMailQueuesRoutes implements Routes {
+
+ private static final TaskRegistrationKey REPUBLISH_NOT_PROCESSED_MAILS_REGISTRATION_KEY = TaskRegistrationKey.of("RepublishNotProcessedMails");
+
+ private final MailQueueFactory<RabbitMQMailQueue> mailQueueFactory;
+ private final JsonTransformer jsonTransformer;
+ private final TaskManager taskManager;
+ private final Clock clock;
+
+ @Inject
+ @SuppressWarnings("unchecked")
+ @VisibleForTesting
+ RabbitMQMailQueuesRoutes(MailQueueFactory<RabbitMQMailQueue> mailQueueFactory,
+ Clock clock, JsonTransformer jsonTransformer, TaskManager taskManager) {
+ this.mailQueueFactory = mailQueueFactory;
+ this.clock = clock;
+ this.jsonTransformer = jsonTransformer;
+ this.taskManager = taskManager;
+ }
+
+ @Override
+ public String getBasePath() {
+ return BASE_URL;
+ }
+
+ @Override
+ public void define(Service service) {
+ republishNotProcessedMails(service);
+ }
+
+
+ @POST
+ @Path("/{mailQueueName}")
+ @ApiImplicitParams({
+ @ApiImplicitParam(required = true, dataType = "string", name = "mailQueueName", paramType = "path"),
+ @ApiImplicitParam(
+ required = true,
+ dataType = "String",
+ name = "action",
+ paramType = "query",
+ example = "?action=RepublishNotProcessedMails",
+ value = "Specify the action to perform on a RabbitMQ mail queue."),
+ @ApiImplicitParam(
+ required = true,
+ dataType = "String",
+ name = "olderThan",
+ paramType = "query",
+ example = "?olderThan=1w",
+ value = "Specify the messages minimum age to republish")
+ })
+ @ApiOperation(
+ value = "republish the not processed mails of the RabbitMQ MailQueue using the cassandra mail queue view"
+ )
+ @ApiResponses(value = {
+ @ApiResponse(code = HttpStatus.CREATED_201, message = "OK, the task for rebuilding the queue is created"),
+ @ApiResponse(code = HttpStatus.BAD_REQUEST_400, message = "Invalid request for rebuilding the mail queue."),
+ @ApiResponse(code = HttpStatus.INTERNAL_SERVER_ERROR_500, message = "Internal server error - Something went bad on the server side.")
+ })
+ public void republishNotProcessedMails(Service service) {
+ TaskFromRequest taskFromRequest = this::republishNotProcessedMails;
+ service.post(BASE_URL + SEPARATOR + MAIL_QUEUE_NAME,
+ TaskFromRequestRegistry.builder()
+ .register(REPUBLISH_NOT_PROCESSED_MAILS_REGISTRATION_KEY, this::republishNotProcessedMails)
+ .buildAsRoute(taskManager),
+ jsonTransformer);
+ }
+
+ private Task republishNotProcessedMails(Request request) {
+ RabbitMQMailQueue mailQueue = getMailQueue(MailQueueName.of(request.params(MAIL_QUEUE_NAME)));
+ return new RepublishNotprocessedMailsTask(mailQueue, getOlderThan(request));
+ }
+
+
+ private RabbitMQMailQueue getMailQueue(MailQueueName mailQueueName) {
+ return mailQueueFactory.getQueue(mailQueueName)
+ .orElseThrow(
+ () -> ErrorResponder.builder()
+ .message("%s can not be found", mailQueueName)
+ .statusCode(HttpStatus.NOT_FOUND_404)
+ .type(ErrorResponder.ErrorType.NOT_FOUND)
+ .haltError());
+ }
+
+ private Instant getOlderThan(Request req) {
+ try {
+ Duration olderThan = Optional.ofNullable(req.queryParams("olderThan"))
+ .filter(Predicate.not(String::isEmpty))
+ .map(rawString -> DurationParser.parse(rawString, ChronoUnit.DAYS))
+ .orElseThrow();
+
+ return clock.instant().minus(olderThan);
+ } catch (NoSuchElementException e) {
+ throw ErrorResponder.builder()
+ .message("Missing olderThan")
+ .statusCode(HttpStatus.BAD_REQUEST_400)
+ .type(ErrorResponder.ErrorType.INVALID_ARGUMENT)
+ .haltError();
+ } catch (Exception e) {
+ throw ErrorResponder.builder()
+ .statusCode(HttpStatus.BAD_REQUEST_400)
+ .cause(e)
+ .type(ErrorResponder.ErrorType.INVALID_ARGUMENT)
+ .message("Invalid olderThan")
+ .haltError();
+ }
+ }
+}
diff --git a/server/protocols/webadmin/webadmin-rabbitmq/src/main/java/org/apache/james/webadmin/service/RepublishNotProcessedMailsTaskAdditionalInformationDTO.java b/server/protocols/webadmin/webadmin-rabbitmq/src/main/java/org/apache/james/webadmin/service/RepublishNotProcessedMailsTaskAdditionalInformationDTO.java
new file mode 100644
index 0000000..3621530
--- /dev/null
+++ b/server/protocols/webadmin/webadmin-rabbitmq/src/main/java/org/apache/james/webadmin/service/RepublishNotProcessedMailsTaskAdditionalInformationDTO.java
@@ -0,0 +1,90 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+package org.apache.james.webadmin.service;
+
+import java.time.Instant;
+
+import org.apache.james.json.DTOModule;
+import org.apache.james.queue.api.MailQueueName;
+import org.apache.james.server.task.json.dto.AdditionalInformationDTO;
+import org.apache.james.server.task.json.dto.AdditionalInformationDTOModule;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class RepublishNotProcessedMailsTaskAdditionalInformationDTO implements AdditionalInformationDTO {
+
+ public static AdditionalInformationDTOModule<RepublishNotprocessedMailsTask.AdditionalInformation, RepublishNotProcessedMailsTaskAdditionalInformationDTO> module() {
+ return DTOModule.forDomainObject(RepublishNotprocessedMailsTask.AdditionalInformation.class)
+ .convertToDTO(RepublishNotProcessedMailsTaskAdditionalInformationDTO.class)
+ .toDomainObjectConverter(dto -> new RepublishNotprocessedMailsTask.AdditionalInformation(
+ MailQueueName.of(dto.mailQueue),
+ dto.olderThan,
+ dto.nbRequeuedMails,
+ dto.timestamp))
+ .toDTOConverter((details, type) -> new RepublishNotProcessedMailsTaskAdditionalInformationDTO(
+ type,
+ details.getMailQueue().asString(),
+ details.getOlderThan(),
+ details.getNbRequeuedMails(),
+ details.timestamp()))
+ .typeName(RepublishNotprocessedMailsTask.TYPE.asString())
+ .withFactory(AdditionalInformationDTOModule::new);
+ }
+
+ private final String type;
+ private final String mailQueue;
+
+ private final long nbRequeuedMails;
+ private final Instant olderThan;
+ private final Instant timestamp;
+
+ public RepublishNotProcessedMailsTaskAdditionalInformationDTO(@JsonProperty("type") String type,
+ @JsonProperty("mailQueue") String mailQueue,
+ @JsonProperty("olderThan") Instant olderThan,
+ @JsonProperty("nbRequeuedMails") long nbRequeuedMails,
+ @JsonProperty("timestamp") Instant timestamp) {
+ this.type = type;
+ this.mailQueue = mailQueue;
+ this.olderThan = olderThan;
+ this.nbRequeuedMails = nbRequeuedMails;
+ this.timestamp = timestamp;
+ }
+
+ public String getMailQueue() {
+ return mailQueue;
+ }
+
+ public long getNbRequeuedMails() {
+ return nbRequeuedMails;
+ }
+
+ public Instant getOlderThan() {
+ return olderThan;
+ }
+
+ @Override
+ public String getType() {
+ return type;
+ }
+
+ @Override
+ public Instant getTimestamp() {
+ return timestamp;
+ }
+}
diff --git a/server/protocols/webadmin/webadmin-rabbitmq/src/main/java/org/apache/james/webadmin/service/RepublishNotProcessedMailsTaskDTO.java b/server/protocols/webadmin/webadmin-rabbitmq/src/main/java/org/apache/james/webadmin/service/RepublishNotProcessedMailsTaskDTO.java
new file mode 100644
index 0000000..5502b35
--- /dev/null
+++ b/server/protocols/webadmin/webadmin-rabbitmq/src/main/java/org/apache/james/webadmin/service/RepublishNotProcessedMailsTaskDTO.java
@@ -0,0 +1,85 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+package org.apache.james.webadmin.service;
+
+import java.time.Instant;
+
+import org.apache.james.json.DTOModule;
+import org.apache.james.queue.api.MailQueueFactory;
+import org.apache.james.queue.api.MailQueueName;
+import org.apache.james.queue.rabbitmq.RabbitMQMailQueue;
+import org.apache.james.server.task.json.dto.TaskDTO;
+import org.apache.james.server.task.json.dto.TaskDTOModule;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class RepublishNotProcessedMailsTaskDTO implements TaskDTO {
+
+ public static class UnknownMailQueueException extends RuntimeException {
+ public UnknownMailQueueException(MailQueueName mailQueueName) {
+ super("Unknown mail queue " + mailQueueName.asString());
+ }
+ }
+
+ public static TaskDTOModule<RepublishNotprocessedMailsTask, RepublishNotProcessedMailsTaskDTO> module(MailQueueFactory<RabbitMQMailQueue> mailQueueFactory) {
+ return DTOModule
+ .forDomainObject(RepublishNotprocessedMailsTask.class)
+ .convertToDTO(RepublishNotProcessedMailsTaskDTO.class)
+ .toDomainObjectConverter(dto -> dto.fromDTO(mailQueueFactory))
+ .toDTOConverter(RepublishNotProcessedMailsTaskDTO::toDTO)
+ .typeName(RepublishNotprocessedMailsTask.TYPE.asString())
+ .withFactory(TaskDTOModule::new);
+ }
+
+ public static RepublishNotProcessedMailsTaskDTO toDTO(RepublishNotprocessedMailsTask domainObject, String typeName) {
+ return new RepublishNotProcessedMailsTaskDTO(typeName, domainObject.getMailQueue().asString(), domainObject.getOlderThan());
+ }
+
+ private final String type;
+ private final String mailQueue;
+ private final Instant olderThan;
+
+ public RepublishNotProcessedMailsTaskDTO(@JsonProperty("type") String type, @JsonProperty("mailQueue") String mailQueue, @JsonProperty("olderThan") Instant olderThan) {
+ this.type = type;
+ this.mailQueue = mailQueue;
+ this.olderThan = olderThan;
+ }
+
+ public RepublishNotprocessedMailsTask fromDTO(MailQueueFactory<RabbitMQMailQueue> mailQueueFactory) {
+ MailQueueName requestedMailQueueName = MailQueueName.of(mailQueue);
+ RabbitMQMailQueue queue = mailQueueFactory
+ .getQueue(requestedMailQueueName)
+ .orElseThrow(() -> new UnknownMailQueueException(requestedMailQueueName));
+
+ return new RepublishNotprocessedMailsTask(queue, olderThan);
+ }
+
+ @Override
+ public String getType() {
+ return type;
+ }
+
+ public Instant getOlderThan() {
+ return olderThan;
+ }
+
+ public String getMailQueue() {
+ return mailQueue;
+ }
+}
diff --git a/server/protocols/webadmin/webadmin-rabbitmq/src/main/java/org/apache/james/webadmin/service/RepublishNotprocessedMailsTask.java b/server/protocols/webadmin/webadmin-rabbitmq/src/main/java/org/apache/james/webadmin/service/RepublishNotprocessedMailsTask.java
new file mode 100644
index 0000000..0428335
--- /dev/null
+++ b/server/protocols/webadmin/webadmin-rabbitmq/src/main/java/org/apache/james/webadmin/service/RepublishNotprocessedMailsTask.java
@@ -0,0 +1,107 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.webadmin.service;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.james.queue.api.MailQueueName;
+import org.apache.james.queue.rabbitmq.RabbitMQMailQueue;
+import org.apache.james.task.Task;
+import org.apache.james.task.TaskExecutionDetails;
+import org.apache.james.task.TaskType;
+
+
+public class RepublishNotprocessedMailsTask implements Task {
+
+ public static class AdditionalInformation implements TaskExecutionDetails.AdditionalInformation {
+
+ private final Instant timestamp;
+ private final long nbRequeuedMails;
+ private final MailQueueName mailQueue;
+ private final Instant olderThan;
+
+ public AdditionalInformation(MailQueueName mailQueue, Instant olderThan, long nbRequeuedMails, Instant timestamp) {
+ this.mailQueue = mailQueue;
+ this.olderThan = olderThan;
+ this.timestamp = timestamp;
+ this.nbRequeuedMails = nbRequeuedMails;
+ }
+
+ @Override
+ public Instant timestamp() {
+ return timestamp;
+ }
+
+ public Instant getOlderThan() {
+ return olderThan;
+ }
+
+ public MailQueueName getMailQueue() {
+ return mailQueue;
+ }
+
+ public long getNbRequeuedMails() {
+ return nbRequeuedMails;
+ }
+ }
+
+ public static final TaskType TYPE = TaskType.of("republish-not-processed-mails");
+
+ private final Instant olderThan;
+ private final RabbitMQMailQueue mailQueue;
+ private final AtomicInteger nbRequeuedMails;
+
+ public RepublishNotprocessedMailsTask(RabbitMQMailQueue mailQueue, Instant olderThan) {
+ this.olderThan = olderThan;
+ this.mailQueue = mailQueue;
+ this.nbRequeuedMails = new AtomicInteger(0);
+ }
+
+ @Override
+ public Result run() {
+ mailQueue.republishNotProcessedMails(olderThan)
+ .doOnNext(mailName -> nbRequeuedMails.getAndIncrement())
+ .then()
+ .block();
+
+ return Result.COMPLETED;
+ }
+
+ @Override
+ public TaskType type() {
+ return TYPE;
+ }
+
+ @Override
+ public Optional<TaskExecutionDetails.AdditionalInformation> details() {
+ return Optional.of(new AdditionalInformation(mailQueue.getName(), olderThan, nbRequeuedMails.get(), Clock.systemUTC().instant()));
+ }
+
+ public Instant getOlderThan() {
+ return olderThan;
+ }
+
+ public MailQueueName getMailQueue() {
+ return mailQueue.getName();
+ }
+}
diff --git a/server/protocols/webadmin/webadmin-rabbitmq/src/test/java/org/apache/james/webadmin/routes/RabbitMQMailQueuesRoutesTest.java b/server/protocols/webadmin/webadmin-rabbitmq/src/test/java/org/apache/james/webadmin/routes/RabbitMQMailQueuesRoutesTest.java
new file mode 100644
index 0000000..4daaa08
--- /dev/null
+++ b/server/protocols/webadmin/webadmin-rabbitmq/src/test/java/org/apache/james/webadmin/routes/RabbitMQMailQueuesRoutesTest.java
@@ -0,0 +1,144 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.webadmin.routes;
+
+import static io.restassured.RestAssured.given;
+import static io.restassured.config.EncoderConfig.encoderConfig;
+import static io.restassured.config.RestAssuredConfig.newConfig;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.hasSize;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.nio.charset.StandardCharsets;
+import java.time.Clock;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
+import java.util.Optional;
+
+import org.apache.james.json.DTOConverter;
+import org.apache.james.queue.api.MailQueueFactory;
+import org.apache.james.queue.rabbitmq.RabbitMQMailQueue;
+import org.apache.james.queue.rabbitmq.RabbitMQMailQueueFactory;
+import org.apache.james.task.Hostname;
+import org.apache.james.task.MemoryTaskManager;
+import org.apache.james.task.TaskManager;
+import org.apache.james.utils.UpdatableTickingClock;
+import org.apache.james.webadmin.WebAdminServer;
+import org.apache.james.webadmin.WebAdminUtils;
+import org.apache.james.webadmin.utils.JsonTransformer;
+import org.eclipse.jetty.http.HttpStatus;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import io.restassured.RestAssured;
+import io.restassured.builder.RequestSpecBuilder;
+import io.restassured.http.ContentType;
+import io.restassured.specification.RequestSpecification;
+
+class RabbitMQMailQueuesRoutesTest {
+ final static ZonedDateTime DATE = ZonedDateTime.parse("2015-10-30T14:12:00Z");
+
+ WebAdminServer webAdminServer;
+ MailQueueFactory mailQueueFactory;
+ Clock clock;
+
+ WebAdminServer createServer(MailQueueFactory mailQueueFactory) {
+ TaskManager taskManager = new MemoryTaskManager(new Hostname("foo"));
+ JsonTransformer jsonTransformer = new JsonTransformer();
+ clock = UpdatableTickingClock.fixed(DATE.toInstant(), ZoneOffset.UTC);
+ return WebAdminUtils.createWebAdminServer(
+ new RabbitMQMailQueuesRoutes(mailQueueFactory, clock, jsonTransformer, taskManager),
+ new TasksRoutes(taskManager, jsonTransformer, DTOConverter.of()))
+ .start();
+ }
+
+ RequestSpecification buildRequestSpecification(WebAdminServer server) {
+ return new RequestSpecBuilder()
+ .setContentType(ContentType.JSON)
+ .setAccept(ContentType.JSON)
+ .setBasePath("/")
+ .setPort(server.getPort().getValue())
+ .setConfig(newConfig().encoderConfig(encoderConfig().defaultContentCharset(StandardCharsets.UTF_8)))
+ .build();
+ }
+
+ @BeforeEach
+ void setUp() {
+ mailQueueFactory = mock(RabbitMQMailQueueFactory.class);
+ webAdminServer = createServer(mailQueueFactory);
+ RestAssured.requestSpecification = buildRequestSpecification(webAdminServer);
+ RestAssured.enableLoggingOfRequestAndResponseIfValidationFails();
+ }
+
+ @AfterEach
+ void tearDown() {
+ webAdminServer.destroy();
+ }
+
+ @Test
+ void triggeringARepublishNotProcessedMailsShouldCreateATask() {
+ when(mailQueueFactory.getQueue(any())).thenReturn(Optional.of(mock(RabbitMQMailQueue.class)));
+ given()
+ .queryParam("action", "RepublishNotProcessedMails")
+ .queryParam("olderThan", "1d")
+ .when()
+ .post(MailQueueRoutes.BASE_URL + "/spooler")
+ .then()
+ .statusCode(HttpStatus.CREATED_201);
+
+ given()
+ .when()
+ .get("/tasks")
+ .then()
+ .statusCode(HttpStatus.OK_200)
+ .body("", hasSize(1));
+ }
+
+ @Test
+ void triggeringARepublishNotProcessedMailsWhenTheQueueHasNotBeenInitializedShouldFail() {
+ when(mailQueueFactory.getQueue(any())).thenReturn(Optional.empty());
+ given()
+ .queryParam("action", "RepublishNotProcessedMails")
+ .queryParam("olderThan", "1d")
+ .when()
+ .post(MailQueueRoutes.BASE_URL + "/spooler")
+ .then()
+ .statusCode(HttpStatus.NOT_FOUND_404)
+ .body("message", containsString("MailQueueName{value=spooler} can not be found"));
+ }
+
+ @Test
+ void triggeringARepublishNotProcessedMailsWithAnInvalidOlderThanShouldFail() {
+ when(mailQueueFactory.getQueue(any())).thenReturn(Optional.of(mock(RabbitMQMailQueue.class)));
+ given()
+ .queryParam("action", "RepublishNotProcessedMails")
+ .queryParam("olderThan", "invalidValue")
+ .when()
+ .post(MailQueueRoutes.BASE_URL + "/spooler")
+ .then()
+ .statusCode(HttpStatus.BAD_REQUEST_400)
+ .body("message", containsString("Invalid olderThan"))
+ .body("details", containsString("Supplied value do not follow the unit format (number optionally suffixed with a string representing the unit"));
+ }
+
+}
diff --git a/server/protocols/webadmin/webadmin-rabbitmq/src/test/java/org/apache/james/webadmin/service/RepublishNotprocessedMailsTaskTest.java b/server/protocols/webadmin/webadmin-rabbitmq/src/test/java/org/apache/james/webadmin/service/RepublishNotprocessedMailsTaskTest.java
new file mode 100644
index 0000000..ac35da1
--- /dev/null
+++ b/server/protocols/webadmin/webadmin-rabbitmq/src/test/java/org/apache/james/webadmin/service/RepublishNotprocessedMailsTaskTest.java
@@ -0,0 +1,108 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ * ***************************************************************/
+
+package org.apache.james.webadmin.service;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.time.Instant;
+import java.util.Optional;
+
+import org.apache.james.JsonSerializationVerifier;
+import org.apache.james.json.JsonGenericSerializer;
+import org.apache.james.queue.api.MailQueueName;
+import org.apache.james.queue.rabbitmq.RabbitMQMailQueue;
+import org.apache.james.queue.rabbitmq.RabbitMQMailQueueFactory;
+import org.junit.jupiter.api.Test;
+
+class RepublishNotprocessedMailsTaskTest {
+ private static final Instant OLDER_THAN = Instant.parse("2018-11-13T12:00:55Z");
+ private static final Instant NOW = Instant.now();
+ private static final long NB_REQUEUED_MAILS = 12;
+ private static final String SERIALIZED = "{\"type\": \"republish-not-processed-mails\",\"mailQueue\":\"anyQueue\", \"olderThan\": \"" + OLDER_THAN + "\"}";
+ private static final String SERIALIZED_TASK_ADDITIONAL_INFORMATION = "{\"type\": \"republish-not-processed-mails\",\"mailQueue\":\"anyQueue\", \"olderThan\": \"" + OLDER_THAN + "\" ,\"nbRequeuedMails\":12,\"timestamp\":\"" + NOW.toString() + "\"}";
+ private static final MailQueueName QUEUE_NAME = MailQueueName.of("anyQueue");
+
+ @Test
+ void taskShouldBeSerializable() throws Exception {
+ RabbitMQMailQueueFactory mockedQueueFactory = mock(RabbitMQMailQueueFactory.class);
+ RabbitMQMailQueue mockedQueue = mock(RabbitMQMailQueue.class);
+
+ when(mockedQueue.getName()).thenReturn(QUEUE_NAME);
+ when(mockedQueueFactory.getQueue(QUEUE_NAME)).thenReturn(Optional.of(mockedQueue));
+
+ RepublishNotprocessedMailsTask task = new RepublishNotprocessedMailsTask(mockedQueue, OLDER_THAN);
+
+ JsonSerializationVerifier.dtoModule(RepublishNotProcessedMailsTaskDTO.module(mockedQueueFactory))
+ .bean(task)
+ .json(SERIALIZED)
+ .verify();
+ }
+
+ @Test
+ void taskShouldBeDeserializable() throws Exception {
+ RabbitMQMailQueueFactory mockedQueueFactory = mock(RabbitMQMailQueueFactory.class);
+ RabbitMQMailQueue mockedQueue = mock(RabbitMQMailQueue.class);
+
+ when(mockedQueue.getName()).thenReturn(QUEUE_NAME);
+ when(mockedQueueFactory.getQueue(QUEUE_NAME)).thenReturn(Optional.of(mockedQueue));
+
+ RepublishNotprocessedMailsTask task = new RepublishNotprocessedMailsTask(mockedQueue, OLDER_THAN);
+ JsonSerializationVerifier.dtoModule(RepublishNotProcessedMailsTaskDTO.module(mockedQueueFactory))
+ .bean(task)
+ .json(SERIALIZED)
+ .verify();
+ }
+
+ @Test
+ void taskDeserializationFromUnknownQueueNameShouldThrow() {
+ RabbitMQMailQueueFactory mockedQueueFactory = mock(RabbitMQMailQueueFactory.class);
+ RabbitMQMailQueue mockedQueue = mock(RabbitMQMailQueue.class);
+
+ when(mockedQueue.getName()).thenReturn(QUEUE_NAME);
+ when(mockedQueueFactory.getQueue(QUEUE_NAME)).thenReturn(Optional.empty());
+
+ RepublishNotprocessedMailsTask task = new RepublishNotprocessedMailsTask(mockedQueue, OLDER_THAN);
+ assertThatThrownBy(() -> JsonSerializationVerifier.dtoModule(RepublishNotProcessedMailsTaskDTO.module(mockedQueueFactory))
+ .bean(task)
+ .json(SERIALIZED)
+ .verify())
+ .isInstanceOf(RepublishNotProcessedMailsTaskDTO.UnknownMailQueueException.class);
+ }
+
+ @Test
+ void additionalInformationShouldBeSerializable() throws Exception {
+ RepublishNotprocessedMailsTask.AdditionalInformation details = new RepublishNotprocessedMailsTask.AdditionalInformation(QUEUE_NAME, OLDER_THAN, NB_REQUEUED_MAILS, NOW);
+ JsonSerializationVerifier.dtoModule(RepublishNotProcessedMailsTaskAdditionalInformationDTO.module())
+ .bean(details)
+ .json(SERIALIZED_TASK_ADDITIONAL_INFORMATION)
+ .verify();
+ }
+
+ @Test
+ void additionalInformationShouldBeDeserializable() throws Exception {
+ RepublishNotprocessedMailsTask.AdditionalInformation details = new RepublishNotprocessedMailsTask.AdditionalInformation(QUEUE_NAME, OLDER_THAN, NB_REQUEUED_MAILS, NOW);
+ RepublishNotprocessedMailsTask.AdditionalInformation deserialized = JsonGenericSerializer.forModules(RepublishNotProcessedMailsTaskAdditionalInformationDTO.module())
+ .withoutNestedType()
+ .deserialize(SERIALIZED_TASK_ADDITIONAL_INFORMATION);
+ assertThat(deserialized).isEqualToComparingFieldByField(details);
+ }
+}
diff --git a/src/site/markdown/server/manage-webadmin.md b/src/site/markdown/server/manage-webadmin.md
index 707417d..ff18829 100644
--- a/src/site/markdown/server/manage-webadmin.md
+++ b/src/site/markdown/server/manage-webadmin.md
@@ -2900,6 +2900,7 @@ The scheduled task will have the following type `reprocessing-one` and the follo
- [Deleting mails from a mail queue](#Deleting_mails_from_a_mail_queue)
- [Clearing a mail queue](#Clearing_a_mail_queue)
- [Flushing mails from a mail queue](#Flushing_mails_from_a_mail_queue)
+ - [RabbitMQ republishing a mail queue from cassandra](#RabbitMQ_republishing_a_mail_queue_from_cassandra)
### Listing mail queues
@@ -3048,6 +3049,40 @@ Response codes:
- 204: Success (No content)
- 400: Invalid request
- 404: The mail queue does not exist
+
+### RabbitMQ republishing a mail queue from cassandra
+
+```
+curl -XPOST 'http://ip:port/mailQueues/{mailQueueName}?action=RepublishNotProcessedMails&olderThan=1d'
+```
+
+This method is specific to the distributed flavor of James, which relies on Cassandra and RabbitMQ for implementing a mail queue.
+In case of a RabbitMQ crash resulting in a loss of messages, this task can be launched to repopulate the
+`mailQueueName` queue in RabbitMQ using the information stored in Cassandra.
+
+The `olderThan` parameter is mandatory. It filters the mails to be restored, by taking into account only
+the mails older than the given value.
+The expected value should be expressed in the following format: `Nunit`.
+`N` should be strictly positive.
+`unit` could be either in the short form (`h`, `d`, `w`, etc.), or in the long form (`day`, `week`, `month`, etc.).
+
+Examples:
+
+ - `5h`
+ - `7d`
+ - `1y`
+
+Response codes:
+
+ - 201: Task created
+ - 400: Invalid request
+
+ The response body contains the id of the republishing task.
+ ```
+ {
+ "taskId": "a650a66a-5984-431e-bdad-f1baad885856"
+ }
+ ```
## Administrating DLP Configuration
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org