You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/06/02 08:41:48 UTC

[james-project] 03/07: JAMES-3197 Prevent infinite loop upon Error of Mail Processing

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 5d6fc84b6ac290e2c4025ec0d8709e7c83f7f118
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri May 29 14:01:17 2020 +0700

    JAMES-3197 Prevent infinite loop upon Error of Mail Processing
    
    Leverage a failure count on top of the mailQueue, and use error mail
    repository.
---
 .../org/apache/james/mailets/MailetErrorsTest.java |  86 +++++++++--
 ...ErrorMailet.java => OneRuntimeErrorMailet.java} |  10 +-
 ...rrorMailet.java => OneThreadSuicideMailet.java} |  12 +-
 .../transport/mailets/RuntimeErrorMailet.java      |   2 +-
 ...rrorMailet.java => RuntimeExceptionMailet.java} |   2 +-
 ...orMatcher.java => RuntimeExceptionMatcher.java} |   2 +-
 .../mailetcontainer/impl/JamesMailSpooler.java     |  80 +++++++++-
 .../mailetcontainer/impl/JamesMailSpoolerTest.java | 163 ---------------------
 8 files changed, 170 insertions(+), 187 deletions(-)

diff --git a/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/MailetErrorsTest.java b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/MailetErrorsTest.java
index b1cc490..646fe0b 100644
--- a/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/MailetErrorsTest.java
+++ b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/MailetErrorsTest.java
@@ -38,8 +38,11 @@ import org.apache.james.transport.mailets.ErrorMatcher;
 import org.apache.james.transport.mailets.NoClassDefFoundErrorMatcher;
 import org.apache.james.transport.mailets.NoopMailet;
 import org.apache.james.transport.mailets.Null;
+import org.apache.james.transport.mailets.OneRuntimeErrorMailet;
+import org.apache.james.transport.mailets.OneThreadSuicideMailet;
 import org.apache.james.transport.mailets.RuntimeErrorMailet;
-import org.apache.james.transport.mailets.RuntimeErrorMatcher;
+import org.apache.james.transport.mailets.RuntimeExceptionMailet;
+import org.apache.james.transport.mailets.RuntimeExceptionMatcher;
 import org.apache.james.transport.mailets.ToRepository;
 import org.apache.james.transport.matchers.All;
 import org.apache.james.transport.matchers.HasException;
