You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2021/05/24 09:17:20 UTC

[james-project] 04/04: JAMES-3588 Allow per-recipient execution of the MailetContainer over LMTP

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit d34036791c917a3d3382658b868c129c572e2bd9
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Thu May 20 14:46:38 2021 +0700

    JAMES-3588 Allow per-recipient execution of the MailetContainer over LMTP
---
 .../servers/pages/distributed/configure/smtp.adoc  | 26 +++++-
 .../test/java/org/apache/james/ErrorMailet.java    | 30 +++++++
 .../java/org/apache/james/LmtpIntegrationTest.java | 91 +++++++++++++++++++++
 .../memory-guice/src/test/resources/lmtpserver.xml |  7 +-
 .../src/test/resources/mailetcontainer.xml         |  3 +
 .../james/lmtpserver/MailetContainerHandler.java   | 95 +++++++++++++++++++---
 src/site/xdoc/server/config-smtp-lmtp.xml          | 28 +++++++
 7 files changed, 266 insertions(+), 14 deletions(-)

diff --git a/docs/modules/servers/pages/distributed/configure/smtp.adoc b/docs/modules/servers/pages/distributed/configure/smtp.adoc
index ae9f275..37303c8 100644
--- a/docs/modules/servers/pages/distributed/configure/smtp.adoc
+++ b/docs/modules/servers/pages/distributed/configure/smtp.adoc
@@ -181,7 +181,7 @@ to the default SMTP protocol. Here is how to achieve this:
 <lmtpservers>
     <lmtpserver enabled="true">
         <jmxName>lmtpserver</jmxName>
-        <bind>0.0.0.0:0</bind>
+        <bind>0.0.0.0:24</bind>
         <connectionBacklog>200</connectionBacklog>
         <connectiontimeout>1200</connectiontimeout>
         <connectionLimit>0</connectionLimit>
@@ -192,4 +192,28 @@ to the default SMTP protocol. Here is how to achieve this:
         </handlerchain>
     </lmtpserver>
 </lmtpservers>
+....
+
+Note that by default the mailet container is executed with all recipients at once and do not allow per recipient
+error reporting. An option <code>splitExecution</code> allow to execute the mailet container for each recipient separately and mitigate this
+limitation at the cost of performance.
+
+....
+<lmtpservers>
+    <lmtpserver enabled="true">
+        <jmxName>lmtpserver</jmxName>
+        <bind>0.0.0.0:24</bind>
+        <connectionBacklog>200</connectionBacklog>
+        <connectiontimeout>1200</connectiontimeout>
+        <connectionLimit>0</connectionLimit>
+        <connectionLimitPerIP>0</connectionLimitPerIP>
+        <maxmessagesize>0</maxmessagesize>
+        <handlerchain coreHandlersPackage="org.apache.james.lmtpserver.MailetContainerCmdHandlerLoader">
+            <handler class="org.apache.james.lmtpserver.MailetContainerCmdHandlerLoader"/>
+            <handler class="org.apache.james.lmtpserver.MailetContainerHandler">
+                <splitExecution>true</splitExecution>
+            </handler>
+        </handlerchain>
+    </lmtpserver>
+</lmtpservers>
 ....
