You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/07/17 02:24:16 UTC

[james-project] 02/31: JAMES-3296 Add task to republish RabbitMQ MailQueue from Cassandra

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 7bc6a36334b4765d373b7610f26a1f1d044fbe28
Author: RĂ©mi KOWALSKI <rk...@linagora.com>
AuthorDate: Tue Jul 7 17:32:23 2020 +0200

    JAMES-3296 Add task to republish RabbitMQ MailQueue from Cassandra
---
 pom.xml                                            |   5 +
 .../guice/cassandra-rabbitmq-guice/pom.xml         |   4 +
 .../james/CassandraRabbitMQJamesServerMain.java    |   3 +-
 server/container/guice/pom.xml                     |   6 +
 .../webadmin-rabbitmq-mailqueue}/pom.xml           |  25 ++-
 .../server/RabbitMailQueueRoutesModule.java        |  36 +++++
 .../RabbitMailQueueTaskSerializationModule.java    |  53 +++++++
 server/container/guice/rabbitmq/pom.xml            |   8 +
 ...dminServerTaskSerializationIntegrationTest.java |  23 ++-
 server/protocols/webadmin/pom.xml                  |   1 +
 .../protocols/webadmin/webadmin-mailqueue/pom.xml  |   2 +-
 .../pom.xml                                        |  18 ++-
 .../webadmin/routes/RabbitMQMailQueuesRoutes.java  | 174 +++++++++++++++++++++
 ...ProcessedMailsTaskAdditionalInformationDTO.java |  90 +++++++++++
 .../service/RepublishNotProcessedMailsTaskDTO.java |  85 ++++++++++
 .../service/RepublishNotprocessedMailsTask.java    | 107 +++++++++++++
 .../routes/RabbitMQMailQueuesRoutesTest.java       | 144 +++++++++++++++++
 .../RepublishNotprocessedMailsTaskTest.java        | 108 +++++++++++++
 src/site/markdown/server/manage-webadmin.md        |  35 +++++
 19 files changed, 903 insertions(+), 24 deletions(-)

diff --git a/pom.xml b/pom.xml
index f2de229..0e7b225 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1877,6 +1877,11 @@
             </dependency>
             <dependency>
                 <groupId>${james.groupId}</groupId>
+                <artifactId>james-server-webadmin-rabbitmq</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>${james.groupId}</groupId>
                 <artifactId>james-server-webadmin-swagger</artifactId>
                 <version>${project.version}</version>
             </dependency>
diff --git a/server/container/guice/cassandra-rabbitmq-guice/pom.xml b/server/container/guice/cassandra-rabbitmq-guice/pom.xml
index 1b02418..6edd29c 100644
--- a/server/container/guice/cassandra-rabbitmq-guice/pom.xml
+++ b/server/container/guice/cassandra-rabbitmq-guice/pom.xml
@@ -181,6 +181,10 @@
         </dependency>
         <dependency>
             <groupId>${james.groupId}</groupId>
+            <artifactId>james-server-webadmin-rabbitmq</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
             <artifactId>testing-base</artifactId>
             <scope>test</scope>
         </dependency>
diff --git a/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/CassandraRabbitMQJamesServerMain.java b/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/CassandraRabbitMQJamesServerMain.java
index 2e40639..322acd3 100644
--- a/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/CassandraRabbitMQJamesServerMain.java
+++ b/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/CassandraRabbitMQJamesServerMain.java
@@ -29,6 +29,7 @@ import org.apache.james.modules.blobstore.BlobStoreModulesChooser;
 import org.apache.james.modules.event.RabbitMQEventBusModule;
 import org.apache.james.modules.rabbitmq.RabbitMQModule;
 import org.apache.james.modules.server.JMXServerModule;
+import org.apache.james.modules.server.RabbitMailQueueRoutesModule;
 
 import com.google.inject.Module;
 import com.google.inject.util.Modules;
