You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2018/01/26 01:13:46 UTC

[02/10] james-project git commit: JAMES-2294 Routes for mail reprocessing

JAMES-2294 Routes for mail reprocessing


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/c73d5de8
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/c73d5de8
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/c73d5de8

Branch: refs/heads/master
Commit: c73d5de8df02bc638dbcfe5916e63163dd3b5111
Parents: e6fdeae
Author: benwa <bt...@linagora.com>
Authored: Thu Jan 25 10:57:41 2018 +0700
Committer: benwa <bt...@linagora.com>
Committed: Fri Jan 26 08:12:05 2018 +0700

----------------------------------------------------------------------
 .../webadmin/routes/MailRepositoriesRoutes.java | 139 +++-
 .../routes/MailRepositoriesRoutesTest.java      | 646 ++++++++++++++++++-
 2 files changed, 755 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/c73d5de8/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/routes/MailRepositoriesRoutes.java
----------------------------------------------------------------------
diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/routes/MailRepositoriesRoutes.java b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/routes/MailRepositoriesRoutes.java
index 1078206..a90323b 100644
--- a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/routes/MailRepositoriesRoutes.java
+++ b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/routes/MailRepositoriesRoutes.java
@@ -19,6 +19,7 @@
 
 package org.apache.james.webadmin.routes;
 
+import java.io.UnsupportedEncodingException;
 import java.net.URLDecoder;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
@@ -32,6 +33,7 @@ import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
 
 import org.apache.james.mailrepository.api.MailRepositoryStore;
+import org.apache.james.queue.api.MailQueueFactory;
 import org.apache.james.task.Task;
 import org.apache.james.task.TaskId;
 import org.apache.james.task.TaskManager;
@@ -42,6 +44,9 @@ import org.apache.james.webadmin.Routes;
 import org.apache.james.webadmin.dto.ExtendedMailRepositoryResponse;
 import org.apache.james.webadmin.dto.TaskIdDto;
 import org.apache.james.webadmin.service.MailRepositoryStoreService;
+import org.apache.james.webadmin.service.ReprocessingAllMailsTask;
+import org.apache.james.webadmin.service.ReprocessingOneMailTask;
+import org.apache.james.webadmin.service.ReprocessingService;
 import org.apache.james.webadmin.utils.ErrorResponder;
 import org.apache.james.webadmin.utils.ErrorResponder.ErrorType;
 import org.apache.james.webadmin.utils.JsonTransformer;
@@ -55,6 +60,7 @@ import io.swagger.annotations.ApiImplicitParams;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
+import io.swagger.jaxrs.PATCH;
 import spark.Request;
 import spark.Service;
 
