You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2021/05/24 09:17:17 UTC

[james-project] 01/04: JAMES-3588 JamesMailSpooler should defend itself against mutability in the face of failed processing

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 97540d77a05b0e1629726243e43912b8dc6ac867
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Thu May 20 11:52:15 2021 +0700

    JAMES-3588 JamesMailSpooler should defend itself against mutability in the face of failed processing
    
    The mail processing can alter the recipients, and the errored mail was never
    redelivered to some addresses, duplicated for others.
    
    Restoring original recipients allows for predictability here and prevents message
    loss.
---
 .../org/apache/james/mailets/MailetErrorsTest.java | 53 +++++++++++++++++++++-
 .../mailetcontainer/impl/JamesMailSpooler.java     | 10 +++-
 .../org/apache/james/utils/SMTPMessageSender.java  |  9 ++++
 3 files changed, 69 insertions(+), 3 deletions(-)

diff --git a/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/MailetErrorsTest.java b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/MailetErrorsTest.java
index 15fd7ec..700d212 100644
--- a/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/MailetErrorsTest.java
+++ b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/MailetErrorsTest.java
@@ -24,7 +24,10 @@ import static org.apache.james.mailets.configuration.CommonProcessors.ERROR_REPO
 import static org.apache.james.mailets.configuration.Constants.DEFAULT_DOMAIN;
 import static org.apache.james.mailets.configuration.Constants.FROM;
 import static org.apache.james.mailets.configuration.Constants.LOCALHOST_IP;
+import static org.apache.james.mailets.configuration.Constants.PASSWORD;
+import static org.apache.james.mailets.configuration.Constants.RECIPIENT;
 import static org.apache.james.mailets.configuration.Constants.awaitAtMostOneMinute;
+import static org.assertj.core.api.Assertions.assertThat;
 
 import java.io.File;
 
@@ -33,6 +36,7 @@ import org.apache.james.mailets.configuration.MailetConfiguration;
 import org.apache.james.mailets.configuration.MailetContainer;
 import org.apache.james.mailets.configuration.ProcessorConfiguration;
 import org.apache.james.mailrepository.api.MailRepositoryUrl;
+import org.apache.james.modules.protocols.ImapGuiceProbe;
 import org.apache.james.modules.protocols.SmtpGuiceProbe;
 import org.apache.james.transport.mailets.ErrorMailet;
 import org.apache.james.transport.mailets.ErrorMatcher;
@@ -41,6 +45,7 @@ import org.apache.james.transport.mailets.NoClassDefFoundErrorMatcher;
 import org.apache.james.transport.mailets.NoopMailet;
 import org.apache.james.transport.mailets.Null;
 import org.apache.james.transport.mailets.OneRuntimeErrorMailet;
+import org.apache.james.transport.mailets.OneRuntimeExceptionMailet;
 import org.apache.james.transport.mailets.OneThreadSuicideMailet;
 import org.apache.james.transport.mailets.RuntimeErrorMailet;
 import org.apache.james.transport.mailets.RuntimeExceptionMailet;
@@ -48,19 +53,26 @@ import org.apache.james.transport.mailets.RuntimeExceptionMatcher;
 import org.apache.james.transport.mailets.ToRepository;
 import org.apache.james.transport.matchers.All;
 import org.apache.james.transport.matchers.HasException;
+import org.apache.james.transport.matchers.RecipientIs;
+import org.apache.james.utils.DataProbeImpl;
 import org.apache.james.utils.MailRepositoryProbeImpl;
 import org.apache.james.utils.SMTPMessageSender;
-import org.junit.jupiter.api.Test;
+import org.apache.james.utils.TestIMAPClient;
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 import org.junit.jupiter.api.io.TempDir;
 