\ No newline at end of file
diff --git a/server/container/guice/memory-guice/src/test/java/org/apache/james/ErrorMailet.java b/server/container/guice/memory-guice/src/test/java/org/apache/james/ErrorMailet.java
new file mode 100644
index 0000000..61a7f18
--- /dev/null
+++ b/server/container/guice/memory-guice/src/test/java/org/apache/james/ErrorMailet.java
@@ -0,0 +1,30 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james;
+
+import org.apache.mailet.Mail;
+import org.apache.mailet.base.GenericMailet;
+
+public class ErrorMailet extends GenericMailet {
+    @Override
+    public void service(Mail mail) {
+        throw new RuntimeException();
+    }
+}
diff --git a/server/container/guice/memory-guice/src/test/java/org/apache/james/LmtpIntegrationTest.java b/server/container/guice/memory-guice/src/test/java/org/apache/james/LmtpIntegrationTest.java
new file mode 100644
index 0000000..c43f62c
--- /dev/null
+++ b/server/container/guice/memory-guice/src/test/java/org/apache/james/LmtpIntegrationTest.java
@@ -0,0 +1,91 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james;
+
+import static org.apache.james.jmap.JMAPTestingConstants.DOMAIN;
+import static org.apache.james.jmap.JMAPTestingConstants.LOCALHOST_IP;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.james.modules.TestJMAPServerModule;
+import org.apache.james.modules.protocols.LmtpGuiceProbe;
+import org.apache.james.utils.DataProbeImpl;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+public class LmtpIntegrationTest {
+    @RegisterExtension
+    static JamesServerExtension jamesServerExtension = new JamesServerBuilder<>(JamesServerBuilder.defaultConfigurationProvider())
+        .server(configuration -> MemoryJamesServerMain.createServer(configuration)
+            .overrideWith(new TestJMAPServerModule()))
+        .build();
+
+    @BeforeEach
+    void setUp(GuiceJamesServer server) throws Exception {
+        server.getProbe(DataProbeImpl.class).fluent()
+            .addDomain(DOMAIN)
+            .addUser("error@" + DOMAIN, "pass1")
+            .addUser("user@" + DOMAIN, "pass1");
+    }
+
+    @Test
+    void lmtpShouldBeConfigurableToReport(GuiceJamesServer guiceJamesServer) throws Exception {
+        SocketChannel server = SocketChannel.open();
+        server.connect(new InetSocketAddress(LOCALHOST_IP, guiceJamesServer.getProbe(LmtpGuiceProbe.class).getLmtpPort()));
+        readBytes(server);
+
+        server.write(ByteBuffer.wrap(("LHLO <" + DOMAIN + ">\r\n").getBytes(StandardCharsets.UTF_8)));
+        readBytes(server);
+        server.write(ByteBuffer.wrap(("MAIL FROM: <user@" + DOMAIN + ">\r\n").getBytes(StandardCharsets.UTF_8)));
+        readBytes(server);
+        server.write(ByteBuffer.wrap(("RCPT TO: <user@" + DOMAIN + ">\r\n").getBytes(StandardCharsets.UTF_8)));
+        readBytes(server);
+        server.write(ByteBuffer.wrap(("RCPT TO: <error@" + DOMAIN + ">\r\n").getBytes(StandardCharsets.UTF_8)));
+        readBytes(server);
+        server.write(ByteBuffer.wrap(("DATA\r\n").getBytes(StandardCharsets.UTF_8)));
+        readBytes(server); // needed to synchronize
+        server.write(ByteBuffer.wrap(("header:value\r\n\r\nbody").getBytes(StandardCharsets.UTF_8)));
+        server.write(ByteBuffer.wrap(("\r\n").getBytes(StandardCharsets.UTF_8)));
+        server.write(ByteBuffer.wrap((".").getBytes(StandardCharsets.UTF_8)));
+        server.write(ByteBuffer.wrap(("\r\n").getBytes(StandardCharsets.UTF_8)));
+        byte[] dataResponse = readBytes(server);
+        server.write(ByteBuffer.wrap(("QUIT\r\n").getBytes(StandardCharsets.UTF_8)));
+
+        assertThat(new String(dataResponse, StandardCharsets.UTF_8))
+            .contains("250 2.6.0 Message received <us...@domain.tld>\r\n" +
+                "451 4.0.0 Temporary error deliver message <er...@domain.tld>");
+    }
+
+
+    private byte[] readBytes(SocketChannel channel) throws IOException {
+        ByteBuffer line = ByteBuffer.allocate(1024);
+        channel.read(line);
+        line.rewind();
+        byte[] bline = new byte[line.remaining()];
+        line.get(bline);
+        return bline;
+    }
+}
diff --git a/server/container/guice/memory-guice/src/test/resources/lmtpserver.xml b/server/container/guice/memory-guice/src/test/resources/lmtpserver.xml
index 7739071..2f7634f 100644
--- a/server/container/guice/memory-guice/src/test/resources/lmtpserver.xml
+++ b/server/container/guice/memory-guice/src/test/resources/lmtpserver.xml
@@ -33,8 +33,11 @@
         <!--  This sets the maximum allowed message size (in kilobytes) for this -->
         <!--  LMTP service. If unspecified, the value defaults to 0, which means no limit. -->
         <maxmessagesize>0</maxmessagesize>
-        <handlerchain>
-            <handler class="org.apache.james.lmtpserver.CoreCmdHandlerLoader"/>
+        <handlerchain coreHandlersPackage="org.apache.james.lmtpserver.MailetContainerCmdHandlerLoader">
+            <handler class="org.apache.james.lmtpserver.MailetContainerCmdHandlerLoader"/>
+            <handler class="org.apache.james.lmtpserver.MailetContainerHandler">
+                <splitExecution>true</splitExecution>
+            </handler>
         </handlerchain>
     </lmtpserver>
 