@@ -67,13 +73,15 @@ public class MailRepositoriesRoutes implements Routes {
 
     private final JsonTransformer jsonTransformer;
     private final MailRepositoryStoreService repositoryStoreService;
+    private final ReprocessingService reprocessingService;
     private final TaskManager taskManager;
     private Service service;
 
     @Inject
-    public MailRepositoriesRoutes(MailRepositoryStoreService repositoryStoreService, JsonTransformer jsonTransformer, TaskManager taskManager) {
+    public MailRepositoriesRoutes(MailRepositoryStoreService repositoryStoreService, JsonTransformer jsonTransformer, ReprocessingService reprocessingService, TaskManager taskManager) {
         this.repositoryStoreService = repositoryStoreService;
         this.jsonTransformer = jsonTransformer;
+        this.reprocessingService = reprocessingService;
         this.taskManager = taskManager;
     }
 
@@ -92,6 +100,10 @@ public class MailRepositoriesRoutes implements Routes {
         defineDeleteMail();
 
         defineDeleteAll();
+
+        defineReprocessAll();
+
+        defineReprocessOne();
     }
 
     @GET
@@ -168,7 +180,7 @@ public class MailRepositoriesRoutes implements Routes {
     })
     public void defineGetMail() {
         service.get(MAIL_REPOSITORIES + "/:encodedUrl/mails/:mailKey", (request, response) -> {
-            String url = URLDecoder.decode(request.params("encodedUrl"), StandardCharsets.UTF_8.displayName());
+            String url = decodedRepositoryUrl(request);
             String mailKey = request.params("mailKey");
             try {
                 return repositoryStoreService.retrieveMail(url, mailKey)
@@ -228,7 +240,7 @@ public class MailRepositoriesRoutes implements Routes {
     })
     public void defineDeleteMail() {
         service.delete(MAIL_REPOSITORIES + "/:encodedUrl/mails/:mailKey", (request, response) -> {
-            String url = URLDecoder.decode(request.params("encodedUrl"), StandardCharsets.UTF_8.displayName());
+            String url = decodedRepositoryUrl(request);
             String mailKey = request.params("mailKey");
             try {
                 response.status(HttpStatus.NO_CONTENT_204);
@@ -255,7 +267,7 @@ public class MailRepositoriesRoutes implements Routes {
     })
     public void defineDeleteAll() {
         service.delete(MAIL_REPOSITORIES + "/:encodedUrl/mails", (request, response) -> {
-            String url = URLDecoder.decode(request.params("encodedUrl"), StandardCharsets.UTF_8.displayName());
+            String url = decodedRepositoryUrl(request);
             try {
                 Task task = repositoryStoreService.createClearMailRepositoryTask(url);
                 TaskId taskId = taskManager.submit(task);
@@ -271,6 +283,125 @@ public class MailRepositoriesRoutes implements Routes {
         }, jsonTransformer);
     }
 
+    @PATCH
+    @Path("/{encodedUrl}/mails")
+    @ApiOperation(value = "Reprocessing all mails in that mailRepository")
+    @ApiImplicitParams({
+        @ApiImplicitParam(
+            required = true,
+            name = "action",
+            paramType = "query parameter",
+            dataType = "String",
+            defaultValue = "none",
+            example = "?action=reprocess",
+            value = "Compulsory. Only supported value is `reprocess`"),
+        @ApiImplicitParam(
+            required = false,
+            name = "queue",
+            paramType = "query parameter",
+            dataType = "String",
+            defaultValue = "spool",
+            example = "?queue=outgoing",
+            value = "Indicates in which queue the mails stored in the repository should be re-enqueued"),
+        @ApiImplicitParam(
+            required = false,
+            paramType = "query parameter",
+            name = "processor",
+            dataType = "String",
+            defaultValue = "absent",
+            example = "?processor=transport",
+            value = "If present, modifies the state property of the mail to allow their processing by a specific mail container processor.")
+    })
+    @ApiResponses(value = {
+        @ApiResponse(code = HttpStatus.CREATED_201, message = "Task is created", response = TaskIdDto.class),
+        @ApiResponse(code = HttpStatus.INTERNAL_SERVER_ERROR_500, message = "Internal server error - Something went bad on the server side."),
+        @ApiResponse(code = HttpStatus.BAD_REQUEST_400, message = "Bad request - unknown action")
+    })
+    public void defineReprocessAll() {
+        service.patch(MAIL_REPOSITORIES + "/:encodedUrl/mails", (request, response) -> {
+            Task task = toAllMailReprocessingTask(request);
+            TaskId taskId = taskManager.submit(task);
+            return TaskIdDto.respond(response, taskId);
+        }, jsonTransformer);
+    }
+
+    private Task toAllMailReprocessingTask(Request request) throws UnsupportedEncodingException, MailRepositoryStore.MailRepositoryStoreException, MessagingException {
+        String url = decodedRepositoryUrl(request);
+        enforceActionParameter(request);
+        Optional<String> targetProcessor = Optional.ofNullable(request.queryParams("processor"));
+        String targetQueue = Optional.ofNullable(request.queryParams("queue")).orElse(MailQueueFactory.SPOOL);
+
+        Long repositorySize = repositoryStoreService.size(url).orElse(0L);
+        return new ReprocessingAllMailsTask(reprocessingService, repositorySize, url, targetQueue, targetProcessor);
+    }
+
+    @PATCH
+    @Path("/{encodedUrl}/mails/{key}")
+    @ApiOperation(value = "Reprocessing a single mail in that mailRepository")
+    @ApiImplicitParams({
+        @ApiImplicitParam(
+            required = true,
+            name = "action",
+            paramType = "query parameter",
+            dataType = "String",
+            defaultValue = "none",
+            example = "?action=reprocess",
+            value = "Compulsory. Only supported value is `reprocess`"),
+        @ApiImplicitParam(
+            required = false,
+            name = "queue",
+            paramType = "query parameter",
+            dataType = "String",
+            defaultValue = "spool",
+            example = "?queue=outgoing",
+            value = "Indicates in which queue the mails stored in the repository should be re-enqueued"),
+        @ApiImplicitParam(
+            required = false,
+            paramType = "query parameter",
+            name = "processor",
+            dataType = "String",
+            defaultValue = "absent",
+            example = "?processor=transport",
+            value = "If present, modifies the state property of the mail to allow their processing by a specific mail container processor.")
+    })
+    @ApiResponses(value = {
+        @ApiResponse(code = HttpStatus.CREATED_201, message = "Task is created", response = TaskIdDto.class),
+        @ApiResponse(code = HttpStatus.INTERNAL_SERVER_ERROR_500, message = "Internal server error - Something went bad on the server side."),
+        @ApiResponse(code = HttpStatus.BAD_REQUEST_400, message = "Bad request - unknown action")
+    })
+    public void defineReprocessOne() {
+        service.patch(MAIL_REPOSITORIES + "/:encodedUrl/mails/:key", (request, response) -> {
+            Task task = toOneMailReprocessingTask(request);
+            TaskId taskId = taskManager.submit(task);
+            return TaskIdDto.respond(response, taskId);
+        }, jsonTransformer);
+    }
+
+    private Task toOneMailReprocessingTask(Request request) throws UnsupportedEncodingException {
+        String url = decodedRepositoryUrl(request);
+        String key = request.params("key");
+        enforceActionParameter(request);
+        Optional<String> targetProcessor = Optional.ofNullable(request.queryParams("processor"));
+        String targetQueue = Optional.ofNullable(request.queryParams("queue")).orElse(MailQueueFactory.SPOOL);
+
+        return new ReprocessingOneMailTask(reprocessingService, url, targetQueue, key, targetProcessor);
+    }
+
+    private void enforceActionParameter(Request request) {
+        String action = request.queryParams("action");
+        if (!"reprocess".equals(action)) {
+            throw ErrorResponder.builder()
+                .statusCode(HttpStatus.BAD_REQUEST_400)
+                .type(ErrorType.INVALID_ARGUMENT)
+                .message("action query parameter is mandatory. The only supported value is `reprocess`")
+                .haltError();
+        }
+    }
+
+    private String decodedRepositoryUrl(Request request) throws UnsupportedEncodingException {
+        return URLDecoder.decode(request.params("encodedUrl"), StandardCharsets.UTF_8.displayName());
+    }
+
     private Optional<Integer> assertPositiveInteger(Request request, String parameterName) {
         try {
             return Optional.ofNullable(request.queryParams(parameterName))

http://git-wip-us.apache.org/repos/asf/james-project/blob/c73d5de8/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/routes/MailRepositoriesRoutesTest.java
----------------------------------------------------------------------
diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/routes/MailRepositoriesRoutesTest.java b/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/routes/MailRepositoriesRoutesTest.java
index cbad2b4..a6dd4de 100644
--- a/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/routes/MailRepositoriesRoutesTest.java
+++ b/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/routes/MailRepositoriesRoutesTest.java
@@ -41,13 +41,21 @@ import java.util.Optional;
 import org.apache.james.mailrepository.api.MailRepositoryStore;
 import org.apache.james.mailrepository.memory.MemoryMailRepository;
 import org.apache.james.metrics.api.NoopMetricFactory;
+import org.apache.james.queue.api.MailQueueFactory;
+import org.apache.james.queue.api.ManageableMailQueue;
+import org.apache.james.queue.api.RawMailQueueItemDecoratorFactory;
+import org.apache.james.queue.memory.MemoryMailQueueFactory;
 import org.apache.james.task.MemoryTaskManager;
 import org.apache.james.webadmin.WebAdminServer;
 import org.apache.james.webadmin.WebAdminUtils;
 import org.apache.james.webadmin.service.ClearMailRepositoryTask;
 import org.apache.james.webadmin.service.MailRepositoryStoreService;
+import org.apache.james.webadmin.service.ReprocessingAllMailsTask;
+import org.apache.james.webadmin.service.ReprocessingOneMailTask;
+import org.apache.james.webadmin.service.ReprocessingService;
 import org.apache.james.webadmin.utils.ErrorResponder;
 import org.apache.james.webadmin.utils.JsonTransformer;
+import org.apache.mailet.Mail;
 import org.apache.mailet.base.test.FakeMail;
 import org.eclipse.jetty.http.HttpStatus;
 import org.junit.After;
@@ -63,9 +71,14 @@ public class MailRepositoriesRoutesTest {
     public static final String URL_MY_REPO = "url://myRepo";
     public static final String URL_ESCAPED_MY_REPO = "url%3A%2F%2FmyRepo";
     public static final String MY_REPO_MAILS = "url%3A%2F%2FmyRepo/mails";
+    public static final String CUSTOM_QUEUE = "customQueue";
+    public static final String NAME_1 = "name1";
+    public static final String NAME_2 = "name2";
     private WebAdminServer webAdminServer;
     private MailRepositoryStore mailRepositoryStore;
     private MemoryMailRepository mailRepository;
+    private ManageableMailQueue spoolQueue;
+    private ManageableMailQueue customQueue;
 
     @Before
     public void setUp() throws Exception {
@@ -74,10 +87,18 @@ public class MailRepositoriesRoutesTest {
 
         MemoryTaskManager taskManager = new MemoryTaskManager();
         JsonTransformer jsonTransformer = new JsonTransformer();
+        MailQueueFactory<ManageableMailQueue> queueFactory = new MemoryMailQueueFactory(new RawMailQueueItemDecoratorFactory());
+        spoolQueue = queueFactory.createQueue(MailQueueFactory.SPOOL);
+        customQueue = queueFactory.createQueue(CUSTOM_QUEUE);
+
+        MailRepositoryStoreService repositoryStoreService = new MailRepositoryStoreService(mailRepositoryStore);
+
+        ReprocessingService reprocessingService = new ReprocessingService(queueFactory, repositoryStoreService);
+
         webAdminServer = WebAdminUtils.createWebAdminServer(
                 new NoopMetricFactory(),
-                new MailRepositoriesRoutes(new MailRepositoryStoreService(mailRepositoryStore),
-                    jsonTransformer, taskManager),
+                new MailRepositoriesRoutes(repositoryStoreService,
+                    jsonTransformer, reprocessingService, taskManager),
             new TasksRoutes(taskManager, jsonTransformer));
         webAdminServer.configure(NO_CONFIGURATION);
         webAdminServer.await();
@@ -289,10 +310,10 @@ public class MailRepositoriesRoutesTest {
         when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository));
 
         mailRepository.store(FakeMail.builder()
-            .name("name1")
+            .name(NAME_1)
             .build());
         mailRepository.store(FakeMail.builder()
-            .name("name2")
+            .name(NAME_2)
             .build());
 
         when()
@@ -300,7 +321,7 @@ public class MailRepositoriesRoutesTest {
         .then()
             .statusCode(HttpStatus.OK_200)
             .body("", hasSize(2))
-            .body("mailKey", containsInAnyOrder("name1", "name2"));
+            .body("mailKey", containsInAnyOrder(NAME_1, NAME_2));
     }
 
     @Test
@@ -354,7 +375,7 @@ public class MailRepositoriesRoutesTest {
         when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository));
 
         mailRepository.store(FakeMail.builder()
-            .name("name1")
+            .name(NAME_1)
             .build());
 
         given()
@@ -369,7 +390,7 @@ public class MailRepositoriesRoutesTest {
     public void retrievingAMailShouldDisplayItsInformation() throws Exception {
         when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository));
 
-        String name = "name1";
+        String name = NAME_1;
         String sender = "sender@domain";
         String recipient1 = "recipient1@domain";
         String recipient2 = "recipient2@domain";
@@ -398,16 +419,15 @@ public class MailRepositoriesRoutesTest {
     public void retrievingAMailShouldNotFailWhenOnlyNameProperty() throws Exception {
         when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository));
 
-        String name = "name1";
         mailRepository.store(FakeMail.builder()
-            .name(name)
+            .name(NAME_1)
             .build());
 
         when()
-            .get(URL_ESCAPED_MY_REPO + "/mails/" + name)
+            .get(URL_ESCAPED_MY_REPO + "/mails/" + NAME_1)
         .then()
             .statusCode(HttpStatus.OK_200)
-            .body("name", is(name))
+            .body("name", is(NAME_1))
             .body("sender", isEmptyOrNullString())
             .body("state", isEmptyOrNullString())
             .body("error", isEmptyOrNullString())
@@ -432,37 +452,34 @@ public class MailRepositoriesRoutesTest {
     public void deletingAMailShouldRemoveIt() throws Exception {
         when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository));
 
-        String name1 = "name1";
-        String name2 = "name2";
         mailRepository.store(FakeMail.builder()
-            .name(name1)
+            .name(NAME_1)
             .build());
         mailRepository.store(FakeMail.builder()
-            .name(name2)
+            .name(NAME_2)
             .build());
 
         given()
-            .delete(URL_ESCAPED_MY_REPO + "/mails/" + name1);
+            .delete(URL_ESCAPED_MY_REPO + "/mails/" + NAME_1);
 
         when()
             .get(URL_ESCAPED_MY_REPO + "/mails")
             .then()
             .statusCode(HttpStatus.OK_200)
             .body("", hasSize(1))
-            .body("mailKey", contains(name2));
+            .body("mailKey", contains(NAME_2));
     }
 
     @Test
     public void deletingAMailShouldReturnOkWhenExist() throws Exception {
         when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository));
 
-        String name1 = "name1";
         mailRepository.store(FakeMail.builder()
-            .name(name1)
+            .name(NAME_1)
             .build());
 
         when()
-            .delete(URL_ESCAPED_MY_REPO + "/mails/" + name1)
+            .delete(URL_ESCAPED_MY_REPO + "/mails/" + NAME_1)
         .then()
             .statusCode(HttpStatus.NO_CONTENT_204);
     }
@@ -493,13 +510,11 @@ public class MailRepositoriesRoutesTest {
     public void clearTaskShouldHaveDetails() throws Exception {
         when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository));
 
-        String name1 = "name1";
-        String name2 = "name2";
         mailRepository.store(FakeMail.builder()
-            .name(name1)
+            .name(NAME_1)
             .build());
         mailRepository.store(FakeMail.builder()
-            .name(name2)
+            .name(NAME_2)
             .build());
 
         String taskId = with()
@@ -528,10 +543,10 @@ public class MailRepositoriesRoutesTest {
         when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository));
 
         mailRepository.store(FakeMail.builder()
-            .name("name1")
+            .name(NAME_1)
             .build());
         mailRepository.store(FakeMail.builder()
-            .name("name2")
+            .name(NAME_2)
             .build());
 
         String taskId = with()
@@ -576,4 +591,583 @@ public class MailRepositoriesRoutesTest {
             .body("message", is(URL_MY_REPO + "does not exist"));
     }
 
+    @Test
+    public void reprocessingAllTaskShouldCreateATask() throws Exception {
+        when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository));
+
+        when()
+            .patch(URL_ESCAPED_MY_REPO + "/mails?action=reprocess")
+        .then()
+            .statusCode(HttpStatus.CREATED_201)
+            .header("Location", is(notNullValue()))
+            .body("taskId", is(notNullValue()));
+    }
+
+    @Test
+    public void reprocessingAllTaskShouldRejectInvalidActions() throws Exception {
+        when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository));
+
+        when()
+            .patch(URL_ESCAPED_MY_REPO + "/mails?action=invalid")
+        .then()
+            .statusCode(HttpStatus.BAD_REQUEST_400)
+            .body("statusCode", is(400))
+            .body("type", is(ErrorResponder.ErrorType.INVALID_ARGUMENT.getType()))
+            .body("message", is("action query parameter is mandatory. The only supported value is `reprocess`"));
+    }
+
+    @Test
+    public void reprocessingAllTaskShouldRequireAction() throws Exception {
+        when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository));
+
+        when()
+            .patch(URL_ESCAPED_MY_REPO + "/mails")
+        .then()
+            .statusCode(HttpStatus.BAD_REQUEST_400)
+            .body("statusCode", is(400))
+            .body("type", is(ErrorResponder.ErrorType.INVALID_ARGUMENT.getType()))
+            .body("message", is("action query parameter is mandatory. The only supported value is `reprocess`"));
+    }
+
+    @Test
+    public void reprocessingAllTaskShouldIncludeDetailsWhenDefaultValues() throws Exception {
+        when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository));
+        mailRepository.store(FakeMail.builder()
+            .name(NAME_1)
+            .build());
+        mailRepository.store(FakeMail.builder()
+            .name(NAME_2)
+            .build());
+
+        String taskId = with()
+            .patch(URL_ESCAPED_MY_REPO + "/mails?action=reprocess")
+            .jsonPath()
+            .get("taskId");
+
+        given()
+            .basePath(TasksRoutes.BASE)
+        .when()
+            .get(taskId + "/await")
+        .then()
+            .body("status", is("completed"))
+            .body("taskId", is(notNullValue()))
+            .body("type", is(ReprocessingAllMailsTask.TYPE))
+            .body("additionalInformation.repositoryUrl", is(URL_MY_REPO))
+            .body("additionalInformation.initialCount", is(2))
+            .body("additionalInformation.remainingCount", is(0))
+            .body("additionalInformation.targetProcessor", isEmptyOrNullString())
+            .body("additionalInformation.targetQueue", is(MailQueueFactory.SPOOL))
+            .body("startedDate", is(notNullValue()))
+            .body("submitDate", is(notNullValue()))
+            .body("completedDate", is(notNullValue()));
+    }
+
+    @Test
+    public void reprocessingAllTaskShouldIncludeDetails() throws Exception {
+        when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository));
+        String name1 = "name1";
+        String name2 = "name2";
+        mailRepository.store(FakeMail.builder()
+            .name(name1)
+            .build());
+        mailRepository.store(FakeMail.builder()
+            .name(name2)
+            .build());
+
+        String transport = "transport";
+        String taskId = with()
+            .patch(URL_ESCAPED_MY_REPO + "/mails?action=reprocess" +
+                "&queue=" + CUSTOM_QUEUE +
+                "&processor=" + transport)
+            .jsonPath()
+            .get("taskId");
+
+        given()
+            .basePath(TasksRoutes.BASE)
+        .when()
+            .get(taskId + "/await")
+        .then()
+            .body("status", is("completed"))
+            .body("taskId", is(notNullValue()))
+            .body("type", is(ReprocessingAllMailsTask.TYPE))
+            .body("additionalInformation.repositoryUrl", is(URL_MY_REPO))
+            .body("additionalInformation.initialCount", is(2))
+            .body("additionalInformation.remainingCount", is(0))
+            .body("additionalInformation.targetProcessor", is(transport))
+            .body("additionalInformation.targetQueue", is(CUSTOM_QUEUE))
+            .body("startedDate", is(notNullValue()))
+            .body("submitDate", is(notNullValue()))
+            .body("completedDate", is(notNullValue()));
+    }
+
+    @Test
+    public void reprocessingAllTaskShouldClearMailRepository() throws Exception {
+        when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository));
+        String name1 = "name1";
+        String name2 = "name2";
+        mailRepository.store(FakeMail.builder()
+            .name(name1)
+            .build());
+        mailRepository.store(FakeMail.builder()
+            .name(name2)
+            .build());
+
+        String transport = "transport";
+        String taskId = with()
+            .patch(URL_ESCAPED_MY_REPO + "/mails?action=reprocess" +
+                "&queue=" + CUSTOM_QUEUE +
+                "&processor=" + transport)
+            .jsonPath()
+            .get("taskId");
+
+        with()
+            .basePath(TasksRoutes.BASE)
+            .get(taskId + "/await");
+
+        assertThat(mailRepository.list()).isEmpty();
+    }
+
+    @Test
+    public void reprocessingAllTaskShouldEnqueueMailsOnDefaultQueue() throws Exception {
+        when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository));
+        mailRepository.store(FakeMail.builder()
+            .name(NAME_1)
+            .build());
+        mailRepository.store(FakeMail.builder()
+            .name(NAME_2)
+            .build());
+
+        String taskId = with()
+            .patch(URL_ESCAPED_MY_REPO + "/mails?action=reprocess")
+            .jsonPath()
+            .get("taskId");
+
+        with()
+            .basePath(TasksRoutes.BASE)
+            .get(taskId + "/await");
+
+        assertThat(spoolQueue.browse())
+            .extracting(ManageableMailQueue.MailQueueItemView::getMail)
+            .extracting(Mail::getName)
+            .containsOnly(NAME_1, NAME_2);
+    }
+
+    @Test
+    public void reprocessingAllTaskShouldPreserveStateWhenProcessorIsNotSpecified() throws Exception {
+        when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository));
+        String state1 = "state1";
+        String state2 = "state2";
+        mailRepository.store(FakeMail.builder()
+            .name(NAME_1)
+            .state(state1)
+            .build());
+        mailRepository.store(FakeMail.builder()
+            .name(NAME_2)
+            .state(state2)
+            .build());
+
+        String taskId = with()
+            .patch(URL_ESCAPED_MY_REPO + "/mails?action=reprocess")
+            .jsonPath()
+            .get("taskId");
+
+        with()
+            .basePath(TasksRoutes.BASE)
+            .get(taskId + "/await");
+
+        assertThat(spoolQueue.browse())
+            .extracting(ManageableMailQueue.MailQueueItemView::getMail)
+            .extracting(Mail::getState)
+            .containsOnly(state1, state2);
+    }
+
+    @Test
+    public void reprocessingAllTaskShouldOverWriteStateWhenProcessorSpecified() throws Exception {
+        when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository));
+        String state1 = "state1";
+        String state2 = "state2";
+        mailRepository.store(FakeMail.builder()
+            .name(NAME_1)
+            .state(state1)
+            .build());
+        mailRepository.store(FakeMail.builder()
+            .name(NAME_2)
+            .state(state2)
+            .build());
+
+        String transport = "transport";
+        String taskId = with()
+            .patch(URL_ESCAPED_MY_REPO + "/mails?action=reprocess"
+                + "&processor=" + transport)
+            .jsonPath()
+            .get("taskId");
+
+        with()
+            .basePath(TasksRoutes.BASE)
+            .get(taskId + "/await");
+
+        assertThat(spoolQueue.browse())
+            .extracting(ManageableMailQueue.MailQueueItemView::getMail)
+            .extracting(Mail::getState)
+            .containsOnly(transport, transport);
+    }
+
+    @Test
+    public void reprocessingAllTaskShouldEnqueueMailsOnSpecifiedQueue() throws Exception {
+        when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository));
+        mailRepository.store(FakeMail.builder()
+            .name(NAME_1)
+            .build());
+        mailRepository.store(FakeMail.builder()
+            .name(NAME_2)
+            .build());
+
+        String taskId = with()
+            .patch(URL_ESCAPED_MY_REPO + "/mails?action=reprocess"
+                + "&queue=" + CUSTOM_QUEUE)
+            .jsonPath()
+            .get("taskId");
+
+        with()
+            .basePath(TasksRoutes.BASE)
+            .get(taskId + "/await");
+
+        assertThat(customQueue.browse())
+            .extracting(ManageableMailQueue.MailQueueItemView::getMail)
+            .extracting(Mail::getName)
+            .containsOnly(NAME_1, NAME_2);
+    }
+
+    @Test
+    public void reprocessingOneTaskShouldCreateATask() throws Exception {
+        when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository));
+
+        mailRepository.store(FakeMail.builder()
+            .name(NAME_1)
+            .build());
+
+        when()
+            .patch(URL_ESCAPED_MY_REPO + "/mails/name1?action=reprocess")
+        .then()
+            .statusCode(HttpStatus.CREATED_201)
+            .header("Location", is(notNullValue()))
+            .body("taskId", is(notNullValue()));
+    }
+
+    @Test
+    public void reprocessingOneTaskShouldRejectInvalidActions() throws Exception {
+        when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository));
+
+        mailRepository.store(FakeMail.builder()
+            .name(NAME_1)
+            .build());
+
+        when()
+            .patch(URL_ESCAPED_MY_REPO + "/mails/" + NAME_1 + "?action=invalid")
+        .then()
+            .statusCode(HttpStatus.BAD_REQUEST_400)
+            .body("statusCode", is(400))
+            .body("type", is(ErrorResponder.ErrorType.INVALID_ARGUMENT.getType()))
+            .body("message", is("action query parameter is mandatory. The only supported value is `reprocess`"));
+    }
+
+    @Test
+    public void reprocessingOneTaskShouldRequireAction() throws Exception {
+        when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository));
+
+        mailRepository.store(FakeMail.builder()
+            .name(NAME_1)
+            .build());
+
+        when()
+            .patch(URL_ESCAPED_MY_REPO + "/mails/" + NAME_1)
+        .then()
+            .statusCode(HttpStatus.BAD_REQUEST_400)
+            .body("statusCode", is(400))
+            .body("type", is(ErrorResponder.ErrorType.INVALID_ARGUMENT.getType()))
+            .body("message", is("action query parameter is mandatory. The only supported value is `reprocess`"));
+    }
+
+    @Test
+    public void reprocessingOneTaskShouldIncludeDetailsWhenDefaultValues() throws Exception {
+        when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository));
+        String name1 = "name1";
+        String name2 = "name2";
+        mailRepository.store(FakeMail.builder()
+            .name(name1)
+            .build());
+        mailRepository.store(FakeMail.builder()
+            .name(name2)
+            .build());
+
+        String taskId = with()
+            .patch(URL_ESCAPED_MY_REPO + "/mails/" + NAME_1 + "?action=reprocess")
+            .jsonPath()
+            .get("taskId");
+
+        given()
+            .basePath(TasksRoutes.BASE)
+        .when()
+            .get(taskId + "/await")
+        .then()
+            .body("status", is("completed"))
+            .body("taskId", is(notNullValue()))
+            .body("type", is(ReprocessingOneMailTask.TYPE))
+            .body("additionalInformation.repositoryUrl", is(URL_MY_REPO))
+            .body("additionalInformation.mailKey", is(NAME_1))
+            .body("additionalInformation.targetProcessor", isEmptyOrNullString())
+            .body("additionalInformation.targetQueue", is(MailQueueFactory.SPOOL))
+            .body("startedDate", is(notNullValue()))
+            .body("submitDate", is(notNullValue()))
+            .body("completedDate", is(notNullValue()));
+    }
+
+    @Test
+    public void reprocessingOneTaskShouldIncludeDetails() throws Exception {
+        when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository));
+        String name1 = "name1";
+        String name2 = "name2";
+        mailRepository.store(FakeMail.builder()
+            .name(name1)
+            .build());
+        mailRepository.store(FakeMail.builder()
+            .name(name2)
+            .build());
+
+        String transport = "transport";
+        String taskId = with()
+            .patch(URL_ESCAPED_MY_REPO + "/mails/" + NAME_1 + "?action=reprocess" +
+                "&queue=" + CUSTOM_QUEUE +
+                "&processor=" + transport)
+            .jsonPath()
+            .get("taskId");
+
+        given()
+            .basePath(TasksRoutes.BASE)
+        .when()
+            .get(taskId + "/await")
+        .then()
+            .body("status", is("completed"))
+            .body("taskId", is(notNullValue()))
+            .body("type", is(ReprocessingOneMailTask.TYPE))
+            .body("additionalInformation.repositoryUrl", is(URL_MY_REPO))
+            .body("additionalInformation.mailKey", is(NAME_1))
+            .body("additionalInformation.targetProcessor", is(transport))
+            .body("additionalInformation.targetQueue", is(CUSTOM_QUEUE))
+            .body("startedDate", is(notNullValue()))
+            .body("submitDate", is(notNullValue()))
+            .body("completedDate", is(notNullValue()));
+    }
+
+    @Test
+    public void reprocessingOneTaskShouldRemoveMailFromRepository() throws Exception {
+        when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository));
+        String name1 = "name1";
+        String name2 = "name2";
+        mailRepository.store(FakeMail.builder()
+            .name(name1)
+            .build());
+        mailRepository.store(FakeMail.builder()
+            .name(name2)
+            .build());
+
+        String transport = "transport";
+        String taskId = with()
+            .patch(URL_ESCAPED_MY_REPO + "/mails/" + NAME_1 + "?action=reprocess" +
+                "&queue=" + CUSTOM_QUEUE +
+                "&processor=" + transport)
+            .jsonPath()
+            .get("taskId");
+
+        with()
+            .basePath(TasksRoutes.BASE)
+            .get(taskId + "/await");
+
+        assertThat(mailRepository.list())
+            .containsOnly(NAME_2);
+    }
+
+    @Test
+    public void reprocessingOneTaskShouldEnqueueMailsOnDefaultQueue() throws Exception {
+        when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository));
+        mailRepository.store(FakeMail.builder()
+            .name(NAME_1)
+            .build());
+        mailRepository.store(FakeMail.builder()
+            .name(NAME_2)
+            .build());
+
+        String taskId = with()
+            .patch(URL_ESCAPED_MY_REPO + "/mails/" + NAME_1 + "?action=reprocess")
+            .jsonPath()
+            .get("taskId");
+
+        with()
+            .basePath(TasksRoutes.BASE)
+            .get(taskId + "/await");
+
+        assertThat(spoolQueue.browse())
+            .extracting(ManageableMailQueue.MailQueueItemView::getMail)
+            .extracting(Mail::getName)
+            .containsOnly(NAME_1);
+    }
+
+    @Test
+    public void reprocessingOneTaskShouldPreserveStateWhenProcessorIsNotSpecified() throws Exception {
+        when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository));
+        String state1 = "state1";
+        String state2 = "state2";
+        mailRepository.store(FakeMail.builder()
+            .name(NAME_1)
+            .state(state1)
+            .build());
+        mailRepository.store(FakeMail.builder()
+            .name(NAME_2)
+            .state(state2)
+            .build());
+
+        String taskId = with()
+            .patch(URL_ESCAPED_MY_REPO + "/mails/" + NAME_1 + "?action=reprocess")
+            .jsonPath()
+            .get("taskId");
+
+        with()
+            .basePath(TasksRoutes.BASE)
+            .get(taskId + "/await");
+
+        assertThat(spoolQueue.browse())
+            .extracting(ManageableMailQueue.MailQueueItemView::getMail)
+            .extracting(Mail::getState)
+            .containsOnly(state1);
+    }
+
+    @Test
+    public void reprocessingOneTaskShouldOverWriteStateWhenProcessorSpecified() throws Exception {
+        when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository));
+        String state1 = "state1";
+        String state2 = "state2";
+        mailRepository.store(FakeMail.builder()
+            .name(NAME_1)
+            .state(state1)
+            .build());
+        mailRepository.store(FakeMail.builder()
+            .name(NAME_2)
+            .state(state2)
+            .build());
+
+        String transport = "transport";
+        String taskId = with()
+            .patch(URL_ESCAPED_MY_REPO + "/mails/" + NAME_1 + "?action=reprocess"
+                + "&processor=" + transport)
+            .jsonPath()
+            .get("taskId");
+
+        with()
+            .basePath(TasksRoutes.BASE)
+            .get(taskId + "/await");
+
+        assertThat(spoolQueue.browse())
+            .extracting(ManageableMailQueue.MailQueueItemView::getMail)
+            .extracting(Mail::getState)
+            .containsOnly(transport);
+    }
+
+    @Test
+    public void reprocessingOneTaskShouldEnqueueMailsOnSpecifiedQueue() throws Exception {
+        when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository));
+        mailRepository.store(FakeMail.builder()
+            .name(NAME_1)
+            .build());
+        mailRepository.store(FakeMail.builder()
+            .name(NAME_2)
+            .build());
+
+        String taskId = with()
+            .patch(URL_ESCAPED_MY_REPO + "/mails/" + NAME_1 + "?action=reprocess"
+                + "&queue=" + CUSTOM_QUEUE)
+            .jsonPath()
+            .get("taskId");
+
+        with()
+            .basePath(TasksRoutes.BASE)
+            .get(taskId + "/await");
+
+        assertThat(customQueue.browse())
+            .extracting(ManageableMailQueue.MailQueueItemView::getMail)
+            .extracting(Mail::getName)
+            .containsOnly(NAME_1);
+    }
+
+    @Test
+    public void reprocessingOneTaskShouldNotEnqueueUnknownMailKey() throws Exception {
+        when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository));
+        mailRepository.store(FakeMail.builder()
+            .name(NAME_1)
+            .build());
+        mailRepository.store(FakeMail.builder()
+            .name(NAME_2)
+            .build());
+
+        String taskId = with()
+            .patch(URL_ESCAPED_MY_REPO + "/mails/" + "unknown" + "?action=reprocess"
+                + "&queue=" + CUSTOM_QUEUE)
+            .jsonPath()
+            .get("taskId");
+
+        with()
+            .basePath(TasksRoutes.BASE)
+            .get(taskId + "/await");
+
+        assertThat(customQueue.browse())
+            .isEmpty();
+    }
+
+    @Test
+    public void reprocessingOneTaskShouldNotRemoveMailFromRepositoryWhenUnknownMailKey() throws Exception {
+        when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository));
+        mailRepository.store(FakeMail.builder()
+            .name(NAME_1)
+            .build());
+        mailRepository.store(FakeMail.builder()
+            .name(NAME_2)
+            .build());
+
+        String taskId = with()
+            .patch(URL_ESCAPED_MY_REPO + "/mails/" + "unknown" + "?action=reprocess"
+                + "&queue=" + CUSTOM_QUEUE)
+            .jsonPath()
+            .get("taskId");
+
+        with()
+            .basePath(TasksRoutes.BASE)
+            .get(taskId + "/await");
+
+        assertThat(mailRepository.size())
+            .isEqualTo(2);
+    }
+
+    @Test
+    public void reprocessingOneTaskShouldFailWhenUnknownMailKey() throws Exception {
+        when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository));
+        mailRepository.store(FakeMail.builder()
+            .name(NAME_1)
+            .build());
+        mailRepository.store(FakeMail.builder()
+            .name(NAME_2)
+            .build());
+
+        String taskId = with()
+            .patch(URL_ESCAPED_MY_REPO + "/mails/" + "unknown" + "?action=reprocess"
+                + "&queue=" + CUSTOM_QUEUE)
+            .jsonPath()
+            .get("taskId");
+
+        given()
+            .basePath(TasksRoutes.BASE)
+        .when()
+            .get(taskId + "/await")
+            .then()
+            .body("status", is("failed"));
+    }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org