+import com.google.common.collect.ImmutableList;
+
 class MailetErrorsTest {
     public static final String CUSTOM_PROCESSOR = "custom";
     public static final MailRepositoryUrl CUSTOM_REPOSITORY = MailRepositoryUrl.from("memory://var/mail/custom/");
 
     @RegisterExtension
     public SMTPMessageSender smtpMessageSender = new SMTPMessageSender(DEFAULT_DOMAIN);
+    @RegisterExtension
+    public TestIMAPClient testIMAPClient = new TestIMAPClient();
 
     private TemporaryJamesServer jamesServer;
 
@@ -132,6 +144,45 @@ class MailetErrorsTest {
     }
 
     @Test
+    void retryShouldSucceedUponSplittedMail(@TempDir File temporaryFolder) throws Exception {
+        jamesServer = TemporaryJamesServer.builder()
+            .withMailetContainer(MailetContainer.builder()
+                .putProcessor(ProcessorConfiguration.transport()
+                    .addMailet(MailetConfiguration.builder()
+                        .matcher(RecipientIs.class)
+                        .matcherCondition(RECIPIENT)
+                        .mailet(OneRuntimeErrorMailet.class)
+                        .addProperty("onMailetException", "propagate"))
+                    .addMailetsFrom(CommonProcessors.transport()))
+                .putProcessor(errorProcessor())
+                .putProcessor(CommonProcessors.root()))
+            .build(temporaryFolder);
+        jamesServer.start();
+        jamesServer.getProbe(DataProbeImpl.class).fluent()
+            .addDomain(DEFAULT_DOMAIN)
+            .addUser(FROM, PASSWORD)
+            .addUser(RECIPIENT, PASSWORD);
+
+        smtpMessageSender.connect(LOCALHOST_IP, jamesServer.getProbe(SmtpGuiceProbe.class).getSmtpPort())
+            .authenticate(FROM, PASSWORD)
+            .sendMessage(FROM, ImmutableList.of(FROM, RECIPIENT));
+
+        Thread.sleep(5000);
+
+        assertThat(testIMAPClient.connect(LOCALHOST_IP, jamesServer.getProbe(ImapGuiceProbe.class).getImapPort())
+            .login(RECIPIENT, PASSWORD)
+            .select(TestIMAPClient.INBOX)
+            .awaitMessage(awaitAtMostOneMinute)
+            .getMessageCount(TestIMAPClient.INBOX)).isEqualTo(1);
+
+        assertThat(testIMAPClient.connect(LOCALHOST_IP, jamesServer.getProbe(ImapGuiceProbe.class).getImapPort())
+            .login(FROM, PASSWORD)
+            .select(TestIMAPClient.INBOX)
+            .awaitMessage(awaitAtMostOneMinute)
+            .getMessageCount(TestIMAPClient.INBOX)).isGreaterThanOrEqualTo(1);
+    }
+
+    @Test
     void mailetProcessorsShouldHandleRuntimeException(@TempDir File temporaryFolder) throws Exception {
         jamesServer = TemporaryJamesServer.builder()
             .withBase(SMTP_ONLY_MODULE)
diff --git a/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java b/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
index 4b7935f..87a4ac3 100644
--- a/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
+++ b/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
@@ -30,6 +30,7 @@ import javax.mail.MessagingException;
 
 import org.apache.commons.configuration2.HierarchicalConfiguration;
 import org.apache.commons.configuration2.tree.ImmutableNode;
+import org.apache.james.core.MailAddress;
 import org.apache.james.lifecycle.api.Configurable;
 import org.apache.james.lifecycle.api.Disposable;
 import org.apache.james.lifecycle.api.LifecycleUtil;
@@ -53,6 +54,7 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -110,6 +112,7 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
 
         private void performProcessMail(MailQueueItem queueItem, Mail mail) {
             LOGGER.debug("==== Begin processing mail {} ====", mail.getName());
+            ImmutableList<MailAddress> originalRecipients = ImmutableList.copyOf(mail.getRecipients());
             try {
                 mailProcessor.service(mail);
 
@@ -118,15 +121,18 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
                 }
                 queueItem.done(true);
             } catch (Exception e) {
-                handleError(queueItem, mail, e);
+                handleError(queueItem, mail, originalRecipients, e);
             } finally {
                 LOGGER.debug("==== End processing mail {} ====", mail.getName());
             }
         }
 
-        private void handleError(MailQueueItem queueItem, Mail mail, Exception processingException) {
+        private void handleError(MailQueueItem queueItem, Mail mail, ImmutableList<MailAddress> originalRecipients, Exception processingException) {
             int failureCount = computeFailureCount(mail);
 
+            // Restore original recipients
+            queueItem.getMail().setRecipients(originalRecipients);
+
             try {
                 if (failureCount > MAXIMUM_FAILURE_COUNT) {
                     LOGGER.error("Failed {} processing {} consecutive times. Abort. Mail is saved in {}", mail.getName(), failureCount, configuration.getErrorRepositoryURL().asString());
diff --git a/server/testing/src/main/java/org/apache/james/utils/SMTPMessageSender.java b/server/testing/src/main/java/org/apache/james/utils/SMTPMessageSender.java
index d17c2c1..05e80e1 100644
--- a/server/testing/src/main/java/org/apache/james/utils/SMTPMessageSender.java
+++ b/server/testing/src/main/java/org/apache/james/utils/SMTPMessageSender.java
@@ -102,6 +102,15 @@ public class SMTPMessageSender extends ExternalResource implements Closeable, Af
         return sendMessageWithHeaders(from, ImmutableList.of(recipient), message);
     }
 
+    public SMTPMessageSender sendMessage(String from, List<String> recipients) throws IOException {
+        String message = "FROM: " + from + "\r\n" +
+            "subject: test\r\n" +
+            "\r\n" +
+            "content\r\n" +
+            ".\r\n";
+        return sendMessageWithHeaders(from, recipients, message);
+    }
+
     public SMTPMessageSender sendMessageNoBracket(String from, String recipient) throws IOException {
         doHelo();
         doSetSender(from);

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org