You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2022/08/23 01:12:40 UTC

[james-project] branch master updated: JAMES-3804 Improve error handling when mailetContainer misses a proce… (#1128)

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new a7298852f3 JAMES-3804 Improve error handling when mailetContainer misses a proce… (#1128)
a7298852f3 is described below

commit a7298852f3bba053a8b15360f823d9fa95619c4a
Author: Benoit TELLIER <bt...@linagora.com>
AuthorDate: Tue Aug 23 08:12:36 2022 +0700

    JAMES-3804 Improve error handling when mailetContainer misses a proce… (#1128)
    
    Today, if I specify a mailet pointing to a processor that do not exist, James will start, and the failure will be managed at runtime: upon processing, each mail needed to be processed by the unspecified processor will instead go to the error processor.
    
    Recovering data from such a failure is quite a journey in itself.
    
    Example of such configuration:
    
    ```
    <mailet matcher="All" class="ToProcessor">
        <processor>notFound</processor>
    </mailet>
    ```
    
    Instead of handling such failures at runtime, we could instead abort James startup.
    
    Given we add a new method on the mailet interface, for a mailet to give the list of processor it needs, we can trivially implement such a check in the mailet container.
---
 .../src/main/java/org/apache/mailet/Mailet.java    | 13 ++++
 .../java/org/apache/mailet/ProcessingState.java    | 56 ++++++++++++++++
 .../james/transport/mailets/ToProcessor.java       | 19 ++++--
 .../apache/james/mailets/TemporaryJamesServer.java |  6 +-
 .../org/apache/james/mailets/MailetErrorsTest.java |  4 ++
 .../apache/james/mailets/MissingProcessorTest.java | 74 ++++++++++++++++++++++
 .../james/mailets/flow/ExecutionFlowTest.java      | 40 +-----------
 .../lib/AbstractStateCompositeProcessor.java       | 29 +++++++++
 .../transport/mailets/RecipientRewriteTable.java   | 17 +++--
 .../mailets/RecipientRewriteTableProcessor.java    |  9 +--
 .../james/transport/mailets/RemoteDelivery.java    | 10 +++
 .../apache/james/transport/mailets/Requeue.java    | 15 ++++-
 .../transport/mailets/remote/delivery/Bouncer.java | 22 +++----
 .../mailets/remote/delivery/DeliveryRunnable.java  |  2 +-
 .../mailets/remote/delivery/MailDelivrer.java      |  2 +-
 .../delivery/RemoteDeliveryConfiguration.java      | 15 +++--
 .../delivery/RemoteDeliveryConfigurationTest.java  |  5 +-
 .../james/transport/mailets/GlobalRateLimit.scala  |  7 +-
 .../transport/mailets/PerRecipientRateLimit.scala  |  6 +-
 .../transport/mailets/PerSenderRateLimit.scala     |  6 +-
 20 files changed, 272 insertions(+), 85 deletions(-)

diff --git a/mailet/api/src/main/java/org/apache/mailet/Mailet.java b/mailet/api/src/main/java/org/apache/mailet/Mailet.java
index 4fbae2c5c0..8bc1ee802e 100644
--- a/mailet/api/src/main/java/org/apache/mailet/Mailet.java
+++ b/mailet/api/src/main/java/org/apache/mailet/Mailet.java
@@ -20,8 +20,12 @@
 
 package org.apache.mailet;
 
+import java.util.Collection;
+
 import javax.mail.MessagingException;
 
+import com.google.common.collect.ImmutableList;
+
 /**
  * A Mailet processes mail messages.
  * <p>
@@ -128,4 +132,13 @@ public interface Mailet {
      */
     String getMailetInfo();
 
+    /**
+     * @return the list of processors that needs to be present according to this mailet configuration.
+     *
+     * Needs to be called after {@link Mailet::init()}
+     */
+    default Collection<ProcessingState> requiredProcessingState() {
+        return ImmutableList.of();
+    }
+
 }
diff --git a/mailet/api/src/main/java/org/apache/mailet/ProcessingState.java b/mailet/api/src/main/java/org/apache/mailet/ProcessingState.java
new file mode 100644
index 0000000000..14ed8baa52
--- /dev/null
+++ b/mailet/api/src/main/java/org/apache/mailet/ProcessingState.java
@@ -0,0 +1,56 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.mailet;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+
+public class ProcessingState {
+    private final String value;
+
+    public ProcessingState(String value) {
+        Preconditions.checkNotNull(value);
+        this.value = value;
+    }
+
+    public String getValue() {
+        return value;
+    }
+
+    @Override
+    public final boolean equals(Object o) {
+        if (o instanceof ProcessingState) {
+            ProcessingState that = (ProcessingState) o;
+
+            return Objects.equal(this.value, that.value);
+        }
+        return false;
+    }
+
+    @Override
+    public final int hashCode() {
+        return Objects.hashCode(value);
+    }
+
+    @Override
+    public String toString() {
+        return value;
+    }
+}
diff --git a/mailet/standard/src/main/java/org/apache/james/transport/mailets/ToProcessor.java b/mailet/standard/src/main/java/org/apache/james/transport/mailets/ToProcessor.java
index 8e2fad5ac8..700d83b56c 100644
--- a/mailet/standard/src/main/java/org/apache/james/transport/mailets/ToProcessor.java
+++ b/mailet/standard/src/main/java/org/apache/james/transport/mailets/ToProcessor.java
@@ -21,16 +21,20 @@
 
 package org.apache.james.transport.mailets;
 
+import java.util.Collection;
 import java.util.Optional;
 
 import javax.mail.MessagingException;
 
 import org.apache.mailet.Mail;
 import org.apache.mailet.MailetException;
+import org.apache.mailet.ProcessingState;
 import org.apache.mailet.base.GenericMailet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.ImmutableList;
+
 /**
  * <p>This mailet redirects the mail to the named processor</p>
  *
@@ -48,16 +52,14 @@ public class ToProcessor extends GenericMailet {
     private static final Logger LOGGER = LoggerFactory.getLogger(ToProcessor.class);
 
     private boolean debug;
-    private String processor;
+    private ProcessingState processor;
     private Optional<String> noticeText;
 
     @Override
     public void init() throws MailetException {
         debug = isDebug();
-        processor = getInitParameter("processor");
-        if (processor == null) {
-            throw new MailetException("processor parameter is required");
-        }
+        processor = new ProcessingState(Optional.ofNullable(getInitParameter("processor"))
+            .orElseThrow(() -> new MailetException("processor parameter is required")));
         noticeText = Optional.ofNullable(getInitParameter("notice"));
     }
 
@@ -75,7 +77,7 @@ public class ToProcessor extends GenericMailet {
         if (debug) {
             LOGGER.debug("Sending mail {} to {}", mail, processor);
         }
-        mail.setState(processor);
+        mail.setState(processor.getValue());
         if (noticeText.isPresent()) {
             setNoticeInErrorMessage(mail);
         }
@@ -88,4 +90,9 @@ public class ToProcessor extends GenericMailet {
             mail.setErrorMessage(String.format("%s\r\n%s", mail.getErrorMessage(), noticeText.get()));
         }
     }
+
+    @Override
+    public Collection<ProcessingState> requiredProcessingState() {
+        return ImmutableList.of(processor);
+    }
 }
diff --git a/server/mailet/integration-testing/src/main/java/org/apache/james/mailets/TemporaryJamesServer.java b/server/mailet/integration-testing/src/main/java/org/apache/james/mailets/TemporaryJamesServer.java
index e3bbb4b0ec..aee4e6914d 100644
--- a/server/mailet/integration-testing/src/main/java/org/apache/james/mailets/TemporaryJamesServer.java
+++ b/server/mailet/integration-testing/src/main/java/org/apache/james/mailets/TemporaryJamesServer.java
@@ -55,14 +55,16 @@ public class TemporaryJamesServer {
         return MailetContainer.builder()
             .putProcessor(CommonProcessors.root())
             .putProcessor(CommonProcessors.error())
-            .putProcessor(CommonProcessors.transport());
+            .putProcessor(CommonProcessors.transport())
+            .putProcessor(CommonProcessors.bounces());
     }
 
     public static MailetContainer.Builder simpleMailetContainerConfiguration() {
         return MailetContainer.builder()
             .putProcessor(CommonProcessors.simpleRoot())
             .putProcessor(CommonProcessors.error())
-            .putProcessor(CommonProcessors.transport());
+            .putProcessor(CommonProcessors.transport())
+            .putProcessor(CommonProcessors.bounces());
     }
 
 
diff --git a/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/MailetErrorsTest.java b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/MailetErrorsTest.java
index 5342e20ecd..d9a8be365e 100644
--- a/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/MailetErrorsTest.java
+++ b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/MailetErrorsTest.java
@@ -136,6 +136,7 @@ class MailetErrorsTest {
             .withMailetContainer(MailetContainer.builder()
                 .putProcessor(CommonProcessors.transport())
                 .putProcessor(errorProcessor())
+                .putProcessor(CommonProcessors.bounces())
                 .putProcessor(ProcessorConfiguration.root()
                     .addMailet(MailetConfiguration.builder()
                         .matcher(All.class)
@@ -173,6 +174,7 @@ class MailetErrorsTest {
                         .addProperty("onMailetException", "propagate"))
                     .addMailetsFrom(CommonProcessors.transport()))
                 .putProcessor(errorProcessor())
+                .putProcessor(CommonProcessors.bounces())
                 .putProcessor(CommonProcessors.root()))
             .withSmtpConfiguration(SmtpConfiguration.builder()
                 .withAutorizedAddresses("0.0.0.0/0.0.0.0"))
@@ -211,6 +213,7 @@ class MailetErrorsTest {
                         .addProperty("onMatchException", "propagate"))
                     .addMailetsFrom(CommonProcessors.transport()))
                 .putProcessor(errorProcessor())
+                .putProcessor(CommonProcessors.bounces())
                 .putProcessor(CommonProcessors.root()))
             .withSmtpConfiguration(SmtpConfiguration.builder()
                 .withAutorizedAddresses("0.0.0.0/0.0.0.0"))
@@ -267,6 +270,7 @@ class MailetErrorsTest {
                         .addProperty("onMailetException", "propagate"))
                     .addMailetsFrom(CommonProcessors.transport()))
                 .putProcessor(errorProcessor())
+                .putProcessor(CommonProcessors.bounces())
                 .putProcessor(CommonProcessors.root()))
             .withSmtpConfiguration(SmtpConfiguration.builder()
                 .withAutorizedAddresses("0.0.0.0/0.0.0.0"))
diff --git a/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/MissingProcessorTest.java b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/MissingProcessorTest.java
new file mode 100644
index 0000000000..92894a8e99
--- /dev/null
+++ b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/MissingProcessorTest.java
@@ -0,0 +1,74 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailets;
+
+import static org.apache.james.MemoryJamesServerMain.SMTP_ONLY_MODULE;
+import static org.apache.james.mailets.configuration.Constants.DEFAULT_DOMAIN;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.io.File;
+
+import org.apache.commons.configuration2.ex.ConfigurationException;
+import org.apache.james.mailets.configuration.CommonProcessors;
+import org.apache.james.mailets.configuration.MailetConfiguration;
+import org.apache.james.mailets.configuration.MailetContainer;
+import org.apache.james.mailets.configuration.ProcessorConfiguration;
+import org.apache.james.transport.mailets.ToProcessor;
+import org.apache.james.transport.matchers.All;
+import org.apache.james.utils.SMTPMessageSender;
+import org.apache.james.utils.TestIMAPClient;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+
+class MissingProcessorTest {
+    @RegisterExtension
+    public SMTPMessageSender smtpMessageSender = new SMTPMessageSender(DEFAULT_DOMAIN);
+    @RegisterExtension
+    public TestIMAPClient testIMAPClient = new TestIMAPClient();
+
+    private TemporaryJamesServer jamesServer;
+
+    @AfterEach
+    void tearDown() {
+        if (jamesServer != null) {
+            jamesServer.shutdown();
+        }
+    }
+
+    @Test
+    void shouldFailOnMissingProcessor(@TempDir File temporaryFolder) throws Exception {
+        jamesServer = TemporaryJamesServer.builder()
+            .withBase(SMTP_ONLY_MODULE)
+            .withMailetContainer(MailetContainer.builder()
+                .putProcessor(CommonProcessors.deliverOnlyTransport())
+                .putProcessor(CommonProcessors.error())
+                .putProcessor(ProcessorConfiguration.root()
+                    .addMailet(MailetConfiguration.builder()
+                        .matcher(All.class)
+                        .mailet(ToProcessor.class)
+                        .addProperty("processor", "missing"))))
+            .build(temporaryFolder);
+
+        assertThatThrownBy(() -> jamesServer.start())
+            .isInstanceOf(ConfigurationException.class);
+    }
+}
diff --git a/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/ExecutionFlowTest.java b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/ExecutionFlowTest.java
index 55b2fbe050..b694e14787 100644
--- a/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/ExecutionFlowTest.java
+++ b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/ExecutionFlowTest.java
@@ -100,6 +100,7 @@ public class ExecutionFlowTest {
                             .build())
                         .addMailet(MailetConfiguration.LOCAL_DELIVERY))
                 .putProcessor(CommonProcessors.error())
+                .putProcessor(CommonProcessors.bounces())
                 .putProcessor(CommonProcessors.root()))
             .build(temporaryFolder);
         jamesServer.start();
@@ -870,45 +871,6 @@ public class ExecutionFlowTest {
             .containsOnly(new MailAddress(RECIPIENT));
     }
 
-    @Test
-    public void toProcessorShouldSendToErrorWhenNotFound(@TempDir File temporaryFolder) throws Exception {
-        jamesServer = TemporaryJamesServer.builder()
-            .withBase(SMTP_AND_IMAP_MODULE)
-            .withMailetContainer(MailetContainer.builder()
-                .putProcessor(ProcessorConfiguration.transport()
-                    .addMailet(MailetConfiguration.BCC_STRIPPER)
-                    .addMailet(MailetConfiguration.builder()
-                        .matcher(RecipientIs.class)
-                        .matcherCondition(RECIPIENT)
-                        .mailet(ToProcessor.class)
-                        .addProperty("processor", "custom")
-                        .build())
-                    .addMailet(MailetConfiguration.LOCAL_DELIVERY))
-                .putProcessor(ProcessorConfiguration.error()
-                    .addMailet(MailetConfiguration.builder()
-                        .matcher(All.class)
-                        .mailet(CountingExecutionMailet.class)))
-                .putProcessor(CommonProcessors.root()))
-            .build(temporaryFolder);
-        jamesServer.start();
-        jamesServer.getProbe(DataProbeImpl.class)
-            .fluent()
-            .addDomain(DEFAULT_DOMAIN)
-            .addUser(FROM, PASSWORD)
-            .addUser(RECIPIENT, PASSWORD);
-
-        smtpMessageSender.connect(LOCALHOST_IP, jamesServer.getProbe(SmtpGuiceProbe.class).getSmtpPort())
-            .authenticate(FROM, PASSWORD)
-            .sendMessage(FROM, ImmutableList.of(FROM, RECIPIENT));
-
-        Thread.sleep(100); // queue delays might cause the processing not to start straight at the end of the SMTP session
-        awaitAtMostOneMinute.untilAsserted(() -> assertThat(
-            jamesServer.getProbe(SpoolerProbe.class).processingFinished())
-            .isTrue());
-        assertThat(CountingExecutionMailet.executionCount())
-            .isEqualTo(1);
-    }
-
     @Test
     public void splitShouldNotDisposeContent(@TempDir File temporaryFolder) throws Exception {
         jamesServer = TemporaryJamesServer.builder()
diff --git a/server/mailet/mailetcontainer-impl/src/main/java/org/apache/james/mailetcontainer/lib/AbstractStateCompositeProcessor.java b/server/mailet/mailetcontainer-impl/src/main/java/org/apache/james/mailetcontainer/lib/AbstractStateCompositeProcessor.java
index 17015466a2..a925da87bf 100644
--- a/server/mailet/mailetcontainer-impl/src/main/java/org/apache/james/mailetcontainer/lib/AbstractStateCompositeProcessor.java
+++ b/server/mailet/mailetcontainer-impl/src/main/java/org/apache/james/mailetcontainer/lib/AbstractStateCompositeProcessor.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.stream.Stream;
 
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
@@ -36,12 +37,18 @@ import org.apache.james.lifecycle.api.Configurable;
 import org.apache.james.lifecycle.api.LifecycleUtil;
 import org.apache.james.mailetcontainer.api.MailProcessor;
 import org.apache.james.mailetcontainer.impl.CompositeProcessorImpl;
+import org.apache.james.mailetcontainer.impl.MailetProcessorImpl;
+import org.apache.james.mailetcontainer.impl.MatcherMailetPair;
 import org.apache.james.mailetcontainer.impl.ProcessorImpl;
 import org.apache.james.mailetcontainer.impl.jmx.JMXStateCompositeProcessorListener;
 import org.apache.mailet.Mail;
+import org.apache.mailet.Mailet;
+import org.apache.mailet.ProcessingState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.ImmutableList;
+
 /**
  * Abstract base class for {@link CompositeProcessorImpl} which service the
  * {@link Mail} with a {@link ProcessorImpl} instances
@@ -126,6 +133,28 @@ public abstract class AbstractStateCompositeProcessor implements MailProcessor,
         if (!processors.containsKey(Mail.DEFAULT)) {
             throw new ConfigurationException("You need to configure a Processor with name " + Mail.DEFAULT);
         }
+        ImmutableList<ProcessingState> missingProcessors = processors.values()
+            .stream()
+            .filter(MailetProcessorImpl.class::isInstance)
+            .map(MailetProcessorImpl.class::cast)
+            .flatMap(processor -> processor.getPairs().stream().map(MatcherMailetPair::getMailet))
+            .flatMap(this::requiredProcessorStates)
+            .filter(state -> !state.equals(new ProcessingState("propagate")) && !state.equals(new ProcessingState("ignore")))
+            .filter(state -> !processors.containsKey(state.getValue()))
+            .collect(ImmutableList.toImmutableList());
+
+        if (!missingProcessors.isEmpty()) {
+            throw new ConfigurationException("Your configurations specifies the following undefined processors: " + missingProcessors);
+        }
+    }
+
+    private Stream<ProcessingState> requiredProcessorStates(Mailet mailet) {
+        return Stream.concat(mailet.requiredProcessingState().stream(),
+            Stream.of(
+                    Optional.ofNullable(mailet.getMailetConfig().getInitParameter("onMailetException")),
+                    Optional.ofNullable(mailet.getMailetConfig().getInitParameter("onMatcherException")))
+                .flatMap(Optional::stream)
+                .map(ProcessingState::new));
     }
 
     @PostConstruct
diff --git a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RecipientRewriteTable.java b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RecipientRewriteTable.java
index c43e963cdc..155f2f395e 100644
--- a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RecipientRewriteTable.java
+++ b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RecipientRewriteTable.java
@@ -19,15 +19,19 @@
 
 package org.apache.james.transport.mailets;
 
+import java.util.Collection;
+
 import javax.inject.Inject;
 import javax.mail.MessagingException;
 import javax.mail.internet.MimeMessage;
 
 import org.apache.james.domainlist.api.DomainList;
 import org.apache.mailet.Mail;
+import org.apache.mailet.ProcessingState;
 import org.apache.mailet.base.GenericMailet;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 
 /**
  * Mailet which should get used when using RecipientRewriteTable-Store to
@@ -51,13 +55,8 @@ public class RecipientRewriteTable extends GenericMailet {
     private final org.apache.james.rrt.api.RecipientRewriteTable virtualTableStore;
     private final DomainList domainList;
     private RecipientRewriteTableProcessor processor;
+    private ProcessingState errorProcessor;
 
-    /**
-     * Sets the virtual table store.
-     * 
-     * @param vut
-     *            the vutStore to set, possibly null
-     */
     @Inject
     public RecipientRewriteTable(org.apache.james.rrt.api.RecipientRewriteTable virtualTableStore, DomainList domainList) {
         this.virtualTableStore = virtualTableStore;
@@ -66,7 +65,7 @@ public class RecipientRewriteTable extends GenericMailet {
 
     @Override
     public void init() throws MessagingException {
-        String errorProcessor = getInitParameter(ERROR_PROCESSOR, Mail.ERROR);
+        errorProcessor = new ProcessingState(getInitParameter(ERROR_PROCESSOR, Mail.ERROR));
         processor = new RecipientRewriteTableProcessor(virtualTableStore, domainList, getMailetContext(), errorProcessor);
     }
 
@@ -95,4 +94,8 @@ public class RecipientRewriteTable extends GenericMailet {
         return "RecipientRewriteTable Mailet";
     }
 
+    @Override
+    public Collection<ProcessingState> requiredProcessingState() {
+        return ImmutableList.of(errorProcessor);
+    }
 }
diff --git a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RecipientRewriteTableProcessor.java b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RecipientRewriteTableProcessor.java
index df1cdc6e84..3b9637cb9b 100644
--- a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RecipientRewriteTableProcessor.java
+++ b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RecipientRewriteTableProcessor.java
@@ -49,6 +49,7 @@ import org.apache.mailet.DsnParameters;
 import org.apache.mailet.DsnParameters.RecipientDsnParameters;
 import org.apache.mailet.Mail;
 import org.apache.mailet.MailetContext;
+import org.apache.mailet.ProcessingState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -151,9 +152,9 @@ public class RecipientRewriteTableProcessor {
     private final RecipientRewriteTable virtualTableStore;
     private final MailetContext mailetContext;
     private final Supplier<Domain> defaultDomainSupplier;
-    private final String errorProcessor;
+    private final ProcessingState errorProcessor;
 
-    public RecipientRewriteTableProcessor(RecipientRewriteTable virtualTableStore, DomainList domainList, MailetContext mailetContext, String errorProcessor) {
+    public RecipientRewriteTableProcessor(RecipientRewriteTable virtualTableStore, DomainList domainList, MailetContext mailetContext, ProcessingState errorProcessor) {
         this.virtualTableStore = virtualTableStore;
         this.mailetContext = mailetContext;
         this.defaultDomainSupplier = MemoizedSupplier.of(
@@ -162,7 +163,7 @@ public class RecipientRewriteTableProcessor {
     }
 
     public RecipientRewriteTableProcessor(RecipientRewriteTable virtualTableStore, DomainList domainList, MailetContext mailetContext) {
-        this(virtualTableStore, domainList, mailetContext, Mail.ERROR);
+        this(virtualTableStore, domainList, mailetContext, new ProcessingState(Mail.ERROR));
     }
 
     private Domain getDefaultDomain(DomainList domainList) throws MessagingException {
@@ -200,7 +201,7 @@ public class RecipientRewriteTableProcessor {
                 .sender(mail.getMaybeSender())
                 .addRecipients(executionResults.recipientWithError)
                 .mimeMessage(mail.getMessage())
-                .state(errorProcessor)
+                .state(errorProcessor.getValue())
                 .build();
             mailetContext.sendMail(newMail);
             LifecycleUtil.dispose(newMail);
diff --git a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java
index b89eaf30ad..83984c892f 100644
--- a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java
+++ b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java
@@ -22,6 +22,7 @@ package org.apache.james.transport.mailets;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Map;
+import java.util.stream.Stream;
 
 import javax.inject.Inject;
 import javax.mail.MessagingException;
@@ -39,11 +40,13 @@ import org.apache.james.transport.mailets.remote.delivery.Bouncer;
 import org.apache.james.transport.mailets.remote.delivery.DeliveryRunnable;
 import org.apache.james.transport.mailets.remote.delivery.RemoteDeliveryConfiguration;
 import org.apache.mailet.Mail;
+import org.apache.mailet.ProcessingState;
 import org.apache.mailet.base.GenericMailet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
 
 /**
  * <p>The RemoteDelivery mailet delivers messages to a remote SMTP server able to deliver or forward messages to their final
@@ -257,4 +260,11 @@ public class RemoteDelivery extends GenericMailet {
         }
     }
 
+    @Override
+    public Collection<ProcessingState> requiredProcessingState() {
+        return Stream.of(configuration.getBounceProcessor().stream(),
+                configuration.getOnSuccess().stream())
+            .flatMap(x -> x)
+            .collect(ImmutableList.toImmutableList());
+    }
 }
diff --git a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/Requeue.java b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/Requeue.java
index 9693d17c83..088a34c0b4 100644
--- a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/Requeue.java
+++ b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/Requeue.java
@@ -24,6 +24,7 @@ import static org.apache.mailet.Mail.GHOST;
 import java.io.IOException;
 import java.time.Duration;
 import java.time.temporal.ChronoUnit;
+import java.util.Collection;
 import java.util.Optional;
 
 import javax.inject.Inject;
@@ -35,10 +36,12 @@ import org.apache.james.queue.api.MailQueueFactory;
 import org.apache.james.queue.api.MailQueueName;
 import org.apache.james.util.DurationParser;
 import org.apache.mailet.Mail;
+import org.apache.mailet.ProcessingState;
 import org.apache.mailet.base.GenericMailet;
 
 import com.github.fge.lambdas.Throwing;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 
 /**
  * <p><b>Requeue</b> puts back the email in a queue.
@@ -73,7 +76,7 @@ public class Requeue extends GenericMailet {
 
     private MailQueue mailQueue;
     private Optional<Duration> delayDuration;
-    private String processor;
+    private ProcessingState processor;
     private boolean consume;
 
     @Inject
@@ -90,7 +93,8 @@ public class Requeue extends GenericMailet {
         delayDuration = Optional.ofNullable(getInitParameter("delay"))
             .map(delayValue -> DurationParser.parse(delayValue, ChronoUnit.SECONDS));
         processor = Optional.ofNullable(getInitParameter("processor"))
-            .orElse(Mail.DEFAULT);
+            .map(ProcessingState::new)
+            .orElse(new ProcessingState(Mail.DEFAULT));
 
         consume = getInitParameter("consume", true);
 
@@ -122,8 +126,13 @@ public class Requeue extends GenericMailet {
         }
     }
 
+    @Override
+    public Collection<ProcessingState> requiredProcessingState() {
+        return ImmutableList.of(processor);
+    }
+
     private void enqueue(Mail mail) {
-        mail.setState(processor);
+        mail.setState(processor.getValue());
         delayDuration.ifPresentOrElse(
             Throwing.consumer(delay -> mailQueue.enQueue(mail, delay)),
             Throwing.runnable(() -> mailQueue.enQueue(mail)).sneakyThrow());
diff --git a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/Bouncer.java b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/Bouncer.java
index f5e27894de..c073d0a043 100644
--- a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/Bouncer.java
+++ b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/Bouncer.java
@@ -57,17 +57,17 @@ public class Bouncer {
         if (!mail.hasSender()) {
             LOGGER.debug("Null Sender: no bounce will be generated for {}", mail.getName());
         } else {
-            if (configuration.getBounceProcessor() != null) {
-                computeErrorCode(ex).ifPresent(mail::setAttribute);
-                mail.setAttribute(new Attribute(DELIVERY_ERROR, AttributeValue.of(getErrorMsg(ex))));
-                try {
-                    mailetContext.sendMail(mail, configuration.getBounceProcessor());
-                } catch (MessagingException e) {
-                    LOGGER.warn("Exception re-inserting failed mail: ", e);
-                }
-            } else {
-                bounceWithMailetContext(mail, ex);
-            }
+            configuration.getBounceProcessor().ifPresentOrElse(
+                bounceProcessor -> {
+                    computeErrorCode(ex).ifPresent(mail::setAttribute);
+                    mail.setAttribute(new Attribute(DELIVERY_ERROR, AttributeValue.of(getErrorMsg(ex))));
+                    try {
+                        mailetContext.sendMail(mail, bounceProcessor.getValue());
+                    } catch (MessagingException e) {
+                        LOGGER.warn("Exception re-inserting failed mail: ", e);
+                    }
+                },
+                () -> bounceWithMailetContext(mail, ex));
         }
     }
 
diff --git a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/DeliveryRunnable.java b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/DeliveryRunnable.java
index 25d3a2d549..510a12a267 100644
--- a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/DeliveryRunnable.java
+++ b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/DeliveryRunnable.java
@@ -147,7 +147,7 @@ public class DeliveryRunnable implements Disposable {
             case SUCCESS:
                 outgoingMailsMetric.increment();
                 configuration.getOnSuccess()
-                    .ifPresent(Throwing.consumer(onSuccess -> mailetContext.sendMail(mail, onSuccess)));
+                    .ifPresent(Throwing.consumer(onSuccess -> mailetContext.sendMail(mail, onSuccess.getValue())));
                 break;
             case TEMPORARY_FAILURE:
                 handleTemporaryFailure(mail, executionResult);
diff --git a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/MailDelivrer.java b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/MailDelivrer.java
index 34dd22ab93..d57c645406 100644
--- a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/MailDelivrer.java
+++ b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/MailDelivrer.java
@@ -148,7 +148,7 @@ public class MailDelivrer {
                             copy.setRecipients(deliveredAddresses.stream()
                                 .map(Throwing.function(MailAddress::new))
                                 .collect(ImmutableList.toImmutableList()));
-                            mailetContext.sendMail(copy, onSuccess);
+                            mailetContext.sendMail(copy, onSuccess.getValue());
                         } finally {
                             LifecycleUtil.dispose(copy);
                         }
diff --git a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/RemoteDeliveryConfiguration.java b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/RemoteDeliveryConfiguration.java
index 493a15a061..a0cffb83cf 100644
--- a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/RemoteDeliveryConfiguration.java
+++ b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/RemoteDeliveryConfiguration.java
@@ -29,6 +29,7 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.apache.james.domainlist.api.DomainList;
 import org.apache.james.queue.api.MailQueueName;
 import org.apache.mailet.MailetConfig;
+import org.apache.mailet.ProcessingState;
 import org.apache.mailet.base.MailetUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -84,12 +85,12 @@ public class RemoteDeliveryConfiguration {
     private final HeloNameProvider heloNameProvider;
     private final MailQueueName outGoingQueueName;
     private final String bindAddress;
-    private final String bounceProcessor;
+    private final Optional<ProcessingState> bounceProcessor;
     private final Collection<String> gatewayServer;
     private final String authUser;
     private final String authPass;
     private final Properties javaxAdditionalProperties;
-    private final Optional<String> onSuccess;
+    private final Optional<ProcessingState> onSuccess;
 
     public RemoteDeliveryConfiguration(MailetConfig mailetConfig, DomainList domainList) {
         isDebug = MailetUtil.getInitParameter(mailetConfig, DEBUG).orElse(false);
@@ -100,7 +101,8 @@ public class RemoteDeliveryConfiguration {
         outGoingQueueName = Optional.ofNullable(mailetConfig.getInitParameter(OUTGOING))
             .map(MailQueueName::of)
             .orElse(DEFAULT_OUTGOING_QUEUE_NAME);
-        bounceProcessor = mailetConfig.getInitParameter(BOUNCE_PROCESSOR);
+        bounceProcessor = Optional.ofNullable(mailetConfig.getInitParameter(BOUNCE_PROCESSOR))
+            .map(ProcessingState::new);
         bindAddress = mailetConfig.getInitParameter(BIND);
 
         DelaysAndMaxRetry delaysAndMaxRetry = computeDelaysAndMaxRetry(mailetConfig);
@@ -123,7 +125,8 @@ public class RemoteDeliveryConfiguration {
         }
         isBindUsed = bindAddress != null;
         javaxAdditionalProperties = computeJavaxProperties(mailetConfig);
-        onSuccess = Optional.ofNullable(mailetConfig.getInitParameter(ON_SUCCESS));
+        onSuccess = Optional.ofNullable(mailetConfig.getInitParameter(ON_SUCCESS))
+            .map(ProcessingState::new);
     }
 
     private Properties computeJavaxProperties(MailetConfig mailetConfig) {
@@ -286,7 +289,7 @@ public class RemoteDeliveryConfiguration {
         return isBindUsed;
     }
 
-    public String getBounceProcessor() {
+    public Optional<ProcessingState> getBounceProcessor() {
         return bounceProcessor;
     }
 
@@ -322,7 +325,7 @@ public class RemoteDeliveryConfiguration {
         return bindAddress;
     }
 
-    public Optional<String> getOnSuccess() {
+    public Optional<ProcessingState> getOnSuccess() {
         return onSuccess;
     }
 }
diff --git a/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/remote/delivery/RemoteDeliveryConfigurationTest.java b/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/remote/delivery/RemoteDeliveryConfigurationTest.java
index e888faf002..32a03d30eb 100644
--- a/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/remote/delivery/RemoteDeliveryConfigurationTest.java
+++ b/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/remote/delivery/RemoteDeliveryConfigurationTest.java
@@ -29,6 +29,7 @@ import java.util.Properties;
 
 import org.apache.james.core.Domain;
 import org.apache.james.domainlist.api.DomainList;
+import org.apache.mailet.ProcessingState;
 import org.apache.mailet.base.test.FakeMailetConfig;
 import org.assertj.core.data.MapEntry;
 import org.junit.jupiter.api.Test;
@@ -231,7 +232,7 @@ class RemoteDeliveryConfigurationTest {
             .build();
 
         assertThat(new RemoteDeliveryConfiguration(mailetConfig, mock(DomainList.class)).getBounceProcessor())
-            .isNull();
+            .isEmpty();
     }
 
     @Test
@@ -242,7 +243,7 @@ class RemoteDeliveryConfigurationTest {
             .build();
 
         assertThat(new RemoteDeliveryConfiguration(mailetConfig, mock(DomainList.class)).getBounceProcessor())
-            .isEqualTo(value);
+            .contains(new ProcessingState(value));
     }
 
     @Test
diff --git a/server/mailet/rate-limiter/src/main/scala/org/apache/james/transport/mailets/GlobalRateLimit.scala b/server/mailet/rate-limiter/src/main/scala/org/apache/james/transport/mailets/GlobalRateLimit.scala
index e06ff724b9..a0f119111e 100644
--- a/server/mailet/rate-limiter/src/main/scala/org/apache/james/transport/mailets/GlobalRateLimit.scala
+++ b/server/mailet/rate-limiter/src/main/scala/org/apache/james/transport/mailets/GlobalRateLimit.scala
@@ -20,11 +20,13 @@
 package org.apache.james.transport.mailets
 
 import java.time.Duration
+import java.util
 
+import com.google.common.collect.ImmutableList
 import javax.inject.Inject
 import org.apache.james.rate.limiter.api.{AcceptableRate, RateExceeded, RateLimiter, RateLimiterFactory, RateLimitingKey, RateLimitingResult}
-import org.apache.mailet.Mail
 import org.apache.mailet.base.GenericMailet
+import org.apache.mailet.{Mail, ProcessingState}
 import org.reactivestreams.Publisher
 import reactor.core.scala.publisher.{SFlux, SMono}
 
@@ -144,4 +146,7 @@ class GlobalRateLimit @Inject()(rateLimiterFactory: RateLimiterFactory) extends
       .map(rateLimiterFactory.withSpecification(_, precision)),
       keyPrefix = keyPrefix,
       entityType = entityType)
+
+
+  override def requiredProcessingState(): util.Collection[ProcessingState] = ImmutableList.of(new ProcessingState(exceededProcessor))
 }
diff --git a/server/mailet/rate-limiter/src/main/scala/org/apache/james/transport/mailets/PerRecipientRateLimit.scala b/server/mailet/rate-limiter/src/main/scala/org/apache/james/transport/mailets/PerRecipientRateLimit.scala
index de0c8c8589..dd65a01138 100644
--- a/server/mailet/rate-limiter/src/main/scala/org/apache/james/transport/mailets/PerRecipientRateLimit.scala
+++ b/server/mailet/rate-limiter/src/main/scala/org/apache/james/transport/mailets/PerRecipientRateLimit.scala
@@ -20,6 +20,7 @@
 package org.apache.james.transport.mailets
 
 import java.time.Duration
+import java.util
 
 import com.google.common.annotations.VisibleForTesting
 import com.google.common.collect.ImmutableList
@@ -28,8 +29,8 @@ import org.apache.james.core.MailAddress
 import org.apache.james.lifecycle.api.LifecycleUtil
 import org.apache.james.rate.limiter.api.{AcceptableRate, RateExceeded, RateLimiter, RateLimiterFactory, RateLimitingKey, RateLimitingResult}
 import org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY
-import org.apache.mailet.Mail
 import org.apache.mailet.base.GenericMailet
+import org.apache.mailet.{Mail, ProcessingState}
 import org.reactivestreams.Publisher
 import reactor.core.scala.publisher.{SFlux, SMono}
 
@@ -149,4 +150,7 @@ class PerRecipientRateLimit @Inject()(rateLimiterFactory: RateLimiterFactory) ex
       .collectSeq()
       .block()
 
+
+  override def requiredProcessingState(): util.Collection[ProcessingState] = ImmutableList.of(new ProcessingState(exceededProcessor))
+
 }
diff --git a/server/mailet/rate-limiter/src/main/scala/org/apache/james/transport/mailets/PerSenderRateLimit.scala b/server/mailet/rate-limiter/src/main/scala/org/apache/james/transport/mailets/PerSenderRateLimit.scala
index b981e56bdc..956d063f90 100644
--- a/server/mailet/rate-limiter/src/main/scala/org/apache/james/transport/mailets/PerSenderRateLimit.scala
+++ b/server/mailet/rate-limiter/src/main/scala/org/apache/james/transport/mailets/PerSenderRateLimit.scala
@@ -20,13 +20,15 @@
 package org.apache.james.transport.mailets
 
 import java.time.Duration
+import java.util
 
 import com.google.common.annotations.VisibleForTesting
+import com.google.common.collect.ImmutableList
 import javax.inject.Inject
 import org.apache.james.core.MailAddress
 import org.apache.james.rate.limiter.api.{AcceptableRate, RateExceeded, RateLimiter, RateLimiterFactory, RateLimitingKey, RateLimitingResult}
-import org.apache.mailet.Mail
 import org.apache.mailet.base.GenericMailet
+import org.apache.mailet.{Mail, ProcessingState}
 import org.reactivestreams.Publisher
 import reactor.core.scala.publisher.{SFlux, SMono}
 
@@ -151,4 +153,6 @@ class PerSenderRateLimit @Inject()(rateLimiterFactory: RateLimiterFactory) exten
       .map(rateLimiterFactory.withSpecification(_, precision)),
       keyPrefix = keyPrefix,
       entityType = entityType)
+
+  override def requiredProcessingState(): util.Collection[ProcessingState] = ImmutableList.of(new ProcessingState(exceededProcessor))
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org