You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2022/08/23 01:12:40 UTC
[james-project] branch master updated: JAMES-3804 Improve error handling when mailetContainer misses a proce… (#1128)
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new a7298852f3 JAMES-3804 Improve error handling when mailetContainer misses a proce… (#1128)
a7298852f3 is described below
commit a7298852f3bba053a8b15360f823d9fa95619c4a
Author: Benoit TELLIER <bt...@linagora.com>
AuthorDate: Tue Aug 23 08:12:36 2022 +0700
JAMES-3804 Improve error handling when mailetContainer misses a proce… (#1128)
Today, if I specify a mailet pointing to a processor that do not exist, James will start, and the failure will be managed at runtime: upon processing, each mail needed to be processed by the unspecified processor will instead go to the error processor.
Recovering data from such a failure is quite a journey in itself.
Example of such configuration:
```
<mailet matcher="All" class="ToProcessor">
<processor>notFound</processor>
</mailet>
```
Instead of handling such failures at runtime, we could instead abort James startup.
Given we add a new method on the mailet interface, for a mailet to give the list of processor it needs, we can trivially implement such a check in the mailet container.
---
.../src/main/java/org/apache/mailet/Mailet.java | 13 ++++
.../java/org/apache/mailet/ProcessingState.java | 56 ++++++++++++++++
.../james/transport/mailets/ToProcessor.java | 19 ++++--
.../apache/james/mailets/TemporaryJamesServer.java | 6 +-
.../org/apache/james/mailets/MailetErrorsTest.java | 4 ++
.../apache/james/mailets/MissingProcessorTest.java | 74 ++++++++++++++++++++++
.../james/mailets/flow/ExecutionFlowTest.java | 40 +-----------
.../lib/AbstractStateCompositeProcessor.java | 29 +++++++++
.../transport/mailets/RecipientRewriteTable.java | 17 +++--
.../mailets/RecipientRewriteTableProcessor.java | 9 +--
.../james/transport/mailets/RemoteDelivery.java | 10 +++
.../apache/james/transport/mailets/Requeue.java | 15 ++++-
.../transport/mailets/remote/delivery/Bouncer.java | 22 +++----
.../mailets/remote/delivery/DeliveryRunnable.java | 2 +-
.../mailets/remote/delivery/MailDelivrer.java | 2 +-
.../delivery/RemoteDeliveryConfiguration.java | 15 +++--
.../delivery/RemoteDeliveryConfigurationTest.java | 5 +-
.../james/transport/mailets/GlobalRateLimit.scala | 7 +-
.../transport/mailets/PerRecipientRateLimit.scala | 6 +-
.../transport/mailets/PerSenderRateLimit.scala | 6 +-
20 files changed, 272 insertions(+), 85 deletions(-)
diff --git a/mailet/api/src/main/java/org/apache/mailet/Mailet.java b/mailet/api/src/main/java/org/apache/mailet/Mailet.java
index 4fbae2c5c0..8bc1ee802e 100644
--- a/mailet/api/src/main/java/org/apache/mailet/Mailet.java
+++ b/mailet/api/src/main/java/org/apache/mailet/Mailet.java
@@ -20,8 +20,12 @@
package org.apache.mailet;
+import java.util.Collection;
+
import javax.mail.MessagingException;
+import com.google.common.collect.ImmutableList;
+
/**
* A Mailet processes mail messages.
* <p>
@@ -128,4 +132,13 @@ public interface Mailet {
*/
String getMailetInfo();
+ /**
+ * @return the list of processors that needs to be present according to this mailet configuration.
+ *
+ * Needs to be called after {@link Mailet::init()}
+ */
+ default Collection<ProcessingState> requiredProcessingState() {
+ return ImmutableList.of();
+ }
+
}
diff --git a/mailet/api/src/main/java/org/apache/mailet/ProcessingState.java b/mailet/api/src/main/java/org/apache/mailet/ProcessingState.java
new file mode 100644
index 0000000000..14ed8baa52
--- /dev/null
+++ b/mailet/api/src/main/java/org/apache/mailet/ProcessingState.java
@@ -0,0 +1,56 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.mailet;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+
+public class ProcessingState {
+ private final String value;
+
+ public ProcessingState(String value) {
+ Preconditions.checkNotNull(value);
+ this.value = value;
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ @Override
+ public final boolean equals(Object o) {
+ if (o instanceof ProcessingState) {
+ ProcessingState that = (ProcessingState) o;
+
+ return Objects.equal(this.value, that.value);
+ }
+ return false;
+ }
+
+ @Override
+ public final int hashCode() {
+ return Objects.hashCode(value);
+ }
+
+ @Override
+ public String toString() {
+ return value;
+ }
+}
diff --git a/mailet/standard/src/main/java/org/apache/james/transport/mailets/ToProcessor.java b/mailet/standard/src/main/java/org/apache/james/transport/mailets/ToProcessor.java
index 8e2fad5ac8..700d83b56c 100644
--- a/mailet/standard/src/main/java/org/apache/james/transport/mailets/ToProcessor.java
+++ b/mailet/standard/src/main/java/org/apache/james/transport/mailets/ToProcessor.java
@@ -21,16 +21,20 @@
package org.apache.james.transport.mailets;
+import java.util.Collection;
import java.util.Optional;
import javax.mail.MessagingException;
import org.apache.mailet.Mail;
import org.apache.mailet.MailetException;
+import org.apache.mailet.ProcessingState;
import org.apache.mailet.base.GenericMailet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.ImmutableList;
+
/**
* <p>This mailet redirects the mail to the named processor</p>
*
@@ -48,16 +52,14 @@ public class ToProcessor extends GenericMailet {
private static final Logger LOGGER = LoggerFactory.getLogger(ToProcessor.class);
private boolean debug;
- private String processor;
+ private ProcessingState processor;
private Optional<String> noticeText;
@Override
public void init() throws MailetException {
debug = isDebug();
- processor = getInitParameter("processor");
- if (processor == null) {
- throw new MailetException("processor parameter is required");
- }
+ processor = new ProcessingState(Optional.ofNullable(getInitParameter("processor"))
+ .orElseThrow(() -> new MailetException("processor parameter is required")));
noticeText = Optional.ofNullable(getInitParameter("notice"));
}
@@ -75,7 +77,7 @@ public class ToProcessor extends GenericMailet {
if (debug) {
LOGGER.debug("Sending mail {} to {}", mail, processor);
}
- mail.setState(processor);
+ mail.setState(processor.getValue());
if (noticeText.isPresent()) {
setNoticeInErrorMessage(mail);
}
@@ -88,4 +90,9 @@ public class ToProcessor extends GenericMailet {
mail.setErrorMessage(String.format("%s\r\n%s", mail.getErrorMessage(), noticeText.get()));
}
}
+
+ @Override
+ public Collection<ProcessingState> requiredProcessingState() {
+ return ImmutableList.of(processor);
+ }
}
diff --git a/server/mailet/integration-testing/src/main/java/org/apache/james/mailets/TemporaryJamesServer.java b/server/mailet/integration-testing/src/main/java/org/apache/james/mailets/TemporaryJamesServer.java
index e3bbb4b0ec..aee4e6914d 100644
--- a/server/mailet/integration-testing/src/main/java/org/apache/james/mailets/TemporaryJamesServer.java
+++ b/server/mailet/integration-testing/src/main/java/org/apache/james/mailets/TemporaryJamesServer.java
@@ -55,14 +55,16 @@ public class TemporaryJamesServer {
return MailetContainer.builder()
.putProcessor(CommonProcessors.root())
.putProcessor(CommonProcessors.error())
- .putProcessor(CommonProcessors.transport());
+ .putProcessor(CommonProcessors.transport())
+ .putProcessor(CommonProcessors.bounces());
}
public static MailetContainer.Builder simpleMailetContainerConfiguration() {
return MailetContainer.builder()
.putProcessor(CommonProcessors.simpleRoot())
.putProcessor(CommonProcessors.error())
- .putProcessor(CommonProcessors.transport());
+ .putProcessor(CommonProcessors.transport())
+ .putProcessor(CommonProcessors.bounces());
}
diff --git a/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/MailetErrorsTest.java b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/MailetErrorsTest.java
index 5342e20ecd..d9a8be365e 100644
--- a/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/MailetErrorsTest.java
+++ b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/MailetErrorsTest.java
@@ -136,6 +136,7 @@ class MailetErrorsTest {
.withMailetContainer(MailetContainer.builder()
.putProcessor(CommonProcessors.transport())
.putProcessor(errorProcessor())
+ .putProcessor(CommonProcessors.bounces())
.putProcessor(ProcessorConfiguration.root()
.addMailet(MailetConfiguration.builder()
.matcher(All.class)
@@ -173,6 +174,7 @@ class MailetErrorsTest {
.addProperty("onMailetException", "propagate"))
.addMailetsFrom(CommonProcessors.transport()))
.putProcessor(errorProcessor())
+ .putProcessor(CommonProcessors.bounces())
.putProcessor(CommonProcessors.root()))
.withSmtpConfiguration(SmtpConfiguration.builder()
.withAutorizedAddresses("0.0.0.0/0.0.0.0"))
@@ -211,6 +213,7 @@ class MailetErrorsTest {
.addProperty("onMatchException", "propagate"))
.addMailetsFrom(CommonProcessors.transport()))
.putProcessor(errorProcessor())
+ .putProcessor(CommonProcessors.bounces())
.putProcessor(CommonProcessors.root()))
.withSmtpConfiguration(SmtpConfiguration.builder()
.withAutorizedAddresses("0.0.0.0/0.0.0.0"))
@@ -267,6 +270,7 @@ class MailetErrorsTest {
.addProperty("onMailetException", "propagate"))
.addMailetsFrom(CommonProcessors.transport()))
.putProcessor(errorProcessor())
+ .putProcessor(CommonProcessors.bounces())
.putProcessor(CommonProcessors.root()))
.withSmtpConfiguration(SmtpConfiguration.builder()
.withAutorizedAddresses("0.0.0.0/0.0.0.0"))
diff --git a/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/MissingProcessorTest.java b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/MissingProcessorTest.java
new file mode 100644
index 0000000000..92894a8e99
--- /dev/null
+++ b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/MissingProcessorTest.java
@@ -0,0 +1,74 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailets;
+
+import static org.apache.james.MemoryJamesServerMain.SMTP_ONLY_MODULE;
+import static org.apache.james.mailets.configuration.Constants.DEFAULT_DOMAIN;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.io.File;
+
+import org.apache.commons.configuration2.ex.ConfigurationException;
+import org.apache.james.mailets.configuration.CommonProcessors;
+import org.apache.james.mailets.configuration.MailetConfiguration;
+import org.apache.james.mailets.configuration.MailetContainer;
+import org.apache.james.mailets.configuration.ProcessorConfiguration;
+import org.apache.james.transport.mailets.ToProcessor;
+import org.apache.james.transport.matchers.All;
+import org.apache.james.utils.SMTPMessageSender;
+import org.apache.james.utils.TestIMAPClient;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+
+class MissingProcessorTest {
+ @RegisterExtension
+ public SMTPMessageSender smtpMessageSender = new SMTPMessageSender(DEFAULT_DOMAIN);
+ @RegisterExtension
+ public TestIMAPClient testIMAPClient = new TestIMAPClient();
+
+ private TemporaryJamesServer jamesServer;
+
+ @AfterEach
+ void tearDown() {
+ if (jamesServer != null) {
+ jamesServer.shutdown();
+ }
+ }
+
+ @Test
+ void shouldFailOnMissingProcessor(@TempDir File temporaryFolder) throws Exception {
+ jamesServer = TemporaryJamesServer.builder()
+ .withBase(SMTP_ONLY_MODULE)
+ .withMailetContainer(MailetContainer.builder()
+ .putProcessor(CommonProcessors.deliverOnlyTransport())
+ .putProcessor(CommonProcessors.error())
+ .putProcessor(ProcessorConfiguration.root()
+ .addMailet(MailetConfiguration.builder()
+ .matcher(All.class)
+ .mailet(ToProcessor.class)
+ .addProperty("processor", "missing"))))
+ .build(temporaryFolder);
+
+ assertThatThrownBy(() -> jamesServer.start())
+ .isInstanceOf(ConfigurationException.class);
+ }
+}
diff --git a/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/ExecutionFlowTest.java b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/ExecutionFlowTest.java
index 55b2fbe050..b694e14787 100644
--- a/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/ExecutionFlowTest.java
+++ b/server/mailet/integration-testing/src/test/java/org/apache/james/mailets/flow/ExecutionFlowTest.java
@@ -100,6 +100,7 @@ public class ExecutionFlowTest {
.build())
.addMailet(MailetConfiguration.LOCAL_DELIVERY))
.putProcessor(CommonProcessors.error())
+ .putProcessor(CommonProcessors.bounces())
.putProcessor(CommonProcessors.root()))
.build(temporaryFolder);
jamesServer.start();
@@ -870,45 +871,6 @@ public class ExecutionFlowTest {
.containsOnly(new MailAddress(RECIPIENT));
}
- @Test
- public void toProcessorShouldSendToErrorWhenNotFound(@TempDir File temporaryFolder) throws Exception {
- jamesServer = TemporaryJamesServer.builder()
- .withBase(SMTP_AND_IMAP_MODULE)
- .withMailetContainer(MailetContainer.builder()
- .putProcessor(ProcessorConfiguration.transport()
- .addMailet(MailetConfiguration.BCC_STRIPPER)
- .addMailet(MailetConfiguration.builder()
- .matcher(RecipientIs.class)
- .matcherCondition(RECIPIENT)
- .mailet(ToProcessor.class)
- .addProperty("processor", "custom")
- .build())
- .addMailet(MailetConfiguration.LOCAL_DELIVERY))
- .putProcessor(ProcessorConfiguration.error()
- .addMailet(MailetConfiguration.builder()
- .matcher(All.class)
- .mailet(CountingExecutionMailet.class)))
- .putProcessor(CommonProcessors.root()))
- .build(temporaryFolder);
- jamesServer.start();
- jamesServer.getProbe(DataProbeImpl.class)
- .fluent()
- .addDomain(DEFAULT_DOMAIN)
- .addUser(FROM, PASSWORD)
- .addUser(RECIPIENT, PASSWORD);
-
- smtpMessageSender.connect(LOCALHOST_IP, jamesServer.getProbe(SmtpGuiceProbe.class).getSmtpPort())
- .authenticate(FROM, PASSWORD)
- .sendMessage(FROM, ImmutableList.of(FROM, RECIPIENT));
-
- Thread.sleep(100); // queue delays might cause the processing not to start straight at the end of the SMTP session
- awaitAtMostOneMinute.untilAsserted(() -> assertThat(
- jamesServer.getProbe(SpoolerProbe.class).processingFinished())
- .isTrue());
- assertThat(CountingExecutionMailet.executionCount())
- .isEqualTo(1);
- }
-
@Test
public void splitShouldNotDisposeContent(@TempDir File temporaryFolder) throws Exception {
jamesServer = TemporaryJamesServer.builder()
diff --git a/server/mailet/mailetcontainer-impl/src/main/java/org/apache/james/mailetcontainer/lib/AbstractStateCompositeProcessor.java b/server/mailet/mailetcontainer-impl/src/main/java/org/apache/james/mailetcontainer/lib/AbstractStateCompositeProcessor.java
index 17015466a2..a925da87bf 100644
--- a/server/mailet/mailetcontainer-impl/src/main/java/org/apache/james/mailetcontainer/lib/AbstractStateCompositeProcessor.java
+++ b/server/mailet/mailetcontainer-impl/src/main/java/org/apache/james/mailetcontainer/lib/AbstractStateCompositeProcessor.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.stream.Stream;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
@@ -36,12 +37,18 @@ import org.apache.james.lifecycle.api.Configurable;
import org.apache.james.lifecycle.api.LifecycleUtil;
import org.apache.james.mailetcontainer.api.MailProcessor;
import org.apache.james.mailetcontainer.impl.CompositeProcessorImpl;
+import org.apache.james.mailetcontainer.impl.MailetProcessorImpl;
+import org.apache.james.mailetcontainer.impl.MatcherMailetPair;
import org.apache.james.mailetcontainer.impl.ProcessorImpl;
import org.apache.james.mailetcontainer.impl.jmx.JMXStateCompositeProcessorListener;
import org.apache.mailet.Mail;
+import org.apache.mailet.Mailet;
+import org.apache.mailet.ProcessingState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.ImmutableList;
+
/**
* Abstract base class for {@link CompositeProcessorImpl} which service the
* {@link Mail} with a {@link ProcessorImpl} instances
@@ -126,6 +133,28 @@ public abstract class AbstractStateCompositeProcessor implements MailProcessor,
if (!processors.containsKey(Mail.DEFAULT)) {
throw new ConfigurationException("You need to configure a Processor with name " + Mail.DEFAULT);
}
+ ImmutableList<ProcessingState> missingProcessors = processors.values()
+ .stream()
+ .filter(MailetProcessorImpl.class::isInstance)
+ .map(MailetProcessorImpl.class::cast)
+ .flatMap(processor -> processor.getPairs().stream().map(MatcherMailetPair::getMailet))
+ .flatMap(this::requiredProcessorStates)
+ .filter(state -> !state.equals(new ProcessingState("propagate")) && !state.equals(new ProcessingState("ignore")))
+ .filter(state -> !processors.containsKey(state.getValue()))
+ .collect(ImmutableList.toImmutableList());
+
+ if (!missingProcessors.isEmpty()) {
+ throw new ConfigurationException("Your configurations specifies the following undefined processors: " + missingProcessors);
+ }
+ }
+
+ private Stream<ProcessingState> requiredProcessorStates(Mailet mailet) {
+ return Stream.concat(mailet.requiredProcessingState().stream(),
+ Stream.of(
+ Optional.ofNullable(mailet.getMailetConfig().getInitParameter("onMailetException")),
+ Optional.ofNullable(mailet.getMailetConfig().getInitParameter("onMatcherException")))
+ .flatMap(Optional::stream)
+ .map(ProcessingState::new));
}
@PostConstruct
diff --git a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RecipientRewriteTable.java b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RecipientRewriteTable.java
index c43e963cdc..155f2f395e 100644
--- a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RecipientRewriteTable.java
+++ b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RecipientRewriteTable.java
@@ -19,15 +19,19 @@
package org.apache.james.transport.mailets;
+import java.util.Collection;
+
import javax.inject.Inject;
import javax.mail.MessagingException;
import javax.mail.internet.MimeMessage;
import org.apache.james.domainlist.api.DomainList;
import org.apache.mailet.Mail;
+import org.apache.mailet.ProcessingState;
import org.apache.mailet.base.GenericMailet;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
/**
* Mailet which should get used when using RecipientRewriteTable-Store to
@@ -51,13 +55,8 @@ public class RecipientRewriteTable extends GenericMailet {
private final org.apache.james.rrt.api.RecipientRewriteTable virtualTableStore;
private final DomainList domainList;
private RecipientRewriteTableProcessor processor;
+ private ProcessingState errorProcessor;
- /**
- * Sets the virtual table store.
- *
- * @param vut
- * the vutStore to set, possibly null
- */
@Inject
public RecipientRewriteTable(org.apache.james.rrt.api.RecipientRewriteTable virtualTableStore, DomainList domainList) {
this.virtualTableStore = virtualTableStore;
@@ -66,7 +65,7 @@ public class RecipientRewriteTable extends GenericMailet {
@Override
public void init() throws MessagingException {
- String errorProcessor = getInitParameter(ERROR_PROCESSOR, Mail.ERROR);
+ errorProcessor = new ProcessingState(getInitParameter(ERROR_PROCESSOR, Mail.ERROR));
processor = new RecipientRewriteTableProcessor(virtualTableStore, domainList, getMailetContext(), errorProcessor);
}
@@ -95,4 +94,8 @@ public class RecipientRewriteTable extends GenericMailet {
return "RecipientRewriteTable Mailet";
}
+ @Override
+ public Collection<ProcessingState> requiredProcessingState() {
+ return ImmutableList.of(errorProcessor);
+ }
}
diff --git a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RecipientRewriteTableProcessor.java b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RecipientRewriteTableProcessor.java
index df1cdc6e84..3b9637cb9b 100644
--- a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RecipientRewriteTableProcessor.java
+++ b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RecipientRewriteTableProcessor.java
@@ -49,6 +49,7 @@ import org.apache.mailet.DsnParameters;
import org.apache.mailet.DsnParameters.RecipientDsnParameters;
import org.apache.mailet.Mail;
import org.apache.mailet.MailetContext;
+import org.apache.mailet.ProcessingState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -151,9 +152,9 @@ public class RecipientRewriteTableProcessor {
private final RecipientRewriteTable virtualTableStore;
private final MailetContext mailetContext;
private final Supplier<Domain> defaultDomainSupplier;
- private final String errorProcessor;
+ private final ProcessingState errorProcessor;
- public RecipientRewriteTableProcessor(RecipientRewriteTable virtualTableStore, DomainList domainList, MailetContext mailetContext, String errorProcessor) {
+ public RecipientRewriteTableProcessor(RecipientRewriteTable virtualTableStore, DomainList domainList, MailetContext mailetContext, ProcessingState errorProcessor) {
this.virtualTableStore = virtualTableStore;
this.mailetContext = mailetContext;
this.defaultDomainSupplier = MemoizedSupplier.of(
@@ -162,7 +163,7 @@ public class RecipientRewriteTableProcessor {
}
public RecipientRewriteTableProcessor(RecipientRewriteTable virtualTableStore, DomainList domainList, MailetContext mailetContext) {
- this(virtualTableStore, domainList, mailetContext, Mail.ERROR);
+ this(virtualTableStore, domainList, mailetContext, new ProcessingState(Mail.ERROR));
}
private Domain getDefaultDomain(DomainList domainList) throws MessagingException {
@@ -200,7 +201,7 @@ public class RecipientRewriteTableProcessor {
.sender(mail.getMaybeSender())
.addRecipients(executionResults.recipientWithError)
.mimeMessage(mail.getMessage())
- .state(errorProcessor)
+ .state(errorProcessor.getValue())
.build();
mailetContext.sendMail(newMail);
LifecycleUtil.dispose(newMail);
diff --git a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java
index b89eaf30ad..83984c892f 100644
--- a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java
+++ b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java
@@ -22,6 +22,7 @@ package org.apache.james.transport.mailets;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
+import java.util.stream.Stream;
import javax.inject.Inject;
import javax.mail.MessagingException;
@@ -39,11 +40,13 @@ import org.apache.james.transport.mailets.remote.delivery.Bouncer;
import org.apache.james.transport.mailets.remote.delivery.DeliveryRunnable;
import org.apache.james.transport.mailets.remote.delivery.RemoteDeliveryConfiguration;
import org.apache.mailet.Mail;
+import org.apache.mailet.ProcessingState;
import org.apache.mailet.base.GenericMailet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
/**
* <p>The RemoteDelivery mailet delivers messages to a remote SMTP server able to deliver or forward messages to their final
@@ -257,4 +260,11 @@ public class RemoteDelivery extends GenericMailet {
}
}
+ @Override
+ public Collection<ProcessingState> requiredProcessingState() {
+ return Stream.of(configuration.getBounceProcessor().stream(),
+ configuration.getOnSuccess().stream())
+ .flatMap(x -> x)
+ .collect(ImmutableList.toImmutableList());
+ }
}
diff --git a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/Requeue.java b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/Requeue.java
index 9693d17c83..088a34c0b4 100644
--- a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/Requeue.java
+++ b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/Requeue.java
@@ -24,6 +24,7 @@ import static org.apache.mailet.Mail.GHOST;
import java.io.IOException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
+import java.util.Collection;
import java.util.Optional;
import javax.inject.Inject;
@@ -35,10 +36,12 @@ import org.apache.james.queue.api.MailQueueFactory;
import org.apache.james.queue.api.MailQueueName;
import org.apache.james.util.DurationParser;
import org.apache.mailet.Mail;
+import org.apache.mailet.ProcessingState;
import org.apache.mailet.base.GenericMailet;
import com.github.fge.lambdas.Throwing;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
/**
* <p><b>Requeue</b> puts back the email in a queue.
@@ -73,7 +76,7 @@ public class Requeue extends GenericMailet {
private MailQueue mailQueue;
private Optional<Duration> delayDuration;
- private String processor;
+ private ProcessingState processor;
private boolean consume;
@Inject
@@ -90,7 +93,8 @@ public class Requeue extends GenericMailet {
delayDuration = Optional.ofNullable(getInitParameter("delay"))
.map(delayValue -> DurationParser.parse(delayValue, ChronoUnit.SECONDS));
processor = Optional.ofNullable(getInitParameter("processor"))
- .orElse(Mail.DEFAULT);
+ .map(ProcessingState::new)
+ .orElse(new ProcessingState(Mail.DEFAULT));
consume = getInitParameter("consume", true);
@@ -122,8 +126,13 @@ public class Requeue extends GenericMailet {
}
}
+ @Override
+ public Collection<ProcessingState> requiredProcessingState() {
+ return ImmutableList.of(processor);
+ }
+
private void enqueue(Mail mail) {
- mail.setState(processor);
+ mail.setState(processor.getValue());
delayDuration.ifPresentOrElse(
Throwing.consumer(delay -> mailQueue.enQueue(mail, delay)),
Throwing.runnable(() -> mailQueue.enQueue(mail)).sneakyThrow());
diff --git a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/Bouncer.java b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/Bouncer.java
index f5e27894de..c073d0a043 100644
--- a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/Bouncer.java
+++ b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/Bouncer.java
@@ -57,17 +57,17 @@ public class Bouncer {
if (!mail.hasSender()) {
LOGGER.debug("Null Sender: no bounce will be generated for {}", mail.getName());
} else {
- if (configuration.getBounceProcessor() != null) {
- computeErrorCode(ex).ifPresent(mail::setAttribute);
- mail.setAttribute(new Attribute(DELIVERY_ERROR, AttributeValue.of(getErrorMsg(ex))));
- try {
- mailetContext.sendMail(mail, configuration.getBounceProcessor());
- } catch (MessagingException e) {
- LOGGER.warn("Exception re-inserting failed mail: ", e);
- }
- } else {
- bounceWithMailetContext(mail, ex);
- }
+ configuration.getBounceProcessor().ifPresentOrElse(
+ bounceProcessor -> {
+ computeErrorCode(ex).ifPresent(mail::setAttribute);
+ mail.setAttribute(new Attribute(DELIVERY_ERROR, AttributeValue.of(getErrorMsg(ex))));
+ try {
+ mailetContext.sendMail(mail, bounceProcessor.getValue());
+ } catch (MessagingException e) {
+ LOGGER.warn("Exception re-inserting failed mail: ", e);
+ }
+ },
+ () -> bounceWithMailetContext(mail, ex));
}
}
diff --git a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/DeliveryRunnable.java b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/DeliveryRunnable.java
index 25d3a2d549..510a12a267 100644
--- a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/DeliveryRunnable.java
+++ b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/DeliveryRunnable.java
@@ -147,7 +147,7 @@ public class DeliveryRunnable implements Disposable {
case SUCCESS:
outgoingMailsMetric.increment();
configuration.getOnSuccess()
- .ifPresent(Throwing.consumer(onSuccess -> mailetContext.sendMail(mail, onSuccess)));
+ .ifPresent(Throwing.consumer(onSuccess -> mailetContext.sendMail(mail, onSuccess.getValue())));
break;
case TEMPORARY_FAILURE:
handleTemporaryFailure(mail, executionResult);
diff --git a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/MailDelivrer.java b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/MailDelivrer.java
index 34dd22ab93..d57c645406 100644
--- a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/MailDelivrer.java
+++ b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/MailDelivrer.java
@@ -148,7 +148,7 @@ public class MailDelivrer {
copy.setRecipients(deliveredAddresses.stream()
.map(Throwing.function(MailAddress::new))
.collect(ImmutableList.toImmutableList()));
- mailetContext.sendMail(copy, onSuccess);
+ mailetContext.sendMail(copy, onSuccess.getValue());
} finally {
LifecycleUtil.dispose(copy);
}
diff --git a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/RemoteDeliveryConfiguration.java b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/RemoteDeliveryConfiguration.java
index 493a15a061..a0cffb83cf 100644
--- a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/RemoteDeliveryConfiguration.java
+++ b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/remote/delivery/RemoteDeliveryConfiguration.java
@@ -29,6 +29,7 @@ import org.apache.commons.lang3.tuple.Pair;
import org.apache.james.domainlist.api.DomainList;
import org.apache.james.queue.api.MailQueueName;
import org.apache.mailet.MailetConfig;
+import org.apache.mailet.ProcessingState;
import org.apache.mailet.base.MailetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -84,12 +85,12 @@ public class RemoteDeliveryConfiguration {
private final HeloNameProvider heloNameProvider;
private final MailQueueName outGoingQueueName;
private final String bindAddress;
- private final String bounceProcessor;
+ private final Optional<ProcessingState> bounceProcessor;
private final Collection<String> gatewayServer;
private final String authUser;
private final String authPass;
private final Properties javaxAdditionalProperties;
- private final Optional<String> onSuccess;
+ private final Optional<ProcessingState> onSuccess;
public RemoteDeliveryConfiguration(MailetConfig mailetConfig, DomainList domainList) {
isDebug = MailetUtil.getInitParameter(mailetConfig, DEBUG).orElse(false);
@@ -100,7 +101,8 @@ public class RemoteDeliveryConfiguration {
outGoingQueueName = Optional.ofNullable(mailetConfig.getInitParameter(OUTGOING))
.map(MailQueueName::of)
.orElse(DEFAULT_OUTGOING_QUEUE_NAME);
- bounceProcessor = mailetConfig.getInitParameter(BOUNCE_PROCESSOR);
+ bounceProcessor = Optional.ofNullable(mailetConfig.getInitParameter(BOUNCE_PROCESSOR))
+ .map(ProcessingState::new);
bindAddress = mailetConfig.getInitParameter(BIND);
DelaysAndMaxRetry delaysAndMaxRetry = computeDelaysAndMaxRetry(mailetConfig);
@@ -123,7 +125,8 @@ public class RemoteDeliveryConfiguration {
}
isBindUsed = bindAddress != null;
javaxAdditionalProperties = computeJavaxProperties(mailetConfig);
- onSuccess = Optional.ofNullable(mailetConfig.getInitParameter(ON_SUCCESS));
+ onSuccess = Optional.ofNullable(mailetConfig.getInitParameter(ON_SUCCESS))
+ .map(ProcessingState::new);
}
private Properties computeJavaxProperties(MailetConfig mailetConfig) {
@@ -286,7 +289,7 @@ public class RemoteDeliveryConfiguration {
return isBindUsed;
}
- public String getBounceProcessor() {
+ public Optional<ProcessingState> getBounceProcessor() {
return bounceProcessor;
}
@@ -322,7 +325,7 @@ public class RemoteDeliveryConfiguration {
return bindAddress;
}
- public Optional<String> getOnSuccess() {
+ public Optional<ProcessingState> getOnSuccess() {
return onSuccess;
}
}
diff --git a/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/remote/delivery/RemoteDeliveryConfigurationTest.java b/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/remote/delivery/RemoteDeliveryConfigurationTest.java
index e888faf002..32a03d30eb 100644
--- a/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/remote/delivery/RemoteDeliveryConfigurationTest.java
+++ b/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/remote/delivery/RemoteDeliveryConfigurationTest.java
@@ -29,6 +29,7 @@ import java.util.Properties;
import org.apache.james.core.Domain;
import org.apache.james.domainlist.api.DomainList;
+import org.apache.mailet.ProcessingState;
import org.apache.mailet.base.test.FakeMailetConfig;
import org.assertj.core.data.MapEntry;
import org.junit.jupiter.api.Test;
@@ -231,7 +232,7 @@ class RemoteDeliveryConfigurationTest {
.build();
assertThat(new RemoteDeliveryConfiguration(mailetConfig, mock(DomainList.class)).getBounceProcessor())
- .isNull();
+ .isEmpty();
}
@Test
@@ -242,7 +243,7 @@ class RemoteDeliveryConfigurationTest {
.build();
assertThat(new RemoteDeliveryConfiguration(mailetConfig, mock(DomainList.class)).getBounceProcessor())
- .isEqualTo(value);
+ .contains(new ProcessingState(value));
}
@Test
diff --git a/server/mailet/rate-limiter/src/main/scala/org/apache/james/transport/mailets/GlobalRateLimit.scala b/server/mailet/rate-limiter/src/main/scala/org/apache/james/transport/mailets/GlobalRateLimit.scala
index e06ff724b9..a0f119111e 100644
--- a/server/mailet/rate-limiter/src/main/scala/org/apache/james/transport/mailets/GlobalRateLimit.scala
+++ b/server/mailet/rate-limiter/src/main/scala/org/apache/james/transport/mailets/GlobalRateLimit.scala
@@ -20,11 +20,13 @@
package org.apache.james.transport.mailets
import java.time.Duration
+import java.util
+import com.google.common.collect.ImmutableList
import javax.inject.Inject
import org.apache.james.rate.limiter.api.{AcceptableRate, RateExceeded, RateLimiter, RateLimiterFactory, RateLimitingKey, RateLimitingResult}
-import org.apache.mailet.Mail
import org.apache.mailet.base.GenericMailet
+import org.apache.mailet.{Mail, ProcessingState}
import org.reactivestreams.Publisher
import reactor.core.scala.publisher.{SFlux, SMono}
@@ -144,4 +146,7 @@ class GlobalRateLimit @Inject()(rateLimiterFactory: RateLimiterFactory) extends
.map(rateLimiterFactory.withSpecification(_, precision)),
keyPrefix = keyPrefix,
entityType = entityType)
+
+
+ override def requiredProcessingState(): util.Collection[ProcessingState] = ImmutableList.of(new ProcessingState(exceededProcessor))
}
diff --git a/server/mailet/rate-limiter/src/main/scala/org/apache/james/transport/mailets/PerRecipientRateLimit.scala b/server/mailet/rate-limiter/src/main/scala/org/apache/james/transport/mailets/PerRecipientRateLimit.scala
index de0c8c8589..dd65a01138 100644
--- a/server/mailet/rate-limiter/src/main/scala/org/apache/james/transport/mailets/PerRecipientRateLimit.scala
+++ b/server/mailet/rate-limiter/src/main/scala/org/apache/james/transport/mailets/PerRecipientRateLimit.scala
@@ -20,6 +20,7 @@
package org.apache.james.transport.mailets
import java.time.Duration
+import java.util
import com.google.common.annotations.VisibleForTesting
import com.google.common.collect.ImmutableList
@@ -28,8 +29,8 @@ import org.apache.james.core.MailAddress
import org.apache.james.lifecycle.api.LifecycleUtil
import org.apache.james.rate.limiter.api.{AcceptableRate, RateExceeded, RateLimiter, RateLimiterFactory, RateLimitingKey, RateLimitingResult}
import org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY
-import org.apache.mailet.Mail
import org.apache.mailet.base.GenericMailet
+import org.apache.mailet.{Mail, ProcessingState}
import org.reactivestreams.Publisher
import reactor.core.scala.publisher.{SFlux, SMono}
@@ -149,4 +150,7 @@ class PerRecipientRateLimit @Inject()(rateLimiterFactory: RateLimiterFactory) ex
.collectSeq()
.block()
+
+ override def requiredProcessingState(): util.Collection[ProcessingState] = ImmutableList.of(new ProcessingState(exceededProcessor))
+
}
diff --git a/server/mailet/rate-limiter/src/main/scala/org/apache/james/transport/mailets/PerSenderRateLimit.scala b/server/mailet/rate-limiter/src/main/scala/org/apache/james/transport/mailets/PerSenderRateLimit.scala
index b981e56bdc..956d063f90 100644
--- a/server/mailet/rate-limiter/src/main/scala/org/apache/james/transport/mailets/PerSenderRateLimit.scala
+++ b/server/mailet/rate-limiter/src/main/scala/org/apache/james/transport/mailets/PerSenderRateLimit.scala
@@ -20,13 +20,15 @@
package org.apache.james.transport.mailets
import java.time.Duration
+import java.util
import com.google.common.annotations.VisibleForTesting
+import com.google.common.collect.ImmutableList
import javax.inject.Inject
import org.apache.james.core.MailAddress
import org.apache.james.rate.limiter.api.{AcceptableRate, RateExceeded, RateLimiter, RateLimiterFactory, RateLimitingKey, RateLimitingResult}
-import org.apache.mailet.Mail
import org.apache.mailet.base.GenericMailet
+import org.apache.mailet.{Mail, ProcessingState}
import org.reactivestreams.Publisher
import reactor.core.scala.publisher.{SFlux, SMono}
@@ -151,4 +153,6 @@ class PerSenderRateLimit @Inject()(rateLimiterFactory: RateLimiterFactory) exten
.map(rateLimiterFactory.withSpecification(_, precision)),
keyPrefix = keyPrefix,
entityType = entityType)
+
+ override def requiredProcessingState(): util.Collection[ProcessingState] = ImmutableList.of(new ProcessingState(exceededProcessor))
}
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org