You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2018/01/26 01:13:46 UTC
[02/10] james-project git commit: JAMES-2294 Routes for mail
reprocessing
JAMES-2294 Routes for mail reprocessing
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/c73d5de8
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/c73d5de8
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/c73d5de8
Branch: refs/heads/master
Commit: c73d5de8df02bc638dbcfe5916e63163dd3b5111
Parents: e6fdeae
Author: benwa <bt...@linagora.com>
Authored: Thu Jan 25 10:57:41 2018 +0700
Committer: benwa <bt...@linagora.com>
Committed: Fri Jan 26 08:12:05 2018 +0700
----------------------------------------------------------------------
.../webadmin/routes/MailRepositoriesRoutes.java | 139 +++-
.../routes/MailRepositoriesRoutesTest.java | 646 ++++++++++++++++++-
2 files changed, 755 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/c73d5de8/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/routes/MailRepositoriesRoutes.java
----------------------------------------------------------------------
diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/routes/MailRepositoriesRoutes.java b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/routes/MailRepositoriesRoutes.java
index 1078206..a90323b 100644
--- a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/routes/MailRepositoriesRoutes.java
+++ b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/routes/MailRepositoriesRoutes.java
@@ -19,6 +19,7 @@
package org.apache.james.webadmin.routes;
+import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.util.List;
@@ -32,6 +33,7 @@ import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import org.apache.james.mailrepository.api.MailRepositoryStore;
+import org.apache.james.queue.api.MailQueueFactory;
import org.apache.james.task.Task;
import org.apache.james.task.TaskId;
import org.apache.james.task.TaskManager;
@@ -42,6 +44,9 @@ import org.apache.james.webadmin.Routes;
import org.apache.james.webadmin.dto.ExtendedMailRepositoryResponse;
import org.apache.james.webadmin.dto.TaskIdDto;
import org.apache.james.webadmin.service.MailRepositoryStoreService;
+import org.apache.james.webadmin.service.ReprocessingAllMailsTask;
+import org.apache.james.webadmin.service.ReprocessingOneMailTask;
+import org.apache.james.webadmin.service.ReprocessingService;
import org.apache.james.webadmin.utils.ErrorResponder;
import org.apache.james.webadmin.utils.ErrorResponder.ErrorType;
import org.apache.james.webadmin.utils.JsonTransformer;
@@ -55,6 +60,7 @@ import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
+import io.swagger.jaxrs.PATCH;
import spark.Request;
import spark.Service;
@@ -67,13 +73,15 @@ public class MailRepositoriesRoutes implements Routes {
private final JsonTransformer jsonTransformer;
private final MailRepositoryStoreService repositoryStoreService;
+ private final ReprocessingService reprocessingService;
private final TaskManager taskManager;
private Service service;
@Inject
- public MailRepositoriesRoutes(MailRepositoryStoreService repositoryStoreService, JsonTransformer jsonTransformer, TaskManager taskManager) {
+ public MailRepositoriesRoutes(MailRepositoryStoreService repositoryStoreService, JsonTransformer jsonTransformer, ReprocessingService reprocessingService, TaskManager taskManager) {
this.repositoryStoreService = repositoryStoreService;
this.jsonTransformer = jsonTransformer;
+ this.reprocessingService = reprocessingService;
this.taskManager = taskManager;
}
@@ -92,6 +100,10 @@ public class MailRepositoriesRoutes implements Routes {
defineDeleteMail();
defineDeleteAll();
+
+ defineReprocessAll();
+
+ defineReprocessOne();
}
@GET
@@ -168,7 +180,7 @@ public class MailRepositoriesRoutes implements Routes {
})
public void defineGetMail() {
service.get(MAIL_REPOSITORIES + "/:encodedUrl/mails/:mailKey", (request, response) -> {
- String url = URLDecoder.decode(request.params("encodedUrl"), StandardCharsets.UTF_8.displayName());
+ String url = decodedRepositoryUrl(request);
String mailKey = request.params("mailKey");
try {
return repositoryStoreService.retrieveMail(url, mailKey)
@@ -228,7 +240,7 @@ public class MailRepositoriesRoutes implements Routes {
})
public void defineDeleteMail() {
service.delete(MAIL_REPOSITORIES + "/:encodedUrl/mails/:mailKey", (request, response) -> {
- String url = URLDecoder.decode(request.params("encodedUrl"), StandardCharsets.UTF_8.displayName());
+ String url = decodedRepositoryUrl(request);
String mailKey = request.params("mailKey");
try {
response.status(HttpStatus.NO_CONTENT_204);
@@ -255,7 +267,7 @@ public class MailRepositoriesRoutes implements Routes {
})
public void defineDeleteAll() {
service.delete(MAIL_REPOSITORIES + "/:encodedUrl/mails", (request, response) -> {
- String url = URLDecoder.decode(request.params("encodedUrl"), StandardCharsets.UTF_8.displayName());
+ String url = decodedRepositoryUrl(request);
try {
Task task = repositoryStoreService.createClearMailRepositoryTask(url);
TaskId taskId = taskManager.submit(task);
@@ -271,6 +283,125 @@ public class MailRepositoriesRoutes implements Routes {
}, jsonTransformer);
}
+ @PATCH
+ @Path("/{encodedUrl}/mails")
+ @ApiOperation(value = "Reprocessing all mails in that mailRepository")
+ @ApiImplicitParams({
+ @ApiImplicitParam(
+ required = true,
+ name = "action",
+ paramType = "query parameter",
+ dataType = "String",
+ defaultValue = "none",
+ example = "?action=reprocess",
+ value = "Compulsory. Only supported value is `reprocess`"),
+ @ApiImplicitParam(
+ required = false,
+ name = "queue",
+ paramType = "query parameter",
+ dataType = "String",
+ defaultValue = "spool",
+ example = "?queue=outgoing",
+ value = "Indicates in which queue the mails stored in the repository should be re-enqueued"),
+ @ApiImplicitParam(
+ required = false,
+ paramType = "query parameter",
+ name = "processor",
+ dataType = "String",
+ defaultValue = "absent",
+ example = "?processor=transport",
+ value = "If present, modifies the state property of the mail to allow their processing by a specific mail container processor.")
+ })
+ @ApiResponses(value = {
+ @ApiResponse(code = HttpStatus.CREATED_201, message = "Task is created", response = TaskIdDto.class),
+ @ApiResponse(code = HttpStatus.INTERNAL_SERVER_ERROR_500, message = "Internal server error - Something went bad on the server side."),
+ @ApiResponse(code = HttpStatus.BAD_REQUEST_400, message = "Bad request - unknown action")
+ })
+ public void defineReprocessAll() {
+ service.patch(MAIL_REPOSITORIES + "/:encodedUrl/mails", (request, response) -> {
+ Task task = toAllMailReprocessingTask(request);
+ TaskId taskId = taskManager.submit(task);
+ return TaskIdDto.respond(response, taskId);
+ }, jsonTransformer);
+ }
+
+ private Task toAllMailReprocessingTask(Request request) throws UnsupportedEncodingException, MailRepositoryStore.MailRepositoryStoreException, MessagingException {
+ String url = decodedRepositoryUrl(request);
+ enforceActionParameter(request);
+ Optional<String> targetProcessor = Optional.ofNullable(request.queryParams("processor"));
+ String targetQueue = Optional.ofNullable(request.queryParams("queue")).orElse(MailQueueFactory.SPOOL);
+
+ Long repositorySize = repositoryStoreService.size(url).orElse(0L);
+ return new ReprocessingAllMailsTask(reprocessingService, repositorySize, url, targetQueue, targetProcessor);
+ }
+
+ @PATCH
+ @Path("/{encodedUrl}/mails/{key}")
+ @ApiOperation(value = "Reprocessing a single mail in that mailRepository")
+ @ApiImplicitParams({
+ @ApiImplicitParam(
+ required = true,
+ name = "action",
+ paramType = "query parameter",
+ dataType = "String",
+ defaultValue = "none",
+ example = "?action=reprocess",
+ value = "Compulsory. Only supported value is `reprocess`"),
+ @ApiImplicitParam(
+ required = false,
+ name = "queue",
+ paramType = "query parameter",
+ dataType = "String",
+ defaultValue = "spool",
+ example = "?queue=outgoing",
+ value = "Indicates in which queue the mails stored in the repository should be re-enqueued"),
+ @ApiImplicitParam(
+ required = false,
+ paramType = "query parameter",
+ name = "processor",
+ dataType = "String",
+ defaultValue = "absent",
+ example = "?processor=transport",
+ value = "If present, modifies the state property of the mail to allow their processing by a specific mail container processor.")
+ })
+ @ApiResponses(value = {
+ @ApiResponse(code = HttpStatus.CREATED_201, message = "Task is created", response = TaskIdDto.class),
+ @ApiResponse(code = HttpStatus.INTERNAL_SERVER_ERROR_500, message = "Internal server error - Something went bad on the server side."),
+ @ApiResponse(code = HttpStatus.BAD_REQUEST_400, message = "Bad request - unknown action")
+ })
+ public void defineReprocessOne() {
+ service.patch(MAIL_REPOSITORIES + "/:encodedUrl/mails/:key", (request, response) -> {
+ Task task = toOneMailReprocessingTask(request);
+ TaskId taskId = taskManager.submit(task);
+ return TaskIdDto.respond(response, taskId);
+ }, jsonTransformer);
+ }
+
+ private Task toOneMailReprocessingTask(Request request) throws UnsupportedEncodingException {
+ String url = decodedRepositoryUrl(request);
+ String key = request.params("key");
+ enforceActionParameter(request);
+ Optional<String> targetProcessor = Optional.ofNullable(request.queryParams("processor"));
+ String targetQueue = Optional.ofNullable(request.queryParams("queue")).orElse(MailQueueFactory.SPOOL);
+
+ return new ReprocessingOneMailTask(reprocessingService, url, targetQueue, key, targetProcessor);
+ }
+
+ private void enforceActionParameter(Request request) {
+ String action = request.queryParams("action");
+ if (!"reprocess".equals(action)) {
+ throw ErrorResponder.builder()
+ .statusCode(HttpStatus.BAD_REQUEST_400)
+ .type(ErrorType.INVALID_ARGUMENT)
+ .message("action query parameter is mandatory. The only supported value is `reprocess`")
+ .haltError();
+ }
+ }
+
+ private String decodedRepositoryUrl(Request request) throws UnsupportedEncodingException {
+ return URLDecoder.decode(request.params("encodedUrl"), StandardCharsets.UTF_8.displayName());
+ }
+
private Optional<Integer> assertPositiveInteger(Request request, String parameterName) {
try {
return Optional.ofNullable(request.queryParams(parameterName))
http://git-wip-us.apache.org/repos/asf/james-project/blob/c73d5de8/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/routes/MailRepositoriesRoutesTest.java
----------------------------------------------------------------------
diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/routes/MailRepositoriesRoutesTest.java b/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/routes/MailRepositoriesRoutesTest.java
index cbad2b4..a6dd4de 100644
--- a/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/routes/MailRepositoriesRoutesTest.java
+++ b/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/routes/MailRepositoriesRoutesTest.java
@@ -41,13 +41,21 @@ import java.util.Optional;
import org.apache.james.mailrepository.api.MailRepositoryStore;
import org.apache.james.mailrepository.memory.MemoryMailRepository;
import org.apache.james.metrics.api.NoopMetricFactory;
+import org.apache.james.queue.api.MailQueueFactory;
+import org.apache.james.queue.api.ManageableMailQueue;
+import org.apache.james.queue.api.RawMailQueueItemDecoratorFactory;
+import org.apache.james.queue.memory.MemoryMailQueueFactory;
import org.apache.james.task.MemoryTaskManager;
import org.apache.james.webadmin.WebAdminServer;
import org.apache.james.webadmin.WebAdminUtils;
import org.apache.james.webadmin.service.ClearMailRepositoryTask;
import org.apache.james.webadmin.service.MailRepositoryStoreService;
+import org.apache.james.webadmin.service.ReprocessingAllMailsTask;
+import org.apache.james.webadmin.service.ReprocessingOneMailTask;
+import org.apache.james.webadmin.service.ReprocessingService;
import org.apache.james.webadmin.utils.ErrorResponder;
import org.apache.james.webadmin.utils.JsonTransformer;
+import org.apache.mailet.Mail;
import org.apache.mailet.base.test.FakeMail;
import org.eclipse.jetty.http.HttpStatus;
import org.junit.After;
@@ -63,9 +71,14 @@ public class MailRepositoriesRoutesTest {
public static final String URL_MY_REPO = "url://myRepo";
public static final String URL_ESCAPED_MY_REPO = "url%3A%2F%2FmyRepo";
public static final String MY_REPO_MAILS = "url%3A%2F%2FmyRepo/mails";
+ public static final String CUSTOM_QUEUE = "customQueue";
+ public static final String NAME_1 = "name1";
+ public static final String NAME_2 = "name2";
private WebAdminServer webAdminServer;
private MailRepositoryStore mailRepositoryStore;
private MemoryMailRepository mailRepository;
+ private ManageableMailQueue spoolQueue;
+ private ManageableMailQueue customQueue;
@Before
public void setUp() throws Exception {
@@ -74,10 +87,18 @@ public class MailRepositoriesRoutesTest {
MemoryTaskManager taskManager = new MemoryTaskManager();
JsonTransformer jsonTransformer = new JsonTransformer();
+ MailQueueFactory<ManageableMailQueue> queueFactory = new MemoryMailQueueFactory(new RawMailQueueItemDecoratorFactory());
+ spoolQueue = queueFactory.createQueue(MailQueueFactory.SPOOL);
+ customQueue = queueFactory.createQueue(CUSTOM_QUEUE);
+
+ MailRepositoryStoreService repositoryStoreService = new MailRepositoryStoreService(mailRepositoryStore);
+
+ ReprocessingService reprocessingService = new ReprocessingService(queueFactory, repositoryStoreService);
+
webAdminServer = WebAdminUtils.createWebAdminServer(
new NoopMetricFactory(),
- new MailRepositoriesRoutes(new MailRepositoryStoreService(mailRepositoryStore),
- jsonTransformer, taskManager),
+ new MailRepositoriesRoutes(repositoryStoreService,
+ jsonTransformer, reprocessingService, taskManager),
new TasksRoutes(taskManager, jsonTransformer));
webAdminServer.configure(NO_CONFIGURATION);
webAdminServer.await();
@@ -289,10 +310,10 @@ public class MailRepositoriesRoutesTest {
when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository));
mailRepository.store(FakeMail.builder()
- .name("name1")
+ .name(NAME_1)
.build());
mailRepository.store(FakeMail.builder()
- .name("name2")
+ .name(NAME_2)
.build());
when()
@@ -300,7 +321,7 @@ public class MailRepositoriesRoutesTest {
.then()
.statusCode(HttpStatus.OK_200)
.body("", hasSize(2))
- .body("mailKey", containsInAnyOrder("name1", "name2"));
+ .body("mailKey", containsInAnyOrder(NAME_1, NAME_2));
}
@Test
@@ -354,7 +375,7 @@ public class MailRepositoriesRoutesTest {
when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository));
mailRepository.store(FakeMail.builder()
- .name("name1")
+ .name(NAME_1)
.build());
given()
@@ -369,7 +390,7 @@ public class MailRepositoriesRoutesTest {
public void retrievingAMailShouldDisplayItsInformation() throws Exception {
when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository));
- String name = "name1";
+ String name = NAME_1;
String sender = "sender@domain";
String recipient1 = "recipient1@domain";
String recipient2 = "recipient2@domain";
@@ -398,16 +419,15 @@ public class MailRepositoriesRoutesTest {
public void retrievingAMailShouldNotFailWhenOnlyNameProperty() throws Exception {
when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository));
- String name = "name1";
mailRepository.store(FakeMail.builder()
- .name(name)
+ .name(NAME_1)
.build());
when()
- .get(URL_ESCAPED_MY_REPO + "/mails/" + name)
+ .get(URL_ESCAPED_MY_REPO + "/mails/" + NAME_1)
.then()
.statusCode(HttpStatus.OK_200)
- .body("name", is(name))
+ .body("name", is(NAME_1))
.body("sender", isEmptyOrNullString())
.body("state", isEmptyOrNullString())
.body("error", isEmptyOrNullString())
@@ -432,37 +452,34 @@ public class MailRepositoriesRoutesTest {
public void deletingAMailShouldRemoveIt() throws Exception {
when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository));
- String name1 = "name1";
- String name2 = "name2";
mailRepository.store(FakeMail.builder()
- .name(name1)
+ .name(NAME_1)
.build());
mailRepository.store(FakeMail.builder()
- .name(name2)
+ .name(NAME_2)
.build());
given()
- .delete(URL_ESCAPED_MY_REPO + "/mails/" + name1);
+ .delete(URL_ESCAPED_MY_REPO + "/mails/" + NAME_1);
when()
.get(URL_ESCAPED_MY_REPO + "/mails")
.then()
.statusCode(HttpStatus.OK_200)
.body("", hasSize(1))
- .body("mailKey", contains(name2));
+ .body("mailKey", contains(NAME_2));
}
@Test
public void deletingAMailShouldReturnOkWhenExist() throws Exception {
when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository));
- String name1 = "name1";
mailRepository.store(FakeMail.builder()
- .name(name1)
+ .name(NAME_1)
.build());
when()
- .delete(URL_ESCAPED_MY_REPO + "/mails/" + name1)
+ .delete(URL_ESCAPED_MY_REPO + "/mails/" + NAME_1)
.then()
.statusCode(HttpStatus.NO_CONTENT_204);
}
@@ -493,13 +510,11 @@ public class MailRepositoriesRoutesTest {
public void clearTaskShouldHaveDetails() throws Exception {
when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository));
- String name1 = "name1";
- String name2 = "name2";
mailRepository.store(FakeMail.builder()
- .name(name1)
+ .name(NAME_1)
.build());
mailRepository.store(FakeMail.builder()
- .name(name2)
+ .name(NAME_2)
.build());
String taskId = with()
@@ -528,10 +543,10 @@ public class MailRepositoriesRoutesTest {
when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository));
mailRepository.store(FakeMail.builder()
- .name("name1")
+ .name(NAME_1)
.build());
mailRepository.store(FakeMail.builder()
- .name("name2")
+ .name(NAME_2)
.build());
String taskId = with()
@@ -576,4 +591,583 @@ public class MailRepositoriesRoutesTest {
.body("message", is(URL_MY_REPO + "does not exist"));
}
+ @Test
+ public void reprocessingAllTaskShouldCreateATask() throws Exception {
+ when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository));
+
+ when()
+ .patch(URL_ESCAPED_MY_REPO + "/mails?action=reprocess")
+ .then()
+ .statusCode(HttpStatus.CREATED_201)
+ .header("Location", is(notNullValue()))
+ .body("taskId", is(notNullValue()));
+ }
+
+ @Test
+ public void reprocessingAllTaskShouldRejectInvalidActions() throws Exception {
+ when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository));
+
+ when()
+ .patch(URL_ESCAPED_MY_REPO + "/mails?action=invalid")
+ .then()
+ .statusCode(HttpStatus.BAD_REQUEST_400)
+ .body("statusCode", is(400))
+ .body("type", is(ErrorResponder.ErrorType.INVALID_ARGUMENT.getType()))
+ .body("message", is("action query parameter is mandatory. The only supported value is `reprocess`"));
+ }
+
+ @Test
+ public void reprocessingAllTaskShouldRequireAction() throws Exception {
+ when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository));
+
+ when()
+ .patch(URL_ESCAPED_MY_REPO + "/mails")
+ .then()
+ .statusCode(HttpStatus.BAD_REQUEST_400)
+ .body("statusCode", is(400))
+ .body("type", is(ErrorResponder.ErrorType.INVALID_ARGUMENT.getType()))
+ .body("message", is("action query parameter is mandatory. The only supported value is `reprocess`"));
+ }
+
+ @Test
+ public void reprocessingAllTaskShouldIncludeDetailsWhenDefaultValues() throws Exception {
+ when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository));
+ mailRepository.store(FakeMail.builder()
+ .name(NAME_1)
+ .build());
+ mailRepository.store(FakeMail.builder()
+ .name(NAME_2)
+ .build());
+
+ String taskId = with()
+ .patch(URL_ESCAPED_MY_REPO + "/mails?action=reprocess")
+ .jsonPath()
+ .get("taskId");
+
+ given()
+ .basePath(TasksRoutes.BASE)
+ .when()
+ .get(taskId + "/await")
+ .then()
+ .body("status", is("completed"))
+ .body("taskId", is(notNullValue()))
+ .body("type", is(ReprocessingAllMailsTask.TYPE))
+ .body("additionalInformation.repositoryUrl", is(URL_MY_REPO))
+ .body("additionalInformation.initialCount", is(2))
+ .body("additionalInformation.remainingCount", is(0))
+ .body("additionalInformation.targetProcessor", isEmptyOrNullString())
+ .body("additionalInformation.targetQueue", is(MailQueueFactory.SPOOL))
+ .body("startedDate", is(notNullValue()))
+ .body("submitDate", is(notNullValue()))
+ .body("completedDate", is(notNullValue()));
+ }
+
+ @Test
+ public void reprocessingAllTaskShouldIncludeDetails() throws Exception {
+ when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository));
+ String name1 = "name1";
+ String name2 = "name2";
+ mailRepository.store(FakeMail.builder()
+ .name(name1)
+ .build());
+ mailRepository.store(FakeMail.builder()
+ .name(name2)
+ .build());
+
+ String transport = "transport";
+ String taskId = with()
+ .patch(URL_ESCAPED_MY_REPO + "/mails?action=reprocess" +
+ "&queue=" + CUSTOM_QUEUE +
+ "&processor=" + transport)
+ .jsonPath()
+ .get("taskId");
+
+ given()
+ .basePath(TasksRoutes.BASE)
+ .when()
+ .get(taskId + "/await")
+ .then()
+ .body("status", is("completed"))
+ .body("taskId", is(notNullValue()))
+ .body("type", is(ReprocessingAllMailsTask.TYPE))
+ .body("additionalInformation.repositoryUrl", is(URL_MY_REPO))
+ .body("additionalInformation.initialCount", is(2))
+ .body("additionalInformation.remainingCount", is(0))
+ .body("additionalInformation.targetProcessor", is(transport))
+ .body("additionalInformation.targetQueue", is(CUSTOM_QUEUE))
+ .body("startedDate", is(notNullValue()))
+ .body("submitDate", is(notNullValue()))
+ .body("completedDate", is(notNullValue()));
+ }
+
+ @Test
+ public void reprocessingAllTaskShouldClearMailRepository() throws Exception {
+ when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository));
+ String name1 = "name1";
+ String name2 = "name2";
+ mailRepository.store(FakeMail.builder()
+ .name(name1)
+ .build());
+ mailRepository.store(FakeMail.builder()
+ .name(name2)
+ .build());
+
+ String transport = "transport";
+ String taskId = with()
+ .patch(URL_ESCAPED_MY_REPO + "/mails?action=reprocess" +
+ "&queue=" + CUSTOM_QUEUE +
+ "&processor=" + transport)
+ .jsonPath()
+ .get("taskId");
+
+ with()
+ .basePath(TasksRoutes.BASE)
+ .get(taskId + "/await");
+
+ assertThat(mailRepository.list()).isEmpty();
+ }
+
+ @Test
+ public void reprocessingAllTaskShouldEnqueueMailsOnDefaultQueue() throws Exception {
+ when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository));
+ mailRepository.store(FakeMail.builder()
+ .name(NAME_1)
+ .build());
+ mailRepository.store(FakeMail.builder()
+ .name(NAME_2)
+ .build());
+
+ String taskId = with()
+ .patch(URL_ESCAPED_MY_REPO + "/mails?action=reprocess")
+ .jsonPath()
+ .get("taskId");
+
+ with()
+ .basePath(TasksRoutes.BASE)
+ .get(taskId + "/await");
+
+ assertThat(spoolQueue.browse())
+ .extracting(ManageableMailQueue.MailQueueItemView::getMail)
+ .extracting(Mail::getName)
+ .containsOnly(NAME_1, NAME_2);
+ }
+
+ @Test
+ public void reprocessingAllTaskShouldPreserveStateWhenProcessorIsNotSpecified() throws Exception {
+ when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository));
+ String state1 = "state1";
+ String state2 = "state2";
+ mailRepository.store(FakeMail.builder()
+ .name(NAME_1)
+ .state(state1)
+ .build());
+ mailRepository.store(FakeMail.builder()
+ .name(NAME_2)
+ .state(state2)
+ .build());
+
+ String taskId = with()
+ .patch(URL_ESCAPED_MY_REPO + "/mails?action=reprocess")
+ .jsonPath()
+ .get("taskId");
+
+ with()
+ .basePath(TasksRoutes.BASE)
+ .get(taskId + "/await");
+
+ assertThat(spoolQueue.browse())
+ .extracting(ManageableMailQueue.MailQueueItemView::getMail)
+ .extracting(Mail::getState)
+ .containsOnly(state1, state2);
+ }
+
+ @Test
+ public void reprocessingAllTaskShouldOverWriteStateWhenProcessorSpecified() throws Exception {
+ when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository));
+ String state1 = "state1";
+ String state2 = "state2";
+ mailRepository.store(FakeMail.builder()
+ .name(NAME_1)
+ .state(state1)
+ .build());
+ mailRepository.store(FakeMail.builder()
+ .name(NAME_2)
+ .state(state2)
+ .build());
+
+ String transport = "transport";
+ String taskId = with()
+ .patch(URL_ESCAPED_MY_REPO + "/mails?action=reprocess"
+ + "&processor=" + transport)
+ .jsonPath()
+ .get("taskId");
+
+ with()
+ .basePath(TasksRoutes.BASE)
+ .get(taskId + "/await");
+
+ assertThat(spoolQueue.browse())
+ .extracting(ManageableMailQueue.MailQueueItemView::getMail)
+ .extracting(Mail::getState)
+ .containsOnly(transport, transport);
+ }
+
+ @Test
+ public void reprocessingAllTaskShouldEnqueueMailsOnSpecifiedQueue() throws Exception {
+ when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository));
+ mailRepository.store(FakeMail.builder()
+ .name(NAME_1)
+ .build());
+ mailRepository.store(FakeMail.builder()
+ .name(NAME_2)
+ .build());
+
+ String taskId = with()
+ .patch(URL_ESCAPED_MY_REPO + "/mails?action=reprocess"
+ + "&queue=" + CUSTOM_QUEUE)
+ .jsonPath()
+ .get("taskId");
+
+ with()
+ .basePath(TasksRoutes.BASE)
+ .get(taskId + "/await");
+
+ assertThat(customQueue.browse())
+ .extracting(ManageableMailQueue.MailQueueItemView::getMail)
+ .extracting(Mail::getName)
+ .containsOnly(NAME_1, NAME_2);
+ }
+
+ @Test
+ public void reprocessingOneTaskShouldCreateATask() throws Exception {
+ when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository));
+
+ mailRepository.store(FakeMail.builder()
+ .name(NAME_1)
+ .build());
+
+ when()
+ .patch(URL_ESCAPED_MY_REPO + "/mails/name1?action=reprocess")
+ .then()
+ .statusCode(HttpStatus.CREATED_201)
+ .header("Location", is(notNullValue()))
+ .body("taskId", is(notNullValue()));
+ }
+
+ @Test
+ public void reprocessingOneTaskShouldRejectInvalidActions() throws Exception {
+ when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository));
+
+ mailRepository.store(FakeMail.builder()
+ .name(NAME_1)
+ .build());
+
+ when()
+ .patch(URL_ESCAPED_MY_REPO + "/mails/" + NAME_1 + "?action=invalid")
+ .then()
+ .statusCode(HttpStatus.BAD_REQUEST_400)
+ .body("statusCode", is(400))
+ .body("type", is(ErrorResponder.ErrorType.INVALID_ARGUMENT.getType()))
+ .body("message", is("action query parameter is mandatory. The only supported value is `reprocess`"));
+ }
+
+ @Test
+ public void reprocessingOneTaskShouldRequireAction() throws Exception {
+ when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository));
+
+ mailRepository.store(FakeMail.builder()
+ .name(NAME_1)
+ .build());
+
+ when()
+ .patch(URL_ESCAPED_MY_REPO + "/mails/" + NAME_1)
+ .then()
+ .statusCode(HttpStatus.BAD_REQUEST_400)
+ .body("statusCode", is(400))
+ .body("type", is(ErrorResponder.ErrorType.INVALID_ARGUMENT.getType()))
+ .body("message", is("action query parameter is mandatory. The only supported value is `reprocess`"));
+ }
+
+ @Test
+ public void reprocessingOneTaskShouldIncludeDetailsWhenDefaultValues() throws Exception {
+ when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository));
+ String name1 = "name1";
+ String name2 = "name2";
+ mailRepository.store(FakeMail.builder()
+ .name(name1)
+ .build());
+ mailRepository.store(FakeMail.builder()
+ .name(name2)
+ .build());
+
+ String taskId = with()
+ .patch(URL_ESCAPED_MY_REPO + "/mails/" + NAME_1 + "?action=reprocess")
+ .jsonPath()
+ .get("taskId");
+
+ given()
+ .basePath(TasksRoutes.BASE)
+ .when()
+ .get(taskId + "/await")
+ .then()
+ .body("status", is("completed"))
+ .body("taskId", is(notNullValue()))
+ .body("type", is(ReprocessingOneMailTask.TYPE))
+ .body("additionalInformation.repositoryUrl", is(URL_MY_REPO))
+ .body("additionalInformation.mailKey", is(NAME_1))
+ .body("additionalInformation.targetProcessor", isEmptyOrNullString())
+ .body("additionalInformation.targetQueue", is(MailQueueFactory.SPOOL))
+ .body("startedDate", is(notNullValue()))
+ .body("submitDate", is(notNullValue()))
+ .body("completedDate", is(notNullValue()));
+ }
+
+ @Test
+ public void reprocessingOneTaskShouldIncludeDetails() throws Exception {
+ when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository));
+ String name1 = "name1";
+ String name2 = "name2";
+ mailRepository.store(FakeMail.builder()
+ .name(name1)
+ .build());
+ mailRepository.store(FakeMail.builder()
+ .name(name2)
+ .build());
+
+ String transport = "transport";
+ String taskId = with()
+ .patch(URL_ESCAPED_MY_REPO + "/mails/" + NAME_1 + "?action=reprocess" +
+ "&queue=" + CUSTOM_QUEUE +
+ "&processor=" + transport)
+ .jsonPath()
+ .get("taskId");
+
+ given()
+ .basePath(TasksRoutes.BASE)
+ .when()
+ .get(taskId + "/await")
+ .then()
+ .body("status", is("completed"))
+ .body("taskId", is(notNullValue()))
+ .body("type", is(ReprocessingOneMailTask.TYPE))
+ .body("additionalInformation.repositoryUrl", is(URL_MY_REPO))
+ .body("additionalInformation.mailKey", is(NAME_1))
+ .body("additionalInformation.targetProcessor", is(transport))
+ .body("additionalInformation.targetQueue", is(CUSTOM_QUEUE))
+ .body("startedDate", is(notNullValue()))
+ .body("submitDate", is(notNullValue()))
+ .body("completedDate", is(notNullValue()));
+ }
+
+ @Test
+ public void reprocessingOneTaskShouldRemoveMailFromRepository() throws Exception {
+ when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository));
+ String name1 = "name1";
+ String name2 = "name2";
+ mailRepository.store(FakeMail.builder()
+ .name(name1)
+ .build());
+ mailRepository.store(FakeMail.builder()
+ .name(name2)
+ .build());
+
+ String transport = "transport";
+ String taskId = with()
+ .patch(URL_ESCAPED_MY_REPO + "/mails/" + NAME_1 + "?action=reprocess" +
+ "&queue=" + CUSTOM_QUEUE +
+ "&processor=" + transport)
+ .jsonPath()
+ .get("taskId");
+
+ with()
+ .basePath(TasksRoutes.BASE)
+ .get(taskId + "/await");
+
+ assertThat(mailRepository.list())
+ .containsOnly(NAME_2);
+ }
+
+ @Test
+ public void reprocessingOneTaskShouldEnqueueMailsOnDefaultQueue() throws Exception {
+ when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository));
+ mailRepository.store(FakeMail.builder()
+ .name(NAME_1)
+ .build());
+ mailRepository.store(FakeMail.builder()
+ .name(NAME_2)
+ .build());
+
+ String taskId = with()
+ .patch(URL_ESCAPED_MY_REPO + "/mails/" + NAME_1 + "?action=reprocess")
+ .jsonPath()
+ .get("taskId");
+
+ with()
+ .basePath(TasksRoutes.BASE)
+ .get(taskId + "/await");
+
+ assertThat(spoolQueue.browse())
+ .extracting(ManageableMailQueue.MailQueueItemView::getMail)
+ .extracting(Mail::getName)
+ .containsOnly(NAME_1);
+ }
+
+ @Test
+ public void reprocessingOneTaskShouldPreserveStateWhenProcessorIsNotSpecified() throws Exception {
+ when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository));
+ String state1 = "state1";
+ String state2 = "state2";
+ mailRepository.store(FakeMail.builder()
+ .name(NAME_1)
+ .state(state1)
+ .build());
+ mailRepository.store(FakeMail.builder()
+ .name(NAME_2)
+ .state(state2)
+ .build());
+
+ String taskId = with()
+ .patch(URL_ESCAPED_MY_REPO + "/mails/" + NAME_1 + "?action=reprocess")
+ .jsonPath()
+ .get("taskId");
+
+ with()
+ .basePath(TasksRoutes.BASE)
+ .get(taskId + "/await");
+
+ assertThat(spoolQueue.browse())
+ .extracting(ManageableMailQueue.MailQueueItemView::getMail)
+ .extracting(Mail::getState)
+ .containsOnly(state1);
+ }
+
+ @Test
+ public void reprocessingOneTaskShouldOverWriteStateWhenProcessorSpecified() throws Exception {
+ when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository));
+ String state1 = "state1";
+ String state2 = "state2";
+ mailRepository.store(FakeMail.builder()
+ .name(NAME_1)
+ .state(state1)
+ .build());
+ mailRepository.store(FakeMail.builder()
+ .name(NAME_2)
+ .state(state2)
+ .build());
+
+ String transport = "transport";
+ String taskId = with()
+ .patch(URL_ESCAPED_MY_REPO + "/mails/" + NAME_1 + "?action=reprocess"
+ + "&processor=" + transport)
+ .jsonPath()
+ .get("taskId");
+
+ with()
+ .basePath(TasksRoutes.BASE)
+ .get(taskId + "/await");
+
+ assertThat(spoolQueue.browse())
+ .extracting(ManageableMailQueue.MailQueueItemView::getMail)
+ .extracting(Mail::getState)
+ .containsOnly(transport);
+ }
+
+ @Test
+ public void reprocessingOneTaskShouldEnqueueMailsOnSpecifiedQueue() throws Exception {
+ when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository));
+ mailRepository.store(FakeMail.builder()
+ .name(NAME_1)
+ .build());
+ mailRepository.store(FakeMail.builder()
+ .name(NAME_2)
+ .build());
+
+ String taskId = with()
+ .patch(URL_ESCAPED_MY_REPO + "/mails/" + NAME_1 + "?action=reprocess"
+ + "&queue=" + CUSTOM_QUEUE)
+ .jsonPath()
+ .get("taskId");
+
+ with()
+ .basePath(TasksRoutes.BASE)
+ .get(taskId + "/await");
+
+ assertThat(customQueue.browse())
+ .extracting(ManageableMailQueue.MailQueueItemView::getMail)
+ .extracting(Mail::getName)
+ .containsOnly(NAME_1);
+ }
+
+ @Test
+ public void reprocessingOneTaskShouldNotEnqueueUnknownMailKey() throws Exception {
+ when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository));
+ mailRepository.store(FakeMail.builder()
+ .name(NAME_1)
+ .build());
+ mailRepository.store(FakeMail.builder()
+ .name(NAME_2)
+ .build());
+
+ String taskId = with()
+ .patch(URL_ESCAPED_MY_REPO + "/mails/" + "unknown" + "?action=reprocess"
+ + "&queue=" + CUSTOM_QUEUE)
+ .jsonPath()
+ .get("taskId");
+
+ with()
+ .basePath(TasksRoutes.BASE)
+ .get(taskId + "/await");
+
+ assertThat(customQueue.browse())
+ .isEmpty();
+ }
+
+ @Test
+ public void reprocessingOneTaskShouldNotRemoveMailFromRepositoryWhenUnknownMailKey() throws Exception {
+ when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository));
+ mailRepository.store(FakeMail.builder()
+ .name(NAME_1)
+ .build());
+ mailRepository.store(FakeMail.builder()
+ .name(NAME_2)
+ .build());
+
+ String taskId = with()
+ .patch(URL_ESCAPED_MY_REPO + "/mails/" + "unknown" + "?action=reprocess"
+ + "&queue=" + CUSTOM_QUEUE)
+ .jsonPath()
+ .get("taskId");
+
+ with()
+ .basePath(TasksRoutes.BASE)
+ .get(taskId + "/await");
+
+ assertThat(mailRepository.size())
+ .isEqualTo(2);
+ }
+
+ @Test
+ public void reprocessingOneTaskShouldFailWhenUnknownMailKey() throws Exception {
+ when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository));
+ mailRepository.store(FakeMail.builder()
+ .name(NAME_1)
+ .build());
+ mailRepository.store(FakeMail.builder()
+ .name(NAME_2)
+ .build());
+
+ String taskId = with()
+ .patch(URL_ESCAPED_MY_REPO + "/mails/" + "unknown" + "?action=reprocess"
+ + "&queue=" + CUSTOM_QUEUE)
+ .jsonPath()
+ .get("taskId");
+
+ given()
+ .basePath(TasksRoutes.BASE)
+ .when()
+ .get(taskId + "/await")
+ .then()
+ .body("status", is("failed"));
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org