diff --git a/server/container/guice/memory-guice/src/test/resources/mailetcontainer.xml b/server/container/guice/memory-guice/src/test/resources/mailetcontainer.xml
index def56c6..7618748 100644
--- a/server/container/guice/memory-guice/src/test/resources/mailetcontainer.xml
+++ b/server/container/guice/memory-guice/src/test/resources/mailetcontainer.xml
@@ -58,6 +58,9 @@
             <mailet match="All" class="RemoveMimeHeader">
                 <name>bcc</name>
             </mailet>
+            <mailet match="RecipientIs=error@domain.tld" class="org.apache.james.ErrorMailet">
+                <onMailetException>propagate</onMailetException>
+            </mailet>
             <mailet match="All" class="RecipientRewriteTable">
                 <errorProcessor>rrt-error</errorProcessor>
             </mailet>
diff --git a/server/protocols/protocols-lmtp/src/main/java/org/apache/james/lmtpserver/MailetContainerHandler.java b/server/protocols/protocols-lmtp/src/main/java/org/apache/james/lmtpserver/MailetContainerHandler.java
index fb1d589..28f2c9d 100644
--- a/server/protocols/protocols-lmtp/src/main/java/org/apache/james/lmtpserver/MailetContainerHandler.java
+++ b/server/protocols/protocols-lmtp/src/main/java/org/apache/james/lmtpserver/MailetContainerHandler.java
@@ -20,9 +20,11 @@
 package org.apache.james.lmtpserver;
 
 import java.util.Collection;
+import java.util.Objects;
 
 import javax.inject.Inject;
 
+import org.apache.commons.configuration2.ex.ConfigurationException;
 import org.apache.james.core.MailAddress;
 import org.apache.james.mailetcontainer.api.MailProcessor;
 import org.apache.james.protocols.api.Response;