@@ -135,6 +138,71 @@ public class MailetErrorsTest {
                 .putProcessor(ProcessorConfiguration.root()
                     .addMailet(MailetConfiguration.builder()
                         .matcher(All.class)
+                        .mailet(RuntimeExceptionMailet.class))))
+            .build(temporaryFolder.newFolder());
+        MailRepositoryProbeImpl probe = jamesServer.getProbe(MailRepositoryProbeImpl.class);
+
+        smtpMessageSender.connect(LOCALHOST_IP, jamesServer.getProbe(SmtpGuiceProbe.class).getSmtpPort()).sendMessage(FROM, FROM);
+
+        awaitAtMostOneMinute.until(() -> probe.getRepositoryMailCount(ERROR_REPOSITORY) == 1);
+    }
+
+    @Test
+    public void spoolerShouldEventuallyProcessUponTemporaryError() throws Exception {
+        jamesServer = TemporaryJamesServer.builder()
+            .withBase(SMTP_ONLY_MODULE)
+            .withMailetContainer(MailetContainer.builder()
+                .putProcessor(CommonProcessors.deliverOnlyTransport())
+                .putProcessor(errorProcessor())
+                .putProcessor(ProcessorConfiguration.root()
+                    .addMailet(MailetConfiguration.builder()
+                        .matcher(All.class)
+                        .mailet(OneRuntimeErrorMailet.class))
+                    .addMailet(MailetConfiguration.builder()
+                        .matcher(All.class)
+                        .mailet(ToRepository.class)
+                        .addProperty("repositoryPath", CUSTOM_REPOSITORY.asString()))))
+            .build(temporaryFolder.newFolder());
+        MailRepositoryProbeImpl probe = jamesServer.getProbe(MailRepositoryProbeImpl.class);
+
+        smtpMessageSender.connect(LOCALHOST_IP, jamesServer.getProbe(SmtpGuiceProbe.class).getSmtpPort()).sendMessage(FROM, FROM);
+
+        awaitAtMostOneMinute.until(() -> probe.getRepositoryMailCount(CUSTOM_REPOSITORY) == 1);
+    }
+
+    @Test
+    public void spoolerShouldEventuallyProcessMailsAfterThreadSuicide() throws Exception {
+        jamesServer = TemporaryJamesServer.builder()
+            .withBase(SMTP_ONLY_MODULE)
+            .withMailetContainer(MailetContainer.builder()
+                .putProcessor(CommonProcessors.deliverOnlyTransport())
+                .putProcessor(errorProcessor())
+                .putProcessor(ProcessorConfiguration.root()
+                    .addMailet(MailetConfiguration.builder()
+                        .matcher(All.class)
+                        .mailet(OneThreadSuicideMailet.class))
+                    .addMailet(MailetConfiguration.builder()
+                        .matcher(All.class)
+                        .mailet(ToRepository.class)
+                        .addProperty("repositoryPath", CUSTOM_REPOSITORY.asString()))))
+            .build(temporaryFolder.newFolder());
+        MailRepositoryProbeImpl probe = jamesServer.getProbe(MailRepositoryProbeImpl.class);
+
+        smtpMessageSender.connect(LOCALHOST_IP, jamesServer.getProbe(SmtpGuiceProbe.class).getSmtpPort()).sendMessage(FROM, FROM);
+
+        awaitAtMostOneMinute.until(() -> probe.getRepositoryMailCount(CUSTOM_REPOSITORY) == 1);
+    }
+
+    @Test
+    public void spoolerShouldNotInfinitLoopUponPermanentError() throws Exception {
+        jamesServer = TemporaryJamesServer.builder()
+            .withBase(SMTP_ONLY_MODULE)
+            .withMailetContainer(MailetContainer.builder()
+                .putProcessor(CommonProcessors.deliverOnlyTransport())
+                .putProcessor(errorProcessor())
+                .putProcessor(ProcessorConfiguration.root()
+                    .addMailet(MailetConfiguration.builder()
+                        .matcher(All.class)
                         .mailet(RuntimeErrorMailet.class))))
             .build(temporaryFolder.newFolder());
         MailRepositoryProbeImpl probe = jamesServer.getProbe(MailRepositoryProbeImpl.class);
@@ -177,7 +245,7 @@ public class MailetErrorsTest {
                 .putProcessor(ProcessorConfiguration.root()
                     .addMailet(MailetConfiguration.builder()
                         .matcher(All.class)
-                        .mailet(RuntimeErrorMailet.class)
+                        .mailet(RuntimeExceptionMailet.class)
                         .addProperty("onMailetException", CUSTOM_PROCESSOR))))
             .build(temporaryFolder.newFolder());
         MailRepositoryProbeImpl probe = jamesServer.getProbe(MailRepositoryProbeImpl.class);
@@ -198,7 +266,7 @@ public class MailetErrorsTest {
                 .putProcessor(ProcessorConfiguration.root()
                     .addMailet(MailetConfiguration.builder()
                         .matcher(All.class)
-                        .mailet(RuntimeErrorMailet.class)
+                        .mailet(RuntimeExceptionMailet.class)
                         .addProperty("onMailetException", "ignore"))
                     .addMailet(MailetConfiguration.builder()
                         .matcher(All.class)
@@ -265,7 +333,7 @@ public class MailetErrorsTest {
                 .putProcessor(errorProcessor())
                 .putProcessor(ProcessorConfiguration.root()
                     .addMailet(MailetConfiguration.builder()
-                        .matcher(RuntimeErrorMatcher.class)
+                        .matcher(RuntimeExceptionMatcher.class)
                         .mailet(NoopMailet.class))))
             .build(temporaryFolder.newFolder());
         MailRepositoryProbeImpl probe = jamesServer.getProbe(MailRepositoryProbeImpl.class);
@@ -306,7 +374,7 @@ public class MailetErrorsTest {
                 .putProcessor(customProcessor())
                 .putProcessor(ProcessorConfiguration.root()
                     .addMailet(MailetConfiguration.builder()
-                        .matcher(RuntimeErrorMatcher.class)
+                        .matcher(RuntimeExceptionMatcher.class)
                         .mailet(NoopMailet.class)
                         .addProperty("onMatchException", CUSTOM_PROCESSOR))))
             .build(temporaryFolder.newFolder());
@@ -327,7 +395,7 @@ public class MailetErrorsTest {
                 .putProcessor(customProcessor())
                 .putProcessor(ProcessorConfiguration.root()
                     .addMailet(MailetConfiguration.builder()
-                        .matcher(RuntimeErrorMatcher.class)
+                        .matcher(RuntimeExceptionMatcher.class)
                         .mailet(Null.class)
                         .addProperty("onMatchException", "nomatch"))
                     .addMailet(MailetConfiguration.builder()
@@ -377,7 +445,7 @@ public class MailetErrorsTest {
                 .putProcessor(customProcessor())
                 .putProcessor(ProcessorConfiguration.root()
                     .addMailet(MailetConfiguration.builder()
-                        .matcher(RuntimeErrorMatcher.class)
+                        .matcher(RuntimeExceptionMatcher.class)
                         .mailet(ToRepository.class)
                         .addProperty("repositoryPath", CUSTOM_REPOSITORY.asString())
                         .addProperty("onMatchException", "matchall"))
@@ -465,7 +533,7 @@ public class MailetErrorsTest {
                 .putProcessor(customProcessor())
                 .putProcessor(ProcessorConfiguration.root()
                     .addMailet(MailetConfiguration.builder()
-                        .matcher(RuntimeErrorMatcher.class)
+                        .matcher(RuntimeExceptionMatcher.class)
                         .mailet(Null.class))))
             .build(temporaryFolder.newFolder());
         MailRepositoryProbeImpl probe = jamesServer.getProbe(MailRepositoryProbeImpl.class);
@@ -524,7 +592,7 @@ public class MailetErrorsTest {
                 .putProcessor(ProcessorConfiguration.root()
                     .addMailet(MailetConfiguration.builder()
                         .matcher(All.class)
-                        .mailet(RuntimeErrorMailet.class))))
+                        .mailet(RuntimeExceptionMailet.class))))
             .build(temporaryFolder.newFolder());
         MailRepositoryProbeImpl probe = jamesServer.getProbe(MailRepositoryProbeImpl.class);
 
diff --git a/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeErrorMailet.java b/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/OneRuntimeErrorMailet.java
similarity index 84%
copy from server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeErrorMailet.java
copy to server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/OneRuntimeErrorMailet.java
index 154c5ec..1b62207 100644
--- a/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeErrorMailet.java
+++ b/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/OneRuntimeErrorMailet.java
@@ -19,14 +19,20 @@
 
 package org.apache.james.transport.mailets;
 
+import java.util.concurrent.atomic.AtomicInteger;
+
 import javax.mail.MessagingException;
 
 import org.apache.mailet.Mail;
 import org.apache.mailet.base.GenericMailet;
 
-public class RuntimeErrorMailet extends GenericMailet {
+public class OneRuntimeErrorMailet extends GenericMailet {
+    private final AtomicInteger callCount = new AtomicInteger(0);
+
     @Override
     public void service(Mail mail) throws MessagingException {
-        throw new RuntimeException();
+        if (callCount.getAndIncrement() == 0) {
+            throw new Error();
+        }
     }
 }
diff --git a/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeErrorMailet.java b/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/OneThreadSuicideMailet.java
similarity index 80%
copy from server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeErrorMailet.java
copy to server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/OneThreadSuicideMailet.java
index 154c5ec..1183635 100644
--- a/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeErrorMailet.java
+++ b/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/OneThreadSuicideMailet.java
@@ -19,14 +19,18 @@
 
 package org.apache.james.transport.mailets;
 
-import javax.mail.MessagingException;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.mailet.Mail;
 import org.apache.mailet.base.GenericMailet;
 
-public class RuntimeErrorMailet extends GenericMailet {
+public class OneThreadSuicideMailet extends GenericMailet {
+    private final AtomicInteger callCount = new AtomicInteger(0);
+
     @Override
-    public void service(Mail mail) throws MessagingException {
-        throw new RuntimeException();
+    public void service(Mail mail) {
+        if (callCount.getAndIncrement() == 0) {
+            Thread.currentThread().interrupt();
+        }
     }
 }
diff --git a/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeErrorMailet.java b/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeErrorMailet.java
index 154c5ec..6c0a125 100644
--- a/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeErrorMailet.java
+++ b/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeErrorMailet.java
@@ -27,6 +27,6 @@ import org.apache.mailet.base.GenericMailet;
 public class RuntimeErrorMailet extends GenericMailet {
     @Override
     public void service(Mail mail) throws MessagingException {
-        throw new RuntimeException();
+        throw new Error();
     }
 }
diff --git a/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeErrorMailet.java b/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeExceptionMailet.java
similarity index 96%
copy from server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeErrorMailet.java
copy to server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeExceptionMailet.java
index 154c5ec..ee65cc7 100644
--- a/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeErrorMailet.java
+++ b/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeExceptionMailet.java
@@ -24,7 +24,7 @@ import javax.mail.MessagingException;
 import org.apache.mailet.Mail;
 import org.apache.mailet.base.GenericMailet;
 
-public class RuntimeErrorMailet extends GenericMailet {
+public class RuntimeExceptionMailet extends GenericMailet {
     @Override
     public void service(Mail mail) throws MessagingException {
         throw new RuntimeException();
diff --git a/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeErrorMatcher.java b/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeExceptionMatcher.java
similarity index 96%
rename from server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeErrorMatcher.java
rename to server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeExceptionMatcher.java
index abf63d2..7bd2dd1 100644
--- a/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeErrorMatcher.java
+++ b/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeExceptionMatcher.java
@@ -27,7 +27,7 @@ import org.apache.james.core.MailAddress;
 import org.apache.mailet.Mail;
 import org.apache.mailet.base.GenericMatcher;
 
-public class RuntimeErrorMatcher extends GenericMatcher {
+public class RuntimeExceptionMatcher extends GenericMatcher {
     @Override
     public Collection<MailAddress> match(Mail mail) throws MessagingException {
         throw new RuntimeException();
diff --git a/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java b/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
index 9737865..b6ceddb 100644
--- a/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
+++ b/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
@@ -22,11 +22,13 @@ package org.apache.james.mailetcontainer.impl;
 import static org.apache.james.metrics.api.TimeMetric.ExecutionResult.DEFAULT_100_MS_THRESHOLD;
 
 import java.io.IOException;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
 import javax.inject.Inject;
+import javax.mail.MessagingException;
 
 import org.apache.commons.configuration2.HierarchicalConfiguration;
 import org.apache.commons.configuration2.tree.ImmutableNode;
@@ -35,11 +37,18 @@ import org.apache.james.lifecycle.api.Disposable;
 import org.apache.james.lifecycle.api.LifecycleUtil;
 import org.apache.james.mailetcontainer.api.MailProcessor;
 import org.apache.james.mailetcontainer.api.jmx.MailSpoolerMBean;
+import org.apache.james.mailrepository.api.MailRepository;
+import org.apache.james.mailrepository.api.MailRepositoryPath;
+import org.apache.james.mailrepository.api.MailRepositoryStore;
+import org.apache.james.mailrepository.api.MailRepositoryUrl;
 import org.apache.james.metrics.api.MetricFactory;
 import org.apache.james.metrics.api.TimeMetric;
 import org.apache.james.queue.api.MailQueue;
 import org.apache.james.queue.api.MailQueue.MailQueueItem;
 import org.apache.james.queue.api.MailQueueFactory;
+import org.apache.mailet.Attribute;
+import org.apache.mailet.AttributeName;
+import org.apache.mailet.AttributeValue;
 import org.apache.mailet.Mail;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -57,6 +66,9 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
     private static final Logger LOGGER = LoggerFactory.getLogger(JamesMailSpooler.class);
 
     public static final String SPOOL_PROCESSING = "spoolProcessing";
+    public static final AttributeName MAIL_PROCESSING_ERROR_COUNT = AttributeName.of("mail-processing-error-count");
+    public static final MailRepositoryPath ERROR_REPOSITORY_PATH = MailRepositoryPath.from("var/mail/error");
+    public static final int MAXIMUM_FAILURE_COUNT = 5;
 
     /**
      * concurrency level to use for dequeuing mails from spool, allows to throttle resources dedicated to that async
@@ -72,21 +84,31 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
      * The mail processor
      */
     private final MailProcessor mailProcessor;
+    private final MailRepositoryStore mailRepositoryStore;
 
     private final MailQueueFactory<?> queueFactory;
+    private MailRepositoryUrl errorRepositoryURL;
+    private MailRepository errorRepository;
     private reactor.core.Disposable disposable;
     private MailQueue queue;
 
     @Inject
-    public JamesMailSpooler(MetricFactory metricFactory, MailProcessor mailProcessor, MailQueueFactory<?> queueFactory) {
+    public JamesMailSpooler(MetricFactory metricFactory, MailProcessor mailProcessor, MailRepositoryStore mailRepositoryStore, MailQueueFactory<?> queueFactory) {
         this.metricFactory = metricFactory;
         this.mailProcessor = mailProcessor;
+        this.mailRepositoryStore = mailRepositoryStore;
         this.queueFactory = queueFactory;
     }
 
     @Override
     public void configure(HierarchicalConfiguration<ImmutableNode> config) {
         concurrencyLevel = config.getInt("threads", 100);
+        errorRepositoryURL = Optional.ofNullable(config.getString("errorRepository", null))
+            .map(MailRepositoryUrl::from)
+            .orElseGet(() -> MailRepositoryUrl.fromPathAndProtocol(
+                mailRepositoryStore.defaultProtocol()
+                    .orElseThrow(() -> new IllegalStateException("Cannot retrieve mailRepository URL, you need to configure an `errorRepository` property for the spooler.0")),
+                ERROR_REPOSITORY_PATH));
     }
 
     /**
@@ -99,6 +121,12 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
         queue = queueFactory.createQueue(MailQueueFactory.SPOOL, MailQueueFactory.prefetchCount(concurrencyLevel));
         disposable = run(queue);
         LOGGER.info("Spooler started");
+        try {
+            this.errorRepository = mailRepositoryStore.select(errorRepositoryURL);
+        } catch (MailRepositoryStore.MailRepositoryStoreException e) {
+            throw new RuntimeException(e);
+        }
+
     }
 
     private reactor.core.Disposable run(MailQueue queue) {
@@ -135,16 +163,56 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
             }
             queueItem.done(true);
         } catch (Exception e) {
-            try {
-                queueItem.done(false);
-            } catch (MailQueue.MailQueueException ex) {
-                throw new RuntimeException(e);
-            }
+            handleError(queueItem, mail, e);
         } finally {
             LOGGER.debug("==== End processing mail {} ====", mail.getName());
         }
     }
 
+    private void handleError(MailQueueItem queueItem, Mail mail, Exception processingException) {
+        int failureCount = computeFailureCount(mail);
+
+        try {
+            if (failureCount > MAXIMUM_FAILURE_COUNT) {
+                LOGGER.error("Failed {} processing {} consecutive times. Abort. Mail is saved in {}", mail.getName(), failureCount, errorRepositoryURL.asString());
+                storeInErrorRepository(queueItem);
+            } else {
+                LOGGER.error("Failed {} processing {} consecutive times. Mail is requeued with increased failure count.", mail.getName(), failureCount, processingException);
+                reEnqueue(queueItem, failureCount);
+            }
+        } catch (Exception nestedE) {
+            LOGGER.error("Could not apply standard error handling for {}, defaulting to nack", mail.getName(), nestedE);
+            nack(queueItem, processingException);
+        }
+    }
+
+    private int computeFailureCount(Mail mail) {
+        Integer previousFailureCount = mail.getAttribute(MAIL_PROCESSING_ERROR_COUNT)
+            .flatMap(attribute -> attribute.getValue().valueAs(Integer.class))
+            .orElse(0);
+        return previousFailureCount + 1;
+    }
+
+    private void reEnqueue(MailQueueItem queueItem, int failureCount) throws MailQueue.MailQueueException {
+        Mail mail = queueItem.getMail();
+        mail.setAttribute(new Attribute(MAIL_PROCESSING_ERROR_COUNT, AttributeValue.of(failureCount)));
+        queue.enQueue(mail);
+        queueItem.done(true);
+    }
+
+    private void storeInErrorRepository(MailQueueItem queueItem) throws MessagingException {
+        errorRepository.store(queueItem.getMail());
+        queueItem.done(true);
+    }
+
+    private void nack(MailQueueItem queueItem, Exception processingException) {
+        try {
+            queueItem.done(false);
+        } catch (MailQueue.MailQueueException ex) {
+            throw new RuntimeException(processingException);
+        }
+    }
+
     /**
      * The dispose operation is called at the end of a components lifecycle.
      * Instances of this class use this method to release and destroy any
diff --git a/server/mailet/mailetcontainer-camel/src/test/java/org/apache/james/mailetcontainer/impl/JamesMailSpoolerTest.java b/server/mailet/mailetcontainer-camel/src/test/java/org/apache/james/mailetcontainer/impl/JamesMailSpoolerTest.java
deleted file mode 100644
index 9485686..0000000
--- a/server/mailet/mailetcontainer-camel/src/test/java/org/apache/james/mailetcontainer/impl/JamesMailSpoolerTest.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-
-package org.apache.james.mailetcontainer.impl;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.awaitility.Duration.ONE_HUNDRED_MILLISECONDS;
-import static org.awaitility.Duration.TEN_SECONDS;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.function.Consumer;
-
-import javax.mail.MessagingException;
-
-import org.apache.commons.configuration2.plist.PropertyListConfiguration;
-import org.apache.james.mailetcontainer.api.MailProcessor;
-import org.apache.james.metrics.api.MetricFactory;
-import org.apache.james.metrics.api.TimeMetric;
-import org.apache.james.queue.api.MailQueue;
-import org.apache.james.queue.api.MailQueueFactory;
-import org.apache.mailet.Mail;
-import org.apache.mailet.base.test.FakeMail;
-import org.awaitility.Awaitility;
-import org.awaitility.core.ConditionFactory;
-import org.junit.jupiter.api.Test;
-
-import reactor.core.publisher.UnicastProcessor;
-
-class JamesMailSpoolerTest {
-    private static final ConditionFactory CALMLY_AWAIT = Awaitility
-            .with().pollInterval(ONE_HUNDRED_MILLISECONDS)
-            .and().pollDelay(ONE_HUNDRED_MILLISECONDS)
-            .await()
-            .atMost(TEN_SECONDS);
-
-    @Test
-    void thrownExceptionShouldAckTheItem() throws MessagingException {
-        MetricFactory metricFactory = mock(MetricFactory.class);
-        when(metricFactory.timer(JamesMailSpooler.SPOOL_PROCESSING)).thenAnswer(ignored -> mock(TimeMetric.class));
-        MailQueueFactory<?> queueFactory = mock(MailQueueFactory.class);
-        MailProcessor mailProcessor = mock(MailProcessor.class);
-        JamesMailSpooler spooler = new JamesMailSpooler(metricFactory, mailProcessor, queueFactory);
-
-        UnicastProcessor<MockedMailQueueItem> workQueue = UnicastProcessor.create();
-        MockedMailQueueItem item = new MockedMailQueueItem();
-        item.addCallback(isDone -> {
-            if (!isDone) {
-                workQueue.onNext(item);
-            }
-        });
-        MailQueue queue = mock(MailQueue.class);
-        workQueue.onNext(item);
-        when(queue.deQueue()).thenAnswer(any -> workQueue.limitRate(1).filter(MockedMailQueueItem::isNotDone));
-        when(queueFactory.createQueue(eq(MailQueueFactory.SPOOL), any())).thenAnswer(any -> queue);
-
-        doThrow(new RuntimeException("Arbitrary failure"))
-            .doNothing()
-            .when(mailProcessor).service(any());
-
-        PropertyListConfiguration configuration = new PropertyListConfiguration();
-        configuration.addProperty("threads", 2);
-        spooler.configure(configuration);
-        spooler.init();
-
-        CALMLY_AWAIT.until(() -> item.getDones().size() == 2);
-
-        assertThat(item.getDones()).containsExactly(false, true);
-    }
-
-    @Test
-    void threadSuicideShouldAckTheItem() throws MessagingException {
-        MetricFactory metricFactory = mock(MetricFactory.class);
-        when(metricFactory.timer(JamesMailSpooler.SPOOL_PROCESSING)).thenAnswer(ignored -> mock(TimeMetric.class));
-        MailQueueFactory<?> queueFactory = mock(MailQueueFactory.class);
-        MailProcessor mailProcessor = mock(MailProcessor.class);
-        JamesMailSpooler spooler = new JamesMailSpooler(metricFactory, mailProcessor, queueFactory);
-
-        UnicastProcessor<MockedMailQueueItem> workQueue = UnicastProcessor.create();
-        MockedMailQueueItem item = new MockedMailQueueItem();
-        item.addCallback(isDone -> {
-            if (!isDone) {
-                workQueue.onNext(item);
-            }
-        });
-        MailQueue queue = mock(MailQueue.class);
-        workQueue.onNext(item);
-        when(queue.deQueue()).thenAnswer(any -> workQueue.limitRate(1).filter(MockedMailQueueItem::isNotDone));
-        when(queueFactory.createQueue(eq(MailQueueFactory.SPOOL), any())).thenAnswer(any -> queue);
-
-        doAnswer(ignored -> {
-            Thread.currentThread().interrupt();
-            return null;
-        }).doNothing().when(mailProcessor).service(any());
-
-        PropertyListConfiguration configuration = new PropertyListConfiguration();
-        configuration.addProperty("threads", 2);
-        spooler.configure(configuration);
-        spooler.init();
-
-        CALMLY_AWAIT.until(() -> item.getDones().size() == 2);
-
-        assertThat(item.getDones()).containsExactly(false, true);
-    }
-
-    private class MockedMailQueueItem implements MailQueue.MailQueueItem {
-        private final Collection<Boolean> dones;
-        private Consumer<Boolean> doneCallback;
-
-        private MockedMailQueueItem() {
-            dones = new ArrayList<>();
-        }
-
-        @Override
-        public Mail getMail() {
-            try {
-                return FakeMail.defaultFakeMail();
-            } catch (MessagingException e) {
-                throw new RuntimeException(e);
-            }
-        }
-
-        @Override
-        public void done(boolean success) throws MailQueue.MailQueueException {
-            dones.add(success);
-            doneCallback.accept(success);
-        }
-
-        public Collection<Boolean> getDones() {
-            return dones;
-        }
-
-        public boolean isNotDone() {
-            return !dones.contains(true);
-        }
-
-        public void addCallback(Consumer<Boolean> callback) {
-            doneCallback = callback;
-        }
-    }
-}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org