@@ -37,7 +38,7 @@ public class CassandraRabbitMQJamesServerMain implements JamesServerMain {
     protected static final Module MODULES =
         Modules
             .override(Modules.combine(REQUIRE_TASK_MANAGER_MODULE, new DistributedTaskManagerModule()))
-            .with(new RabbitMQModule(), new RabbitMQEventBusModule(), new DistributedTaskSerializationModule());
+            .with(new RabbitMQModule(), new RabbitMailQueueRoutesModule(), new RabbitMQEventBusModule(), new DistributedTaskSerializationModule());
 
     public static void main(String[] args) throws Exception {
         CassandraRabbitMQJamesConfiguration configuration = CassandraRabbitMQJamesConfiguration.builder()
diff --git a/server/container/guice/pom.xml b/server/container/guice/pom.xml
index bf8fe7f..cbae105 100644
--- a/server/container/guice/pom.xml
+++ b/server/container/guice/pom.xml
@@ -71,6 +71,7 @@
         <module>protocols/webadmin-mailbox</module>
         <module>protocols/webadmin-mailqueue</module>
         <module>protocols/webadmin-mailrepository</module>
+        <module>protocols/webadmin-rabbitmq-mailqueue</module>
         <module>protocols/webadmin-swagger</module>
         <module>rabbitmq</module>
         <module>testing</module>
@@ -200,6 +201,11 @@
             </dependency>
             <dependency>
                 <groupId>${james.groupId}</groupId>
+                <artifactId>james-server-guice-webadmin-rabbitmq-mailqueue</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>${james.groupId}</groupId>
                 <artifactId>james-server-guice-webadmin-mailrepository</artifactId>
                 <version>${project.version}</version>
             </dependency>
diff --git a/server/container/guice/rabbitmq/pom.xml b/server/container/guice/protocols/webadmin-rabbitmq-mailqueue/pom.xml
similarity index 73%
copy from server/container/guice/rabbitmq/pom.xml
copy to server/container/guice/protocols/webadmin-rabbitmq-mailqueue/pom.xml
index b117e4f..9d625f4 100644
--- a/server/container/guice/rabbitmq/pom.xml
+++ b/server/container/guice/protocols/webadmin-rabbitmq-mailqueue/pom.xml
@@ -18,35 +18,29 @@
     under the License.
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
     <modelVersion>4.0.0</modelVersion>
+    
     <parent>
-        <groupId>org.apache.james</groupId>
         <artifactId>james-server-guice</artifactId>
+        <groupId>org.apache.james</groupId>
         <version>3.6.0-SNAPSHOT</version>
-        <relativePath>../pom.xml</relativePath>
+        <relativePath>../../pom.xml</relativePath>
     </parent>
 
-    <artifactId>james-server-guice-rabbitmq</artifactId>
+    <artifactId>james-server-guice-webadmin-rabbitmq-mailqueue</artifactId>
 
-    <name>Apache James :: Server :: Guice :: RabbitMQ</name>
-    <description>Guice Module for RabbitMQ</description>
+    <name>Apache James :: Server :: Guice :: Webadmin :: RabbitMQ :: MailQueue</name>
+    <description>Webadmin rabbitMQ mailqueue modules for Guice implementation of James server</description>
 
     <dependencies>
         <dependency>
             <groupId>${james.groupId}</groupId>
-            <artifactId>apache-james-backends-rabbitmq</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>${james.groupId}</groupId>
-            <artifactId>james-server-guice-configuration</artifactId>
+            <artifactId>james-server-webadmin-core</artifactId>
         </dependency>
         <dependency>
             <groupId>${james.groupId}</groupId>
-            <artifactId>james-server-queue-api</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>${james.groupId}</groupId>
-            <artifactId>james-server-queue-rabbitmq</artifactId>
+            <artifactId>james-server-webadmin-rabbitmq</artifactId>
         </dependency>
         <dependency>
             <groupId>${james.groupId}</groupId>
@@ -58,5 +52,4 @@
             <artifactId>guice</artifactId>
         </dependency>
     </dependencies>
-
 </project>
diff --git a/server/container/guice/protocols/webadmin-rabbitmq-mailqueue/src/main/java/org/apache/james/modules/server/RabbitMailQueueRoutesModule.java b/server/container/guice/protocols/webadmin-rabbitmq-mailqueue/src/main/java/org/apache/james/modules/server/RabbitMailQueueRoutesModule.java
new file mode 100644
index 0000000..dbffe76
--- /dev/null
+++ b/server/container/guice/protocols/webadmin-rabbitmq-mailqueue/src/main/java/org/apache/james/modules/server/RabbitMailQueueRoutesModule.java
@@ -0,0 +1,36 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.modules.server;
+
+import org.apache.james.webadmin.Routes;
+import org.apache.james.webadmin.routes.RabbitMQMailQueuesRoutes;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.multibindings.Multibinder;
+
+public class RabbitMailQueueRoutesModule extends AbstractModule {
+    @Override
+    protected void configure() {
+        install(new RabbitMailQueueTaskSerializationModule());
+
+        Multibinder<Routes> routesMultibinder = Multibinder.newSetBinder(binder(), Routes.class);
+        routesMultibinder.addBinding().to(RabbitMQMailQueuesRoutes.class);
+    }
+}
diff --git a/server/container/guice/protocols/webadmin-rabbitmq-mailqueue/src/main/java/org/apache/james/modules/server/RabbitMailQueueTaskSerializationModule.java b/server/container/guice/protocols/webadmin-rabbitmq-mailqueue/src/main/java/org/apache/james/modules/server/RabbitMailQueueTaskSerializationModule.java
new file mode 100644
index 0000000..61d7048
--- /dev/null
+++ b/server/container/guice/protocols/webadmin-rabbitmq-mailqueue/src/main/java/org/apache/james/modules/server/RabbitMailQueueTaskSerializationModule.java
@@ -0,0 +1,53 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.modules.server;
+
+import org.apache.james.queue.api.MailQueueFactory;
+import org.apache.james.queue.rabbitmq.RabbitMQMailQueue;
+import org.apache.james.server.task.json.dto.AdditionalInformationDTO;
+import org.apache.james.server.task.json.dto.AdditionalInformationDTOModule;
+import org.apache.james.server.task.json.dto.TaskDTO;
+import org.apache.james.server.task.json.dto.TaskDTOModule;
+import org.apache.james.task.Task;
+import org.apache.james.task.TaskExecutionDetails;
+import org.apache.james.webadmin.dto.DTOModuleInjections;
+import org.apache.james.webadmin.service.RepublishNotProcessedMailsTaskAdditionalInformationDTO;
+import org.apache.james.webadmin.service.RepublishNotProcessedMailsTaskDTO;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.multibindings.ProvidesIntoSet;
+import com.google.inject.name.Named;
+
+public class RabbitMailQueueTaskSerializationModule extends AbstractModule {
+    @ProvidesIntoSet
+    public TaskDTOModule<? extends Task, ? extends TaskDTO> republishNotProcessedMailsTask(MailQueueFactory<RabbitMQMailQueue> mailQueueFactory) {
+        return RepublishNotProcessedMailsTaskDTO.module(mailQueueFactory);
+    }
+
+    @ProvidesIntoSet
+    public AdditionalInformationDTOModule<? extends TaskExecutionDetails.AdditionalInformation, ? extends AdditionalInformationDTO> republishNotProcessedMailsAdditionalInformation() {
+        return RepublishNotProcessedMailsTaskAdditionalInformationDTO.module();
+    }
+
+    @Named(DTOModuleInjections.WEBADMIN_DTO)
+    @ProvidesIntoSet
+    public AdditionalInformationDTOModule<? extends TaskExecutionDetails.AdditionalInformation, ? extends AdditionalInformationDTO> republishNotProcessedMailsAdditionalInformationWebAdmin() {
+        return RepublishNotProcessedMailsTaskAdditionalInformationDTO.module();
+    }
+}
diff --git a/server/container/guice/rabbitmq/pom.xml b/server/container/guice/rabbitmq/pom.xml
index b117e4f..5c4b0cc 100644
--- a/server/container/guice/rabbitmq/pom.xml
+++ b/server/container/guice/rabbitmq/pom.xml
@@ -42,6 +42,14 @@
         </dependency>
         <dependency>
             <groupId>${james.groupId}</groupId>
+            <artifactId>james-server-guice-webadmin-rabbitmq-mailqueue</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>james-server-webadmin-rabbitmq</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
             <artifactId>james-server-queue-api</artifactId>
         </dependency>
         <dependency>
diff --git a/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQWebAdminServerTaskSerializationIntegrationTest.java b/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQWebAdminServerTaskSerializationIntegrationTest.java
index f0775df..3d5e3ed 100644
--- a/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQWebAdminServerTaskSerializationIntegrationTest.java
+++ b/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQWebAdminServerTaskSerializationIntegrationTest.java
@@ -797,4 +797,25 @@ class RabbitMQWebAdminServerTaskSerializationIntegrationTest {
             .mailboxPath(MailboxPath.forUser(Username.of(USERNAME), "Important-mailbox"))
             .build();
     }
-}
\ No newline at end of file
+
+    @Test
+    void republishNotProcessedMailsOnSpoolShouldComplete() {
+        String taskId = with()
+            .basePath("/mailQueues/spool")
+            .queryParam("action", "RepublishNotProcessedMails")
+            .queryParam("olderThan", "2d")
+        .post()
+            .jsonPath()
+        .get("taskId");
+
+        given()
+            .basePath(TasksRoutes.BASE)
+        .when()
+            .get(taskId + "/await")
+        .then()
+            .body("status", is("completed"))
+            .body("taskId", is(taskId))
+            .body("type", is("republish-not-processed-mails"))
+            .body("additionalInformation.nbRequeuedMails", is(0));
+    }
+}
diff --git a/server/protocols/webadmin/pom.xml b/server/protocols/webadmin/pom.xml
index 4600e0a..db7f387 100644
--- a/server/protocols/webadmin/pom.xml
+++ b/server/protocols/webadmin/pom.xml
@@ -42,6 +42,7 @@
         <module>webadmin-mailbox-deleted-message-vault</module>
         <module>webadmin-mailqueue</module>
         <module>webadmin-mailrepository</module>
