You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/06/02 08:41:48 UTC
[james-project] 03/07: JAMES-3197 Prevent infinite loop upon Error
of Mail Processing
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 5d6fc84b6ac290e2c4025ec0d8709e7c83f7f118
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri May 29 14:01:17 2020 +0700
JAMES-3197 Prevent infinite loop upon Error of Mail Processing
Leverage a failure count on top of the mailQueue, and use error mail
repository.
---
.../org/apache/james/mailets/MailetErrorsTest.java | 86 +++++++++--
...ErrorMailet.java => OneRuntimeErrorMailet.java} | 10 +-
...rrorMailet.java => OneThreadSuicideMailet.java} | 12 +-
.../transport/mailets/RuntimeErrorMailet.java | 2 +-
...rrorMailet.java => RuntimeExceptionMailet.java} | 2 +-
...orMatcher.java => RuntimeExceptionMatcher.java} | 2 +-
.../mailetcontainer/impl/JamesMailSpooler.java | 80 +++++++++-
.../mailetcontainer/impl/JamesMailSpoolerTest.java | 163 ---------------------
8 files changed, 170 insertions(+), 187 deletions(-)
diff --git a/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/MailetErrorsTest.java b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/MailetErrorsTest.java
index b1cc490..646fe0b 100644
--- a/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/MailetErrorsTest.java
+++ b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/MailetErrorsTest.java
@@ -38,8 +38,11 @@ import org.apache.james.transport.mailets.ErrorMatcher;
import org.apache.james.transport.mailets.NoClassDefFoundErrorMatcher;
import org.apache.james.transport.mailets.NoopMailet;
import org.apache.james.transport.mailets.Null;
+import org.apache.james.transport.mailets.OneRuntimeErrorMailet;
+import org.apache.james.transport.mailets.OneThreadSuicideMailet;
import org.apache.james.transport.mailets.RuntimeErrorMailet;
-import org.apache.james.transport.mailets.RuntimeErrorMatcher;
+import org.apache.james.transport.mailets.RuntimeExceptionMailet;
+import org.apache.james.transport.mailets.RuntimeExceptionMatcher;
import org.apache.james.transport.mailets.ToRepository;
import org.apache.james.transport.matchers.All;
import org.apache.james.transport.matchers.HasException;
@@ -135,6 +138,71 @@ public class MailetErrorsTest {
.putProcessor(ProcessorConfiguration.root()
.addMailet(MailetConfiguration.builder()
.matcher(All.class)
+ .mailet(RuntimeExceptionMailet.class))))
+ .build(temporaryFolder.newFolder());
+ MailRepositoryProbeImpl probe = jamesServer.getProbe(MailRepositoryProbeImpl.class);
+
+ smtpMessageSender.connect(LOCALHOST_IP, jamesServer.getProbe(SmtpGuiceProbe.class).getSmtpPort()).sendMessage(FROM, FROM);
+
+ awaitAtMostOneMinute.until(() -> probe.getRepositoryMailCount(ERROR_REPOSITORY) == 1);
+ }
+
+ @Test
+ public void spoolerShouldEventuallyProcessUponTemporaryError() throws Exception {
+ jamesServer = TemporaryJamesServer.builder()
+ .withBase(SMTP_ONLY_MODULE)
+ .withMailetContainer(MailetContainer.builder()
+ .putProcessor(CommonProcessors.deliverOnlyTransport())
+ .putProcessor(errorProcessor())
+ .putProcessor(ProcessorConfiguration.root()
+ .addMailet(MailetConfiguration.builder()
+ .matcher(All.class)
+ .mailet(OneRuntimeErrorMailet.class))
+ .addMailet(MailetConfiguration.builder()
+ .matcher(All.class)
+ .mailet(ToRepository.class)
+ .addProperty("repositoryPath", CUSTOM_REPOSITORY.asString()))))
+ .build(temporaryFolder.newFolder());
+ MailRepositoryProbeImpl probe = jamesServer.getProbe(MailRepositoryProbeImpl.class);
+
+ smtpMessageSender.connect(LOCALHOST_IP, jamesServer.getProbe(SmtpGuiceProbe.class).getSmtpPort()).sendMessage(FROM, FROM);
+
+ awaitAtMostOneMinute.until(() -> probe.getRepositoryMailCount(CUSTOM_REPOSITORY) == 1);
+ }
+
+ @Test
+ public void spoolerShouldEventuallyProcessMailsAfterThreadSuicide() throws Exception {
+ jamesServer = TemporaryJamesServer.builder()
+ .withBase(SMTP_ONLY_MODULE)
+ .withMailetContainer(MailetContainer.builder()
+ .putProcessor(CommonProcessors.deliverOnlyTransport())
+ .putProcessor(errorProcessor())
+ .putProcessor(ProcessorConfiguration.root()
+ .addMailet(MailetConfiguration.builder()
+ .matcher(All.class)
+ .mailet(OneThreadSuicideMailet.class))
+ .addMailet(MailetConfiguration.builder()
+ .matcher(All.class)
+ .mailet(ToRepository.class)
+ .addProperty("repositoryPath", CUSTOM_REPOSITORY.asString()))))
+ .build(temporaryFolder.newFolder());
+ MailRepositoryProbeImpl probe = jamesServer.getProbe(MailRepositoryProbeImpl.class);
+
+ smtpMessageSender.connect(LOCALHOST_IP, jamesServer.getProbe(SmtpGuiceProbe.class).getSmtpPort()).sendMessage(FROM, FROM);
+
+ awaitAtMostOneMinute.until(() -> probe.getRepositoryMailCount(CUSTOM_REPOSITORY) == 1);
+ }
+
+ @Test
+ public void spoolerShouldNotInfinitLoopUponPermanentError() throws Exception {
+ jamesServer = TemporaryJamesServer.builder()
+ .withBase(SMTP_ONLY_MODULE)
+ .withMailetContainer(MailetContainer.builder()
+ .putProcessor(CommonProcessors.deliverOnlyTransport())
+ .putProcessor(errorProcessor())
+ .putProcessor(ProcessorConfiguration.root()
+ .addMailet(MailetConfiguration.builder()
+ .matcher(All.class)
.mailet(RuntimeErrorMailet.class))))
.build(temporaryFolder.newFolder());
MailRepositoryProbeImpl probe = jamesServer.getProbe(MailRepositoryProbeImpl.class);
@@ -177,7 +245,7 @@ public class MailetErrorsTest {
.putProcessor(ProcessorConfiguration.root()
.addMailet(MailetConfiguration.builder()
.matcher(All.class)
- .mailet(RuntimeErrorMailet.class)
+ .mailet(RuntimeExceptionMailet.class)
.addProperty("onMailetException", CUSTOM_PROCESSOR))))
.build(temporaryFolder.newFolder());
MailRepositoryProbeImpl probe = jamesServer.getProbe(MailRepositoryProbeImpl.class);
@@ -198,7 +266,7 @@ public class MailetErrorsTest {
.putProcessor(ProcessorConfiguration.root()
.addMailet(MailetConfiguration.builder()
.matcher(All.class)
- .mailet(RuntimeErrorMailet.class)
+ .mailet(RuntimeExceptionMailet.class)
.addProperty("onMailetException", "ignore"))
.addMailet(MailetConfiguration.builder()
.matcher(All.class)
@@ -265,7 +333,7 @@ public class MailetErrorsTest {
.putProcessor(errorProcessor())
.putProcessor(ProcessorConfiguration.root()
.addMailet(MailetConfiguration.builder()
- .matcher(RuntimeErrorMatcher.class)
+ .matcher(RuntimeExceptionMatcher.class)
.mailet(NoopMailet.class))))
.build(temporaryFolder.newFolder());
MailRepositoryProbeImpl probe = jamesServer.getProbe(MailRepositoryProbeImpl.class);
@@ -306,7 +374,7 @@ public class MailetErrorsTest {
.putProcessor(customProcessor())
.putProcessor(ProcessorConfiguration.root()
.addMailet(MailetConfiguration.builder()
- .matcher(RuntimeErrorMatcher.class)
+ .matcher(RuntimeExceptionMatcher.class)
.mailet(NoopMailet.class)
.addProperty("onMatchException", CUSTOM_PROCESSOR))))
.build(temporaryFolder.newFolder());
@@ -327,7 +395,7 @@ public class MailetErrorsTest {
.putProcessor(customProcessor())
.putProcessor(ProcessorConfiguration.root()
.addMailet(MailetConfiguration.builder()
- .matcher(RuntimeErrorMatcher.class)
+ .matcher(RuntimeExceptionMatcher.class)
.mailet(Null.class)
.addProperty("onMatchException", "nomatch"))
.addMailet(MailetConfiguration.builder()
@@ -377,7 +445,7 @@ public class MailetErrorsTest {
.putProcessor(customProcessor())
.putProcessor(ProcessorConfiguration.root()
.addMailet(MailetConfiguration.builder()
- .matcher(RuntimeErrorMatcher.class)
+ .matcher(RuntimeExceptionMatcher.class)
.mailet(ToRepository.class)
.addProperty("repositoryPath", CUSTOM_REPOSITORY.asString())
.addProperty("onMatchException", "matchall"))
@@ -465,7 +533,7 @@ public class MailetErrorsTest {
.putProcessor(customProcessor())
.putProcessor(ProcessorConfiguration.root()
.addMailet(MailetConfiguration.builder()
- .matcher(RuntimeErrorMatcher.class)
+ .matcher(RuntimeExceptionMatcher.class)
.mailet(Null.class))))
.build(temporaryFolder.newFolder());
MailRepositoryProbeImpl probe = jamesServer.getProbe(MailRepositoryProbeImpl.class);
@@ -524,7 +592,7 @@ public class MailetErrorsTest {
.putProcessor(ProcessorConfiguration.root()
.addMailet(MailetConfiguration.builder()
.matcher(All.class)
- .mailet(RuntimeErrorMailet.class))))
+ .mailet(RuntimeExceptionMailet.class))))
.build(temporaryFolder.newFolder());
MailRepositoryProbeImpl probe = jamesServer.getProbe(MailRepositoryProbeImpl.class);
diff --git a/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeErrorMailet.java b/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/OneRuntimeErrorMailet.java
similarity index 84%
copy from server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeErrorMailet.java
copy to server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/OneRuntimeErrorMailet.java
index 154c5ec..1b62207 100644
--- a/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeErrorMailet.java
+++ b/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/OneRuntimeErrorMailet.java
@@ -19,14 +19,20 @@
package org.apache.james.transport.mailets;
+import java.util.concurrent.atomic.AtomicInteger;
+
import javax.mail.MessagingException;
import org.apache.mailet.Mail;
import org.apache.mailet.base.GenericMailet;
-public class RuntimeErrorMailet extends GenericMailet {
+public class OneRuntimeErrorMailet extends GenericMailet {
+ private final AtomicInteger callCount = new AtomicInteger(0);
+
@Override
public void service(Mail mail) throws MessagingException {
- throw new RuntimeException();
+ if (callCount.getAndIncrement() == 0) {
+ throw new Error();
+ }
}
}
diff --git a/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeErrorMailet.java b/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/OneThreadSuicideMailet.java
similarity index 80%
copy from server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeErrorMailet.java
copy to server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/OneThreadSuicideMailet.java
index 154c5ec..1183635 100644
--- a/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeErrorMailet.java
+++ b/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/OneThreadSuicideMailet.java
@@ -19,14 +19,18 @@
package org.apache.james.transport.mailets;
-import javax.mail.MessagingException;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.mailet.Mail;
import org.apache.mailet.base.GenericMailet;
-public class RuntimeErrorMailet extends GenericMailet {
+public class OneThreadSuicideMailet extends GenericMailet {
+ private final AtomicInteger callCount = new AtomicInteger(0);
+
@Override
- public void service(Mail mail) throws MessagingException {
- throw new RuntimeException();
+ public void service(Mail mail) {
+ if (callCount.getAndIncrement() == 0) {
+ Thread.currentThread().interrupt();
+ }
}
}
diff --git a/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeErrorMailet.java b/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeErrorMailet.java
index 154c5ec..6c0a125 100644
--- a/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeErrorMailet.java
+++ b/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeErrorMailet.java
@@ -27,6 +27,6 @@ import org.apache.mailet.base.GenericMailet;
public class RuntimeErrorMailet extends GenericMailet {
@Override
public void service(Mail mail) throws MessagingException {
- throw new RuntimeException();
+ throw new Error();
}
}
diff --git a/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeErrorMailet.java b/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeExceptionMailet.java
similarity index 96%
copy from server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeErrorMailet.java
copy to server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeExceptionMailet.java
index 154c5ec..ee65cc7 100644
--- a/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeErrorMailet.java
+++ b/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeExceptionMailet.java
@@ -24,7 +24,7 @@ import javax.mail.MessagingException;
import org.apache.mailet.Mail;
import org.apache.mailet.base.GenericMailet;
-public class RuntimeErrorMailet extends GenericMailet {
+public class RuntimeExceptionMailet extends GenericMailet {
@Override
public void service(Mail mail) throws MessagingException {
throw new RuntimeException();
diff --git a/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeErrorMatcher.java b/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeExceptionMatcher.java
similarity index 96%
rename from server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeErrorMatcher.java
rename to server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeExceptionMatcher.java
index abf63d2..7bd2dd1 100644
--- a/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeErrorMatcher.java
+++ b/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/RuntimeExceptionMatcher.java
@@ -27,7 +27,7 @@ import org.apache.james.core.MailAddress;
import org.apache.mailet.Mail;
import org.apache.mailet.base.GenericMatcher;
-public class RuntimeErrorMatcher extends GenericMatcher {
+public class RuntimeExceptionMatcher extends GenericMatcher {
@Override
public Collection<MailAddress> match(Mail mail) throws MessagingException {
throw new RuntimeException();
diff --git a/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java b/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
index 9737865..b6ceddb 100644
--- a/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
+++ b/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
@@ -22,11 +22,13 @@ package org.apache.james.mailetcontainer.impl;
import static org.apache.james.metrics.api.TimeMetric.ExecutionResult.DEFAULT_100_MS_THRESHOLD;
import java.io.IOException;
+import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
+import javax.mail.MessagingException;
import org.apache.commons.configuration2.HierarchicalConfiguration;
import org.apache.commons.configuration2.tree.ImmutableNode;
@@ -35,11 +37,18 @@ import org.apache.james.lifecycle.api.Disposable;
import org.apache.james.lifecycle.api.LifecycleUtil;
import org.apache.james.mailetcontainer.api.MailProcessor;
import org.apache.james.mailetcontainer.api.jmx.MailSpoolerMBean;
+import org.apache.james.mailrepository.api.MailRepository;
+import org.apache.james.mailrepository.api.MailRepositoryPath;
+import org.apache.james.mailrepository.api.MailRepositoryStore;
+import org.apache.james.mailrepository.api.MailRepositoryUrl;
import org.apache.james.metrics.api.MetricFactory;
import org.apache.james.metrics.api.TimeMetric;
import org.apache.james.queue.api.MailQueue;
import org.apache.james.queue.api.MailQueue.MailQueueItem;
import org.apache.james.queue.api.MailQueueFactory;
+import org.apache.mailet.Attribute;
+import org.apache.mailet.AttributeName;
+import org.apache.mailet.AttributeValue;
import org.apache.mailet.Mail;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,6 +66,9 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
private static final Logger LOGGER = LoggerFactory.getLogger(JamesMailSpooler.class);
public static final String SPOOL_PROCESSING = "spoolProcessing";
+ public static final AttributeName MAIL_PROCESSING_ERROR_COUNT = AttributeName.of("mail-processing-error-count");
+ public static final MailRepositoryPath ERROR_REPOSITORY_PATH = MailRepositoryPath.from("var/mail/error");
+ public static final int MAXIMUM_FAILURE_COUNT = 5;
/**
* concurrency level to use for dequeuing mails from spool, allows to throttle resources dedicated to that async
@@ -72,21 +84,31 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
* The mail processor
*/
private final MailProcessor mailProcessor;
+ private final MailRepositoryStore mailRepositoryStore;
private final MailQueueFactory<?> queueFactory;
+ private MailRepositoryUrl errorRepositoryURL;
+ private MailRepository errorRepository;
private reactor.core.Disposable disposable;
private MailQueue queue;
@Inject
- public JamesMailSpooler(MetricFactory metricFactory, MailProcessor mailProcessor, MailQueueFactory<?> queueFactory) {
+ public JamesMailSpooler(MetricFactory metricFactory, MailProcessor mailProcessor, MailRepositoryStore mailRepositoryStore, MailQueueFactory<?> queueFactory) {
this.metricFactory = metricFactory;
this.mailProcessor = mailProcessor;
+ this.mailRepositoryStore = mailRepositoryStore;
this.queueFactory = queueFactory;
}
@Override
public void configure(HierarchicalConfiguration<ImmutableNode> config) {
concurrencyLevel = config.getInt("threads", 100);
+ errorRepositoryURL = Optional.ofNullable(config.getString("errorRepository", null))
+ .map(MailRepositoryUrl::from)
+ .orElseGet(() -> MailRepositoryUrl.fromPathAndProtocol(
+ mailRepositoryStore.defaultProtocol()
+ .orElseThrow(() -> new IllegalStateException("Cannot retrieve mailRepository URL, you need to configure an `errorRepository` property for the spooler.0")),
+ ERROR_REPOSITORY_PATH));
}
/**
@@ -99,6 +121,12 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
queue = queueFactory.createQueue(MailQueueFactory.SPOOL, MailQueueFactory.prefetchCount(concurrencyLevel));
disposable = run(queue);
LOGGER.info("Spooler started");
+ try {
+ this.errorRepository = mailRepositoryStore.select(errorRepositoryURL);
+ } catch (MailRepositoryStore.MailRepositoryStoreException e) {
+ throw new RuntimeException(e);
+ }
+
}
private reactor.core.Disposable run(MailQueue queue) {
@@ -135,16 +163,56 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
}
queueItem.done(true);
} catch (Exception e) {
- try {
- queueItem.done(false);
- } catch (MailQueue.MailQueueException ex) {
- throw new RuntimeException(e);
- }
+ handleError(queueItem, mail, e);
} finally {
LOGGER.debug("==== End processing mail {} ====", mail.getName());
}
}
+ private void handleError(MailQueueItem queueItem, Mail mail, Exception processingException) {
+ int failureCount = computeFailureCount(mail);
+
+ try {
+ if (failureCount > MAXIMUM_FAILURE_COUNT) {
+ LOGGER.error("Failed {} processing {} consecutive times. Abort. Mail is saved in {}", mail.getName(), failureCount, errorRepositoryURL.asString());
+ storeInErrorRepository(queueItem);
+ } else {
+ LOGGER.error("Failed {} processing {} consecutive times. Mail is requeued with increased failure count.", mail.getName(), failureCount, processingException);
+ reEnqueue(queueItem, failureCount);
+ }
+ } catch (Exception nestedE) {
+ LOGGER.error("Could not apply standard error handling for {}, defaulting to nack", mail.getName(), nestedE);
+ nack(queueItem, processingException);
+ }
+ }
+
+ private int computeFailureCount(Mail mail) {
+ Integer previousFailureCount = mail.getAttribute(MAIL_PROCESSING_ERROR_COUNT)
+ .flatMap(attribute -> attribute.getValue().valueAs(Integer.class))
+ .orElse(0);
+ return previousFailureCount + 1;
+ }
+
+ private void reEnqueue(MailQueueItem queueItem, int failureCount) throws MailQueue.MailQueueException {
+ Mail mail = queueItem.getMail();
+ mail.setAttribute(new Attribute(MAIL_PROCESSING_ERROR_COUNT, AttributeValue.of(failureCount)));
+ queue.enQueue(mail);
+ queueItem.done(true);
+ }
+
+ private void storeInErrorRepository(MailQueueItem queueItem) throws MessagingException {
+ errorRepository.store(queueItem.getMail());
+ queueItem.done(true);
+ }
+
+ private void nack(MailQueueItem queueItem, Exception processingException) {
+ try {
+ queueItem.done(false);
+ } catch (MailQueue.MailQueueException ex) {
+ throw new RuntimeException(processingException);
+ }
+ }
+
/**
* The dispose operation is called at the end of a components lifecycle.
* Instances of this class use this method to release and destroy any
diff --git a/server/mailet/mailetcontainer-camel/src/test/java/org/apache/james/mailetcontainer/impl/JamesMailSpoolerTest.java b/server/mailet/mailetcontainer-camel/src/test/java/org/apache/james/mailetcontainer/impl/JamesMailSpoolerTest.java
deleted file mode 100644
index 9485686..0000000
--- a/server/mailet/mailetcontainer-camel/src/test/java/org/apache/james/mailetcontainer/impl/JamesMailSpoolerTest.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-
-package org.apache.james.mailetcontainer.impl;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.awaitility.Duration.ONE_HUNDRED_MILLISECONDS;
-import static org.awaitility.Duration.TEN_SECONDS;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.function.Consumer;
-
-import javax.mail.MessagingException;
-
-import org.apache.commons.configuration2.plist.PropertyListConfiguration;
-import org.apache.james.mailetcontainer.api.MailProcessor;
-import org.apache.james.metrics.api.MetricFactory;
-import org.apache.james.metrics.api.TimeMetric;
-import org.apache.james.queue.api.MailQueue;
-import org.apache.james.queue.api.MailQueueFactory;
-import org.apache.mailet.Mail;
-import org.apache.mailet.base.test.FakeMail;
-import org.awaitility.Awaitility;
-import org.awaitility.core.ConditionFactory;
-import org.junit.jupiter.api.Test;
-
-import reactor.core.publisher.UnicastProcessor;
-
-class JamesMailSpoolerTest {
- private static final ConditionFactory CALMLY_AWAIT = Awaitility
- .with().pollInterval(ONE_HUNDRED_MILLISECONDS)
- .and().pollDelay(ONE_HUNDRED_MILLISECONDS)
- .await()
- .atMost(TEN_SECONDS);
-
- @Test
- void thrownExceptionShouldAckTheItem() throws MessagingException {
- MetricFactory metricFactory = mock(MetricFactory.class);
- when(metricFactory.timer(JamesMailSpooler.SPOOL_PROCESSING)).thenAnswer(ignored -> mock(TimeMetric.class));
- MailQueueFactory<?> queueFactory = mock(MailQueueFactory.class);
- MailProcessor mailProcessor = mock(MailProcessor.class);
- JamesMailSpooler spooler = new JamesMailSpooler(metricFactory, mailProcessor, queueFactory);
-
- UnicastProcessor<MockedMailQueueItem> workQueue = UnicastProcessor.create();
- MockedMailQueueItem item = new MockedMailQueueItem();
- item.addCallback(isDone -> {
- if (!isDone) {
- workQueue.onNext(item);
- }
- });
- MailQueue queue = mock(MailQueue.class);
- workQueue.onNext(item);
- when(queue.deQueue()).thenAnswer(any -> workQueue.limitRate(1).filter(MockedMailQueueItem::isNotDone));
- when(queueFactory.createQueue(eq(MailQueueFactory.SPOOL), any())).thenAnswer(any -> queue);
-
- doThrow(new RuntimeException("Arbitrary failure"))
- .doNothing()
- .when(mailProcessor).service(any());
-
- PropertyListConfiguration configuration = new PropertyListConfiguration();
- configuration.addProperty("threads", 2);
- spooler.configure(configuration);
- spooler.init();
-
- CALMLY_AWAIT.until(() -> item.getDones().size() == 2);
-
- assertThat(item.getDones()).containsExactly(false, true);
- }
-
- @Test
- void threadSuicideShouldAckTheItem() throws MessagingException {
- MetricFactory metricFactory = mock(MetricFactory.class);
- when(metricFactory.timer(JamesMailSpooler.SPOOL_PROCESSING)).thenAnswer(ignored -> mock(TimeMetric.class));
- MailQueueFactory<?> queueFactory = mock(MailQueueFactory.class);
- MailProcessor mailProcessor = mock(MailProcessor.class);
- JamesMailSpooler spooler = new JamesMailSpooler(metricFactory, mailProcessor, queueFactory);
-
- UnicastProcessor<MockedMailQueueItem> workQueue = UnicastProcessor.create();
- MockedMailQueueItem item = new MockedMailQueueItem();
- item.addCallback(isDone -> {
- if (!isDone) {
- workQueue.onNext(item);
- }
- });
- MailQueue queue = mock(MailQueue.class);
- workQueue.onNext(item);
- when(queue.deQueue()).thenAnswer(any -> workQueue.limitRate(1).filter(MockedMailQueueItem::isNotDone));
- when(queueFactory.createQueue(eq(MailQueueFactory.SPOOL), any())).thenAnswer(any -> queue);
-
- doAnswer(ignored -> {
- Thread.currentThread().interrupt();
- return null;
- }).doNothing().when(mailProcessor).service(any());
-
- PropertyListConfiguration configuration = new PropertyListConfiguration();
- configuration.addProperty("threads", 2);
- spooler.configure(configuration);
- spooler.init();
-
- CALMLY_AWAIT.until(() -> item.getDones().size() == 2);
-
- assertThat(item.getDones()).containsExactly(false, true);
- }
-
- private class MockedMailQueueItem implements MailQueue.MailQueueItem {
- private final Collection<Boolean> dones;
- private Consumer<Boolean> doneCallback;
-
- private MockedMailQueueItem() {
- dones = new ArrayList<>();
- }
-
- @Override
- public Mail getMail() {
- try {
- return FakeMail.defaultFakeMail();
- } catch (MessagingException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void done(boolean success) throws MailQueue.MailQueueException {
- dones.add(success);
- doneCallback.accept(success);
- }
-
- public Collection<Boolean> getDones() {
- return dones;
- }
-
- public boolean isNotDone() {
- return !dones.contains(true);
- }
-
- public void addCallback(Consumer<Boolean> callback) {
- doneCallback = callback;
- }
- }
-}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org