You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2022/03/11 05:11:00 UTC
[james-project] 01/03: JAMES-3723 Allow to not consume emails upon reprocessing
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit c458cea86af5c48ccd844dff6d202834fc2be44d
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Sat Mar 5 22:06:45 2022 +0700
JAMES-3723 Allow to not consume emails upon reprocessing
---
...dminServerTaskSerializationIntegrationTest.java | 1 +
.../webadmin/routes/MailRepositoriesRoutes.java | 20 +++--
.../webadmin/service/ReprocessingAllMailsTask.java | 37 +++------
...essingAllMailsTaskAdditionalInformationDTO.java | 18 ++++-
.../service/ReprocessingAllMailsTaskDTO.java | 20 +++--
.../webadmin/service/ReprocessingOneMailTask.java | 38 +++------
...cessingOneMailTaskAdditionalInformationDTO.java | 18 ++++-
.../service/ReprocessingOneMailTaskDTO.java | 24 ++++--
.../webadmin/service/ReprocessingService.java | 44 ++++++++---
.../routes/MailRepositoriesRoutesTest.java | 91 ++++++++++++++++++++++
.../service/ReprocessingAllMailsTaskTest.java | 49 +++++++++---
.../service/ReprocessingOneMailTaskTest.java | 42 ++++++++--
.../webadmin/service/ReprocessingServiceTest.java | 12 +--
13 files changed, 304 insertions(+), 110 deletions(-)
diff --git a/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQWebAdminServerTaskSerializationIntegrationTest.java b/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQWebAdminServerTaskSerializationIntegrationTest.java
index 4e3ac97..4e5ba18 100644
--- a/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQWebAdminServerTaskSerializationIntegrationTest.java
+++ b/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQWebAdminServerTaskSerializationIntegrationTest.java
@@ -344,6 +344,7 @@ class RabbitMQWebAdminServerTaskSerializationIntegrationTest {
.body("type", is("reprocessing-one"))
.body("additionalInformation.repositoryPath", is(mailRepositoryUrl.asString()))
.body("additionalInformation.targetQueue", is(notNullValue()))
+ .body("additionalInformation.consume", is(notNullValue()))
.body("additionalInformation.mailKey", is(mailKey))
.body("additionalInformation.targetProcessor", is(nullValue()));
}
diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/routes/MailRepositoriesRoutes.java b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/routes/MailRepositoriesRoutes.java
index aea7771..97c5634 100644
--- a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/routes/MailRepositoriesRoutes.java
+++ b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/routes/MailRepositoriesRoutes.java
@@ -322,11 +322,15 @@ public class MailRepositoriesRoutes implements Routes {
private Task reprocessAll(Request request) throws UnsupportedEncodingException, MailRepositoryStore.MailRepositoryStoreException {
MailRepositoryPath path = decodedRepositoryPath(request);
- Optional<String> targetProcessor = parseTargetProcessor(request);
- MailQueueName targetQueue = parseTargetQueue(request);
Long repositorySize = repositoryStoreService.size(path).orElse(0L);
- return new ReprocessingAllMailsTask(reprocessingService, repositorySize, path, targetQueue, targetProcessor);
+ return new ReprocessingAllMailsTask(reprocessingService, repositorySize, path, extractConfiguration(request));
+ }
+
+ private ReprocessingService.Configuration extractConfiguration(Request request) {
+ return new ReprocessingService.Configuration(parseTargetQueue(request),
+ parseTargetProcessor(request),
+ parseConsume(request).orElse(true));
}
public void defineReprocessOne() {
@@ -340,10 +344,7 @@ public class MailRepositoriesRoutes implements Routes {
MailRepositoryPath path = decodedRepositoryPath(request);
MailKey key = new MailKey(request.params("key"));
- Optional<String> targetProcessor = parseTargetProcessor(request);
- MailQueueName targetQueue = parseTargetQueue(request);
-
- return new ReprocessingOneMailTask(reprocessingService, path, targetQueue, key, targetProcessor, Clock.systemUTC());
+ return new ReprocessingOneMailTask(reprocessingService, path, extractConfiguration(request), key, Clock.systemUTC());
}
private Set<AdditionalField> extractAdditionalFields(String additionalFieldsParam) throws IllegalArgumentException {
@@ -361,6 +362,11 @@ public class MailRepositoriesRoutes implements Routes {
return Optional.ofNullable(request.queryParams("processor"));
}
+ private Optional<Boolean> parseConsume(Request request) {
+ return Optional.ofNullable(request.queryParams("consume"))
+ .map(Boolean::parseBoolean);
+ }
+
private MailQueueName parseTargetQueue(Request request) {
return Optional.ofNullable(request.queryParams("queue"))
.map(MailQueueName::of)
diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingAllMailsTask.java b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingAllMailsTask.java
index 5d8ea39..ef930f9 100644
--- a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingAllMailsTask.java
+++ b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingAllMailsTask.java
@@ -29,7 +29,6 @@ import javax.mail.MessagingException;
import org.apache.james.mailrepository.api.MailKey;
import org.apache.james.mailrepository.api.MailRepositoryPath;
import org.apache.james.mailrepository.api.MailRepositoryStore;
-import org.apache.james.queue.api.MailQueueName;
import org.apache.james.task.Task;
import org.apache.james.task.TaskExecutionDetails;
import org.apache.james.task.TaskType;
@@ -40,27 +39,21 @@ public class ReprocessingAllMailsTask implements Task {
public static class AdditionalInformation implements TaskExecutionDetails.AdditionalInformation {
private final MailRepositoryPath repositoryPath;
- private final String targetQueue;
- private final Optional<String> targetProcessor;
+ private final ReprocessingService.Configuration configuration;
private final long initialCount;
private final long remainingCount;
private final Instant timestamp;
- public AdditionalInformation(MailRepositoryPath repositoryPath, MailQueueName targetQueue, Optional<String> targetProcessor, long initialCount, long remainingCount, Instant timestamp) {
+ public AdditionalInformation(MailRepositoryPath repositoryPath, ReprocessingService.Configuration configuration, long initialCount, long remainingCount, Instant timestamp) {
this.repositoryPath = repositoryPath;
- this.targetQueue = targetQueue.asString();
- this.targetProcessor = targetProcessor;
+ this.configuration = configuration;
this.initialCount = initialCount;
this.remainingCount = remainingCount;
this.timestamp = timestamp;
}
- public String getTargetQueue() {
- return targetQueue;
- }
-
- public Optional<String> getTargetProcessor() {
- return targetProcessor;
+ public ReprocessingService.Configuration getConfiguration() {
+ return configuration;
}
public String getRepositoryPath() {
@@ -97,17 +90,15 @@ public class ReprocessingAllMailsTask implements Task {
private final ReprocessingService reprocessingService;
private final MailRepositoryPath repositoryPath;
- private final MailQueueName targetQueue;
- private final Optional<String> targetProcessor;
+ private final ReprocessingService.Configuration configuration;
private final long repositorySize;
private final AtomicLong processedCount;
public ReprocessingAllMailsTask(ReprocessingService reprocessingService, long repositorySize,
- MailRepositoryPath repositoryPath, MailQueueName targetQueue, Optional<String> targetProcessor) {
+ MailRepositoryPath repositoryPath, ReprocessingService.Configuration configuration) {
this.reprocessingService = reprocessingService;
this.repositoryPath = repositoryPath;
- this.targetQueue = targetQueue;
- this.targetProcessor = targetProcessor;
+ this.configuration = configuration;
this.repositorySize = repositorySize;
this.processedCount = new AtomicLong(0);
}
@@ -119,7 +110,7 @@ public class ReprocessingAllMailsTask implements Task {
@Override
public Result run() {
try {
- reprocessingService.reprocessAll(repositoryPath, targetProcessor, targetQueue, this::notifyProgress);
+ reprocessingService.reprocessAll(repositoryPath, configuration, this::notifyProgress);
return Result.COMPLETED;
} catch (MessagingException | MailRepositoryStore.MailRepositoryStoreException e) {
LOGGER.error("Encountered error while reprocessing repository", e);
@@ -135,12 +126,8 @@ public class ReprocessingAllMailsTask implements Task {
return repositorySize;
}
- Optional<String> getTargetProcessor() {
- return targetProcessor;
- }
-
- MailQueueName getTargetQueue() {
- return targetQueue;
+ ReprocessingService.Configuration getConfiguration() {
+ return configuration;
}
@Override
@@ -151,7 +138,7 @@ public class ReprocessingAllMailsTask implements Task {
@Override
public Optional<TaskExecutionDetails.AdditionalInformation> details() {
return Optional.of(new AdditionalInformation(
- repositoryPath, targetQueue, targetProcessor, repositorySize, repositorySize - processedCount.get(),
+ repositoryPath, configuration, repositorySize, repositorySize - processedCount.get(),
Clock.systemUTC().instant()));
}
diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingAllMailsTaskAdditionalInformationDTO.java b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingAllMailsTaskAdditionalInformationDTO.java
index adacac9..55dcdb7 100644
--- a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingAllMailsTaskAdditionalInformationDTO.java
+++ b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingAllMailsTaskAdditionalInformationDTO.java
@@ -35,16 +35,19 @@ public class ReprocessingAllMailsTaskAdditionalInformationDTO implements Additio
.convertToDTO(ReprocessingAllMailsTaskAdditionalInformationDTO.class)
.toDomainObjectConverter(dto -> new ReprocessingAllMailsTask.AdditionalInformation(
MailRepositoryPath.from(dto.repositoryPath),
- MailQueueName.of(dto.targetQueue),
- dto.targetProcessor,
+ new ReprocessingService.Configuration(
+ MailQueueName.of(dto.getTargetQueue()),
+ dto.getTargetProcessor(),
+ dto.isConsume()),
dto.initialCount,
dto.remainingCount,
dto.timestamp))
.toDTOConverter((details, type) -> new ReprocessingAllMailsTaskAdditionalInformationDTO(
type,
details.getRepositoryPath(),
- details.getTargetQueue(),
- details.getTargetProcessor(),
+ details.getConfiguration().getMailQueueName().asString(),
+ details.getConfiguration().getTargetProcessor(),
+ Optional.of(details.getConfiguration().isConsume()),
details.getInitialCount(),
details.getRemainingCount(),
details.timestamp()))
@@ -56,6 +59,7 @@ public class ReprocessingAllMailsTaskAdditionalInformationDTO implements Additio
private final String repositoryPath;
private final String targetQueue;
private final Optional<String> targetProcessor;
+ private final boolean consume;
private final long initialCount;
private final long remainingCount;
private final Instant timestamp;
@@ -65,6 +69,7 @@ public class ReprocessingAllMailsTaskAdditionalInformationDTO implements Additio
@JsonProperty("repositoryPath") String repositoryPath,
@JsonProperty("targetQueue") String targetQueue,
@JsonProperty("targetProcessor") Optional<String> targetProcessor,
+ @JsonProperty("consume") Optional<Boolean> consume,
@JsonProperty("initialCount") long initialCount,
@JsonProperty("remainingCount") long remainingCount,
@JsonProperty("timestamp") Instant timestamp) {
@@ -75,6 +80,11 @@ public class ReprocessingAllMailsTaskAdditionalInformationDTO implements Additio
this.initialCount = initialCount;
this.remainingCount = remainingCount;
this.timestamp = timestamp;
+ this.consume = consume.orElse(true);
+ }
+
+ public boolean isConsume() {
+ return consume;
}
@Override
diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingAllMailsTaskDTO.java b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingAllMailsTaskDTO.java
index d81f2a8..7a7b722 100644
--- a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingAllMailsTaskDTO.java
+++ b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingAllMailsTaskDTO.java
@@ -46,9 +46,9 @@ public class ReprocessingAllMailsTaskDTO implements TaskDTO {
typeName,
domainObject.getRepositorySize(),
domainObject.getRepositoryPath().urlEncoded(),
- domainObject.getTargetQueue().asString(),
- domainObject.getTargetProcessor()
- );
+ domainObject.getConfiguration().getMailQueueName().asString(),
+ Optional.of(domainObject.getConfiguration().isConsume()),
+ domainObject.getConfiguration().getTargetProcessor());
} catch (Exception e) {
throw new ReprocessingAllMailsTask.UrlEncodingFailureSerializationException(domainObject.getRepositoryPath());
}
@@ -58,17 +58,20 @@ public class ReprocessingAllMailsTaskDTO implements TaskDTO {
private final long repositorySize;
private final String repositoryPath;
private final String targetQueue;
+ private final boolean consume;
private final Optional<String> targetProcessor;
public ReprocessingAllMailsTaskDTO(@JsonProperty("type") String type,
@JsonProperty("repositorySize") long repositorySize,
@JsonProperty("repositoryPath") String repositoryPath,
@JsonProperty("targetQueue") String targetQueue,
+ @JsonProperty("consume") Optional<Boolean> consume,
@JsonProperty("targetProcessor") Optional<String> targetProcessor) {
this.type = type;
this.repositorySize = repositorySize;
this.repositoryPath = repositoryPath;
this.targetQueue = targetQueue;
+ this.consume = consume.orElse(true);
this.targetProcessor = targetProcessor;
}
@@ -78,9 +81,10 @@ public class ReprocessingAllMailsTaskDTO implements TaskDTO {
reprocessingService,
repositorySize,
MailRepositoryPath.fromEncoded(repositoryPath),
- MailQueueName.of(targetQueue),
- targetProcessor
- );
+ new ReprocessingService.Configuration(
+ MailQueueName.of(targetQueue),
+ targetProcessor,
+ consume));
} catch (Exception e) {
throw new ReprocessingAllMailsTask.InvalidMailRepositoryPathDeserializationException(repositoryPath);
}
@@ -103,6 +107,10 @@ public class ReprocessingAllMailsTaskDTO implements TaskDTO {
return targetQueue;
}
+ public boolean isConsume() {
+ return consume;
+ }
+
public Optional<String> getTargetProcessor() {
return targetProcessor;
}
diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingOneMailTask.java b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingOneMailTask.java
index 2777ce2..bb81055 100644
--- a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingOneMailTask.java
+++ b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingOneMailTask.java
@@ -28,7 +28,6 @@ import javax.mail.MessagingException;
import org.apache.james.mailrepository.api.MailKey;
import org.apache.james.mailrepository.api.MailRepositoryPath;
import org.apache.james.mailrepository.api.MailRepositoryStore;
-import org.apache.james.queue.api.MailQueueName;
import org.apache.james.task.Task;
import org.apache.james.task.TaskExecutionDetails;
import org.apache.james.task.TaskType;
@@ -39,16 +38,14 @@ public class ReprocessingOneMailTask implements Task {
public static class AdditionalInformation implements TaskExecutionDetails.AdditionalInformation {
private final MailRepositoryPath repositoryPath;
- private final String targetQueue;
+ private final ReprocessingService.Configuration configuration;
private final MailKey mailKey;
- private final Optional<String> targetProcessor;
private final Instant timestamp;
- public AdditionalInformation(MailRepositoryPath repositoryPath, MailQueueName targetQueue, MailKey mailKey, Optional<String> targetProcessor, Instant timestamp) {
+ public AdditionalInformation(MailRepositoryPath repositoryPath, ReprocessingService.Configuration configuration, MailKey mailKey, Instant timestamp) {
this.repositoryPath = repositoryPath;
- this.targetQueue = targetQueue.asString();
+ this.configuration = configuration;
this.mailKey = mailKey;
- this.targetProcessor = targetProcessor;
this.timestamp = timestamp;
}
@@ -56,12 +53,8 @@ public class ReprocessingOneMailTask implements Task {
return mailKey.asString();
}
- public String getTargetQueue() {
- return targetQueue;
- }
-
- public Optional<String> getTargetProcessor() {
- return targetProcessor;
+ public ReprocessingService.Configuration getConfiguration() {
+ return configuration;
}
public String getRepositoryPath() {
@@ -90,29 +83,26 @@ public class ReprocessingOneMailTask implements Task {
private final ReprocessingService reprocessingService;
private final MailRepositoryPath repositoryPath;
- private final MailQueueName targetQueue;
+ private final ReprocessingService.Configuration configuration;
private final MailKey mailKey;
- private final Optional<String> targetProcessor;
private final AdditionalInformation additionalInformation;
public ReprocessingOneMailTask(ReprocessingService reprocessingService,
MailRepositoryPath repositoryPath,
- MailQueueName targetQueue,
+ ReprocessingService.Configuration configuration,
MailKey mailKey,
- Optional<String> targetProcessor,
Clock clock) {
this.reprocessingService = reprocessingService;
this.repositoryPath = repositoryPath;
- this.targetQueue = targetQueue;
+ this.configuration = configuration;
this.mailKey = mailKey;
- this.targetProcessor = targetProcessor;
- this.additionalInformation = new AdditionalInformation(repositoryPath, targetQueue, mailKey, targetProcessor, clock.instant());
+ this.additionalInformation = new AdditionalInformation(repositoryPath, configuration, mailKey, clock.instant());
}
@Override
public Result run() {
try {
- reprocessingService.reprocess(repositoryPath, mailKey, targetProcessor, targetQueue);
+ reprocessingService.reprocess(repositoryPath, mailKey, configuration);
return Result.COMPLETED;
} catch (MessagingException | MailRepositoryStore.MailRepositoryStoreException e) {
LOGGER.error("Encountered error while reprocessing repository", e);
@@ -129,16 +119,12 @@ public class ReprocessingOneMailTask implements Task {
return repositoryPath;
}
- MailQueueName getTargetQueue() {
- return targetQueue;
- }
-
MailKey getMailKey() {
return mailKey;
}
- Optional<String> getTargetProcessor() {
- return targetProcessor;
+ public ReprocessingService.Configuration getConfiguration() {
+ return configuration;
}
@Override
diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingOneMailTaskAdditionalInformationDTO.java b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingOneMailTaskAdditionalInformationDTO.java
index f84ae9d..40802d8 100644
--- a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingOneMailTaskAdditionalInformationDTO.java
+++ b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingOneMailTaskAdditionalInformationDTO.java
@@ -36,16 +36,19 @@ public class ReprocessingOneMailTaskAdditionalInformationDTO implements Addition
.convertToDTO(ReprocessingOneMailTaskAdditionalInformationDTO.class)
.toDomainObjectConverter(dto -> new ReprocessingOneMailTask.AdditionalInformation(
MailRepositoryPath.from(dto.repositoryPath),
- MailQueueName.of(dto.targetQueue),
+ new ReprocessingService.Configuration(
+ MailQueueName.of(dto.targetQueue),
+ dto.targetProcessor,
+ dto.isConsume()),
new MailKey(dto.mailKey),
- dto.targetProcessor,
dto.timestamp))
.toDTOConverter((details, type) -> new ReprocessingOneMailTaskAdditionalInformationDTO(
type,
details.getRepositoryPath(),
- details.getTargetQueue(),
+ details.getConfiguration().getMailQueueName().asString(),
details.getMailKey(),
- details.getTargetProcessor(),
+ Optional.of(details.getConfiguration().isConsume()),
+ details.getConfiguration().getTargetProcessor(),
details.timestamp()))
.typeName(ReprocessingOneMailTask.TYPE.asString())
.withFactory(AdditionalInformationDTOModule::new);
@@ -56,15 +59,18 @@ public class ReprocessingOneMailTaskAdditionalInformationDTO implements Addition
private final String targetQueue;
private final String mailKey;
private final Optional<String> targetProcessor;
+ private final boolean consume;
private final Instant timestamp;
public ReprocessingOneMailTaskAdditionalInformationDTO(@JsonProperty("type") String type,
@JsonProperty("repositoryPath") String repositoryPath,
@JsonProperty("targetQueue") String targetQueue,
@JsonProperty("mailKey") String mailKey,
+ @JsonProperty("consume") Optional<Boolean> consume,
@JsonProperty("targetProcessor") Optional<String> targetProcessor,
@JsonProperty("timestamp") Instant timestamp) {
this.type = type;
+ this.consume = consume.orElse(true);
this.repositoryPath = repositoryPath;
this.targetQueue = targetQueue;
this.mailKey = mailKey;
@@ -72,6 +78,10 @@ public class ReprocessingOneMailTaskAdditionalInformationDTO implements Addition
this.timestamp = timestamp;
}
+ public boolean isConsume() {
+ return consume;
+ }
+
@Override
public String getType() {
return type;
diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingOneMailTaskDTO.java b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingOneMailTaskDTO.java
index ea09c18..9ced355 100644
--- a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingOneMailTaskDTO.java
+++ b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingOneMailTaskDTO.java
@@ -47,10 +47,10 @@ public class ReprocessingOneMailTaskDTO implements TaskDTO {
return new ReprocessingOneMailTaskDTO(
typeName,
domainObject.getRepositoryPath().urlEncoded(),
- domainObject.getTargetQueue().asString(),
+ domainObject.getConfiguration().getMailQueueName().asString(),
domainObject.getMailKey().asString(),
- domainObject.getTargetProcessor()
- );
+ domainObject.getConfiguration().getTargetProcessor(),
+ Optional.of(domainObject.getConfiguration().isConsume()));
} catch (Exception e) {
throw new ReprocessingOneMailTask.UrlEncodingFailureSerializationException(domainObject.getRepositoryPath());
}
@@ -61,28 +61,32 @@ public class ReprocessingOneMailTaskDTO implements TaskDTO {
private final String targetQueue;
private final String mailKey;
private final Optional<String> targetProcessor;
+ private final boolean consume;
public ReprocessingOneMailTaskDTO(@JsonProperty("type") String type,
@JsonProperty("repositoryPath") String repositoryPath,
@JsonProperty("targetQueue") String targetQueue,
@JsonProperty("mailKey") String mailKey,
- @JsonProperty("targetProcessor") Optional<String> targetProcessor) {
+ @JsonProperty("targetProcessor") Optional<String> targetProcessor,
+ @JsonProperty("boolean") Optional<Boolean> consume) {
this.type = type;
this.repositoryPath = repositoryPath;
this.mailKey = mailKey;
this.targetQueue = targetQueue;
this.targetProcessor = targetProcessor;
+ this.consume = consume.orElse(true);
}
public ReprocessingOneMailTask fromDTO(ReprocessingService reprocessingService, Clock clock) {
return new ReprocessingOneMailTask(
reprocessingService,
getMailRepositoryPath(),
- MailQueueName.of(targetQueue),
+ new ReprocessingService.Configuration(
+ MailQueueName.of(targetQueue),
+ targetProcessor,
+ consume),
new MailKey(mailKey),
- targetProcessor,
- clock
- );
+ clock);
}
private MailRepositoryPath getMailRepositoryPath() {
@@ -110,6 +114,10 @@ public class ReprocessingOneMailTaskDTO implements TaskDTO {
return targetQueue;
}
+ public boolean isConsume() {
+ return consume;
+ }
+
public Optional<String> getTargetProcessor() {
return targetProcessor;
}
diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingService.java b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingService.java
index 5b7ed6b..9c443d2 100644
--- a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingService.java
+++ b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingService.java
@@ -53,20 +53,46 @@ public class ReprocessingService {
}
}
+ public static class Configuration {
+ private final MailQueueName mailQueueName;
+ private final Optional<String> targetProcessor;
+ private final boolean consume;
+
+ public Configuration(MailQueueName mailQueueName, Optional<String> targetProcessor, boolean consume) {
+ this.mailQueueName = mailQueueName;
+ this.targetProcessor = targetProcessor;
+ this.consume = consume;
+ }
+
+ public MailQueueName getMailQueueName() {
+ return mailQueueName;
+ }
+
+ public Optional<String> getTargetProcessor() {
+ return targetProcessor;
+ }
+
+ public boolean isConsume() {
+ return consume;
+ }
+ }
+
static class Reprocessor implements Closeable {
private final MailQueue mailQueue;
- private final Optional<String> targetProcessor;
+ private final Configuration configuration;
- Reprocessor(MailQueue mailQueue, Optional<String> targetProcessor) {
+ Reprocessor(MailQueue mailQueue, Configuration configuration) {
this.mailQueue = mailQueue;
- this.targetProcessor = targetProcessor;
+ this.configuration = configuration;
}
private void reprocess(MailRepository repository, Mail mail) {
try {
- targetProcessor.ifPresent(mail::setState);
+ configuration.getTargetProcessor().ifPresent(mail::setState);
mailQueue.enQueue(mail);
- repository.remove(mail);
+ if (configuration.isConsume()) {
+ repository.remove(mail);
+ }
} catch (Exception e) {
throw new RuntimeException("Error encountered while reprocessing mail " + mail.getName(), e);
} finally {
@@ -94,8 +120,8 @@ public class ReprocessingService {
this.mailRepositoryStoreService = mailRepositoryStoreService;
}
- public void reprocessAll(MailRepositoryPath path, Optional<String> targetProcessor, MailQueueName targetQueue, Consumer<MailKey> keyListener) throws MailRepositoryStore.MailRepositoryStoreException, MessagingException {
- try (Reprocessor reprocessor = new Reprocessor(getMailQueue(targetQueue), targetProcessor)) {
+ public void reprocessAll(MailRepositoryPath path, Configuration configuration, Consumer<MailKey> keyListener) throws MailRepositoryStore.MailRepositoryStoreException, MessagingException {
+ try (Reprocessor reprocessor = new Reprocessor(getMailQueue(configuration.getMailQueueName()), configuration)) {
mailRepositoryStoreService
.getRepositories(path)
.forEach(Throwing.consumer((MailRepository repository) ->
@@ -107,8 +133,8 @@ public class ReprocessingService {
}
}
- public void reprocess(MailRepositoryPath path, MailKey key, Optional<String> targetProcessor, MailQueueName targetQueue) throws MailRepositoryStore.MailRepositoryStoreException, MessagingException {
- try (Reprocessor reprocessor = new Reprocessor(getMailQueue(targetQueue), targetProcessor)) {
+ public void reprocess(MailRepositoryPath path, MailKey key, Configuration configuration) throws MailRepositoryStore.MailRepositoryStoreException, MessagingException {
+ try (Reprocessor reprocessor = new Reprocessor(getMailQueue(configuration.getMailQueueName()), configuration)) {
Pair<MailRepository, Mail> mailPair = mailRepositoryStoreService
.getRepositories(path)
.map(Throwing.function(repository -> Pair.of(repository, Optional.ofNullable(repository.retrieve(key)))))
diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/routes/MailRepositoriesRoutesTest.java b/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/routes/MailRepositoriesRoutesTest.java
index 95e8860..16bd869 100644
--- a/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/routes/MailRepositoriesRoutesTest.java
+++ b/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/routes/MailRepositoriesRoutesTest.java
@@ -1133,6 +1133,7 @@ class MailRepositoriesRoutesTest {
.body("additionalInformation.remainingCount", is(0))
.body("additionalInformation.targetProcessor", is(emptyOrNullString()))
.body("additionalInformation.targetQueue", is(MailQueueFactory.SPOOL.asString()))
+ .body("additionalInformation.consume", is(true))
.body("startedDate", is(notNullValue()))
.body("submitDate", is(notNullValue()))
.body("completedDate", is(notNullValue()));
@@ -1157,6 +1158,7 @@ class MailRepositoriesRoutesTest {
.param("action", "reprocess")
.param("queue", CUSTOM_QUEUE.asString())
.param("processor", transport)
+ .param("consume", false)
.patch(PATH_ESCAPED_MY_REPO + "/mails")
.jsonPath()
.get("taskId");
@@ -1174,6 +1176,7 @@ class MailRepositoriesRoutesTest {
.body("additionalInformation.remainingCount", is(0))
.body("additionalInformation.targetProcessor", is(transport))
.body("additionalInformation.targetQueue", is(CUSTOM_QUEUE.asString()))
+ .body("additionalInformation.consume", is(false))
.body("startedDate", is(notNullValue()))
.body("submitDate", is(notNullValue()))
.body("completedDate", is(notNullValue()));
@@ -1244,6 +1247,39 @@ class MailRepositoriesRoutesTest {
}
@Test
+ void reprocessingAllTaskShouldNotClearMailRepositoryWhenNotConsume() throws Exception {
+ MailRepository mailRepository = mailRepositoryStore.create(URL_MY_REPO);
+ String name1 = "name1";
+ String name2 = "name2";
+ mailRepository.store(FakeMail.builder()
+ .name(name1)
+ .mimeMessage(MimeMessageUtil.mimeMessageFromBytes(MESSAGE_BYTES))
+ .build());
+ mailRepository.store(FakeMail.builder()
+ .name(name2)
+ .mimeMessage(MimeMessageUtil.mimeMessageFromBytes(MESSAGE_BYTES))
+ .build());
+
+ String transport = "transport";
+ String taskId = with()
+ .param("action", "reprocess")
+ .param("queue", CUSTOM_QUEUE.asString())
+ .param("processor", transport)
+ .param("consume", false)
+ .patch(PATH_ESCAPED_MY_REPO + "/mails")
+ .jsonPath()
+ .get("taskId");
+
+ with()
+ .basePath(TasksRoutes.BASE)
+ .get(taskId + "/await");
+
+ assertThat(mailRepository.list())
+ .toIterable()
+ .hasSize(2);
+ }
+
+ @Test
void reprocessingAllTaskShouldClearBothMailRepositoriesWhenSamePath() throws Exception {
MailRepository mailRepository1 = mailRepositoryStore.create(URL_MY_REPO);
MailRepository mailRepository2 = mailRepositoryStore.create(URL_MY_REPO_OTHER);
@@ -1519,6 +1555,7 @@ class MailRepositoriesRoutesTest {
.body("additionalInformation.mailKey", is(NAME_1))
.body("additionalInformation.targetProcessor", is(emptyOrNullString()))
.body("additionalInformation.targetQueue", is(MailQueueFactory.SPOOL.asString()))
+ .body("additionalInformation.consume", is(true))
.body("startedDate", is(notNullValue()))
.body("submitDate", is(notNullValue()))
.body("completedDate", is(notNullValue()));
@@ -1543,6 +1580,7 @@ class MailRepositoriesRoutesTest {
.param("action", "reprocess")
.param("queue", CUSTOM_QUEUE.asString())
.param("processor", transport)
+ .param("consume", false)
.patch(PATH_ESCAPED_MY_REPO + "/mails/" + NAME_1)
.jsonPath()
.get("taskId");
@@ -1559,6 +1597,7 @@ class MailRepositoriesRoutesTest {
.body("additionalInformation.mailKey", is(NAME_1))
.body("additionalInformation.targetProcessor", is(transport))
.body("additionalInformation.targetQueue", is(CUSTOM_QUEUE.asString()))
+ .body("additionalInformation.consume", is(false))
.body("startedDate", is(notNullValue()))
.body("submitDate", is(notNullValue()))
.body("completedDate", is(notNullValue()));
@@ -1782,6 +1821,58 @@ class MailRepositoriesRoutesTest {
}
@Test
+ void reprocessingOneTaskShouldNotRemoveEmailWhenNotConsume() throws Exception {
+ MailRepository mailRepository = mailRepositoryStore.create(URL_MY_REPO);
+ mailRepository.store(FakeMail.builder()
+ .name(NAME_1)
+ .build());
+ mailRepository.store(FakeMail.builder()
+ .name(NAME_2)
+ .build());
+
+ String taskId = with()
+ .param("action", "reprocess")
+ .param("queue", CUSTOM_QUEUE.asString())
+ .param("consume", false)
+ .patch(PATH_ESCAPED_MY_REPO + "/mails/" + NAME_1)
+ .jsonPath()
+ .get("taskId");
+
+ with()
+ .basePath(TasksRoutes.BASE)
+ .get(taskId + "/await");
+
+ assertThat(mailRepository.size())
+ .isEqualTo(2);
+ }
+
+ @Test
+ void reprocessingOneTaskShouldRemoveEmailWhenConsume() throws Exception {
+ MailRepository mailRepository = mailRepositoryStore.create(URL_MY_REPO);
+ mailRepository.store(FakeMail.builder()
+ .name(NAME_1)
+ .build());
+ mailRepository.store(FakeMail.builder()
+ .name(NAME_2)
+ .build());
+
+ String taskId = with()
+ .param("action", "reprocess")
+ .param("queue", CUSTOM_QUEUE.asString())
+ .param("consume", true)
+ .patch(PATH_ESCAPED_MY_REPO + "/mails/" + NAME_1)
+ .jsonPath()
+ .get("taskId");
+
+ with()
+ .basePath(TasksRoutes.BASE)
+ .get(taskId + "/await");
+
+ assertThat(mailRepository.size())
+ .isEqualTo(1);
+ }
+
+ @Test
void reprocessingOneTaskShouldNotRemoveMailFromRepositoryWhenUnknownMailKey() throws Exception {
MailRepository mailRepository = mailRepositoryStore.create(URL_MY_REPO);
mailRepository.store(FakeMail.builder()
diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingAllMailsTaskTest.java b/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingAllMailsTaskTest.java
index d9eb9ca..64b5fe3 100644
--- a/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingAllMailsTaskTest.java
+++ b/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingAllMailsTaskTest.java
@@ -19,6 +19,7 @@
package org.apache.james.webadmin.service;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.Mockito.mock;
@@ -28,6 +29,7 @@ import java.util.Optional;
import org.apache.james.JsonSerializationVerifier;
import org.apache.james.mailrepository.api.MailRepositoryPath;
import org.apache.james.queue.api.MailQueueName;
+import org.apache.james.server.task.json.JsonTaskAdditionalInformationSerializer;
import org.apache.james.server.task.json.JsonTaskSerializer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
@@ -41,15 +43,20 @@ class ReprocessingAllMailsTaskTest {
private static final MailQueueName TARGET_QUEUE = MailQueueName.of("queue");
private static final Optional<String> SOME_TARGET_PROCESSOR = Optional.of("targetProcessor");
private static final long REMAINING_COUNT = 3L;
- private static final String SERIALIZED_TASK_WITH_TARGET_PROCESSOR = "{\"type\":\"reprocessing-all\",\"repositorySize\":5,\"repositoryPath\":\"a\",\"targetQueue\":\"queue\",\"targetProcessor\":\"targetProcessor\"}";
- private static final String SERIALIZED_TASK_WITHOUT_TARGET_PROCESSOR = "{\"type\":\"reprocessing-all\",\"repositorySize\":5,\"repositoryPath\":\"a\",\"targetQueue\":\"queue\"}";
- private static final String SERIALIZED_TASK_ADDITIONAL_INFORMATION_WITH_TARGET_PROCESSOR = "{\"type\":\"reprocessing-all\", \"repositoryPath\":\"a\",\"targetQueue\":\"queue\",\"targetProcessor\":\"targetProcessor\",\"initialCount\":5,\"remainingCount\":3, \"timestamp\":\"2018-11-13T12:00:55Z\"}";
- private static final String SERIALIZED_TASK_ADDITIONAL_INFORMATION_WITHOUT_TARGET_PROCESSOR = "{\"type\":\"reprocessing-all\", \"repositoryPath\":\"a\",\"targetQueue\":\"queue\", \"initialCount\":5,\"remainingCount\":3, \"timestamp\":\"2018-11-13T12:00:55Z\"}";
+ private static final boolean CONSUME = true;
+
+ private static final String SERIALIZED_TASK_WITH_TARGET_PROCESSOR = "{\"type\":\"reprocessing-all\",\"repositorySize\":5,\"repositoryPath\":\"a\",\"targetQueue\":\"queue\",\"targetProcessor\":\"targetProcessor\",\"consume\":true}";
+ private static final String SERIALIZED_TASK_WITHOUT_TARGET_PROCESSOR = "{\"type\":\"reprocessing-all\",\"repositorySize\":5,\"repositoryPath\":\"a\",\"targetQueue\":\"queue\",\"consume\":false}";
+ private static final String SERIALIZED_TASK_ADDITIONAL_INFORMATION_WITH_TARGET_PROCESSOR = "{\"type\":\"reprocessing-all\", \"repositoryPath\":\"a\",\"targetQueue\":\"queue\",\"targetProcessor\":\"targetProcessor\",\"initialCount\":5,\"remainingCount\":3, \"timestamp\":\"2018-11-13T12:00:55Z\",\"consume\":true}";
+ private static final String SERIALIZED_TASK_ADDITIONAL_INFORMATION_WITHOUT_TARGET_PROCESSOR = "{\"type\":\"reprocessing-all\", \"repositoryPath\":\"a\",\"targetQueue\":\"queue\", \"initialCount\":5,\"remainingCount\":3, \"timestamp\":\"2018-11-13T12:00:55Z\",\"consume\":false}";
+
+ private static final String OLD_SERIALIZED_TASK = "{\"type\":\"reprocessing-all\",\"repositorySize\":5,\"repositoryPath\":\"a\",\"targetQueue\":\"queue\",\"targetProcessor\":\"targetProcessor\"}";
+ private static final String OLD_SERIALIZED_TASK_ADDITIONAL_INFORMATION = "{\"type\":\"reprocessing-all\", \"repositoryPath\":\"a\",\"targetQueue\":\"queue\",\"targetProcessor\":\"targetProcessor\",\"initialCount\":5,\"remainingCount\":3, \"timestamp\":\"2018-11-13T12:00:55Z\"}";
@Test
void taskShouldBeSerializable() throws Exception {
- ReprocessingAllMailsTask taskWithTargetProcessor = new ReprocessingAllMailsTask(REPROCESSING_SERVICE, REPOSITORY_SIZE, REPOSITORY_PATH, TARGET_QUEUE, SOME_TARGET_PROCESSOR);
- ReprocessingAllMailsTask taskWithoutTargetProcessor = new ReprocessingAllMailsTask(REPROCESSING_SERVICE, REPOSITORY_SIZE, REPOSITORY_PATH, TARGET_QUEUE, Optional.empty());
+ ReprocessingAllMailsTask taskWithTargetProcessor = new ReprocessingAllMailsTask(REPROCESSING_SERVICE, REPOSITORY_SIZE, REPOSITORY_PATH, new ReprocessingService.Configuration(TARGET_QUEUE, SOME_TARGET_PROCESSOR, CONSUME));
+ ReprocessingAllMailsTask taskWithoutTargetProcessor = new ReprocessingAllMailsTask(REPROCESSING_SERVICE, REPOSITORY_SIZE, REPOSITORY_PATH, new ReprocessingService.Configuration(TARGET_QUEUE, Optional.empty(), !CONSUME));
JsonSerializationVerifier.dtoModule(ReprocessingAllMailsTaskDTO.module(REPROCESSING_SERVICE))
.testCase(taskWithTargetProcessor, SERIALIZED_TASK_WITH_TARGET_PROCESSOR)
@@ -58,7 +65,9 @@ class ReprocessingAllMailsTaskTest {
}
@ParameterizedTest
- @ValueSource(strings = {"{\"type\":\"reprocessing-all\",\"repositorySize\":5,\"repositoryPath\":\"%\",\"targetQueue\":\"queue\",\"targetProcessor\":\"targetProcessor\"}", "{\"type\":\"reprocessing-all\",\"repositorySize\":5,\"repositoryPath\":\"%\",\"targetQueue\":\"queue\"}"})
+ @ValueSource(strings = {
+ "{\"type\":\"reprocessing-all\",\"repositorySize\":5,\"repositoryPath\":\"%\",\"targetQueue\":\"queue\",\"targetProcessor\":\"targetProcessor\"}",
+ "{\"type\":\"reprocessing-all\",\"repositorySize\":5,\"repositoryPath\":\"%\",\"targetQueue\":\"queue\"}"})
void taskShouldThrowOnDeserializationUrlDecodingError(String serialized) {
JsonTaskSerializer testee = JsonTaskSerializer.of(ReprocessingAllMailsTaskDTO.module(REPROCESSING_SERVICE));
@@ -68,9 +77,11 @@ class ReprocessingAllMailsTaskTest {
@Test
void additionalInformationShouldBeSerializable() throws Exception {
- ReprocessingAllMailsTask.AdditionalInformation details = new ReprocessingAllMailsTask.AdditionalInformation(REPOSITORY_PATH, TARGET_QUEUE, SOME_TARGET_PROCESSOR,
+ ReprocessingAllMailsTask.AdditionalInformation details = new ReprocessingAllMailsTask.AdditionalInformation(REPOSITORY_PATH,
+ new ReprocessingService.Configuration(TARGET_QUEUE, SOME_TARGET_PROCESSOR, CONSUME),
REPOSITORY_SIZE, REMAINING_COUNT, TIMESTAMP);
- ReprocessingAllMailsTask.AdditionalInformation detailsWithoutProcessor = new ReprocessingAllMailsTask.AdditionalInformation(REPOSITORY_PATH, TARGET_QUEUE, Optional.empty(),
+ ReprocessingAllMailsTask.AdditionalInformation detailsWithoutProcessor = new ReprocessingAllMailsTask.AdditionalInformation(REPOSITORY_PATH,
+ new ReprocessingService.Configuration(TARGET_QUEUE, Optional.empty(), !CONSUME),
REPOSITORY_SIZE, REMAINING_COUNT, TIMESTAMP);
JsonSerializationVerifier.dtoModule(ReprocessingAllMailsTaskAdditionalInformationDTO.module())
@@ -78,4 +89,24 @@ class ReprocessingAllMailsTaskTest {
.testCase(detailsWithoutProcessor, SERIALIZED_TASK_ADDITIONAL_INFORMATION_WITHOUT_TARGET_PROCESSOR)
.verify();
}
+
+ @Test
+ void shouldDeserializePreviousTaskFormat() throws Exception {
+ ReprocessingAllMailsTask taskWithTargetProcessor = new ReprocessingAllMailsTask(REPROCESSING_SERVICE, REPOSITORY_SIZE, REPOSITORY_PATH, new ReprocessingService.Configuration(TARGET_QUEUE, SOME_TARGET_PROCESSOR, CONSUME));
+ JsonTaskSerializer testee = JsonTaskSerializer.of(ReprocessingAllMailsTaskDTO.module(REPROCESSING_SERVICE));
+
+ assertThat(testee.deserialize(OLD_SERIALIZED_TASK))
+ .isEqualToComparingFieldByFieldRecursively(taskWithTargetProcessor);
+ }
+
+ @Test
+ void shouldDeserializePreviousAdditionalInformationFormat() throws Exception {
+ ReprocessingAllMailsTask.AdditionalInformation details = new ReprocessingAllMailsTask.AdditionalInformation(REPOSITORY_PATH,
+ new ReprocessingService.Configuration(TARGET_QUEUE, SOME_TARGET_PROCESSOR, CONSUME),
+ REPOSITORY_SIZE, REMAINING_COUNT, TIMESTAMP);
+ JsonTaskAdditionalInformationSerializer testee = JsonTaskAdditionalInformationSerializer.of(ReprocessingAllMailsTaskAdditionalInformationDTO.module());
+
+ assertThat(testee.deserialize(OLD_SERIALIZED_TASK_ADDITIONAL_INFORMATION))
+ .isEqualToComparingFieldByFieldRecursively(details);
+ }
}
\ No newline at end of file
diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingOneMailTaskTest.java b/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingOneMailTaskTest.java
index 1d89553..cf0746b 100644
--- a/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingOneMailTaskTest.java
+++ b/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingOneMailTaskTest.java
@@ -19,6 +19,7 @@
package org.apache.james.webadmin.service;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.Mockito.mock;
@@ -32,6 +33,7 @@ import org.apache.james.JsonSerializationVerifier;
import org.apache.james.mailrepository.api.MailKey;
import org.apache.james.mailrepository.api.MailRepositoryPath;
import org.apache.james.queue.api.MailQueueName;
+import org.apache.james.server.task.json.JsonTaskAdditionalInformationSerializer;
import org.apache.james.server.task.json.JsonTaskSerializer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
@@ -41,27 +43,32 @@ class ReprocessingOneMailTaskTest {
private static final Instant TIMESTAMP = Instant.parse("2018-11-13T12:00:55Z");
private static final Clock CLOCK = Clock.fixed(TIMESTAMP, ZoneId.of("UTC"));
private static final ReprocessingService REPROCESSING_SERVICE = mock(ReprocessingService.class);
- private static final String SERIALIZED_TASK_1 = "{\"type\":\"reprocessing-one\",\"repositoryPath\":\"a\",\"targetQueue\":\"queue\",\"mailKey\": \"myMail\",\"targetProcessor\":\"targetProcessor\"}";
- private static final String SERIALIZED_TASK_1_ADDITIONAL_INFORMATION = "{\"type\":\"reprocessing-one\", \"repositoryPath\":\"a\",\"targetQueue\":\"queue\",\"mailKey\": \"myMail\",\"targetProcessor\":\"targetProcessor\", \"timestamp\":\"2018-11-13T12:00:55Z\"}";
+ private static final String SERIALIZED_TASK_1 = "{\"type\":\"reprocessing-one\",\"repositoryPath\":\"a\",\"targetQueue\":\"queue\",\"mailKey\": \"myMail\",\"targetProcessor\":\"targetProcessor\", \"consume\":true}";
+ private static final String SERIALIZED_TASK_1_ADDITIONAL_INFORMATION = "{\"type\":\"reprocessing-one\", \"repositoryPath\":\"a\",\"targetQueue\":\"queue\",\"mailKey\": \"myMail\",\"targetProcessor\":\"targetProcessor\", \"timestamp\":\"2018-11-13T12:00:55Z\", \"consume\":true}";
+ private static final String SERIALIZED_TASK_OLD = "{\"type\":\"reprocessing-one\",\"repositoryPath\":\"a\",\"targetQueue\":\"queue\",\"mailKey\": \"myMail\",\"targetProcessor\":\"targetProcessor\"}";
+ private static final String SERIALIZED_TASK_OLD_ADDITIONAL_INFORMATION = "{\"type\":\"reprocessing-one\", \"repositoryPath\":\"a\",\"targetQueue\":\"queue\",\"mailKey\": \"myMail\",\"targetProcessor\":\"targetProcessor\", \"timestamp\":\"2018-11-13T12:00:55Z\"}";
private static final MailRepositoryPath REPOSITORY_PATH = MailRepositoryPath.from("a");
private static final MailQueueName TARGET_QUEUE = MailQueueName.of("queue");
private static final MailKey MAIL_KEY = new MailKey("myMail");
private static final Optional<String> TARGET_PROCESSOR = Optional.of("targetProcessor");
+ public static final boolean CONSUME = true;
@Test
void taskShouldBeSerializable() throws Exception {
- ReprocessingOneMailTask taskWithTargetProcessor = new ReprocessingOneMailTask(REPROCESSING_SERVICE, REPOSITORY_PATH, TARGET_QUEUE, MAIL_KEY, TARGET_PROCESSOR, CLOCK);
- ReprocessingOneMailTask taskWithoutTargetProcessor = new ReprocessingOneMailTask(REPROCESSING_SERVICE, REPOSITORY_PATH, TARGET_QUEUE, MAIL_KEY, Optional.empty(), CLOCK);
+ ReprocessingOneMailTask taskWithTargetProcessor = new ReprocessingOneMailTask(REPROCESSING_SERVICE, REPOSITORY_PATH, new ReprocessingService.Configuration(TARGET_QUEUE, TARGET_PROCESSOR, CONSUME), MAIL_KEY, CLOCK);
+ ReprocessingOneMailTask taskWithoutTargetProcessor = new ReprocessingOneMailTask(REPROCESSING_SERVICE, REPOSITORY_PATH, new ReprocessingService.Configuration(TARGET_QUEUE, Optional.empty(), !CONSUME), MAIL_KEY, CLOCK);
JsonSerializationVerifier.dtoModule(ReprocessingOneMailTaskDTO.module(CLOCK, REPROCESSING_SERVICE))
.testCase(taskWithTargetProcessor, SERIALIZED_TASK_1)
.testCase(taskWithoutTargetProcessor,
- "{\"type\":\"reprocessing-one\",\"repositoryPath\":\"a\",\"targetQueue\":\"queue\",\"mailKey\": \"myMail\"}")
+ "{\"type\":\"reprocessing-one\",\"repositoryPath\":\"a\",\"targetQueue\":\"queue\",\"mailKey\": \"myMail\", \"consume\":false}")
.verify();
}
@ParameterizedTest
- @ValueSource(strings = {"{\"type\":\"reprocessing-one\",\"repositoryPath\":\"%\",\"targetQueue\":\"queue\",\"mailKey\": \"myMail\",\"targetProcessor\":\"targetProcessor\"}", "{\"type\":\"reprocessing-one\",\"repositoryPath\":\"%\",\"targetQueue\":\"queue\",\"mailKey\": \"myMail\"}"})
+ @ValueSource(strings = {
+ "{\"type\":\"reprocessing-one\",\"repositoryPath\":\"%\",\"targetQueue\":\"queue\",\"mailKey\": \"myMail\",\"targetProcessor\":\"targetProcessor\"}",
+ "{\"type\":\"reprocessing-one\",\"repositoryPath\":\"%\",\"targetQueue\":\"queue\",\"mailKey\": \"myMail\"}"})
void taskShouldThrowOnDeserializationUrlDecodingError(String serialized) {
JsonTaskSerializer testee = JsonTaskSerializer.of(ReprocessingOneMailTaskDTO.module(CLOCK, REPROCESSING_SERVICE));
@@ -71,10 +78,31 @@ class ReprocessingOneMailTaskTest {
@Test
void additionalInformationShouldBeSerializable() throws IOException {
- ReprocessingOneMailTask.AdditionalInformation details = new ReprocessingOneMailTask.AdditionalInformation(REPOSITORY_PATH, TARGET_QUEUE, MAIL_KEY, TARGET_PROCESSOR, TIMESTAMP);
+ ReprocessingOneMailTask.AdditionalInformation details = new ReprocessingOneMailTask.AdditionalInformation(REPOSITORY_PATH,
+ new ReprocessingService.Configuration(TARGET_QUEUE, TARGET_PROCESSOR, true), MAIL_KEY, TIMESTAMP);
JsonSerializationVerifier.dtoModule(ReprocessingOneMailTaskAdditionalInformationDTO.module())
.bean(details)
.json(SERIALIZED_TASK_1_ADDITIONAL_INFORMATION)
.verify();
}
+
+
+ @Test
+ void shouldDeserializePreviousTaskFormat() throws Exception {
+ ReprocessingOneMailTask taskWithTargetProcessor = new ReprocessingOneMailTask(REPROCESSING_SERVICE, REPOSITORY_PATH, new ReprocessingService.Configuration(TARGET_QUEUE, TARGET_PROCESSOR, CONSUME), MAIL_KEY, CLOCK);
+ JsonTaskSerializer testee = JsonTaskSerializer.of(ReprocessingOneMailTaskDTO.module(CLOCK, REPROCESSING_SERVICE));
+
+ assertThat(testee.deserialize(SERIALIZED_TASK_OLD))
+ .isEqualToComparingFieldByFieldRecursively(taskWithTargetProcessor);
+ }
+
+ @Test
+ void shouldDeserializePreviousAdditionalInformationFormat() throws Exception {
+ ReprocessingOneMailTask.AdditionalInformation details = new ReprocessingOneMailTask.AdditionalInformation(REPOSITORY_PATH,
+ new ReprocessingService.Configuration(TARGET_QUEUE, TARGET_PROCESSOR, true), MAIL_KEY, TIMESTAMP);
+ JsonTaskAdditionalInformationSerializer testee = JsonTaskAdditionalInformationSerializer.of(ReprocessingOneMailTaskAdditionalInformationDTO.module());
+
+ assertThat(testee.deserialize(SERIALIZED_TASK_OLD_ADDITIONAL_INFORMATION))
+ .isEqualToComparingFieldByFieldRecursively(details);
+ }
}
diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingServiceTest.java b/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingServiceTest.java
index d2ad1c5..1ad1e9f 100644
--- a/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingServiceTest.java
+++ b/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingServiceTest.java
@@ -43,6 +43,7 @@ import org.apache.james.queue.api.ManageableMailQueue;
import org.apache.james.queue.api.RawMailQueueItemDecoratorFactory;
import org.apache.james.queue.memory.MemoryMailQueueFactory;
import org.apache.james.util.MimeMessageUtil;
+import org.apache.james.webadmin.service.ReprocessingService.Configuration;
import org.apache.mailet.base.test.FakeMail;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -64,6 +65,7 @@ class ReprocessingServiceTest {
private static final Consumer<MailKey> NOOP_CONSUMER = key -> { };
private static final Optional<String> NO_TARGET_PROCESSOR = Optional.empty();
private static final byte[] MESSAGE_BYTES = "header: value \r\n".getBytes(UTF_8);
+ public static final boolean CONSUME = true;
private ReprocessingService reprocessingService;
private MemoryMailRepositoryStore mailRepositoryStore;
@@ -106,7 +108,7 @@ class ReprocessingServiceTest {
repository.store(mail2);
repository.store(mail3);
- reprocessingService.reprocess(PATH, KEY_2, NO_TARGET_PROCESSOR, SPOOL);
+ reprocessingService.reprocess(PATH, KEY_2, new ReprocessingService.Configuration(SPOOL, NO_TARGET_PROCESSOR, CONSUME));
assertThat(queueFactory.getQueue(SPOOL).get().browse())
.toIterable()
@@ -121,7 +123,7 @@ class ReprocessingServiceTest {
repository.store(mail2);
repository.store(mail3);
- reprocessingService.reprocess(PATH, KEY_2, NO_TARGET_PROCESSOR, SPOOL);
+ reprocessingService.reprocess(PATH, KEY_2, new ReprocessingService.Configuration(SPOOL, NO_TARGET_PROCESSOR, CONSUME));
assertThat(repository.list()).toIterable()
.containsOnly(KEY_1, KEY_3);
@@ -134,7 +136,7 @@ class ReprocessingServiceTest {
repository.store(mail2);
repository.store(mail3);
- reprocessingService.reprocessAll(PATH, NO_TARGET_PROCESSOR, SPOOL, NOOP_CONSUMER);
+ reprocessingService.reprocessAll(PATH, new Configuration(SPOOL, NO_TARGET_PROCESSOR, CONSUME), NOOP_CONSUMER);
assertThat(repository.list()).toIterable()
.isEmpty();
@@ -147,7 +149,7 @@ class ReprocessingServiceTest {
repository.store(mail2);
repository.store(mail3);
- reprocessingService.reprocessAll(PATH, NO_TARGET_PROCESSOR, SPOOL, NOOP_CONSUMER);
+ reprocessingService.reprocessAll(PATH, new Configuration(SPOOL, NO_TARGET_PROCESSOR, CONSUME), NOOP_CONSUMER);
assertThat(queueFactory.getQueue(SPOOL).get().browse())
.toIterable()
@@ -178,7 +180,7 @@ class ReprocessingServiceTest {
}
});
- reprocessingService.reprocessAll(PATH, NO_TARGET_PROCESSOR, SPOOL, concurrentRemoveConsumer);
+ reprocessingService.reprocessAll(PATH, new ReprocessingService.Configuration(SPOOL, NO_TARGET_PROCESSOR, CONSUME), concurrentRemoveConsumer);
assertThat(queueFactory.getQueue(SPOOL).get().browse())
.toIterable()
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org