You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2022/03/11 05:11:00 UTC

[james-project] 01/03: JAMES-3723 Allow to not consume emails upon reprocessing

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit c458cea86af5c48ccd844dff6d202834fc2be44d
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Sat Mar 5 22:06:45 2022 +0700

    JAMES-3723 Allow to not consume emails upon reprocessing
---
 ...dminServerTaskSerializationIntegrationTest.java |  1 +
 .../webadmin/routes/MailRepositoriesRoutes.java    | 20 +++--
 .../webadmin/service/ReprocessingAllMailsTask.java | 37 +++------
 ...essingAllMailsTaskAdditionalInformationDTO.java | 18 ++++-
 .../service/ReprocessingAllMailsTaskDTO.java       | 20 +++--
 .../webadmin/service/ReprocessingOneMailTask.java  | 38 +++------
 ...cessingOneMailTaskAdditionalInformationDTO.java | 18 ++++-
 .../service/ReprocessingOneMailTaskDTO.java        | 24 ++++--
 .../webadmin/service/ReprocessingService.java      | 44 ++++++++---
 .../routes/MailRepositoriesRoutesTest.java         | 91 ++++++++++++++++++++++
 .../service/ReprocessingAllMailsTaskTest.java      | 49 +++++++++---
 .../service/ReprocessingOneMailTaskTest.java       | 42 ++++++++--
 .../webadmin/service/ReprocessingServiceTest.java  | 12 +--
 13 files changed, 304 insertions(+), 110 deletions(-)

diff --git a/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQWebAdminServerTaskSerializationIntegrationTest.java b/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQWebAdminServerTaskSerializationIntegrationTest.java
index 4e3ac97..4e5ba18 100644
--- a/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQWebAdminServerTaskSerializationIntegrationTest.java
+++ b/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQWebAdminServerTaskSerializationIntegrationTest.java
@@ -344,6 +344,7 @@ class RabbitMQWebAdminServerTaskSerializationIntegrationTest {
             .body("type", is("reprocessing-one"))
             .body("additionalInformation.repositoryPath", is(mailRepositoryUrl.asString()))
             .body("additionalInformation.targetQueue", is(notNullValue()))
+            .body("additionalInformation.consume", is(notNullValue()))
             .body("additionalInformation.mailKey", is(mailKey))
             .body("additionalInformation.targetProcessor", is(nullValue()));
     }
diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/routes/MailRepositoriesRoutes.java b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/routes/MailRepositoriesRoutes.java
index aea7771..97c5634 100644
--- a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/routes/MailRepositoriesRoutes.java
+++ b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/routes/MailRepositoriesRoutes.java
@@ -322,11 +322,15 @@ public class MailRepositoriesRoutes implements Routes {
 
     private Task reprocessAll(Request request) throws UnsupportedEncodingException, MailRepositoryStore.MailRepositoryStoreException {
         MailRepositoryPath path = decodedRepositoryPath(request);
-        Optional<String> targetProcessor = parseTargetProcessor(request);
-        MailQueueName targetQueue = parseTargetQueue(request);
 
         Long repositorySize = repositoryStoreService.size(path).orElse(0L);
-        return new ReprocessingAllMailsTask(reprocessingService, repositorySize, path, targetQueue, targetProcessor);
+        return new ReprocessingAllMailsTask(reprocessingService, repositorySize, path, extractConfiguration(request));
+    }
+
+    private ReprocessingService.Configuration extractConfiguration(Request request) {
+        return new ReprocessingService.Configuration(parseTargetQueue(request),
+            parseTargetProcessor(request),
+            parseConsume(request).orElse(true));
     }
 
     public void defineReprocessOne() {
@@ -340,10 +344,7 @@ public class MailRepositoriesRoutes implements Routes {
         MailRepositoryPath path = decodedRepositoryPath(request);
         MailKey key = new MailKey(request.params("key"));
 
-        Optional<String> targetProcessor = parseTargetProcessor(request);
-        MailQueueName targetQueue = parseTargetQueue(request);
-
-        return new ReprocessingOneMailTask(reprocessingService, path, targetQueue, key, targetProcessor, Clock.systemUTC());
+        return new ReprocessingOneMailTask(reprocessingService, path, extractConfiguration(request), key, Clock.systemUTC());
     }
 
     private Set<AdditionalField> extractAdditionalFields(String additionalFieldsParam) throws IllegalArgumentException {
@@ -361,6 +362,11 @@ public class MailRepositoriesRoutes implements Routes {
         return Optional.ofNullable(request.queryParams("processor"));
     }
 
+    private Optional<Boolean> parseConsume(Request request) {
+        return Optional.ofNullable(request.queryParams("consume"))
+            .map(Boolean::parseBoolean);
+    }
+
     private MailQueueName parseTargetQueue(Request request) {
         return Optional.ofNullable(request.queryParams("queue"))
             .map(MailQueueName::of)
diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingAllMailsTask.java b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingAllMailsTask.java
index 5d8ea39..ef930f9 100644
--- a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingAllMailsTask.java
+++ b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingAllMailsTask.java
@@ -29,7 +29,6 @@ import javax.mail.MessagingException;
 import org.apache.james.mailrepository.api.MailKey;
 import org.apache.james.mailrepository.api.MailRepositoryPath;
 import org.apache.james.mailrepository.api.MailRepositoryStore;
-import org.apache.james.queue.api.MailQueueName;
 import org.apache.james.task.Task;
 import org.apache.james.task.TaskExecutionDetails;
 import org.apache.james.task.TaskType;
@@ -40,27 +39,21 @@ public class ReprocessingAllMailsTask implements Task {
 
     public static class AdditionalInformation implements TaskExecutionDetails.AdditionalInformation {
         private final MailRepositoryPath repositoryPath;
-        private final String targetQueue;
-        private final Optional<String> targetProcessor;
+        private final ReprocessingService.Configuration configuration;
         private final long initialCount;
         private final long remainingCount;
         private final Instant timestamp;
 
-        public AdditionalInformation(MailRepositoryPath repositoryPath, MailQueueName targetQueue, Optional<String> targetProcessor, long initialCount, long remainingCount, Instant timestamp) {
+        public AdditionalInformation(MailRepositoryPath repositoryPath, ReprocessingService.Configuration configuration, long initialCount, long remainingCount, Instant timestamp) {
             this.repositoryPath = repositoryPath;
-            this.targetQueue = targetQueue.asString();
-            this.targetProcessor = targetProcessor;
+            this.configuration = configuration;
             this.initialCount = initialCount;
             this.remainingCount = remainingCount;
             this.timestamp = timestamp;
         }
 
-        public String getTargetQueue() {
-            return targetQueue;
-        }
-
-        public Optional<String> getTargetProcessor() {
-            return targetProcessor;
+        public ReprocessingService.Configuration getConfiguration() {
+            return configuration;
         }
 
         public String getRepositoryPath() {
@@ -97,17 +90,15 @@ public class ReprocessingAllMailsTask implements Task {
 
     private final ReprocessingService reprocessingService;
     private final MailRepositoryPath repositoryPath;
-    private final MailQueueName targetQueue;
-    private final Optional<String> targetProcessor;
+    private final ReprocessingService.Configuration configuration;
     private final long repositorySize;
     private final AtomicLong processedCount;
 
     public ReprocessingAllMailsTask(ReprocessingService reprocessingService, long repositorySize,
-                                    MailRepositoryPath repositoryPath, MailQueueName targetQueue, Optional<String> targetProcessor) {
+                                    MailRepositoryPath repositoryPath, ReprocessingService.Configuration configuration) {
         this.reprocessingService = reprocessingService;
         this.repositoryPath = repositoryPath;
-        this.targetQueue = targetQueue;
-        this.targetProcessor = targetProcessor;
+        this.configuration = configuration;
         this.repositorySize = repositorySize;
         this.processedCount = new AtomicLong(0);
     }
@@ -119,7 +110,7 @@ public class ReprocessingAllMailsTask implements Task {
     @Override
     public Result run() {
         try {
-            reprocessingService.reprocessAll(repositoryPath, targetProcessor, targetQueue, this::notifyProgress);
+            reprocessingService.reprocessAll(repositoryPath, configuration, this::notifyProgress);
             return Result.COMPLETED;
         } catch (MessagingException | MailRepositoryStore.MailRepositoryStoreException e) {
             LOGGER.error("Encountered error while reprocessing repository", e);
@@ -135,12 +126,8 @@ public class ReprocessingAllMailsTask implements Task {
         return repositorySize;
     }
 
-    Optional<String> getTargetProcessor() {
-        return targetProcessor;
-    }
-
-    MailQueueName getTargetQueue() {
-        return targetQueue;
+    ReprocessingService.Configuration getConfiguration() {
+        return configuration;
     }
 
     @Override
@@ -151,7 +138,7 @@ public class ReprocessingAllMailsTask implements Task {
     @Override
     public Optional<TaskExecutionDetails.AdditionalInformation> details() {
         return Optional.of(new AdditionalInformation(
-            repositoryPath, targetQueue, targetProcessor, repositorySize, repositorySize - processedCount.get(),
+            repositoryPath, configuration, repositorySize, repositorySize - processedCount.get(),
             Clock.systemUTC().instant()));
     }
 
diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingAllMailsTaskAdditionalInformationDTO.java b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingAllMailsTaskAdditionalInformationDTO.java
index adacac9..55dcdb7 100644
--- a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingAllMailsTaskAdditionalInformationDTO.java
+++ b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingAllMailsTaskAdditionalInformationDTO.java
@@ -35,16 +35,19 @@ public class ReprocessingAllMailsTaskAdditionalInformationDTO implements Additio
             .convertToDTO(ReprocessingAllMailsTaskAdditionalInformationDTO.class)
             .toDomainObjectConverter(dto -> new ReprocessingAllMailsTask.AdditionalInformation(
                 MailRepositoryPath.from(dto.repositoryPath),
-                MailQueueName.of(dto.targetQueue),
-                dto.targetProcessor,
+                new ReprocessingService.Configuration(
+                    MailQueueName.of(dto.getTargetQueue()),
+                    dto.getTargetProcessor(),
+                    dto.isConsume()),
                 dto.initialCount,
                 dto.remainingCount,
                 dto.timestamp))
             .toDTOConverter((details, type) -> new ReprocessingAllMailsTaskAdditionalInformationDTO(
                 type,
                 details.getRepositoryPath(),
-                details.getTargetQueue(),
-                details.getTargetProcessor(),
+                details.getConfiguration().getMailQueueName().asString(),
+                details.getConfiguration().getTargetProcessor(),
+                Optional.of(details.getConfiguration().isConsume()),
                 details.getInitialCount(),
                 details.getRemainingCount(),
                 details.timestamp()))
@@ -56,6 +59,7 @@ public class ReprocessingAllMailsTaskAdditionalInformationDTO implements Additio
     private final String repositoryPath;
     private final String targetQueue;
     private final Optional<String> targetProcessor;
+    private final boolean consume;
     private final long initialCount;
     private final long remainingCount;
     private final Instant timestamp;
@@ -65,6 +69,7 @@ public class ReprocessingAllMailsTaskAdditionalInformationDTO implements Additio
         @JsonProperty("repositoryPath") String repositoryPath,
         @JsonProperty("targetQueue") String targetQueue,
         @JsonProperty("targetProcessor") Optional<String> targetProcessor,
+        @JsonProperty("consume") Optional<Boolean> consume,
         @JsonProperty("initialCount") long initialCount,
         @JsonProperty("remainingCount") long remainingCount,
         @JsonProperty("timestamp") Instant timestamp) {
@@ -75,6 +80,11 @@ public class ReprocessingAllMailsTaskAdditionalInformationDTO implements Additio
         this.initialCount = initialCount;
         this.remainingCount = remainingCount;
         this.timestamp = timestamp;
+        this.consume = consume.orElse(true);
+    }
+
+    public boolean isConsume() {
+        return consume;
     }
 
     @Override
diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingAllMailsTaskDTO.java b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingAllMailsTaskDTO.java
index d81f2a8..7a7b722 100644
--- a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingAllMailsTaskDTO.java
+++ b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingAllMailsTaskDTO.java
@@ -46,9 +46,9 @@ public class ReprocessingAllMailsTaskDTO implements TaskDTO {
                 typeName,
                 domainObject.getRepositorySize(),
                 domainObject.getRepositoryPath().urlEncoded(),
-                domainObject.getTargetQueue().asString(),
-                domainObject.getTargetProcessor()
-            );
+                domainObject.getConfiguration().getMailQueueName().asString(),
+                Optional.of(domainObject.getConfiguration().isConsume()),
+                domainObject.getConfiguration().getTargetProcessor());
         } catch (Exception e) {
             throw new ReprocessingAllMailsTask.UrlEncodingFailureSerializationException(domainObject.getRepositoryPath());
         }
@@ -58,17 +58,20 @@ public class ReprocessingAllMailsTaskDTO implements TaskDTO {
     private final long repositorySize;
     private final String repositoryPath;
     private final String targetQueue;
+    private final boolean consume;
     private final Optional<String> targetProcessor;
 
     public ReprocessingAllMailsTaskDTO(@JsonProperty("type") String type,
                                        @JsonProperty("repositorySize") long repositorySize,
                                        @JsonProperty("repositoryPath") String repositoryPath,
                                        @JsonProperty("targetQueue") String targetQueue,
+                                       @JsonProperty("consume") Optional<Boolean> consume,
                                        @JsonProperty("targetProcessor") Optional<String> targetProcessor) {
         this.type = type;
         this.repositorySize = repositorySize;
         this.repositoryPath = repositoryPath;
         this.targetQueue = targetQueue;
+        this.consume = consume.orElse(true);
         this.targetProcessor = targetProcessor;
     }
 
@@ -78,9 +81,10 @@ public class ReprocessingAllMailsTaskDTO implements TaskDTO {
                 reprocessingService,
                 repositorySize,
                 MailRepositoryPath.fromEncoded(repositoryPath),
-                MailQueueName.of(targetQueue),
-                targetProcessor
-            );
+                new ReprocessingService.Configuration(
+                    MailQueueName.of(targetQueue),
+                    targetProcessor,
+                    consume));
         } catch (Exception e) {
             throw new ReprocessingAllMailsTask.InvalidMailRepositoryPathDeserializationException(repositoryPath);
         }
@@ -103,6 +107,10 @@ public class ReprocessingAllMailsTaskDTO implements TaskDTO {
         return targetQueue;
     }
 
+    public boolean isConsume() {
+        return consume;
+    }
+
     public Optional<String> getTargetProcessor() {
         return targetProcessor;
     }
diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingOneMailTask.java b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingOneMailTask.java
index 2777ce2..bb81055 100644
--- a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingOneMailTask.java
+++ b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingOneMailTask.java
@@ -28,7 +28,6 @@ import javax.mail.MessagingException;
 import org.apache.james.mailrepository.api.MailKey;
 import org.apache.james.mailrepository.api.MailRepositoryPath;
 import org.apache.james.mailrepository.api.MailRepositoryStore;
-import org.apache.james.queue.api.MailQueueName;
 import org.apache.james.task.Task;
 import org.apache.james.task.TaskExecutionDetails;
 import org.apache.james.task.TaskType;
@@ -39,16 +38,14 @@ public class ReprocessingOneMailTask implements Task {
 
     public static class AdditionalInformation implements TaskExecutionDetails.AdditionalInformation {
         private final MailRepositoryPath repositoryPath;
-        private final String targetQueue;
+        private final ReprocessingService.Configuration configuration;
         private final MailKey mailKey;
-        private final Optional<String> targetProcessor;
         private final Instant timestamp;
 
-        public AdditionalInformation(MailRepositoryPath repositoryPath, MailQueueName targetQueue, MailKey mailKey, Optional<String> targetProcessor, Instant timestamp) {
+        public AdditionalInformation(MailRepositoryPath repositoryPath, ReprocessingService.Configuration configuration, MailKey mailKey, Instant timestamp) {
             this.repositoryPath = repositoryPath;
-            this.targetQueue = targetQueue.asString();
+            this.configuration = configuration;
             this.mailKey = mailKey;
-            this.targetProcessor = targetProcessor;
             this.timestamp = timestamp;
         }
 
@@ -56,12 +53,8 @@ public class ReprocessingOneMailTask implements Task {
             return mailKey.asString();
         }
 
-        public String getTargetQueue() {
-            return targetQueue;
-        }
-
-        public Optional<String> getTargetProcessor() {
-            return targetProcessor;
+        public ReprocessingService.Configuration getConfiguration() {
+            return configuration;
         }
 
         public String getRepositoryPath() {
@@ -90,29 +83,26 @@ public class ReprocessingOneMailTask implements Task {
 
     private final ReprocessingService reprocessingService;
     private final MailRepositoryPath repositoryPath;
-    private final MailQueueName targetQueue;
+    private final ReprocessingService.Configuration configuration;
     private final MailKey mailKey;
-    private final Optional<String> targetProcessor;
     private final AdditionalInformation additionalInformation;
 
     public ReprocessingOneMailTask(ReprocessingService reprocessingService,
                                    MailRepositoryPath repositoryPath,
-                                   MailQueueName targetQueue,
+                                   ReprocessingService.Configuration configuration,
                                    MailKey mailKey,
-                                   Optional<String> targetProcessor,
                                    Clock clock) {
         this.reprocessingService = reprocessingService;
         this.repositoryPath = repositoryPath;
-        this.targetQueue = targetQueue;
+        this.configuration = configuration;
         this.mailKey = mailKey;
-        this.targetProcessor = targetProcessor;
-        this.additionalInformation = new AdditionalInformation(repositoryPath, targetQueue, mailKey, targetProcessor, clock.instant());
+        this.additionalInformation = new AdditionalInformation(repositoryPath, configuration, mailKey, clock.instant());
     }
 
     @Override
     public Result run() {
         try {
-            reprocessingService.reprocess(repositoryPath, mailKey, targetProcessor, targetQueue);
+            reprocessingService.reprocess(repositoryPath, mailKey, configuration);
             return Result.COMPLETED;
         } catch (MessagingException | MailRepositoryStore.MailRepositoryStoreException e) {
             LOGGER.error("Encountered error while reprocessing repository", e);
@@ -129,16 +119,12 @@ public class ReprocessingOneMailTask implements Task {
         return repositoryPath;
     }
 
-    MailQueueName getTargetQueue() {
-        return targetQueue;
-    }
-
     MailKey getMailKey() {
         return mailKey;
     }
 
-    Optional<String> getTargetProcessor() {
-        return targetProcessor;
+    public ReprocessingService.Configuration getConfiguration() {
+        return configuration;
     }
 
     @Override
diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingOneMailTaskAdditionalInformationDTO.java b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingOneMailTaskAdditionalInformationDTO.java
index f84ae9d..40802d8 100644
--- a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingOneMailTaskAdditionalInformationDTO.java
+++ b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingOneMailTaskAdditionalInformationDTO.java
@@ -36,16 +36,19 @@ public class ReprocessingOneMailTaskAdditionalInformationDTO implements Addition
             .convertToDTO(ReprocessingOneMailTaskAdditionalInformationDTO.class)
             .toDomainObjectConverter(dto -> new ReprocessingOneMailTask.AdditionalInformation(
                 MailRepositoryPath.from(dto.repositoryPath),
-                MailQueueName.of(dto.targetQueue),
+                new ReprocessingService.Configuration(
+                    MailQueueName.of(dto.targetQueue),
+                    dto.targetProcessor,
+                    dto.isConsume()),
                 new MailKey(dto.mailKey),
-                dto.targetProcessor,
                 dto.timestamp))
             .toDTOConverter((details, type) -> new ReprocessingOneMailTaskAdditionalInformationDTO(
                 type,
                 details.getRepositoryPath(),
-                details.getTargetQueue(),
+                details.getConfiguration().getMailQueueName().asString(),
                 details.getMailKey(),
-                details.getTargetProcessor(),
+                Optional.of(details.getConfiguration().isConsume()),
+                details.getConfiguration().getTargetProcessor(),
                 details.timestamp()))
             .typeName(ReprocessingOneMailTask.TYPE.asString())
             .withFactory(AdditionalInformationDTOModule::new);
@@ -56,15 +59,18 @@ public class ReprocessingOneMailTaskAdditionalInformationDTO implements Addition
     private final String targetQueue;
     private final String mailKey;
     private final Optional<String> targetProcessor;
+    private final boolean consume;
     private final Instant timestamp;
 
     public ReprocessingOneMailTaskAdditionalInformationDTO(@JsonProperty("type") String type,
                                                            @JsonProperty("repositoryPath") String repositoryPath,
                                                            @JsonProperty("targetQueue") String targetQueue,
                                                            @JsonProperty("mailKey") String mailKey,
+                                                           @JsonProperty("consume") Optional<Boolean> consume,
                                                            @JsonProperty("targetProcessor") Optional<String> targetProcessor,
                                                            @JsonProperty("timestamp") Instant timestamp) {
         this.type = type;
+        this.consume = consume.orElse(true);
         this.repositoryPath = repositoryPath;
         this.targetQueue = targetQueue;
         this.mailKey = mailKey;
@@ -72,6 +78,10 @@ public class ReprocessingOneMailTaskAdditionalInformationDTO implements Addition
         this.timestamp = timestamp;
     }
 
+    public boolean isConsume() {
+        return consume;
+    }
+
     @Override
     public String getType() {
         return type;
diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingOneMailTaskDTO.java b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingOneMailTaskDTO.java
index ea09c18..9ced355 100644
--- a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingOneMailTaskDTO.java
+++ b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingOneMailTaskDTO.java
@@ -47,10 +47,10 @@ public class ReprocessingOneMailTaskDTO implements TaskDTO {
             return new ReprocessingOneMailTaskDTO(
                 typeName,
                 domainObject.getRepositoryPath().urlEncoded(),
-                domainObject.getTargetQueue().asString(),
+                domainObject.getConfiguration().getMailQueueName().asString(),
                 domainObject.getMailKey().asString(),
-                domainObject.getTargetProcessor()
-            );
+                domainObject.getConfiguration().getTargetProcessor(),
+                Optional.of(domainObject.getConfiguration().isConsume()));
         } catch (Exception e) {
             throw new ReprocessingOneMailTask.UrlEncodingFailureSerializationException(domainObject.getRepositoryPath());
         }
@@ -61,28 +61,32 @@ public class ReprocessingOneMailTaskDTO implements TaskDTO {
     private final String targetQueue;
     private final String mailKey;
     private final Optional<String> targetProcessor;
+    private final boolean consume;
 
     public ReprocessingOneMailTaskDTO(@JsonProperty("type") String type,
                                       @JsonProperty("repositoryPath") String repositoryPath,
                                       @JsonProperty("targetQueue") String targetQueue,
                                       @JsonProperty("mailKey") String mailKey,
-                                      @JsonProperty("targetProcessor") Optional<String> targetProcessor) {
+                                      @JsonProperty("targetProcessor") Optional<String> targetProcessor,
+                                      @JsonProperty("boolean") Optional<Boolean> consume) {
         this.type = type;
         this.repositoryPath = repositoryPath;
         this.mailKey = mailKey;
         this.targetQueue = targetQueue;
         this.targetProcessor = targetProcessor;
+        this.consume = consume.orElse(true);
     }
 
     public ReprocessingOneMailTask fromDTO(ReprocessingService reprocessingService, Clock clock) {
         return new ReprocessingOneMailTask(
             reprocessingService,
             getMailRepositoryPath(),
-            MailQueueName.of(targetQueue),
+            new ReprocessingService.Configuration(
+                MailQueueName.of(targetQueue),
+                targetProcessor,
+                consume),
             new MailKey(mailKey),
-            targetProcessor,
-            clock
-        );
+            clock);
     }
 
     private MailRepositoryPath getMailRepositoryPath() {
@@ -110,6 +114,10 @@ public class ReprocessingOneMailTaskDTO implements TaskDTO {
         return targetQueue;
     }
 
+    public boolean isConsume() {
+        return consume;
+    }
+
     public Optional<String> getTargetProcessor() {
         return targetProcessor;
     }
diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingService.java b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingService.java
index 5b7ed6b..9c443d2 100644
--- a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingService.java
+++ b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingService.java
@@ -53,20 +53,46 @@ public class ReprocessingService {
         }
     }
 
+    public static class Configuration {
+        private final MailQueueName mailQueueName;
+        private final Optional<String> targetProcessor;
+        private final boolean consume;
+
+        public Configuration(MailQueueName mailQueueName, Optional<String> targetProcessor, boolean consume) {
+            this.mailQueueName = mailQueueName;
+            this.targetProcessor = targetProcessor;
+            this.consume = consume;
+        }
+
+        public MailQueueName getMailQueueName() {
+            return mailQueueName;
+        }
+
+        public Optional<String> getTargetProcessor() {
+            return targetProcessor;
+        }
+
+        public boolean isConsume() {
+            return consume;
+        }
+    }
+
     static class Reprocessor implements Closeable {
         private final MailQueue mailQueue;
-        private final Optional<String> targetProcessor;
+        private final Configuration configuration;
 
-        Reprocessor(MailQueue mailQueue, Optional<String> targetProcessor) {
+        Reprocessor(MailQueue mailQueue, Configuration configuration) {
             this.mailQueue = mailQueue;
-            this.targetProcessor = targetProcessor;
+            this.configuration = configuration;
         }
 
         private void reprocess(MailRepository repository, Mail mail) {
             try {
-                targetProcessor.ifPresent(mail::setState);
+                configuration.getTargetProcessor().ifPresent(mail::setState);
                 mailQueue.enQueue(mail);
-                repository.remove(mail);
+                if (configuration.isConsume()) {
+                    repository.remove(mail);
+                }
             } catch (Exception e) {
                 throw new RuntimeException("Error encountered while reprocessing mail " + mail.getName(), e);
             } finally {
@@ -94,8 +120,8 @@ public class ReprocessingService {
         this.mailRepositoryStoreService = mailRepositoryStoreService;
     }
 
-    public void reprocessAll(MailRepositoryPath path, Optional<String> targetProcessor, MailQueueName targetQueue, Consumer<MailKey> keyListener) throws MailRepositoryStore.MailRepositoryStoreException, MessagingException {
-        try (Reprocessor reprocessor = new Reprocessor(getMailQueue(targetQueue), targetProcessor)) {
+    public void reprocessAll(MailRepositoryPath path, Configuration configuration, Consumer<MailKey> keyListener) throws MailRepositoryStore.MailRepositoryStoreException, MessagingException {
+        try (Reprocessor reprocessor = new Reprocessor(getMailQueue(configuration.getMailQueueName()), configuration)) {
             mailRepositoryStoreService
                 .getRepositories(path)
                 .forEach(Throwing.consumer((MailRepository repository) ->
@@ -107,8 +133,8 @@ public class ReprocessingService {
         }
     }
 
-    public void reprocess(MailRepositoryPath path, MailKey key, Optional<String> targetProcessor, MailQueueName targetQueue) throws MailRepositoryStore.MailRepositoryStoreException, MessagingException {
-        try (Reprocessor reprocessor = new Reprocessor(getMailQueue(targetQueue), targetProcessor)) {
+    public void reprocess(MailRepositoryPath path, MailKey key, Configuration configuration) throws MailRepositoryStore.MailRepositoryStoreException, MessagingException {
+        try (Reprocessor reprocessor = new Reprocessor(getMailQueue(configuration.getMailQueueName()), configuration)) {
             Pair<MailRepository, Mail> mailPair = mailRepositoryStoreService
                 .getRepositories(path)
                 .map(Throwing.function(repository -> Pair.of(repository, Optional.ofNullable(repository.retrieve(key)))))
diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/routes/MailRepositoriesRoutesTest.java b/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/routes/MailRepositoriesRoutesTest.java
index 95e8860..16bd869 100644
--- a/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/routes/MailRepositoriesRoutesTest.java
+++ b/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/routes/MailRepositoriesRoutesTest.java
@@ -1133,6 +1133,7 @@ class MailRepositoriesRoutesTest {
             .body("additionalInformation.remainingCount", is(0))
             .body("additionalInformation.targetProcessor", is(emptyOrNullString()))
             .body("additionalInformation.targetQueue", is(MailQueueFactory.SPOOL.asString()))
+            .body("additionalInformation.consume", is(true))
             .body("startedDate", is(notNullValue()))
             .body("submitDate", is(notNullValue()))
             .body("completedDate", is(notNullValue()));
@@ -1157,6 +1158,7 @@ class MailRepositoriesRoutesTest {
             .param("action", "reprocess")
             .param("queue", CUSTOM_QUEUE.asString())
             .param("processor", transport)
+            .param("consume", false)
             .patch(PATH_ESCAPED_MY_REPO + "/mails")
             .jsonPath()
             .get("taskId");
@@ -1174,6 +1176,7 @@ class MailRepositoriesRoutesTest {
             .body("additionalInformation.remainingCount", is(0))
             .body("additionalInformation.targetProcessor", is(transport))
             .body("additionalInformation.targetQueue", is(CUSTOM_QUEUE.asString()))
+            .body("additionalInformation.consume", is(false))
             .body("startedDate", is(notNullValue()))
             .body("submitDate", is(notNullValue()))
             .body("completedDate", is(notNullValue()));
@@ -1244,6 +1247,39 @@ class MailRepositoriesRoutesTest {
     }
 
     @Test
+    void reprocessingAllTaskShouldNotClearMailRepositoryWhenNotConsume() throws Exception {
+        MailRepository mailRepository = mailRepositoryStore.create(URL_MY_REPO);
+        String name1 = "name1";
+        String name2 = "name2";
+        mailRepository.store(FakeMail.builder()
+            .name(name1)
+            .mimeMessage(MimeMessageUtil.mimeMessageFromBytes(MESSAGE_BYTES))
+            .build());
+        mailRepository.store(FakeMail.builder()
+            .name(name2)
+            .mimeMessage(MimeMessageUtil.mimeMessageFromBytes(MESSAGE_BYTES))
+            .build());
+
+        String transport = "transport";
+        String taskId = with()
+            .param("action", "reprocess")
+            .param("queue", CUSTOM_QUEUE.asString())
+            .param("processor", transport)
+            .param("consume", false)
+            .patch(PATH_ESCAPED_MY_REPO + "/mails")
+            .jsonPath()
+            .get("taskId");
+
+        with()
+            .basePath(TasksRoutes.BASE)
+            .get(taskId + "/await");
+
+        assertThat(mailRepository.list())
+            .toIterable()
+            .hasSize(2);
+    }
+
+    @Test
     void reprocessingAllTaskShouldClearBothMailRepositoriesWhenSamePath() throws Exception {
         MailRepository mailRepository1 = mailRepositoryStore.create(URL_MY_REPO);
         MailRepository mailRepository2 = mailRepositoryStore.create(URL_MY_REPO_OTHER);
@@ -1519,6 +1555,7 @@ class MailRepositoriesRoutesTest {
             .body("additionalInformation.mailKey", is(NAME_1))
             .body("additionalInformation.targetProcessor", is(emptyOrNullString()))
             .body("additionalInformation.targetQueue", is(MailQueueFactory.SPOOL.asString()))
+            .body("additionalInformation.consume", is(true))
             .body("startedDate", is(notNullValue()))
             .body("submitDate", is(notNullValue()))
             .body("completedDate", is(notNullValue()));
@@ -1543,6 +1580,7 @@ class MailRepositoriesRoutesTest {
             .param("action", "reprocess")
             .param("queue", CUSTOM_QUEUE.asString())
             .param("processor", transport)
+            .param("consume", false)
             .patch(PATH_ESCAPED_MY_REPO + "/mails/" + NAME_1)
             .jsonPath()
             .get("taskId");
@@ -1559,6 +1597,7 @@ class MailRepositoriesRoutesTest {
             .body("additionalInformation.mailKey", is(NAME_1))
             .body("additionalInformation.targetProcessor", is(transport))
             .body("additionalInformation.targetQueue", is(CUSTOM_QUEUE.asString()))
+            .body("additionalInformation.consume", is(false))
             .body("startedDate", is(notNullValue()))
             .body("submitDate", is(notNullValue()))
             .body("completedDate", is(notNullValue()));
@@ -1782,6 +1821,58 @@ class MailRepositoriesRoutesTest {
     }
 
     @Test
+    void reprocessingOneTaskShouldNotRemoveEmailWhenNotConsume() throws Exception {
+        MailRepository mailRepository = mailRepositoryStore.create(URL_MY_REPO);
+        mailRepository.store(FakeMail.builder()
+            .name(NAME_1)
+            .build());
+        mailRepository.store(FakeMail.builder()
+            .name(NAME_2)
+            .build());
+
+        String taskId = with()
+            .param("action", "reprocess")
+            .param("queue", CUSTOM_QUEUE.asString())
+            .param("consume", false)
+            .patch(PATH_ESCAPED_MY_REPO + "/mails/" + NAME_1)
+            .jsonPath()
+            .get("taskId");
+
+        with()
+            .basePath(TasksRoutes.BASE)
+            .get(taskId + "/await");
+
+        assertThat(mailRepository.size())
+            .isEqualTo(2);
+    }
+
+    @Test
+    void reprocessingOneTaskShouldRemoveEmailWhenConsume() throws Exception {
+        MailRepository mailRepository = mailRepositoryStore.create(URL_MY_REPO);
+        mailRepository.store(FakeMail.builder()
+            .name(NAME_1)
+            .build());
+        mailRepository.store(FakeMail.builder()
+            .name(NAME_2)
+            .build());
+
+        String taskId = with()
+            .param("action", "reprocess")
+            .param("queue", CUSTOM_QUEUE.asString())
+            .param("consume", true)
+            .patch(PATH_ESCAPED_MY_REPO + "/mails/" + NAME_1)
+            .jsonPath()
+            .get("taskId");
+
+        with()
+            .basePath(TasksRoutes.BASE)
+            .get(taskId + "/await");
+
+        assertThat(mailRepository.size())
+            .isEqualTo(1);
+    }
+
+    @Test
     void reprocessingOneTaskShouldNotRemoveMailFromRepositoryWhenUnknownMailKey() throws Exception {
         MailRepository mailRepository = mailRepositoryStore.create(URL_MY_REPO);
         mailRepository.store(FakeMail.builder()
diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingAllMailsTaskTest.java b/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingAllMailsTaskTest.java
index d9eb9ca..64b5fe3 100644
--- a/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingAllMailsTaskTest.java
+++ b/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingAllMailsTaskTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.james.webadmin.service;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.Mockito.mock;
 
@@ -28,6 +29,7 @@ import java.util.Optional;
 import org.apache.james.JsonSerializationVerifier;
 import org.apache.james.mailrepository.api.MailRepositoryPath;
 import org.apache.james.queue.api.MailQueueName;
+import org.apache.james.server.task.json.JsonTaskAdditionalInformationSerializer;
 import org.apache.james.server.task.json.JsonTaskSerializer;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -41,15 +43,20 @@ class ReprocessingAllMailsTaskTest {
     private static final MailQueueName TARGET_QUEUE = MailQueueName.of("queue");
     private static final Optional<String> SOME_TARGET_PROCESSOR = Optional.of("targetProcessor");
     private static final long REMAINING_COUNT = 3L;
-    private static final String SERIALIZED_TASK_WITH_TARGET_PROCESSOR = "{\"type\":\"reprocessing-all\",\"repositorySize\":5,\"repositoryPath\":\"a\",\"targetQueue\":\"queue\",\"targetProcessor\":\"targetProcessor\"}";
-    private static final String SERIALIZED_TASK_WITHOUT_TARGET_PROCESSOR = "{\"type\":\"reprocessing-all\",\"repositorySize\":5,\"repositoryPath\":\"a\",\"targetQueue\":\"queue\"}";
-    private static final String SERIALIZED_TASK_ADDITIONAL_INFORMATION_WITH_TARGET_PROCESSOR = "{\"type\":\"reprocessing-all\", \"repositoryPath\":\"a\",\"targetQueue\":\"queue\",\"targetProcessor\":\"targetProcessor\",\"initialCount\":5,\"remainingCount\":3, \"timestamp\":\"2018-11-13T12:00:55Z\"}";
-    private static final String SERIALIZED_TASK_ADDITIONAL_INFORMATION_WITHOUT_TARGET_PROCESSOR = "{\"type\":\"reprocessing-all\", \"repositoryPath\":\"a\",\"targetQueue\":\"queue\", \"initialCount\":5,\"remainingCount\":3, \"timestamp\":\"2018-11-13T12:00:55Z\"}";
+    private static final boolean CONSUME = true;
+
+    private static final String SERIALIZED_TASK_WITH_TARGET_PROCESSOR = "{\"type\":\"reprocessing-all\",\"repositorySize\":5,\"repositoryPath\":\"a\",\"targetQueue\":\"queue\",\"targetProcessor\":\"targetProcessor\",\"consume\":true}";
+    private static final String SERIALIZED_TASK_WITHOUT_TARGET_PROCESSOR = "{\"type\":\"reprocessing-all\",\"repositorySize\":5,\"repositoryPath\":\"a\",\"targetQueue\":\"queue\",\"consume\":false}";
+    private static final String SERIALIZED_TASK_ADDITIONAL_INFORMATION_WITH_TARGET_PROCESSOR = "{\"type\":\"reprocessing-all\", \"repositoryPath\":\"a\",\"targetQueue\":\"queue\",\"targetProcessor\":\"targetProcessor\",\"initialCount\":5,\"remainingCount\":3, \"timestamp\":\"2018-11-13T12:00:55Z\",\"consume\":true}";
+    private static final String SERIALIZED_TASK_ADDITIONAL_INFORMATION_WITHOUT_TARGET_PROCESSOR = "{\"type\":\"reprocessing-all\", \"repositoryPath\":\"a\",\"targetQueue\":\"queue\", \"initialCount\":5,\"remainingCount\":3, \"timestamp\":\"2018-11-13T12:00:55Z\",\"consume\":false}";
+
+    private static final String OLD_SERIALIZED_TASK = "{\"type\":\"reprocessing-all\",\"repositorySize\":5,\"repositoryPath\":\"a\",\"targetQueue\":\"queue\",\"targetProcessor\":\"targetProcessor\"}";
+    private static final String OLD_SERIALIZED_TASK_ADDITIONAL_INFORMATION = "{\"type\":\"reprocessing-all\", \"repositoryPath\":\"a\",\"targetQueue\":\"queue\",\"targetProcessor\":\"targetProcessor\",\"initialCount\":5,\"remainingCount\":3, \"timestamp\":\"2018-11-13T12:00:55Z\"}";
 
     @Test
     void taskShouldBeSerializable() throws Exception {
-        ReprocessingAllMailsTask taskWithTargetProcessor = new ReprocessingAllMailsTask(REPROCESSING_SERVICE, REPOSITORY_SIZE, REPOSITORY_PATH, TARGET_QUEUE, SOME_TARGET_PROCESSOR);
-        ReprocessingAllMailsTask taskWithoutTargetProcessor = new ReprocessingAllMailsTask(REPROCESSING_SERVICE, REPOSITORY_SIZE, REPOSITORY_PATH, TARGET_QUEUE, Optional.empty());
+        ReprocessingAllMailsTask taskWithTargetProcessor = new ReprocessingAllMailsTask(REPROCESSING_SERVICE, REPOSITORY_SIZE, REPOSITORY_PATH, new ReprocessingService.Configuration(TARGET_QUEUE, SOME_TARGET_PROCESSOR, CONSUME));
+        ReprocessingAllMailsTask taskWithoutTargetProcessor = new ReprocessingAllMailsTask(REPROCESSING_SERVICE, REPOSITORY_SIZE, REPOSITORY_PATH, new ReprocessingService.Configuration(TARGET_QUEUE, Optional.empty(), !CONSUME));
 
         JsonSerializationVerifier.dtoModule(ReprocessingAllMailsTaskDTO.module(REPROCESSING_SERVICE))
             .testCase(taskWithTargetProcessor, SERIALIZED_TASK_WITH_TARGET_PROCESSOR)
@@ -58,7 +65,9 @@ class ReprocessingAllMailsTaskTest {
     }
 
     @ParameterizedTest
-    @ValueSource(strings = {"{\"type\":\"reprocessing-all\",\"repositorySize\":5,\"repositoryPath\":\"%\",\"targetQueue\":\"queue\",\"targetProcessor\":\"targetProcessor\"}", "{\"type\":\"reprocessing-all\",\"repositorySize\":5,\"repositoryPath\":\"%\",\"targetQueue\":\"queue\"}"})
+    @ValueSource(strings = {
+        "{\"type\":\"reprocessing-all\",\"repositorySize\":5,\"repositoryPath\":\"%\",\"targetQueue\":\"queue\",\"targetProcessor\":\"targetProcessor\"}",
+        "{\"type\":\"reprocessing-all\",\"repositorySize\":5,\"repositoryPath\":\"%\",\"targetQueue\":\"queue\"}"})
     void taskShouldThrowOnDeserializationUrlDecodingError(String serialized) {
         JsonTaskSerializer testee = JsonTaskSerializer.of(ReprocessingAllMailsTaskDTO.module(REPROCESSING_SERVICE));
 
@@ -68,9 +77,11 @@ class ReprocessingAllMailsTaskTest {
 
     @Test
     void additionalInformationShouldBeSerializable() throws Exception {
-        ReprocessingAllMailsTask.AdditionalInformation details = new ReprocessingAllMailsTask.AdditionalInformation(REPOSITORY_PATH, TARGET_QUEUE, SOME_TARGET_PROCESSOR,
+        ReprocessingAllMailsTask.AdditionalInformation details = new ReprocessingAllMailsTask.AdditionalInformation(REPOSITORY_PATH,
+            new ReprocessingService.Configuration(TARGET_QUEUE, SOME_TARGET_PROCESSOR, CONSUME),
             REPOSITORY_SIZE, REMAINING_COUNT, TIMESTAMP);
-        ReprocessingAllMailsTask.AdditionalInformation detailsWithoutProcessor = new ReprocessingAllMailsTask.AdditionalInformation(REPOSITORY_PATH, TARGET_QUEUE, Optional.empty(),
+        ReprocessingAllMailsTask.AdditionalInformation detailsWithoutProcessor = new ReprocessingAllMailsTask.AdditionalInformation(REPOSITORY_PATH,
+            new ReprocessingService.Configuration(TARGET_QUEUE, Optional.empty(), !CONSUME),
             REPOSITORY_SIZE, REMAINING_COUNT, TIMESTAMP);
 
         JsonSerializationVerifier.dtoModule(ReprocessingAllMailsTaskAdditionalInformationDTO.module())
@@ -78,4 +89,24 @@ class ReprocessingAllMailsTaskTest {
             .testCase(detailsWithoutProcessor, SERIALIZED_TASK_ADDITIONAL_INFORMATION_WITHOUT_TARGET_PROCESSOR)
             .verify();
     }
+
+    @Test
+    void shouldDeserializePreviousTaskFormat() throws Exception {
+        ReprocessingAllMailsTask taskWithTargetProcessor = new ReprocessingAllMailsTask(REPROCESSING_SERVICE, REPOSITORY_SIZE, REPOSITORY_PATH, new ReprocessingService.Configuration(TARGET_QUEUE, SOME_TARGET_PROCESSOR, CONSUME));
+        JsonTaskSerializer testee = JsonTaskSerializer.of(ReprocessingAllMailsTaskDTO.module(REPROCESSING_SERVICE));
+
+        assertThat(testee.deserialize(OLD_SERIALIZED_TASK))
+            .isEqualToComparingFieldByFieldRecursively(taskWithTargetProcessor);
+    }
+
+    @Test
+    void shouldDeserializePreviousAdditionalInformationFormat() throws Exception {
+        ReprocessingAllMailsTask.AdditionalInformation details = new ReprocessingAllMailsTask.AdditionalInformation(REPOSITORY_PATH,
+            new ReprocessingService.Configuration(TARGET_QUEUE, SOME_TARGET_PROCESSOR, CONSUME),
+            REPOSITORY_SIZE, REMAINING_COUNT, TIMESTAMP);
+        JsonTaskAdditionalInformationSerializer testee = JsonTaskAdditionalInformationSerializer.of(ReprocessingAllMailsTaskAdditionalInformationDTO.module());
+
+        assertThat(testee.deserialize(OLD_SERIALIZED_TASK_ADDITIONAL_INFORMATION))
+            .isEqualToComparingFieldByFieldRecursively(details);
+    }
 }
\ No newline at end of file
diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingOneMailTaskTest.java b/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingOneMailTaskTest.java
index 1d89553..cf0746b 100644
--- a/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingOneMailTaskTest.java
+++ b/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingOneMailTaskTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.james.webadmin.service;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.Mockito.mock;
 
@@ -32,6 +33,7 @@ import org.apache.james.JsonSerializationVerifier;
 import org.apache.james.mailrepository.api.MailKey;
 import org.apache.james.mailrepository.api.MailRepositoryPath;
 import org.apache.james.queue.api.MailQueueName;
+import org.apache.james.server.task.json.JsonTaskAdditionalInformationSerializer;
 import org.apache.james.server.task.json.JsonTaskSerializer;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -41,27 +43,32 @@ class ReprocessingOneMailTaskTest {
     private static final Instant TIMESTAMP = Instant.parse("2018-11-13T12:00:55Z");
     private static final Clock CLOCK = Clock.fixed(TIMESTAMP, ZoneId.of("UTC"));
     private static final ReprocessingService REPROCESSING_SERVICE = mock(ReprocessingService.class);
-    private static final String SERIALIZED_TASK_1 = "{\"type\":\"reprocessing-one\",\"repositoryPath\":\"a\",\"targetQueue\":\"queue\",\"mailKey\": \"myMail\",\"targetProcessor\":\"targetProcessor\"}";
-    private static final String SERIALIZED_TASK_1_ADDITIONAL_INFORMATION = "{\"type\":\"reprocessing-one\", \"repositoryPath\":\"a\",\"targetQueue\":\"queue\",\"mailKey\": \"myMail\",\"targetProcessor\":\"targetProcessor\", \"timestamp\":\"2018-11-13T12:00:55Z\"}";
+    private static final String SERIALIZED_TASK_1 = "{\"type\":\"reprocessing-one\",\"repositoryPath\":\"a\",\"targetQueue\":\"queue\",\"mailKey\": \"myMail\",\"targetProcessor\":\"targetProcessor\", \"consume\":true}";
+    private static final String SERIALIZED_TASK_1_ADDITIONAL_INFORMATION = "{\"type\":\"reprocessing-one\", \"repositoryPath\":\"a\",\"targetQueue\":\"queue\",\"mailKey\": \"myMail\",\"targetProcessor\":\"targetProcessor\", \"timestamp\":\"2018-11-13T12:00:55Z\", \"consume\":true}";
+    private static final String SERIALIZED_TASK_OLD = "{\"type\":\"reprocessing-one\",\"repositoryPath\":\"a\",\"targetQueue\":\"queue\",\"mailKey\": \"myMail\",\"targetProcessor\":\"targetProcessor\"}";
+    private static final String SERIALIZED_TASK_OLD_ADDITIONAL_INFORMATION = "{\"type\":\"reprocessing-one\", \"repositoryPath\":\"a\",\"targetQueue\":\"queue\",\"mailKey\": \"myMail\",\"targetProcessor\":\"targetProcessor\", \"timestamp\":\"2018-11-13T12:00:55Z\"}";
     private static final MailRepositoryPath REPOSITORY_PATH = MailRepositoryPath.from("a");
     private static final MailQueueName TARGET_QUEUE = MailQueueName.of("queue");
     private static final MailKey MAIL_KEY = new MailKey("myMail");
     private static final Optional<String> TARGET_PROCESSOR = Optional.of("targetProcessor");
+    public static final boolean CONSUME = true;
 
     @Test
     void taskShouldBeSerializable() throws Exception {
-        ReprocessingOneMailTask taskWithTargetProcessor = new ReprocessingOneMailTask(REPROCESSING_SERVICE, REPOSITORY_PATH, TARGET_QUEUE, MAIL_KEY, TARGET_PROCESSOR, CLOCK);
-        ReprocessingOneMailTask taskWithoutTargetProcessor = new ReprocessingOneMailTask(REPROCESSING_SERVICE, REPOSITORY_PATH, TARGET_QUEUE, MAIL_KEY, Optional.empty(), CLOCK);
+        ReprocessingOneMailTask taskWithTargetProcessor = new ReprocessingOneMailTask(REPROCESSING_SERVICE, REPOSITORY_PATH, new ReprocessingService.Configuration(TARGET_QUEUE, TARGET_PROCESSOR, CONSUME), MAIL_KEY, CLOCK);
+        ReprocessingOneMailTask taskWithoutTargetProcessor = new ReprocessingOneMailTask(REPROCESSING_SERVICE, REPOSITORY_PATH, new ReprocessingService.Configuration(TARGET_QUEUE, Optional.empty(), !CONSUME), MAIL_KEY, CLOCK);
 
         JsonSerializationVerifier.dtoModule(ReprocessingOneMailTaskDTO.module(CLOCK, REPROCESSING_SERVICE))
             .testCase(taskWithTargetProcessor, SERIALIZED_TASK_1)
             .testCase(taskWithoutTargetProcessor,
-                "{\"type\":\"reprocessing-one\",\"repositoryPath\":\"a\",\"targetQueue\":\"queue\",\"mailKey\": \"myMail\"}")
+                "{\"type\":\"reprocessing-one\",\"repositoryPath\":\"a\",\"targetQueue\":\"queue\",\"mailKey\": \"myMail\", \"consume\":false}")
             .verify();
     }
 
     @ParameterizedTest
-    @ValueSource(strings = {"{\"type\":\"reprocessing-one\",\"repositoryPath\":\"%\",\"targetQueue\":\"queue\",\"mailKey\": \"myMail\",\"targetProcessor\":\"targetProcessor\"}", "{\"type\":\"reprocessing-one\",\"repositoryPath\":\"%\",\"targetQueue\":\"queue\",\"mailKey\": \"myMail\"}"})
+    @ValueSource(strings = {
+        "{\"type\":\"reprocessing-one\",\"repositoryPath\":\"%\",\"targetQueue\":\"queue\",\"mailKey\": \"myMail\",\"targetProcessor\":\"targetProcessor\"}",
+        "{\"type\":\"reprocessing-one\",\"repositoryPath\":\"%\",\"targetQueue\":\"queue\",\"mailKey\": \"myMail\"}"})
     void taskShouldThrowOnDeserializationUrlDecodingError(String serialized) {
         JsonTaskSerializer testee = JsonTaskSerializer.of(ReprocessingOneMailTaskDTO.module(CLOCK, REPROCESSING_SERVICE));
 
@@ -71,10 +78,31 @@ class ReprocessingOneMailTaskTest {
 
     @Test
     void additionalInformationShouldBeSerializable() throws IOException {
-        ReprocessingOneMailTask.AdditionalInformation details = new ReprocessingOneMailTask.AdditionalInformation(REPOSITORY_PATH, TARGET_QUEUE, MAIL_KEY, TARGET_PROCESSOR, TIMESTAMP);
+        ReprocessingOneMailTask.AdditionalInformation details = new ReprocessingOneMailTask.AdditionalInformation(REPOSITORY_PATH,
+            new ReprocessingService.Configuration(TARGET_QUEUE, TARGET_PROCESSOR, true), MAIL_KEY, TIMESTAMP);
         JsonSerializationVerifier.dtoModule(ReprocessingOneMailTaskAdditionalInformationDTO.module())
             .bean(details)
             .json(SERIALIZED_TASK_1_ADDITIONAL_INFORMATION)
             .verify();
     }
+
+
+    @Test
+    void shouldDeserializePreviousTaskFormat() throws Exception {
+        ReprocessingOneMailTask taskWithTargetProcessor = new ReprocessingOneMailTask(REPROCESSING_SERVICE, REPOSITORY_PATH, new ReprocessingService.Configuration(TARGET_QUEUE, TARGET_PROCESSOR, CONSUME), MAIL_KEY, CLOCK);
+        JsonTaskSerializer testee = JsonTaskSerializer.of(ReprocessingOneMailTaskDTO.module(CLOCK, REPROCESSING_SERVICE));
+
+        assertThat(testee.deserialize(SERIALIZED_TASK_OLD))
+            .isEqualToComparingFieldByFieldRecursively(taskWithTargetProcessor);
+    }
+
+    @Test
+    void shouldDeserializePreviousAdditionalInformationFormat() throws Exception {
+        ReprocessingOneMailTask.AdditionalInformation details = new ReprocessingOneMailTask.AdditionalInformation(REPOSITORY_PATH,
+            new ReprocessingService.Configuration(TARGET_QUEUE, TARGET_PROCESSOR, true), MAIL_KEY, TIMESTAMP);
+        JsonTaskAdditionalInformationSerializer testee = JsonTaskAdditionalInformationSerializer.of(ReprocessingOneMailTaskAdditionalInformationDTO.module());
+
+        assertThat(testee.deserialize(SERIALIZED_TASK_OLD_ADDITIONAL_INFORMATION))
+            .isEqualToComparingFieldByFieldRecursively(details);
+    }
 }
diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingServiceTest.java b/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingServiceTest.java
index d2ad1c5..1ad1e9f 100644
--- a/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingServiceTest.java
+++ b/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingServiceTest.java
@@ -43,6 +43,7 @@ import org.apache.james.queue.api.ManageableMailQueue;
 import org.apache.james.queue.api.RawMailQueueItemDecoratorFactory;
 import org.apache.james.queue.memory.MemoryMailQueueFactory;
 import org.apache.james.util.MimeMessageUtil;
+import org.apache.james.webadmin.service.ReprocessingService.Configuration;
 import org.apache.mailet.base.test.FakeMail;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -64,6 +65,7 @@ class ReprocessingServiceTest {
     private static final Consumer<MailKey> NOOP_CONSUMER = key -> { };
     private static final Optional<String> NO_TARGET_PROCESSOR = Optional.empty();
     private static final byte[] MESSAGE_BYTES = "header: value \r\n".getBytes(UTF_8);
+    public static final boolean CONSUME = true;
 
     private ReprocessingService reprocessingService;
     private MemoryMailRepositoryStore mailRepositoryStore;
@@ -106,7 +108,7 @@ class ReprocessingServiceTest {
         repository.store(mail2);
         repository.store(mail3);
 
-        reprocessingService.reprocess(PATH, KEY_2, NO_TARGET_PROCESSOR, SPOOL);
+        reprocessingService.reprocess(PATH, KEY_2, new ReprocessingService.Configuration(SPOOL, NO_TARGET_PROCESSOR, CONSUME));
 
         assertThat(queueFactory.getQueue(SPOOL).get().browse())
             .toIterable()
@@ -121,7 +123,7 @@ class ReprocessingServiceTest {
         repository.store(mail2);
         repository.store(mail3);
 
-        reprocessingService.reprocess(PATH, KEY_2, NO_TARGET_PROCESSOR, SPOOL);
+        reprocessingService.reprocess(PATH, KEY_2, new ReprocessingService.Configuration(SPOOL, NO_TARGET_PROCESSOR, CONSUME));
 
         assertThat(repository.list()).toIterable()
             .containsOnly(KEY_1, KEY_3);
@@ -134,7 +136,7 @@ class ReprocessingServiceTest {
         repository.store(mail2);
         repository.store(mail3);
 
-        reprocessingService.reprocessAll(PATH, NO_TARGET_PROCESSOR, SPOOL, NOOP_CONSUMER);
+        reprocessingService.reprocessAll(PATH, new Configuration(SPOOL, NO_TARGET_PROCESSOR, CONSUME), NOOP_CONSUMER);
 
         assertThat(repository.list()).toIterable()
             .isEmpty();
@@ -147,7 +149,7 @@ class ReprocessingServiceTest {
         repository.store(mail2);
         repository.store(mail3);
 
-        reprocessingService.reprocessAll(PATH, NO_TARGET_PROCESSOR, SPOOL, NOOP_CONSUMER);
+        reprocessingService.reprocessAll(PATH, new Configuration(SPOOL, NO_TARGET_PROCESSOR, CONSUME), NOOP_CONSUMER);
 
         assertThat(queueFactory.getQueue(SPOOL).get().browse())
             .toIterable()
@@ -178,7 +180,7 @@ class ReprocessingServiceTest {
             }
         });
 
-        reprocessingService.reprocessAll(PATH, NO_TARGET_PROCESSOR, SPOOL, concurrentRemoveConsumer);
+        reprocessingService.reprocessAll(PATH, new ReprocessingService.Configuration(SPOOL, NO_TARGET_PROCESSOR, CONSUME), concurrentRemoveConsumer);
 
         assertThat(queueFactory.getQueue(SPOOL).get().browse())
             .toIterable()

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org