You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2021/05/24 09:21:09 UTC

[james-project] branch master updated: [PERFORMANCE] AmqpForwardAttribute should reuse connection (#454)

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a2f137  [PERFORMANCE] AmqpForwardAttribute should reuse connection (#454)
0a2f137 is described below

commit 0a2f13744ffdc85d121519e617b50003bbc4c00a
Author: Tellier Benoit <bt...@linagora.com>
AuthorDate: Mon May 24 16:21:01 2021 +0700

    [PERFORMANCE] AmqpForwardAttribute should reuse connection (#454)
    
    Today one connection and one channel is opened per processed
    emails. Connections and channels are long lived objects,
    made to be reused.
    
    Flamme graphs showed that this AMQP mailet represents ~1%
    of CPU occupation with just about 30 mail/sec. Note that
    92% of this time is used opening/closing connections/channels.
    
    Moving connection and channel initialisation at mailet
    initialisation enables reuse and leads to dramatic enhancements
    of the performances of that very mailet. Note that retries are
    in place in case Rabbit is not up right at start - common with
    IE docker-compose.
    
    The testing strategy relying on mocks is brittle. Given that we
    have full-fledged integration tests in place, we can get rid of
    these annoying mock-based tests.
    
     - Move AmqpForwardAttribute to a separated maven module
    
     -  AmqpForwardAttribute should use backend-rabbitmq
    
    It cares for us about channel thread safety upon reuse, connection and channel
    reconfiguration.
---
 .../backends/rabbitmq/RabbitMQConfiguration.java   |   2 +-
 mailet/{standard => amqp}/pom.xml                  |  38 ++-
 .../transport/mailets/AmqpForwardAttribute.java    | 115 ++++++---
 .../mailets/AmqpForwardAttributeTest.java          | 156 +++++++++++
 mailet/pom.xml                                     |   1 +
 mailet/standard/pom.xml                            |   8 +-
 .../mailets/AmqpForwardAttributeTest.java          | 284 ---------------------
 pom.xml                                            |   5 +
 server/container/guice/mailet/pom.xml              |   4 +
 9 files changed, 273 insertions(+), 340 deletions(-)

diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConfiguration.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConfiguration.java
index 458a768..57920f8 100644
--- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConfiguration.java
+++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConfiguration.java
@@ -315,7 +315,7 @@ public class RabbitMQConfiguration {
         private final String user;
         private final char[] password;
 
-        ManagementCredentials(String user, char[] password) {
+        public ManagementCredentials(String user, char[] password) {
             Preconditions.checkNotNull(user);
             Preconditions.checkNotNull(password);
             this.user = user;
diff --git a/mailet/standard/pom.xml b/mailet/amqp/pom.xml
similarity index 79%
copy from mailet/standard/pom.xml
copy to mailet/amqp/pom.xml
index ddd5455..7e1940f 100644
--- a/mailet/standard/pom.xml
+++ b/mailet/amqp/pom.xml
@@ -17,7 +17,9 @@
     specific language governing permissions and limitations
     under the License.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
 
     <parent>
@@ -27,18 +29,21 @@
         <relativePath>../</relativePath>
     </parent>
 
-    <artifactId>apache-mailet-standard</artifactId>
+    <artifactId>apache-mailet-amqp</artifactId>
     <packaging>jar</packaging>
 
-    <name>Apache James :: Standard Mailets</name>
-    <description>Apache James Standard Mailets is a rich collection of general purpose mailets
-        with limited dependencies. These mailets can be used in any mailet container.</description>
-    <url>http://james.apache.org/mailet/standard/</url>
-    <inceptionYear>2008</inceptionYear>
+    <name>Apache James :: AMQP Mailet</name>
+    <description>Provides a mailet forwarding mail attributes over AMQP</description>
+    <inceptionYear>2021</inceptionYear>
+
 
     <dependencies>
         <dependency>
             <groupId>${james.groupId}</groupId>
+            <artifactId>apache-james-backends-rabbitmq</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
             <artifactId>apache-mailet-base</artifactId>
         </dependency>
         <dependency>
@@ -65,10 +70,6 @@
             <artifactId>jackson-databind</artifactId>
         </dependency>
         <dependency>
-            <groupId>com.fasterxml.jackson.datatype</groupId>
-            <artifactId>jackson-datatype-jdk8</artifactId>
-        </dependency>
-        <dependency>
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
         </dependency>
@@ -85,6 +86,10 @@
             <artifactId>commons-io</artifactId>
         </dependency>
         <dependency>
+            <groupId>javax.annotation</groupId>
+            <artifactId>javax.annotation-api</artifactId>
+        </dependency>
+        <dependency>
             <groupId>net.javacrumbs.json-unit</groupId>
             <artifactId>json-unit-assertj</artifactId>
             <scope>test</scope>
@@ -123,14 +128,7 @@
                 <groupId>${james.groupId}</groupId>
                 <artifactId>mailetdocs-maven-plugin</artifactId>
             </plugin>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-surefire-plugin</artifactId>
-                <configuration>
-                    <reuseForks>true</reuseForks>
-                    <forkCount>1C</forkCount>
-                </configuration>
-            </plugin>
         </plugins>
     </build>
-</project>
+
+</project>
\ No newline at end of file
diff --git a/mailet/standard/src/main/java/org/apache/james/transport/mailets/AmqpForwardAttribute.java b/mailet/amqp/src/main/java/org/apache/james/transport/mailets/AmqpForwardAttribute.java
similarity index 57%
rename from mailet/standard/src/main/java/org/apache/james/transport/mailets/AmqpForwardAttribute.java
rename to mailet/amqp/src/main/java/org/apache/james/transport/mailets/AmqpForwardAttribute.java
index 84ed6ff..324cf9e 100644
--- a/mailet/standard/src/main/java/org/apache/james/transport/mailets/AmqpForwardAttribute.java
+++ b/mailet/amqp/src/main/java/org/apache/james/transport/mailets/AmqpForwardAttribute.java
@@ -19,18 +19,26 @@
 
 package org.apache.james.transport.mailets;
 
-import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.nio.charset.StandardCharsets;
+import java.time.Duration;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.concurrent.TimeoutException;
 import java.util.stream.Stream;
 
+import javax.annotation.PreDestroy;
+
+import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
+import org.apache.james.backends.rabbitmq.RabbitMQConnectionFactory;
+import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
+import org.apache.james.backends.rabbitmq.SimpleConnectionPool;
 import org.apache.mailet.Attribute;
 import org.apache.mailet.AttributeName;
 import org.apache.mailet.AttributeValue;
 import org.apache.mailet.Mail;
+import org.apache.mailet.MailetConfig;
 import org.apache.mailet.MailetException;
 import org.apache.mailet.base.GenericMailet;
 import org.slf4j.Logger;
@@ -39,12 +47,15 @@ import org.slf4j.LoggerFactory;
 import com.github.fge.lambdas.Throwing;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
-import com.rabbitmq.client.AMQP;
+import com.google.common.collect.ImmutableSet;
 import com.rabbitmq.client.AlreadyClosedException;
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
 
+import reactor.core.publisher.Flux;
+import reactor.rabbitmq.ExchangeSpecification;
+import reactor.rabbitmq.OutboundMessage;
+import reactor.rabbitmq.Sender;
+
 /**
  * This mailet forwards the attributes values to a AMPQ.
  * <br />
@@ -63,6 +74,19 @@ import com.rabbitmq.client.ConnectionFactory;
  */
 public class AmqpForwardAttribute extends GenericMailet {
     private static final Logger LOGGER = LoggerFactory.getLogger(AmqpForwardAttribute.class);
+    private static final ImmutableSet<SimpleConnectionPool.ReconnectionHandler> RECONNECTION_HANDLERS = ImmutableSet.of();
+    private static final int MAX_THREE_RETRIES = 3;
+    private static final int MIN_DELAY_OF_TEN_MILLISECONDS = 10;
+    private static final int CONNECTION_TIMEOUT_OF_ONE_HUNDRED_MILLISECOND = 100;
+    private static final int CHANNEL_RPC_TIMEOUT_OF_ONE_HUNDRED_MILLISECOND = 100;
+    private static final int HANDSHAKE_TIMEOUT_OF_ONE_HUNDRED_MILLISECOND = 100;
+    private static final int SHUTDOWN_TIMEOUT_OF_ONE_HUNDRED_MILLISECOND = 100;
+    private static final int NETWORK_RECOVERY_INTERVAL_OF_ONE_HUNDRED_MILLISECOND = 100;
+    private static final String DEFAULT_USER = "guest";
+    private static final String DEFAULT_PASSWORD_STRING = "guest";
+    private static final char[] DEFAULT_PASSWORD = DEFAULT_PASSWORD_STRING.toCharArray();
+    private static final RabbitMQConfiguration.ManagementCredentials DEFAULT_MANAGEMENT_CREDENTIAL = new RabbitMQConfiguration.ManagementCredentials(DEFAULT_USER, DEFAULT_PASSWORD);
+
 
     public static final String URI_PARAMETER_NAME = "uri";
     public static final String EXCHANGE_PARAMETER_NAME = "exchange";
@@ -70,26 +94,64 @@ public class AmqpForwardAttribute extends GenericMailet {
     public static final String ATTRIBUTE_PARAMETER_NAME = "attribute";
 
     public static final String ROUTING_KEY_DEFAULT_VALUE = "";
+    public static final int MAX_ATTEMPTS = 8;
+    public static final Duration MIN_BACKOFF = Duration.ofSeconds(1);
 
     private String exchange;
     private AttributeName attribute;
     private ConnectionFactory connectionFactory;
     @VisibleForTesting String routingKey;
+    private SimpleConnectionPool connectionPool;
+    private ReactorRabbitMQChannelPool reactorRabbitMQChannelPool;
+    private Sender sender;
 
     @Override
     public void init() throws MailetException {
-        String uri = getInitParameter(URI_PARAMETER_NAME);
+        MailetConfig mailetConfig = getMailetConfig();
+        String uri = preInit(mailetConfig);
+
+        try {
+            RabbitMQConfiguration rabbitMQConfiguration = RabbitMQConfiguration.builder()
+                .amqpUri(new URI(uri))
+                .managementUri(new URI(uri))
+                .managementCredentials(DEFAULT_MANAGEMENT_CREDENTIAL)
+                .maxRetries(MAX_THREE_RETRIES)
+                .minDelayInMs(MIN_DELAY_OF_TEN_MILLISECONDS)
+                .connectionTimeoutInMs(CONNECTION_TIMEOUT_OF_ONE_HUNDRED_MILLISECOND)
+                .channelRpcTimeoutInMs(CHANNEL_RPC_TIMEOUT_OF_ONE_HUNDRED_MILLISECOND)
+                .handshakeTimeoutInMs(HANDSHAKE_TIMEOUT_OF_ONE_HUNDRED_MILLISECOND)
+                .shutdownTimeoutInMs(SHUTDOWN_TIMEOUT_OF_ONE_HUNDRED_MILLISECOND)
+                .networkRecoveryIntervalInMs(NETWORK_RECOVERY_INTERVAL_OF_ONE_HUNDRED_MILLISECOND)
+                .build();
+            connectionPool = new SimpleConnectionPool(new RabbitMQConnectionFactory(rabbitMQConfiguration),
+                RECONNECTION_HANDLERS, SimpleConnectionPool.Configuration.builder()
+                .retries(2)
+                .initialDelay(Duration.ofMillis(5)));
+            reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool(connectionPool.getResilientConnection(),
+                ReactorRabbitMQChannelPool.Configuration.DEFAULT);
+            reactorRabbitMQChannelPool.start();
+            sender = reactorRabbitMQChannelPool.getSender();
+            sender.declareExchange(ExchangeSpecification.exchange(exchange));
+        } catch (URISyntaxException e) {
+            throw new RuntimeException(e);
+        }
+}
+
+    @VisibleForTesting
+    String preInit(MailetConfig mailetConfig) throws MailetException {
+        String uri = mailetConfig.getInitParameter(URI_PARAMETER_NAME);
         if (Strings.isNullOrEmpty(uri)) {
             throw new MailetException("No value for " + URI_PARAMETER_NAME
                     + " parameter was provided.");
         }
-        exchange = getInitParameter(EXCHANGE_PARAMETER_NAME);
+        exchange = mailetConfig.getInitParameter(EXCHANGE_PARAMETER_NAME);
         if (Strings.isNullOrEmpty(exchange)) {
             throw new MailetException("No value for " + EXCHANGE_PARAMETER_NAME
                     + " parameter was provided.");
         }
-        routingKey = getInitParameter(ROUTING_KEY_PARAMETER_NAME, ROUTING_KEY_DEFAULT_VALUE);
-        String rawAttribute = getInitParameter(ATTRIBUTE_PARAMETER_NAME);
+        routingKey = Optional.ofNullable(mailetConfig.getInitParameter(ROUTING_KEY_PARAMETER_NAME))
+            .orElse(ROUTING_KEY_DEFAULT_VALUE);
+        String rawAttribute = mailetConfig.getInitParameter(ATTRIBUTE_PARAMETER_NAME);
         if (Strings.isNullOrEmpty(rawAttribute)) {
             throw new MailetException("No value for " + ATTRIBUTE_PARAMETER_NAME
                     + " parameter was provided.");
@@ -102,6 +164,14 @@ public class AmqpForwardAttribute extends GenericMailet {
             throw new MailetException("Invalid " + URI_PARAMETER_NAME
                     + " parameter was provided: " + uri, e);
         }
+        return uri;
+    }
+
+    @PreDestroy
+    public void cleanUp() throws Exception {
+        sender.close();
+        reactorRabbitMQChannelPool.close();
+        connectionPool.close();
     }
 
     @VisibleForTesting void setConnectionFactory(ConnectionFactory connectionFactory) {
@@ -138,33 +208,16 @@ public class AmqpForwardAttribute extends GenericMailet {
 
     private void sendContent(Stream<byte[]> content) {
         try {
-            trySendContent(content);
-        } catch (IOException e) {
-            LOGGER.error("IOException while writing to AMQP: {}", e.getMessage(), e);
-        } catch (TimeoutException e) {
-            LOGGER.error("TimeoutException while writing to AMQP: {}", e.getMessage(), e);
+            sender.send(Flux.fromStream(content)
+                .map(bytes -> new OutboundMessage(exchange, routingKey, bytes)))
+                .block();
         } catch (AlreadyClosedException e) {
             LOGGER.error("AlreadyClosedException while writing to AMQP: {}", e.getMessage(), e);
+        } catch (Exception e) {
+            LOGGER.error("IOException while writing to AMQP: {}", e.getMessage(), e);
         }
     }
 
-    private void trySendContent(Stream<byte[]> content) throws IOException, TimeoutException {
-        try (Connection connection = connectionFactory.newConnection();
-             Channel channel = connection.createChannel()) {
-            channel.exchangeDeclarePassive(exchange);
-            sendContentOnChannel(channel, content);
-        }
-    }
-
-    private void sendContentOnChannel(Channel channel, Stream<byte[]> content) throws IOException {
-        content.forEach(
-            Throwing.consumer(message ->
-                channel.basicPublish(exchange,
-                        routingKey,
-                        new AMQP.BasicProperties(),
-                        message)));
-    }
-
     @Override
     public String getMailetInfo() {
         return "AmqpForwardAttribute";
diff --git a/mailet/amqp/src/test/java/org/apache/james/transport/mailets/AmqpForwardAttributeTest.java b/mailet/amqp/src/test/java/org/apache/james/transport/mailets/AmqpForwardAttributeTest.java
new file mode 100644
index 0000000..c2a6a56
--- /dev/null
+++ b/mailet/amqp/src/test/java/org/apache/james/transport/mailets/AmqpForwardAttributeTest.java
@@ -0,0 +1,156 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.transport.mailets;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Optional;
+
+import javax.mail.MessagingException;
+
+import org.apache.mailet.Attribute;
+import org.apache.mailet.AttributeName;
+import org.apache.mailet.AttributeValue;
+import org.apache.mailet.Mail;
+import org.apache.mailet.MailetContext;
+import org.apache.mailet.MailetException;
+import org.apache.mailet.base.test.FakeMailContext;
+import org.apache.mailet.base.test.FakeMailetConfig;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+
+import com.google.common.collect.ImmutableMap;
+
+class AmqpForwardAttributeTest {
+
+    private static final AttributeName MAIL_ATTRIBUTE = AttributeName.of("ampq.attachments");
+    private static final String EXCHANGE_NAME = "exchangeName";
+    private static final String ROUTING_KEY = "routingKey";
+    private static final String AMQP_URI = "amqp://host";
+    private static final byte[] ATTACHMENT_CONTENT = "Attachment content".getBytes(StandardCharsets.UTF_8);
+    private static final ImmutableMap<String, byte[]> ATTRIBUTE_VALUE = ImmutableMap.of("attachment1.txt", ATTACHMENT_CONTENT);
+    private static final Optional<Attribute> ATTRIBUTE_CONTENT = Optional.of(new Attribute(MAIL_ATTRIBUTE, AttributeValue.ofAny(ATTRIBUTE_VALUE)));
+
+    private AmqpForwardAttribute mailet;
+    private MailetContext mailetContext;
+    private FakeMailetConfig mailetConfig;
+
+    @BeforeEach
+    public void setUp() throws Exception {
+        mailet = new AmqpForwardAttribute();
+        Logger logger = mock(Logger.class);
+        mailetContext = FakeMailContext.builder()
+                .logger(logger)
+                .build();
+        mailetConfig = FakeMailetConfig.builder()
+                .mailetName("Test")
+                .mailetContext(mailetContext)
+                .setProperty("uri", AMQP_URI)
+                .setProperty("exchange", EXCHANGE_NAME)
+                .setProperty("routing_key", ROUTING_KEY)
+                .setProperty("attribute", MAIL_ATTRIBUTE.asString())
+                .build();
+    }
+
+    @Test
+    void initShouldThrowWhenNoUriParameter() {
+        FakeMailetConfig customMailetConfig = FakeMailetConfig.builder()
+                .mailetName("Test")
+                .mailetContext(mailetContext)
+                .build();
+        assertThatThrownBy(() -> mailet.preInit(customMailetConfig))
+            .isInstanceOf(MailetException.class);
+    }
+
+    @Test
+    void initShouldThrowWhenNoExchangeParameter() {
+        FakeMailetConfig customMailetConfig = FakeMailetConfig.builder()
+                .mailetName("Test")
+                .mailetContext(mailetContext)
+                .setProperty("uri", AMQP_URI)
+                .build();
+        assertThatThrownBy(() -> mailet.preInit(customMailetConfig))
+            .isInstanceOf(MailetException.class);
+    }
+
+    @Test
+    void initShouldThrowWhenNoAttributeParameter() {
+        FakeMailetConfig customMailetConfig = FakeMailetConfig.builder()
+                .mailetName("Test")
+                .mailetContext(mailetContext)
+                .setProperty("uri", AMQP_URI)
+                .setProperty("exchange", EXCHANGE_NAME)
+                .build();
+        assertThatThrownBy(() -> mailet.preInit(customMailetConfig))
+            .isInstanceOf(MailetException.class);
+    }
+
+    @Test
+    void initShouldThrowWhenInvalidUri() {
+        FakeMailetConfig customMailetConfig = FakeMailetConfig.builder()
+                .mailetName("Test")
+                .mailetContext(mailetContext)
+                .setProperty("uri", "bad-uri")
+                .setProperty("exchange", EXCHANGE_NAME)
+                .setProperty("attribute", MAIL_ATTRIBUTE.asString())
+                .build();
+        assertThatThrownBy(() -> mailet.preInit(customMailetConfig))
+            .isInstanceOf(MailetException.class);
+    }
+
+    @Test
+    void getMailetInfoShouldReturnInfo() {
+        assertThat(mailet.getMailetInfo()).isEqualTo("AmqpForwardAttribute");
+    }
+
+    @Test
+    void initShouldIntializeEmptyRoutingKeyWhenAllParametersButRoutingKey() throws MessagingException {
+        FakeMailetConfig customMailetConfig = FakeMailetConfig.builder()
+                .mailetName("Test")
+                .mailetContext(mailetContext)
+                .setProperty("uri", AMQP_URI)
+                .setProperty("exchange", EXCHANGE_NAME)
+                .setProperty("attribute", MAIL_ATTRIBUTE.asString())
+                .build();
+        mailet.preInit(customMailetConfig);
+
+        assertThat(mailet.routingKey).isEmpty();
+    }
+
+    @Test
+    void initShouldNotThrowWithAllParameters() throws MessagingException {
+        mailet.preInit(mailetConfig);
+    }
+
+    @Test
+    public void serviceShouldThrowWhenAttributeContentIsNotAMapAListOrAString() throws MessagingException {
+        mailet.preInit(mailetConfig);
+        Mail mail = mock(Mail.class);
+        when(mail.getAttribute(MAIL_ATTRIBUTE)).thenReturn(Optional.of(new Attribute(MAIL_ATTRIBUTE, AttributeValue.of(2))));
+
+        assertThatThrownBy(() -> mailet.service(mail))
+            .isInstanceOf(MailetException.class);
+    }
+}
diff --git a/mailet/pom.xml b/mailet/pom.xml
index 7036499..800504e 100644
--- a/mailet/pom.xml
+++ b/mailet/pom.xml
@@ -46,6 +46,7 @@
 
     <modules>
         <module>ai</module>
+        <module>amqp</module>
         <module>api</module>
         <module>base</module>
         <module>crypto</module>
diff --git a/mailet/standard/pom.xml b/mailet/standard/pom.xml
index ddd5455..6856af8 100644
--- a/mailet/standard/pom.xml
+++ b/mailet/standard/pom.xml
@@ -73,10 +73,6 @@
             <artifactId>guava</artifactId>
         </dependency>
         <dependency>
-            <groupId>com.rabbitmq</groupId>
-            <artifactId>amqp-client</artifactId>
-        </dependency>
-        <dependency>
             <groupId>com.sun.mail</groupId>
             <artifactId>javax.mail</artifactId>
         </dependency>
@@ -85,6 +81,10 @@
             <artifactId>commons-io</artifactId>
         </dependency>
         <dependency>
+            <groupId>javax.annotation</groupId>
+            <artifactId>javax.annotation-api</artifactId>
+        </dependency>
+        <dependency>
             <groupId>net.javacrumbs.json-unit</groupId>
             <artifactId>json-unit-assertj</artifactId>
             <scope>test</scope>
diff --git a/mailet/standard/src/test/java/org/apache/james/transport/mailets/AmqpForwardAttributeTest.java b/mailet/standard/src/test/java/org/apache/james/transport/mailets/AmqpForwardAttributeTest.java
deleted file mode 100644
index c490b0f..0000000
--- a/mailet/standard/src/test/java/org/apache/james/transport/mailets/AmqpForwardAttributeTest.java
+++ /dev/null
@@ -1,284 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-
-package org.apache.james.transport.mailets;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyZeroInteractions;
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.Optional;
-import java.util.concurrent.TimeoutException;
-
-import javax.mail.MessagingException;
-
-import org.apache.mailet.Attribute;
-import org.apache.mailet.AttributeName;
-import org.apache.mailet.AttributeValue;
-import org.apache.mailet.Mail;
-import org.apache.mailet.MailetContext;
-import org.apache.mailet.MailetException;
-import org.apache.mailet.base.test.FakeMailContext;
-import org.apache.mailet.base.test.FakeMailetConfig;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.mockito.ArgumentCaptor;
-import org.slf4j.Logger;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.rabbitmq.client.AMQP;
-import com.rabbitmq.client.AMQP.BasicProperties;
-import com.rabbitmq.client.AMQP.Channel.Close;
-import com.rabbitmq.client.AlreadyClosedException;
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.ConnectionFactory;
-import com.rabbitmq.client.ShutdownSignalException;
-
-class AmqpForwardAttributeTest {
-
-    private static final AttributeName MAIL_ATTRIBUTE = AttributeName.of("ampq.attachments");
-    private static final String EXCHANGE_NAME = "exchangeName";
-    private static final String ROUTING_KEY = "routingKey";
-    private static final String AMQP_URI = "amqp://host";
-    private static final byte[] ATTACHMENT_CONTENT = "Attachment content".getBytes(StandardCharsets.UTF_8);
-    private static final ImmutableMap<String, byte[]> ATTRIBUTE_VALUE = ImmutableMap.of("attachment1.txt", ATTACHMENT_CONTENT);
-    private static final Optional<Attribute> ATTRIBUTE_CONTENT = Optional.of(new Attribute(MAIL_ATTRIBUTE, AttributeValue.ofAny(ATTRIBUTE_VALUE)));
-
-    private AmqpForwardAttribute mailet;
-    private MailetContext mailetContext;
-    private FakeMailetConfig mailetConfig;
-
-    @BeforeEach
-    public void setUp() throws Exception {
-        mailet = new AmqpForwardAttribute();
-        Logger logger = mock(Logger.class);
-        mailetContext = FakeMailContext.builder()
-                .logger(logger)
-                .build();
-        mailetConfig = FakeMailetConfig.builder()
-                .mailetName("Test")
-                .mailetContext(mailetContext)
-                .setProperty("uri", AMQP_URI)
-                .setProperty("exchange", EXCHANGE_NAME)
-                .setProperty("routing_key", ROUTING_KEY)
-                .setProperty("attribute", MAIL_ATTRIBUTE.asString())
-                .build();
-    }
-
-    @Test
-    void initShouldThrowWhenNoUriParameter() {
-        FakeMailetConfig customMailetConfig = FakeMailetConfig.builder()
-                .mailetName("Test")
-                .mailetContext(mailetContext)
-                .build();
-        assertThatThrownBy(() -> mailet.init(customMailetConfig))
-            .isInstanceOf(MailetException.class);
-    }
-
-    @Test
-    void initShouldThrowWhenNoExchangeParameter() {
-        FakeMailetConfig customMailetConfig = FakeMailetConfig.builder()
-                .mailetName("Test")
-                .mailetContext(mailetContext)
-                .setProperty("uri", AMQP_URI)
-                .build();
-        assertThatThrownBy(() -> mailet.init(customMailetConfig))
-            .isInstanceOf(MailetException.class);
-    }
-
-    @Test
-    void initShouldThrowWhenNoAttributeParameter() {
-        FakeMailetConfig customMailetConfig = FakeMailetConfig.builder()
-                .mailetName("Test")
-                .mailetContext(mailetContext)
-                .setProperty("uri", AMQP_URI)
-                .setProperty("exchange", EXCHANGE_NAME)
-                .build();
-        assertThatThrownBy(() -> mailet.init(customMailetConfig))
-            .isInstanceOf(MailetException.class);
-    }
-
-    @Test
-    void initShouldThrowWhenInvalidUri() {
-        FakeMailetConfig customMailetConfig = FakeMailetConfig.builder()
-                .mailetName("Test")
-                .mailetContext(mailetContext)
-                .setProperty("uri", "bad-uri")
-                .setProperty("exchange", EXCHANGE_NAME)
-                .setProperty("attribute", MAIL_ATTRIBUTE.asString())
-                .build();
-        assertThatThrownBy(() -> mailet.init(customMailetConfig))
-            .isInstanceOf(MailetException.class);
-    }
-
-    @Test
-    void getMailetInfoShouldReturnInfo() {
-        assertThat(mailet.getMailetInfo()).isEqualTo("AmqpForwardAttribute");
-    }
-
-    @Test
-    void initShouldIntializeEmptyRoutingKeyWhenAllParametersButRoutingKey() throws MessagingException {
-        FakeMailetConfig customMailetConfig = FakeMailetConfig.builder()
-                .mailetName("Test")
-                .mailetContext(mailetContext)
-                .setProperty("uri", AMQP_URI)
-                .setProperty("exchange", EXCHANGE_NAME)
-                .setProperty("attribute", MAIL_ATTRIBUTE.asString())
-                .build();
-        mailet.init(customMailetConfig);
-
-        assertThat(mailet.routingKey).isEmpty();
-    }
-
-    @Test
-    void initShouldNotThrowWithAllParameters() throws MessagingException {
-        mailet.init(mailetConfig);
-    }
-
-    @Test
-    public void serviceShouldNotUseConnectionWhenNoAttributeInMail() throws Exception {
-        mailet.init(mailetConfig);
-        Connection connection = mock(Connection.class);
-        ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
-        when(connectionFactory.newConnection()).thenReturn(connection);
-        mailet.setConnectionFactory(connectionFactory);
-        Mail mail = mock(Mail.class);
-        when(mail.getAttribute(MAIL_ATTRIBUTE)).thenReturn(Optional.empty());
-
-        mailet.service(mail);
-
-        verifyZeroInteractions(connection);
-    }
-
-    @Test
-    public void serviceShouldThrowWhenAttributeContentIsNotAMapAListOrAString() throws MessagingException {
-        mailet.init(mailetConfig);
-        Mail mail = mock(Mail.class);
-        when(mail.getAttribute(MAIL_ATTRIBUTE)).thenReturn(Optional.of(new Attribute(MAIL_ATTRIBUTE, AttributeValue.of(2))));
-
-        assertThatThrownBy(() -> mailet.service(mail))
-            .isInstanceOf(MailetException.class);
-    }
-
-    @Test
-    public void serviceShouldNotFailWhenTimeoutException() throws Exception {
-        mailet.init(mailetConfig);
-        Mail mail = mock(Mail.class);
-        when(mail.getAttribute(MAIL_ATTRIBUTE)).thenReturn(ATTRIBUTE_CONTENT);
-        ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
-        when(connectionFactory.newConnection()).thenThrow(new TimeoutException());
-        mailet.setConnectionFactory(connectionFactory);
-
-        mailet.service(mail);
-    }
-
-    @Test
-    public void serviceShouldNotFailWhenIOException() throws Exception {
-        mailet.init(mailetConfig);
-        Mail mail = mock(Mail.class);
-        when(mail.getAttribute(MAIL_ATTRIBUTE)).thenReturn(ATTRIBUTE_CONTENT);
-        ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
-        when(connectionFactory.newConnection()).thenThrow(new IOException());
-        mailet.setConnectionFactory(connectionFactory);
-
-        mailet.service(mail);
-    }
-
-    @Test
-    public void serviceShouldNotFailWhenAlreadyClosedException() throws Exception {
-        mailet.init(mailetConfig);
-        Mail mail = mock(Mail.class);
-        when(mail.getAttribute(MAIL_ATTRIBUTE)).thenReturn(ATTRIBUTE_CONTENT);
-        ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
-        ShutdownSignalException shutdownSignalException = new ShutdownSignalException(false, false, new Close.Builder().build(), "reference");
-        when(connectionFactory.newConnection()).thenThrow(new AlreadyClosedException(shutdownSignalException));
-        mailet.setConnectionFactory(connectionFactory);
-
-        mailet.service(mail);
-    }
-
-    @Test
-    public void serviceShouldPublishAttributeContentWhenAttributeInMailAndIsAMap() throws Exception {
-        mailet.init(mailetConfig);
-        Channel channel = mock(Channel.class);
-        Connection connection = mock(Connection.class);
-        when(connection.createChannel()).thenReturn(channel);
-        ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
-        when(connectionFactory.newConnection()).thenReturn(connection);
-        mailet.setConnectionFactory(connectionFactory);
-        Mail mail = mock(Mail.class);
-        when(mail.getAttribute(MAIL_ATTRIBUTE)).thenReturn(ATTRIBUTE_CONTENT);
-        BasicProperties expectedProperties = new AMQP.BasicProperties();
-
-        mailet.service(mail);
-
-        ArgumentCaptor<BasicProperties> basicPropertiesCaptor = ArgumentCaptor.forClass(BasicProperties.class);
-        verify(channel).basicPublish(eq(EXCHANGE_NAME), eq(ROUTING_KEY), basicPropertiesCaptor.capture(), eq(ATTACHMENT_CONTENT));
-        assertThat(basicPropertiesCaptor.getValue()).isEqualToComparingFieldByField(expectedProperties);
-    }
-
-    @Test
-    public void serviceShouldPublishAttributeContentWhenAttributeInMailAndIsAList() throws Exception {
-        mailet.init(mailetConfig);
-        Channel channel = mock(Channel.class);
-        Connection connection = mock(Connection.class);
-        when(connection.createChannel()).thenReturn(channel);
-        ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
-        when(connectionFactory.newConnection()).thenReturn(connection);
-        mailet.setConnectionFactory(connectionFactory);
-        Mail mail = mock(Mail.class);
-        when(mail.getAttribute(MAIL_ATTRIBUTE)).thenReturn(Optional.of(new Attribute(MAIL_ATTRIBUTE, AttributeValue.of(ImmutableList.of(AttributeValue.ofSerializable(ATTACHMENT_CONTENT))))));
-        BasicProperties expectedProperties = new AMQP.BasicProperties();
-
-        mailet.service(mail);
-
-        ArgumentCaptor<BasicProperties> basicPropertiesCaptor = ArgumentCaptor.forClass(BasicProperties.class);
-        verify(channel).basicPublish(eq(EXCHANGE_NAME), eq(ROUTING_KEY), basicPropertiesCaptor.capture(), eq(ATTACHMENT_CONTENT));
-        assertThat(basicPropertiesCaptor.getValue()).isEqualToComparingFieldByField(expectedProperties);
-    }
-
-    @Test
-    public void serviceShouldPublishAttributeContentWhenAttributeInMailAndIsAString() throws Exception {
-        mailet.init(mailetConfig);
-        Channel channel = mock(Channel.class);
-        Connection connection = mock(Connection.class);
-        when(connection.createChannel()).thenReturn(channel);
-        ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
-        when(connectionFactory.newConnection()).thenReturn(connection);
-        mailet.setConnectionFactory(connectionFactory);
-        Mail mail = mock(Mail.class);
-        String content = "Attachment content";
-        when(mail.getAttribute(MAIL_ATTRIBUTE)).thenReturn(Optional.of(new Attribute(MAIL_ATTRIBUTE, AttributeValue.of(content))));
-        BasicProperties expectedProperties = new AMQP.BasicProperties();
-
-        mailet.service(mail);
-
-        ArgumentCaptor<BasicProperties> basicPropertiesCaptor = ArgumentCaptor.forClass(BasicProperties.class);
-        verify(channel).basicPublish(eq(EXCHANGE_NAME), eq(ROUTING_KEY), basicPropertiesCaptor.capture(), eq(content.getBytes(StandardCharsets.UTF_8)));
-        assertThat(basicPropertiesCaptor.getValue()).isEqualToComparingFieldByField(expectedProperties);
-    }
-}
diff --git a/pom.xml b/pom.xml
index e9be952..c502be1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -949,6 +949,11 @@
             </dependency>
             <dependency>
                 <groupId>${james.groupId}</groupId>
+                <artifactId>apache-mailet-amqp</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>${james.groupId}</groupId>
                 <artifactId>apache-mailet-api</artifactId>
                 <version>${project.version}</version>
             </dependency>
diff --git a/server/container/guice/mailet/pom.xml b/server/container/guice/mailet/pom.xml
index f301808..1e349f7 100644
--- a/server/container/guice/mailet/pom.xml
+++ b/server/container/guice/mailet/pom.xml
@@ -34,6 +34,10 @@
     <dependencies>
         <dependency>
             <groupId>${james.groupId}</groupId>
+            <artifactId>apache-mailet-amqp</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
             <artifactId>apache-mailet-standard</artifactId>
         </dependency>
         <dependency>

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org