@@ -41,28 +43,75 @@ import com.github.steveash.guavate.Guavate;
 import com.google.common.collect.ImmutableList;
 
 public class MailetContainerHandler extends DataLineJamesMessageHookHandler {
+    private static class Configuration {
+        static Configuration DEFAULT = new Configuration(false);
+
+        static Configuration parse(org.apache.commons.configuration2.Configuration config) {
+            return new Configuration(config.getBoolean("splitExecution", false));
+        }
+
+        private final boolean splitExecution;
+
+        private Configuration(boolean splitExecution) {
+            this.splitExecution = splitExecution;
+        }
+
+        public boolean splitExecutionEnabled() {
+            return splitExecution;
+        }
+
+        @Override
+        public final boolean equals(Object o) {
+            if (o instanceof Configuration) {
+                Configuration other = (Configuration) o;
+                return Objects.equals(this.splitExecution, other.splitExecution);
+            }
+            return false;
+        }
+
+        @Override
+        public final int hashCode() {
+            return Objects.hash(splitExecution);
+        }
+    }
+    
     private final MailProcessor mailProcessor;
+    private Configuration configuration;
 
     @Inject
     public MailetContainerHandler(MailProcessor mailProcessor) {
         this.mailProcessor = mailProcessor;
+        this.configuration = Configuration.DEFAULT;
+    }
+
+    @Override
+    public void init(org.apache.commons.configuration2.Configuration config) throws ConfigurationException {
+        configuration = Configuration.parse(config);
     }
 
     @Override
     protected Response processExtensions(SMTPSession session, Mail mail) {
         Collection<MailAddress> recipients = ImmutableList.copyOf(mail.getRecipients());
-        try {
-            executeJamesMessageHooks(session, mail);
-
-            if (recipients.size() == 0) {
-                // Return 503 see https://datatracker.ietf.org/doc/html/rfc2033#section-4.2
-                AbstractHookableCmdHandler.calcDefaultSMTPResponse(HookResult.builder()
-                    .hookReturnCode(HookReturnCode.ok())
-                    .smtpReturnCode(SMTPRetCode.MAIL_OK)
-                    .smtpDescription(DSNStatus.getStatus(DSNStatus.SUCCESS, DSNStatus.CONTENT_OTHER) + " Message received")
-                    .build());
-            }
+        executeJamesMessageHooks(session, mail);
+
+        if (recipients.size() == 0) {
+            // Return 503 see https://datatracker.ietf.org/doc/html/rfc2033#section-4.2
+            AbstractHookableCmdHandler.calcDefaultSMTPResponse(HookResult.builder()
+                .hookReturnCode(HookReturnCode.ok())
+                .smtpReturnCode(SMTPRetCode.BAD_SEQUENCE)
+                .smtpDescription(DSNStatus.getStatus(DSNStatus.PERMANENT, DSNStatus.CONTENT_OTHER) + " No recipients")
+                .build());
+        }
+
+        if (configuration.splitExecutionEnabled()) {
+            return executeEachRecipientSeparately(mail, recipients);
+        } else {
+            return executeAllRecipientsAtOnce(mail, recipients);
+        }
+    }
 
+    private LMTPMultiResponse executeAllRecipientsAtOnce(Mail mail, Collection<MailAddress> recipients) {
+        try {
             mailProcessor.service(mail);
 
             return LMTPMultiResponse.of(
@@ -81,4 +130,28 @@ public class MailetContainerHandler extends DataLineJamesMessageHookHandler {
                     .collect(Guavate.toImmutableList()));
         }
     }
+
+    private LMTPMultiResponse executeEachRecipientSeparately(Mail mail, Collection<MailAddress> recipients) {
+        return LMTPMultiResponse.of(
+            recipients.stream()
+                .map(recipient -> executeFor(mail, recipient))
+                .collect(Guavate.toImmutableList()));
+    }
+
+    private SMTPResponse executeFor(Mail mail, MailAddress recipient) {
+        try {
+            Mail newMail = mail.duplicate();
+            newMail.setRecipients(ImmutableList.of(recipient));
+
+            mailProcessor.service(newMail);
+
+            return AbstractHookableCmdHandler.calcDefaultSMTPResponse(HookResult.builder()
+                .hookReturnCode(HookReturnCode.ok())
+                .smtpReturnCode(SMTPRetCode.MAIL_OK)
+                .smtpDescription(DSNStatus.getStatus(DSNStatus.SUCCESS, DSNStatus.CONTENT_OTHER) + " Message received <" + recipient.asString() + ">")
+                .build());
+        } catch (Exception e) {
+            return new SMTPResponse(SMTPRetCode.LOCAL_ERROR, DSNStatus.getStatus(DSNStatus.TRANSIENT, DSNStatus.UNDEFINED_STATUS) + " Temporary error deliver message <" + recipient.asString() + ">");
+        }
+    }
 }
diff --git a/src/site/xdoc/server/config-smtp-lmtp.xml b/src/site/xdoc/server/config-smtp-lmtp.xml
index 42b419b..ca8d806 100644
--- a/src/site/xdoc/server/config-smtp-lmtp.xml
+++ b/src/site/xdoc/server/config-smtp-lmtp.xml
@@ -229,6 +229,34 @@ Correct this.
           </code>
       </pre>
 
+      <p>Note that by default the mailet container is executed with all recipients at once and do not allow per recipient
+      error reporting. An option <code>splitExecution</code> allow to execute the mailet container for each recipient separately and mitigate this
+      limitation at the cost of performance.</p>
+
+      <pre>
+          <code>
+&lt;lmtpservers&gt;
+  &lt;lmtpserver enabled=&quot;true&quot;&gt;
+    &lt;jmxName&gt;lmtpserver&lt;/jmxName&gt;
+    &lt;bind&gt;0.0.0.0:0&lt;/bind&gt;
+    &lt;connectionBacklog&gt;200&lt;/connectionBacklog&gt;
+    &lt;connectiontimeout&gt;1200&lt;/connectiontimeout&gt;
+    &lt;connectionLimit&gt;0&lt;/connectionLimit&gt;
+    &lt;connectionLimitPerIP&gt;0&lt;/connectionLimitPerIP&gt;
+    &lt;maxmessagesize&gt;0&lt;/maxmessagesize&gt;
+    &lt;handlerchain coreHandlersPackage=&quot;org.apache.james.lmtpserver.MailetContainerCmdHandlerLoader&quot;&gt;
+      &lt;handler class=&quot;org.apache.james.lmtpserver.MailetContainerCmdHandlerLoader&quot;&gt;
+        &lt;splitExecution&gt;false&lt;/splitExecution&gt;
+      &lt;/handler&gt;
+      &lt;handler class=&quot;org.apache.james.lmtpserver.MailetContainerHandler&quot;&gt;
+        &lt;splitExecution>true&lt;/splitExecution&gt;
+      &lt;/handler&gt;
+    &lt;/handlerchain&gt;
+  &lt;/lmtpserver&gt;
+&lt;/lmtpservers&gt;
+          </code>
+      </pre>
+
   </section>
     
 </body>

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org