+        <module>webadmin-rabbitmq</module>
         <module>webadmin-swagger</module>
     </modules>
 
diff --git a/server/protocols/webadmin/webadmin-mailqueue/pom.xml b/server/protocols/webadmin/webadmin-mailqueue/pom.xml
index f119ef2..a35a339 100644
--- a/server/protocols/webadmin/webadmin-mailqueue/pom.xml
+++ b/server/protocols/webadmin/webadmin-mailqueue/pom.xml
@@ -129,7 +129,7 @@
                                 <version>v1</version>
                             </info>
                             <swaggerDirectory>${project.build.directory}</swaggerDirectory>
-                            <swaggerFileName>webadmin-mailbox</swaggerFileName>
+                            <swaggerFileName>webadmin-mailqueue</swaggerFileName>
                         </apiSource>
                     </apiSources>
                 </configuration>
diff --git a/server/protocols/webadmin/webadmin-mailqueue/pom.xml b/server/protocols/webadmin/webadmin-rabbitmq/pom.xml
similarity index 88%
copy from server/protocols/webadmin/webadmin-mailqueue/pom.xml
copy to server/protocols/webadmin/webadmin-rabbitmq/pom.xml
index f119ef2..862deff 100644
--- a/server/protocols/webadmin/webadmin-mailqueue/pom.xml
+++ b/server/protocols/webadmin/webadmin-rabbitmq/pom.xml
@@ -27,10 +27,10 @@
         <relativePath>../../../pom.xml</relativePath>
     </parent>
 
-    <artifactId>james-server-webadmin-mailqueue</artifactId>
+    <artifactId>james-server-webadmin-rabbitmq</artifactId>
     <packaging>jar</packaging>
 
-    <name>Apache James :: Server :: Web Admin :: MailQueue</name>
+    <name>Apache James :: Server :: Web Admin :: RabbitMQ</name>
 
     <dependencies>
         <dependency>
@@ -56,7 +56,11 @@
         </dependency>
         <dependency>
             <groupId>${james.groupId}</groupId>
-            <artifactId>james-server-queue-memory</artifactId>
+            <artifactId>james-server-queue-rabbitmq</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>james-server-testing</artifactId>
             <scope>test</scope>
         </dependency>
         <dependency>
@@ -65,7 +69,7 @@
         </dependency>
         <dependency>
             <groupId>${james.groupId}</groupId>
-            <artifactId>james-server-task-memory</artifactId>
+            <artifactId>james-server-task-distributed</artifactId>
             <scope>test</scope>
         </dependency>
         <dependency>
@@ -80,6 +84,10 @@
         </dependency>
         <dependency>
             <groupId>${james.groupId}</groupId>
+            <artifactId>james-server-webadmin-mailqueue</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
             <artifactId>metrics-tests</artifactId>
             <scope>test</scope>
         </dependency>
@@ -129,7 +137,7 @@
                                 <version>v1</version>
                             </info>
                             <swaggerDirectory>${project.build.directory}</swaggerDirectory>
-                            <swaggerFileName>webadmin-mailbox</swaggerFileName>
+                            <swaggerFileName>webadmin-rabbitmq</swaggerFileName>
                         </apiSource>
                     </apiSources>
                 </configuration>
diff --git a/server/protocols/webadmin/webadmin-rabbitmq/src/main/java/org/apache/james/webadmin/routes/RabbitMQMailQueuesRoutes.java b/server/protocols/webadmin/webadmin-rabbitmq/src/main/java/org/apache/james/webadmin/routes/RabbitMQMailQueuesRoutes.java
new file mode 100644
index 0000000..c366df6
--- /dev/null
+++ b/server/protocols/webadmin/webadmin-rabbitmq/src/main/java/org/apache/james/webadmin/routes/RabbitMQMailQueuesRoutes.java
@@ -0,0 +1,174 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.webadmin.routes;
+
+import static org.apache.james.webadmin.Constants.SEPARATOR;
+import static org.apache.james.webadmin.routes.MailQueueRoutes.BASE_URL;
+import static org.apache.james.webadmin.routes.MailQueueRoutes.MAIL_QUEUE_NAME;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import java.util.function.Predicate;
+
+import javax.inject.Inject;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+
+import org.apache.james.queue.api.MailQueueFactory;
+import org.apache.james.queue.api.MailQueueName;
+import org.apache.james.queue.rabbitmq.RabbitMQMailQueue;
+import org.apache.james.task.Task;
+import org.apache.james.task.TaskManager;
+import org.apache.james.util.DurationParser;
+import org.apache.james.webadmin.Routes;
+import org.apache.james.webadmin.service.RepublishNotprocessedMailsTask;
+import org.apache.james.webadmin.tasks.TaskFromRequest;
+import org.apache.james.webadmin.tasks.TaskFromRequestRegistry;
+import org.apache.james.webadmin.tasks.TaskRegistrationKey;
+import org.apache.james.webadmin.utils.ErrorResponder;
+import org.apache.james.webadmin.utils.JsonTransformer;
+import org.eclipse.jetty.http.HttpStatus;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiImplicitParam;
+import io.swagger.annotations.ApiImplicitParams;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import spark.Request;
+import spark.Service;
+
+@Api(tags = "MailQueues")
+@Path(BASE_URL)
+@Produces("application/json")
+public class RabbitMQMailQueuesRoutes implements Routes {
+
+    private static final TaskRegistrationKey REPUBLISH_NOT_PROCESSED_MAILS_REGISTRATION_KEY = TaskRegistrationKey.of("RepublishNotProcessedMails");
+
+    private final MailQueueFactory<RabbitMQMailQueue> mailQueueFactory;
+    private final JsonTransformer jsonTransformer;
+    private final TaskManager taskManager;
+    private final Clock clock;
+
+    @Inject
+    @SuppressWarnings("unchecked")
+    @VisibleForTesting
+    RabbitMQMailQueuesRoutes(MailQueueFactory<RabbitMQMailQueue> mailQueueFactory,
+                             Clock clock, JsonTransformer jsonTransformer, TaskManager taskManager) {
+        this.mailQueueFactory = mailQueueFactory;
+        this.clock = clock;
+        this.jsonTransformer = jsonTransformer;
+        this.taskManager = taskManager;
+    }
+
+    @Override
+    public String getBasePath() {
+        return BASE_URL;
+    }
+
+    @Override
+    public void define(Service service) {
+        republishNotProcessedMails(service);
+    }
+
+
+    @POST
+    @Path("/{mailQueueName}")
+    @ApiImplicitParams({
+        @ApiImplicitParam(required = true, dataType = "string", name = "mailQueueName", paramType = "path"),
+        @ApiImplicitParam(
+            required = true,
+            dataType = "String",
+            name = "action",
+            paramType = "query",
+            example = "?action=RepublishNotProcessedMails",
+            value = "Specify the action to perform on a RabbitMQ mail queue."),
+        @ApiImplicitParam(
+            required = true,
+            dataType = "String",
+            name = "olderThan",
+            paramType = "query",
+            example = "?olderThan=1w",
+            value = "Specify the messages minimum age to republish")
+    })
+    @ApiOperation(
+        value = "republish the not processed mails of the RabbitMQ MailQueue using the cassandra mail queue view"
+    )
+    @ApiResponses(value = {
+        @ApiResponse(code = HttpStatus.CREATED_201, message = "OK, the task for rebuilding the queue is created"),
+        @ApiResponse(code = HttpStatus.BAD_REQUEST_400, message = "Invalid request for rebuilding the mail queue."),
+        @ApiResponse(code = HttpStatus.INTERNAL_SERVER_ERROR_500, message = "Internal server error - Something went bad on the server side.")
+    })
+    public void republishNotProcessedMails(Service service) {
+        TaskFromRequest taskFromRequest = this::republishNotProcessedMails;
+        service.post(BASE_URL + SEPARATOR + MAIL_QUEUE_NAME,
+            TaskFromRequestRegistry.builder()
+                .register(REPUBLISH_NOT_PROCESSED_MAILS_REGISTRATION_KEY, this::republishNotProcessedMails)
+                .buildAsRoute(taskManager),
+            jsonTransformer);
+    }
+
+    private Task republishNotProcessedMails(Request request) {
+        RabbitMQMailQueue mailQueue = getMailQueue(MailQueueName.of(request.params(MAIL_QUEUE_NAME)));
+        return new RepublishNotprocessedMailsTask(mailQueue, getOlderThan(request));
+    }
+
+
+    private RabbitMQMailQueue getMailQueue(MailQueueName mailQueueName) {
+        return mailQueueFactory.getQueue(mailQueueName)
+            .orElseThrow(
+                () -> ErrorResponder.builder()
+                    .message("%s can not be found", mailQueueName)
+                    .statusCode(HttpStatus.NOT_FOUND_404)
+                    .type(ErrorResponder.ErrorType.NOT_FOUND)
+                    .haltError());
+    }
+
+    private Instant getOlderThan(Request req) {
+        try {
+            Duration olderThan =  Optional.ofNullable(req.queryParams("olderThan"))
+                .filter(Predicate.not(String::isEmpty))
+                .map(rawString -> DurationParser.parse(rawString, ChronoUnit.DAYS))
+                .orElseThrow();
+
+            return clock.instant().minus(olderThan);
+        } catch (NoSuchElementException e) {
+            throw ErrorResponder.builder()
+                .message("Missing olderThan")
+                .statusCode(HttpStatus.BAD_REQUEST_400)
+                .type(ErrorResponder.ErrorType.INVALID_ARGUMENT)
+                .haltError();
+        } catch (Exception e) {
+            throw ErrorResponder.builder()
+                .statusCode(HttpStatus.BAD_REQUEST_400)
+                .cause(e)
+                .type(ErrorResponder.ErrorType.INVALID_ARGUMENT)
+                .message("Invalid olderThan")
+                .haltError();
+        }
+    }
+}
diff --git a/server/protocols/webadmin/webadmin-rabbitmq/src/main/java/org/apache/james/webadmin/service/RepublishNotProcessedMailsTaskAdditionalInformationDTO.java b/server/protocols/webadmin/webadmin-rabbitmq/src/main/java/org/apache/james/webadmin/service/RepublishNotProcessedMailsTaskAdditionalInformationDTO.java
new file mode 100644
index 0000000..3621530
--- /dev/null
+++ b/server/protocols/webadmin/webadmin-rabbitmq/src/main/java/org/apache/james/webadmin/service/RepublishNotProcessedMailsTaskAdditionalInformationDTO.java
@@ -0,0 +1,90 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.webadmin.service;
+
+import java.time.Instant;
+
+import org.apache.james.json.DTOModule;
+import org.apache.james.queue.api.MailQueueName;
+import org.apache.james.server.task.json.dto.AdditionalInformationDTO;
+import org.apache.james.server.task.json.dto.AdditionalInformationDTOModule;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class RepublishNotProcessedMailsTaskAdditionalInformationDTO implements AdditionalInformationDTO {
+
+    public static AdditionalInformationDTOModule<RepublishNotprocessedMailsTask.AdditionalInformation, RepublishNotProcessedMailsTaskAdditionalInformationDTO> module() {
+        return DTOModule.forDomainObject(RepublishNotprocessedMailsTask.AdditionalInformation.class)
+            .convertToDTO(RepublishNotProcessedMailsTaskAdditionalInformationDTO.class)
+            .toDomainObjectConverter(dto -> new RepublishNotprocessedMailsTask.AdditionalInformation(
+                MailQueueName.of(dto.mailQueue),
+                dto.olderThan,
+                dto.nbRequeuedMails,
+                dto.timestamp))
+            .toDTOConverter((details, type) -> new RepublishNotProcessedMailsTaskAdditionalInformationDTO(
+                type,
+                details.getMailQueue().asString(),
+                details.getOlderThan(),
+                details.getNbRequeuedMails(),
+                details.timestamp()))
+            .typeName(RepublishNotprocessedMailsTask.TYPE.asString())
+            .withFactory(AdditionalInformationDTOModule::new);
+    }
+
+    private final String type;
+    private final String mailQueue;
+
+    private final long nbRequeuedMails;
+    private final Instant olderThan;
+    private final Instant timestamp;
+
+    public RepublishNotProcessedMailsTaskAdditionalInformationDTO(@JsonProperty("type") String type,
+                                                                  @JsonProperty("mailQueue") String mailQueue,
+                                                                  @JsonProperty("olderThan") Instant olderThan,
+                                                                  @JsonProperty("nbRequeuedMails") long nbRequeuedMails,
+                                                                  @JsonProperty("timestamp") Instant timestamp) {
+        this.type = type;
+        this.mailQueue = mailQueue;
+        this.olderThan = olderThan;
+        this.nbRequeuedMails = nbRequeuedMails;
+        this.timestamp = timestamp;
+    }
+
+    public String getMailQueue() {
+        return mailQueue;
+    }
+
+    public long getNbRequeuedMails() {
+        return nbRequeuedMails;
+    }
+
+    public Instant getOlderThan() {
+        return olderThan;
+    }
+
+    @Override
+    public String getType() {
+        return type;
+    }
+
+    @Override
+    public Instant getTimestamp() {
+        return timestamp;
+    }
+}
diff --git a/server/protocols/webadmin/webadmin-rabbitmq/src/main/java/org/apache/james/webadmin/service/RepublishNotProcessedMailsTaskDTO.java b/server/protocols/webadmin/webadmin-rabbitmq/src/main/java/org/apache/james/webadmin/service/RepublishNotProcessedMailsTaskDTO.java
new file mode 100644
index 0000000..5502b35
--- /dev/null
+++ b/server/protocols/webadmin/webadmin-rabbitmq/src/main/java/org/apache/james/webadmin/service/RepublishNotProcessedMailsTaskDTO.java
@@ -0,0 +1,85 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.webadmin.service;
+
+import java.time.Instant;
+
+import org.apache.james.json.DTOModule;
+import org.apache.james.queue.api.MailQueueFactory;
+import org.apache.james.queue.api.MailQueueName;
+import org.apache.james.queue.rabbitmq.RabbitMQMailQueue;
+import org.apache.james.server.task.json.dto.TaskDTO;
+import org.apache.james.server.task.json.dto.TaskDTOModule;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class RepublishNotProcessedMailsTaskDTO implements TaskDTO {
+
+    public static class UnknownMailQueueException extends RuntimeException {
+        public UnknownMailQueueException(MailQueueName mailQueueName) {
+            super("Unknown mail queue " + mailQueueName.asString());
+        }
+    }
+
+    public static TaskDTOModule<RepublishNotprocessedMailsTask, RepublishNotProcessedMailsTaskDTO> module(MailQueueFactory<RabbitMQMailQueue> mailQueueFactory) {
+        return DTOModule
+            .forDomainObject(RepublishNotprocessedMailsTask.class)
+            .convertToDTO(RepublishNotProcessedMailsTaskDTO.class)
+            .toDomainObjectConverter(dto -> dto.fromDTO(mailQueueFactory))
+            .toDTOConverter(RepublishNotProcessedMailsTaskDTO::toDTO)
+            .typeName(RepublishNotprocessedMailsTask.TYPE.asString())
+            .withFactory(TaskDTOModule::new);
+    }
+
+    public static RepublishNotProcessedMailsTaskDTO toDTO(RepublishNotprocessedMailsTask domainObject, String typeName) {
+        return new RepublishNotProcessedMailsTaskDTO(typeName, domainObject.getMailQueue().asString(), domainObject.getOlderThan());
+    }
+
+    private final String type;
+    private final String mailQueue;
+    private final Instant olderThan;
+
+    public RepublishNotProcessedMailsTaskDTO(@JsonProperty("type") String type, @JsonProperty("mailQueue") String mailQueue, @JsonProperty("olderThan") Instant olderThan) {
+        this.type = type;
+        this.mailQueue = mailQueue;
+        this.olderThan = olderThan;
+    }
+
+    public RepublishNotprocessedMailsTask fromDTO(MailQueueFactory<RabbitMQMailQueue> mailQueueFactory) {
+        MailQueueName requestedMailQueueName = MailQueueName.of(mailQueue);
+        RabbitMQMailQueue queue = mailQueueFactory
+            .getQueue(requestedMailQueueName)
+            .orElseThrow(() -> new UnknownMailQueueException(requestedMailQueueName));
+
+        return new RepublishNotprocessedMailsTask(queue, olderThan);
+    }
+
+    @Override
+    public String getType() {
+        return type;
+    }
+
+    public Instant getOlderThan() {
+        return olderThan;
+    }
+
+    public String getMailQueue() {
+        return mailQueue;
+    }
+}
diff --git a/server/protocols/webadmin/webadmin-rabbitmq/src/main/java/org/apache/james/webadmin/service/RepublishNotprocessedMailsTask.java b/server/protocols/webadmin/webadmin-rabbitmq/src/main/java/org/apache/james/webadmin/service/RepublishNotprocessedMailsTask.java
new file mode 100644
index 0000000..0428335
--- /dev/null
+++ b/server/protocols/webadmin/webadmin-rabbitmq/src/main/java/org/apache/james/webadmin/service/RepublishNotprocessedMailsTask.java
@@ -0,0 +1,107 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.webadmin.service;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.james.queue.api.MailQueueName;
+import org.apache.james.queue.rabbitmq.RabbitMQMailQueue;
+import org.apache.james.task.Task;
+import org.apache.james.task.TaskExecutionDetails;
+import org.apache.james.task.TaskType;
+
+
+public class RepublishNotprocessedMailsTask implements Task {
+
+    public static class AdditionalInformation implements TaskExecutionDetails.AdditionalInformation {
+
+        private final Instant timestamp;
+        private final long nbRequeuedMails;
+        private final MailQueueName mailQueue;
+        private final Instant olderThan;
+
+        public AdditionalInformation(MailQueueName mailQueue, Instant olderThan, long nbRequeuedMails, Instant timestamp) {
+            this.mailQueue = mailQueue;
+            this.olderThan = olderThan;
+            this.timestamp = timestamp;
+            this.nbRequeuedMails = nbRequeuedMails;
+        }
+
+        @Override
+        public Instant timestamp() {
+            return timestamp;
+        }
+
+        public Instant getOlderThan() {
+            return olderThan;
+        }
+
+        public MailQueueName getMailQueue() {
+            return mailQueue;
+        }
+
+        public long getNbRequeuedMails() {
+            return nbRequeuedMails;
+        }
+    }
+
+    public static final TaskType TYPE = TaskType.of("republish-not-processed-mails");
+
+    private final Instant olderThan;
+    private final RabbitMQMailQueue mailQueue;
+    private final AtomicInteger nbRequeuedMails;
+
+    public RepublishNotprocessedMailsTask(RabbitMQMailQueue mailQueue, Instant olderThan) {
+        this.olderThan = olderThan;
+        this.mailQueue = mailQueue;
+        this.nbRequeuedMails = new AtomicInteger(0);
+    }
+
+    @Override
+    public Result run() {
+        mailQueue.republishNotProcessedMails(olderThan)
+            .doOnNext(mailName -> nbRequeuedMails.getAndIncrement())
+            .then()
+            .block();
+
+        return Result.COMPLETED;
+    }
+
+    @Override
+    public TaskType type() {
+        return TYPE;
+    }
+
+    @Override
+    public Optional<TaskExecutionDetails.AdditionalInformation> details() {
+        return Optional.of(new AdditionalInformation(mailQueue.getName(), olderThan, nbRequeuedMails.get(), Clock.systemUTC().instant()));
+    }
+
+    public Instant getOlderThan() {
+        return olderThan;
+    }
+
+    public MailQueueName getMailQueue() {
+        return mailQueue.getName();
+    }
+}
diff --git a/server/protocols/webadmin/webadmin-rabbitmq/src/test/java/org/apache/james/webadmin/routes/RabbitMQMailQueuesRoutesTest.java b/server/protocols/webadmin/webadmin-rabbitmq/src/test/java/org/apache/james/webadmin/routes/RabbitMQMailQueuesRoutesTest.java
new file mode 100644
index 0000000..4daaa08
--- /dev/null
+++ b/server/protocols/webadmin/webadmin-rabbitmq/src/test/java/org/apache/james/webadmin/routes/RabbitMQMailQueuesRoutesTest.java
@@ -0,0 +1,144 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.webadmin.routes;
+
+import static io.restassured.RestAssured.given;
+import static io.restassured.config.EncoderConfig.encoderConfig;
+import static io.restassured.config.RestAssuredConfig.newConfig;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.hasSize;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.nio.charset.StandardCharsets;
+import java.time.Clock;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
+import java.util.Optional;
+
+import org.apache.james.json.DTOConverter;
+import org.apache.james.queue.api.MailQueueFactory;
+import org.apache.james.queue.rabbitmq.RabbitMQMailQueue;
+import org.apache.james.queue.rabbitmq.RabbitMQMailQueueFactory;
+import org.apache.james.task.Hostname;
+import org.apache.james.task.MemoryTaskManager;
+import org.apache.james.task.TaskManager;
+import org.apache.james.utils.UpdatableTickingClock;
+import org.apache.james.webadmin.WebAdminServer;
+import org.apache.james.webadmin.WebAdminUtils;
+import org.apache.james.webadmin.utils.JsonTransformer;
+import org.eclipse.jetty.http.HttpStatus;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import io.restassured.RestAssured;
+import io.restassured.builder.RequestSpecBuilder;
+import io.restassured.http.ContentType;
+import io.restassured.specification.RequestSpecification;
+
+class RabbitMQMailQueuesRoutesTest {
+    final static ZonedDateTime DATE = ZonedDateTime.parse("2015-10-30T14:12:00Z");
+
+    WebAdminServer webAdminServer;
+    MailQueueFactory mailQueueFactory;
+    Clock clock;
+
+    WebAdminServer createServer(MailQueueFactory mailQueueFactory) {
+        TaskManager taskManager = new MemoryTaskManager(new Hostname("foo"));
+        JsonTransformer jsonTransformer = new JsonTransformer();
+        clock = UpdatableTickingClock.fixed(DATE.toInstant(), ZoneOffset.UTC);
+        return WebAdminUtils.createWebAdminServer(
+                new RabbitMQMailQueuesRoutes(mailQueueFactory, clock, jsonTransformer, taskManager),
+                new TasksRoutes(taskManager, jsonTransformer, DTOConverter.of()))
+            .start();
+    }
+
+    RequestSpecification buildRequestSpecification(WebAdminServer server) {
+        return new RequestSpecBuilder()
+            .setContentType(ContentType.JSON)
+            .setAccept(ContentType.JSON)
+            .setBasePath("/")
+            .setPort(server.getPort().getValue())
+            .setConfig(newConfig().encoderConfig(encoderConfig().defaultContentCharset(StandardCharsets.UTF_8)))
+            .build();
+    }
+
+    @BeforeEach
+    void setUp() {
+        mailQueueFactory = mock(RabbitMQMailQueueFactory.class);
+        webAdminServer = createServer(mailQueueFactory);
+        RestAssured.requestSpecification = buildRequestSpecification(webAdminServer);
+        RestAssured.enableLoggingOfRequestAndResponseIfValidationFails();
+    }
+
+    @AfterEach
+    void tearDown() {
+        webAdminServer.destroy();
+    }
+
+    @Test
+    void triggeringARepublishNotProcessedMailsShouldCreateATask() {
+        when(mailQueueFactory.getQueue(any())).thenReturn(Optional.of(mock(RabbitMQMailQueue.class)));
+        given()
+            .queryParam("action", "RepublishNotProcessedMails")
+            .queryParam("olderThan", "1d")
+        .when()
+            .post(MailQueueRoutes.BASE_URL + "/spooler")
+        .then()
+            .statusCode(HttpStatus.CREATED_201);
+
+        given()
+        .when()
+            .get("/tasks")
+        .then()
+            .statusCode(HttpStatus.OK_200)
+            .body("", hasSize(1));
+    }
+
+    @Test
+    void triggeringARepublishNotProcessedMailsWhenTheQueueHasNotBeenInitializedShouldFail() {
+        when(mailQueueFactory.getQueue(any())).thenReturn(Optional.empty());
+        given()
+            .queryParam("action", "RepublishNotProcessedMails")
+            .queryParam("olderThan", "1d")
+        .when()
+            .post(MailQueueRoutes.BASE_URL + "/spooler")
+        .then()
+            .statusCode(HttpStatus.NOT_FOUND_404)
+            .body("message", containsString("MailQueueName{value=spooler} can not be found"));
+    }
+
+    @Test
+    void triggeringARepublishNotProcessedMailsWithAnInvalidOlderThanShouldFail() {
+        when(mailQueueFactory.getQueue(any())).thenReturn(Optional.of(mock(RabbitMQMailQueue.class)));
+        given()
+            .queryParam("action", "RepublishNotProcessedMails")
+            .queryParam("olderThan", "invalidValue")
+        .when()
+            .post(MailQueueRoutes.BASE_URL + "/spooler")
+        .then()
+            .statusCode(HttpStatus.BAD_REQUEST_400)
+            .body("message", containsString("Invalid olderThan"))
+            .body("details", containsString("Supplied value do not follow the unit format (number optionally suffixed with a string representing the unit"));
+    }
+
+}
diff --git a/server/protocols/webadmin/webadmin-rabbitmq/src/test/java/org/apache/james/webadmin/service/RepublishNotprocessedMailsTaskTest.java b/server/protocols/webadmin/webadmin-rabbitmq/src/test/java/org/apache/james/webadmin/service/RepublishNotprocessedMailsTaskTest.java
new file mode 100644
index 0000000..ac35da1
--- /dev/null
+++ b/server/protocols/webadmin/webadmin-rabbitmq/src/test/java/org/apache/james/webadmin/service/RepublishNotprocessedMailsTaskTest.java
@@ -0,0 +1,108 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ * http://www.apache.org/licenses/LICENSE-2.0                   *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ * ***************************************************************/
+
+package org.apache.james.webadmin.service;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.time.Instant;
+import java.util.Optional;
+
+import org.apache.james.JsonSerializationVerifier;
+import org.apache.james.json.JsonGenericSerializer;
+import org.apache.james.queue.api.MailQueueName;
+import org.apache.james.queue.rabbitmq.RabbitMQMailQueue;
+import org.apache.james.queue.rabbitmq.RabbitMQMailQueueFactory;
+import org.junit.jupiter.api.Test;
+
+class RepublishNotprocessedMailsTaskTest {
+    private static final Instant OLDER_THAN = Instant.parse("2018-11-13T12:00:55Z");
+    private static final Instant NOW = Instant.now();
+    private static final long NB_REQUEUED_MAILS = 12;
+    private static final String SERIALIZED = "{\"type\": \"republish-not-processed-mails\",\"mailQueue\":\"anyQueue\", \"olderThan\": \"" + OLDER_THAN + "\"}";
+    private static final String SERIALIZED_TASK_ADDITIONAL_INFORMATION = "{\"type\": \"republish-not-processed-mails\",\"mailQueue\":\"anyQueue\", \"olderThan\": \"" + OLDER_THAN + "\" ,\"nbRequeuedMails\":12,\"timestamp\":\"" + NOW.toString() + "\"}";
+    private static final MailQueueName QUEUE_NAME = MailQueueName.of("anyQueue");
+
+    @Test
+    void taskShouldBeSerializable() throws Exception {
+        RabbitMQMailQueueFactory mockedQueueFactory = mock(RabbitMQMailQueueFactory.class);
+        RabbitMQMailQueue mockedQueue = mock(RabbitMQMailQueue.class);
+
+        when(mockedQueue.getName()).thenReturn(QUEUE_NAME);
+        when(mockedQueueFactory.getQueue(QUEUE_NAME)).thenReturn(Optional.of(mockedQueue));
+
+        RepublishNotprocessedMailsTask task = new RepublishNotprocessedMailsTask(mockedQueue, OLDER_THAN);
+
+        JsonSerializationVerifier.dtoModule(RepublishNotProcessedMailsTaskDTO.module(mockedQueueFactory))
+            .bean(task)
+            .json(SERIALIZED)
+            .verify();
+    }
+
+    @Test
+    void taskShouldBeDeserializable() throws Exception {
+        RabbitMQMailQueueFactory mockedQueueFactory = mock(RabbitMQMailQueueFactory.class);
+        RabbitMQMailQueue mockedQueue = mock(RabbitMQMailQueue.class);
+
+        when(mockedQueue.getName()).thenReturn(QUEUE_NAME);
+        when(mockedQueueFactory.getQueue(QUEUE_NAME)).thenReturn(Optional.of(mockedQueue));
+
+        RepublishNotprocessedMailsTask task = new RepublishNotprocessedMailsTask(mockedQueue, OLDER_THAN);
+        JsonSerializationVerifier.dtoModule(RepublishNotProcessedMailsTaskDTO.module(mockedQueueFactory))
+            .bean(task)
+            .json(SERIALIZED)
+            .verify();
+    }
+
+    @Test
+    void taskDeserializationFromUnknownQueueNameShouldThrow() {
+        RabbitMQMailQueueFactory mockedQueueFactory = mock(RabbitMQMailQueueFactory.class);
+        RabbitMQMailQueue mockedQueue = mock(RabbitMQMailQueue.class);
+
+        when(mockedQueue.getName()).thenReturn(QUEUE_NAME);
+        when(mockedQueueFactory.getQueue(QUEUE_NAME)).thenReturn(Optional.empty());
+
+        RepublishNotprocessedMailsTask task = new RepublishNotprocessedMailsTask(mockedQueue, OLDER_THAN);
+        assertThatThrownBy(() -> JsonSerializationVerifier.dtoModule(RepublishNotProcessedMailsTaskDTO.module(mockedQueueFactory))
+            .bean(task)
+            .json(SERIALIZED)
+            .verify())
+            .isInstanceOf(RepublishNotProcessedMailsTaskDTO.UnknownMailQueueException.class);
+    }
+
+    @Test
+    void additionalInformationShouldBeSerializable() throws Exception {
+        RepublishNotprocessedMailsTask.AdditionalInformation details = new RepublishNotprocessedMailsTask.AdditionalInformation(QUEUE_NAME, OLDER_THAN, NB_REQUEUED_MAILS, NOW);
+        JsonSerializationVerifier.dtoModule(RepublishNotProcessedMailsTaskAdditionalInformationDTO.module())
+            .bean(details)
+            .json(SERIALIZED_TASK_ADDITIONAL_INFORMATION)
+            .verify();
+    }
+
+    @Test
+    void additionalInformationShouldBeDeserializable() throws Exception {
+        RepublishNotprocessedMailsTask.AdditionalInformation details = new RepublishNotprocessedMailsTask.AdditionalInformation(QUEUE_NAME, OLDER_THAN, NB_REQUEUED_MAILS, NOW);
+        RepublishNotprocessedMailsTask.AdditionalInformation deserialized = JsonGenericSerializer.forModules(RepublishNotProcessedMailsTaskAdditionalInformationDTO.module())
+            .withoutNestedType()
+            .deserialize(SERIALIZED_TASK_ADDITIONAL_INFORMATION);
+        assertThat(deserialized).isEqualToComparingFieldByField(details);
+    }
+}
diff --git a/src/site/markdown/server/manage-webadmin.md b/src/site/markdown/server/manage-webadmin.md
index 707417d..ff18829 100644
--- a/src/site/markdown/server/manage-webadmin.md
+++ b/src/site/markdown/server/manage-webadmin.md
@@ -2900,6 +2900,7 @@ The scheduled task will have the following type `reprocessing-one` and the follo
  - [Deleting mails from a mail queue](#Deleting_mails_from_a_mail_queue)
  - [Clearing a mail queue](#Clearing_a_mail_queue)
  - [Flushing mails from a mail queue](#Flushing_mails_from_a_mail_queue)
+ - [RabbitMQ republishing a mail queue from cassandra](#RabbitMQ_republishing_a_mail_queue_from_cassandra)
 
 ### Listing mail queues
 
@@ -3048,6 +3049,40 @@ Response codes:
  - 204: Success (No content)
  - 400: Invalid request
  - 404: The mail queue does not exist
+ 
+### RabbitMQ republishing a mail queue from cassandra
+
+```
+curl -XPOST 'http://ip:port/mailQueues/{mailQueueName}?action=RepublishNotProcessedMails&olderThan=1d'
+```
+
+This method is specific to the distributed flavor of James, which relies on Cassandra and RabbitMQ for implementing a mail queue.
+In case of a RabbitMQ crash resulting in a loss of messages, this task can be launched to repopulate the
+`mailQueueName` queue in RabbitMQ using the information stored in Cassandra.
+
+The `olderThan` parameter is mandatory. It filters the mails to be restored, by taking into account only
+the mails older than the given value.
+The expected value should be expressed in the following format: `Nunit`.
+`N` should be strictly positive.
+`unit` could be either in the short form (`h`, `d`, `w`, etc.), or in the long form (`day`, `week`, `month`, etc.).
+
+Examples:
+
+ - `5h`
+ - `7d`
+ - `1y`
+
+Response codes:
+
+ - 201: Task created
+ - 400: Invalid request
+
+ The response body contains the id of the republishing task.
+ ```
+ {
+     "taskId": "a650a66a-5984-431e-bdad-f1baad885856"
+ }
+ ```
 
 ## Administrating DLP Configuration
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org