You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ro...@apache.org on 2019/03/18 16:39:50 UTC

[james-project] branch master updated (60c7f63 -> ca12220)

This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git.


    from 60c7f63  JAMES-2544 dequeueTime time is always 0 now that the queue is reactive
     new 47d2306  JAMES-2693 Update com.puppycrawl.tools:checkstyle to respond to CVE-2019-9658
     new c4311e3  JAMES-2578 Deprecate MailetContext Attributes API
     new ca12220  JAMES-2683 Plug the thread leak related to Connection and Channel management

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../backend/rabbitmq/RabbitMQChannelPool.java      |  2 +
 .../rabbitmq/RabbitMQConnectionFactory.java        | 28 +------
 .../backend/rabbitmq/RabbitMQHealthCheck.java      | 19 ++---
 .../james/backend/rabbitmq/SimpleChannelPool.java  | 79 ++++++++++++-------
 .../backend/rabbitmq/SimpleConnectionPool.java     | 90 ++++++++++++++++++++++
 .../james/backend/rabbitmq/RabbitMQExtension.java  | 16 ++--
 .../james/mailbox/events/RabbitMQEventBus.java     |  6 +-
 .../james/mailbox/events/RabbitMQEventBusTest.java | 45 +++++------
 .../main/java/org/apache/mailet/MailetContext.java |  4 +
 .../rabbitmq/host/RabbitMQEventBusHostSystem.java  |  8 +-
 pom.xml                                            |  4 +-
 .../james/transport/mailets/WhiteListManager.java  | 10 ++-
 12 files changed, 204 insertions(+), 107 deletions(-)
 create mode 100644 backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleConnectionPool.java


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 01/03: JAMES-2693 Update com.puppycrawl.tools:checkstyle to respond to CVE-2019-9658

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 47d23061be958af8e21fdaa9f83c86a174e2b0cf
Author: Gautier DI FOLCO <gd...@linagora.com>
AuthorDate: Thu Mar 14 17:19:37 2019 +0100

    JAMES-2693 Update com.puppycrawl.tools:checkstyle to respond to CVE-2019-9658
---
 pom.xml | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/pom.xml b/pom.xml
index 5212d05..704e1df 100644
--- a/pom.xml
+++ b/pom.xml
@@ -3222,7 +3222,7 @@
                     <dependency>
                         <groupId>com.puppycrawl.tools</groupId>
                         <artifactId>checkstyle</artifactId>
-                        <version>8.5</version>
+                        <version>8.18</version>
                     </dependency>
                 </dependencies>
                 <executions>
@@ -3417,4 +3417,4 @@
             -->
         </plugins>
     </reporting>
-</project>
\ No newline at end of file
+</project>


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 02/03: JAMES-2578 Deprecate MailetContext Attributes API

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit c4311e3afb9cb59f1035f552d4a4916033645028
Author: Gautier DI FOLCO <gd...@linagora.com>
AuthorDate: Tue Feb 26 18:18:37 2019 +0100

    JAMES-2578 Deprecate MailetContext Attributes API
---
 mailet/api/src/main/java/org/apache/mailet/MailetContext.java  |  4 ++++
 .../org/apache/james/transport/mailets/WhiteListManager.java   | 10 +++++++---
 2 files changed, 11 insertions(+), 3 deletions(-)

diff --git a/mailet/api/src/main/java/org/apache/mailet/MailetContext.java b/mailet/api/src/main/java/org/apache/mailet/MailetContext.java
index 6b91dd0..44c0e9a 100644
--- a/mailet/api/src/main/java/org/apache/mailet/MailetContext.java
+++ b/mailet/api/src/main/java/org/apache/mailet/MailetContext.java
@@ -124,6 +124,7 @@ public interface MailetContext {
      *
      * @return an Iterator (of Strings) over all attribute names
      */
+    @Deprecated
     Iterator<String> getAttributeNames();
 
     /**
@@ -133,6 +134,7 @@ public interface MailetContext {
      * @param name the attribute name
      * @return the attribute value, or null if the attribute does not exist
      */
+    @Deprecated
     Object getAttribute(String name);
 
     /**
@@ -160,6 +162,7 @@ public interface MailetContext {
      * @param name  the attribute name
      * @param value the attribute value
      */
+    @Deprecated
     void setAttribute(String name, Object value);
 
     /**
@@ -168,6 +171,7 @@ public interface MailetContext {
      * @param name the name of the attribute to be removed
      * @since Mailet API v2.1
      */
+    @Deprecated
     void removeAttribute(String name);
 
     /**
diff --git a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/WhiteListManager.java b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/WhiteListManager.java
index 0249c7a..e666d78 100644
--- a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/WhiteListManager.java
+++ b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/WhiteListManager.java
@@ -55,6 +55,7 @@ import org.apache.james.util.sql.JDBCUtil;
 import org.apache.james.util.sql.SqlResources;
 import org.apache.mailet.Experimental;
 import org.apache.mailet.Mail;
+import org.apache.mailet.MailetException;
 import org.apache.mailet.base.DateFormats;
 import org.apache.mailet.base.GenericMailet;
 import org.apache.mailet.base.RFC2822Headers;
@@ -752,9 +753,12 @@ public class WhiteListManager extends GenericMailet {
             }
 
             /*
-      Holds value of property sqlFile.
-     */
-            File sqlFile = new File((String) mailetContext.getAttribute("confDir"), "sqlResources.xml").getCanonicalFile();
+                Holds value of property sqlFile.
+            */
+            String confDir = getInitParameterAsOptional("confDir")
+                .orElseThrow(() -> new MailetException("WhiteListManager has no 'confDir' configured"));
+
+            File sqlFile = new File(confDir, "sqlResources.xml").getCanonicalFile();
             sqlQueries.init(sqlFile, "WhiteList", conn, getSqlParameters());
 
             checkTables(conn);


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 03/03: JAMES-2683 Plug the thread leak related to Connection and Channel management

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit ca12220c101ac6cdce4377fe1de4c7d975e4c205
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Tue Mar 12 11:11:24 2019 +0100

    JAMES-2683 Plug the thread leak related to Connection and Channel management
    
    	Make sure only SimpleConnectionPool can create a connection
    	and always call the close method when done with the pool
---
 .../backend/rabbitmq/RabbitMQChannelPool.java      |  2 +
 .../rabbitmq/RabbitMQConnectionFactory.java        | 28 +------
 .../backend/rabbitmq/RabbitMQHealthCheck.java      | 19 ++---
 .../james/backend/rabbitmq/SimpleChannelPool.java  | 79 ++++++++++++-------
 .../backend/rabbitmq/SimpleConnectionPool.java     | 90 ++++++++++++++++++++++
 .../james/backend/rabbitmq/RabbitMQExtension.java  | 16 ++--
 .../james/mailbox/events/RabbitMQEventBus.java     |  6 +-
 .../james/mailbox/events/RabbitMQEventBusTest.java | 45 +++++------
 .../rabbitmq/host/RabbitMQEventBusHostSystem.java  |  8 +-
 9 files changed, 191 insertions(+), 102 deletions(-)

diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQChannelPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQChannelPool.java
index f26efbd..e8fef49 100644
--- a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQChannelPool.java
+++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQChannelPool.java
@@ -49,5 +49,7 @@ public interface RabbitMQChannelPool {
 
     Flux<AcknowledgableDelivery> receive(String queueName);
 
+    boolean tryConnection();
+
     void close() throws Exception;
 }
diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactory.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactory.java
index 151e983..1f0e329 100644
--- a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactory.java
+++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactory.java
@@ -19,37 +19,15 @@
 package org.apache.james.backend.rabbitmq;
 
 import java.time.Duration;
-import java.util.Optional;
-import java.util.concurrent.Callable;
 
 import javax.inject.Inject;
 
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
-
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 
 public class RabbitMQConnectionFactory {
-    private class ConnectionCallable implements Callable<Connection> {
-        private final ConnectionFactory connectionFactory;
-        private Optional<Connection> connection;
-
-        ConnectionCallable(ConnectionFactory connectionFactory) {
-            this.connectionFactory = connectionFactory;
-            connection = Optional.empty();
-        }
-
-        @Override
-        public synchronized Connection call() throws Exception {
-            if (connection.map(Connection::isOpen).orElse(false)) {
-                return connection.get();
-            }
-            Connection newConnection = connectionFactory.newConnection();
-            connection = Optional.of(newConnection);
-            return newConnection;
-        }
-    }
 
     private final ConnectionFactory connectionFactory;
 
@@ -71,12 +49,12 @@ public class RabbitMQConnectionFactory {
         }
     }
 
-    public Connection create() {
+    Connection create() {
         return connectionMono().block();
     }
 
-    public Mono<Connection> connectionMono() {
-        return Mono.fromCallable(new ConnectionCallable(connectionFactory))
+    Mono<Connection> connectionMono() {
+        return Mono.fromCallable(connectionFactory::newConnection)
             .retryBackoff(configuration.getMaxRetries(), Duration.ofMillis(configuration.getMinDelayInMs()))
             .publishOn(Schedulers.elastic());
     }
diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheck.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheck.java
index 17a7ead..0290a47 100644
--- a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheck.java
+++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheck.java
@@ -19,10 +19,6 @@
 
 package org.apache.james.backend.rabbitmq;
 
-import java.net.URISyntaxException;
-import java.security.KeyManagementException;
-import java.security.NoSuchAlgorithmException;
-
 import javax.inject.Inject;
 
 import org.apache.james.core.healthcheck.ComponentName;
@@ -38,7 +34,7 @@ public class RabbitMQHealthCheck implements HealthCheck {
     private final RabbitMQChannelPool rabbitChannelPoolImpl;
 
     @Inject
-    public RabbitMQHealthCheck(RabbitMQChannelPool rabbitChannelPoolImpl) throws NoSuchAlgorithmException, KeyManagementException, URISyntaxException {
+    public RabbitMQHealthCheck(RabbitMQChannelPool rabbitChannelPoolImpl) {
         this.rabbitChannelPoolImpl = rabbitChannelPoolImpl;
     }
 
@@ -50,13 +46,12 @@ public class RabbitMQHealthCheck implements HealthCheck {
     @Override
     public Result check() {
         try {
-            return rabbitChannelPoolImpl.execute(channel -> {
-                    if (channel.isOpen()) {
-                        return Result.healthy(COMPONENT_NAME);
-                    }
-                    LOGGER.error("The created connection was not opened");
-                    return Result.unhealthy(COMPONENT_NAME);
-            });
+            if (rabbitChannelPoolImpl.tryConnection()) {
+                return Result.healthy(COMPONENT_NAME);
+            } else {
+                LOGGER.error("The created connection was not opened");
+                return Result.unhealthy(COMPONENT_NAME);
+            }
         } catch (Exception e) {
             LOGGER.error("Unhealthy RabbitMQ instances: could not establish a connection", e);
             return Result.unhealthy(COMPONENT_NAME);
diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleChannelPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleChannelPool.java
index b5819e3..2a7da85 100644
--- a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleChannelPool.java
+++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleChannelPool.java
@@ -19,7 +19,10 @@
 
 package org.apache.james.backend.rabbitmq;
 
+import java.io.IOException;
+import java.time.Duration;
 import java.util.Optional;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 
 import javax.annotation.PreDestroy;
@@ -28,8 +31,9 @@ import javax.inject.Inject;
 import com.github.fge.lambdas.Throwing;
 import com.google.common.annotations.VisibleForTesting;
 import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
 import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 import reactor.rabbitmq.AcknowledgableDelivery;
 import reactor.rabbitmq.RabbitFlux;
 import reactor.rabbitmq.Receiver;
@@ -37,18 +41,16 @@ import reactor.rabbitmq.ReceiverOptions;
 
 public class SimpleChannelPool implements RabbitMQChannelPool {
     private final AtomicReference<Channel> channelReference;
-    private final AtomicReference<Connection> connectionReference;
-    private final RabbitMQConnectionFactory connectionFactory;
     private final Receiver rabbitFlux;
+    private final SimpleConnectionPool connectionPool;
 
     @Inject
     @VisibleForTesting
-    SimpleChannelPool(RabbitMQConnectionFactory factory) {
-        this.connectionFactory = factory;
-        this.connectionReference = new AtomicReference<>();
+    SimpleChannelPool(SimpleConnectionPool connectionPool) {
+        this.connectionPool = connectionPool;
         this.channelReference = new AtomicReference<>();
         this.rabbitFlux = RabbitFlux
-            .createReceiver(new ReceiverOptions().connectionMono(connectionFactory.connectionMono()));
+            .createReceiver(new ReceiverOptions().connectionMono(connectionPool.getResilientConnection()));
     }
 
     @Override
@@ -57,26 +59,22 @@ public class SimpleChannelPool implements RabbitMQChannelPool {
     }
 
     @Override
-    public synchronized <T, E extends Throwable> T execute(RabbitFunction<T, E> f) throws E, ConnectionFailedException {
-        return f.execute(getResilientChannel());
+    public <T, E extends Throwable> T execute(RabbitFunction<T, E> f) throws E, ConnectionFailedException {
+        return f.execute(getResilientChannel().block());
     }
 
     @Override
-    public synchronized <E extends Throwable> void execute(RabbitConsumer<E> f) throws E, ConnectionFailedException {
-        f.execute(getResilientChannel());
+    public <E extends Throwable> void execute(RabbitConsumer<E> f) throws E, ConnectionFailedException {
+        f.execute(getResilientChannel().block());
     }
 
     @PreDestroy
     @Override
-    public synchronized void close() {
+    public void close() {
         Optional.ofNullable(channelReference.get())
             .filter(Channel::isOpen)
             .ifPresent(Throwing.<Channel>consumer(Channel::close).orDoNothing());
 
-        Optional.ofNullable(connectionReference.get())
-            .filter(Connection::isOpen)
-            .ifPresent(Throwing.<Connection>consumer(Connection::close).orDoNothing());
-
         try {
             rabbitFlux.close();
         } catch (Throwable ignored) {
@@ -84,24 +82,47 @@ public class SimpleChannelPool implements RabbitMQChannelPool {
         }
     }
 
-    private Connection getResilientConnection() {
-        return connectionReference.updateAndGet(this::getOpenConnection);
+    private Mono<Channel> getResilientChannel() {
+        int numRetries = 100;
+        Duration initialDelay = Duration.ofMillis(100);
+        return Mono.defer(this::getOpenChannel)
+            .publishOn(Schedulers.elastic())
+            .retryBackoff(numRetries, initialDelay);
     }
 
-    private Connection getOpenConnection(Connection checkedConnection) {
-        return Optional.ofNullable(checkedConnection)
-            .filter(Connection::isOpen)
-            .orElseGet(connectionFactory::create);
+    private Mono<Channel> getOpenChannel() {
+        Channel previous = channelReference.get();
+        return Mono.justOrEmpty(previous)
+            .publishOn(Schedulers.elastic())
+            .filter(Channel::isOpen)
+            .switchIfEmpty(connectionPool.getResilientConnection()
+                .flatMap(connection -> Mono.fromCallable(connection::createChannel)))
+            .flatMap(current -> replaceCurrentChannel(previous, current))
+            .onErrorMap(t -> new RuntimeException("unable to create and register a new Channel", t));
     }
 
-    private Channel getResilientChannel() {
-        return channelReference.updateAndGet(Throwing.unaryOperator(this::getOpenChannel));
+    private Mono<Channel> replaceCurrentChannel(Channel previous, Channel current) {
+        if (channelReference.compareAndSet(previous, current)) {
+            return Mono.just(current);
+        } else {
+            try {
+                current.close();
+            } catch (IOException | TimeoutException e) {
+                //error below
+            }
+            return Mono.error(new RuntimeException("unable to create and register a new Channel"));
+        }
     }
 
-    private Channel getOpenChannel(Channel checkedChannel) {
-        return Optional.ofNullable(checkedChannel)
-            .filter(Channel::isOpen)
-            .orElseGet(Throwing.supplier(() -> getResilientConnection().createChannel())
-                .sneakyThrow());
+    @Override
+    public boolean tryConnection() {
+        try {
+            return connectionPool.tryConnection() &&
+                getOpenChannel()
+                    .blockOptional()
+                    .isPresent();
+        } catch (Throwable t) {
+            return false;
+        }
     }
 }
diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleConnectionPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleConnectionPool.java
new file mode 100644
index 0000000..519c1fa
--- /dev/null
+++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleConnectionPool.java
@@ -0,0 +1,90 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.backend.rabbitmq;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.annotation.PreDestroy;
+import javax.inject.Inject;
+
+import com.github.fge.lambdas.Throwing;
+import com.google.common.annotations.VisibleForTesting;
+import com.rabbitmq.client.Connection;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
+public class SimpleConnectionPool implements AutoCloseable {
+    private final AtomicReference<Connection> connectionReference;
+    private final RabbitMQConnectionFactory connectionFactory;
+
+    @Inject
+    @VisibleForTesting
+    public SimpleConnectionPool(RabbitMQConnectionFactory factory) {
+        this.connectionFactory = factory;
+        this.connectionReference = new AtomicReference<>();
+    }
+
+    @PreDestroy
+    @Override
+    public void close() {
+        Optional.ofNullable(connectionReference.get())
+            .filter(Connection::isOpen)
+            .ifPresent(Throwing.<Connection>consumer(Connection::close).orDoNothing());
+    }
+
+    public Mono<Connection> getResilientConnection() {
+        int numRetries = 100;
+        Duration initialDelay = Duration.ofMillis(100);
+        return Mono.defer(this::getOpenConnection)
+            .subscribeOn(Schedulers.elastic())
+            .retryBackoff(numRetries, initialDelay);
+    }
+
+    private Mono<Connection> getOpenConnection() {
+        Connection previous = connectionReference.get();
+        Connection current = Optional.ofNullable(previous)
+            .filter(Connection::isOpen)
+            .orElseGet(connectionFactory::create);
+        boolean updated = connectionReference.compareAndSet(previous, current);
+        if (updated) {
+            return Mono.just(current);
+        } else {
+            try {
+                current.close();
+            } catch (IOException e) {
+                //error below
+            }
+            return Mono.error(new RuntimeException("unable to create and register a new Connection"));
+        }
+    }
+
+    public boolean tryConnection() {
+        try {
+            return getOpenConnection()
+                .blockOptional(Duration.ofSeconds(1))
+                .isPresent();
+        } catch (Throwable t) {
+            return false;
+        }
+    }
+}
diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQExtension.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQExtension.java
index 4e3f388..37083b4 100644
--- a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQExtension.java
+++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQExtension.java
@@ -21,6 +21,7 @@ package org.apache.james.backend.rabbitmq;
 import static org.apache.james.backend.rabbitmq.RabbitMQFixture.DEFAULT_MANAGEMENT_CREDENTIAL;
 
 import java.net.URISyntaxException;
+import java.util.Collection;
 
 import org.junit.jupiter.api.extension.AfterAllCallback;
 import org.junit.jupiter.api.extension.AfterEachCallback;
@@ -35,6 +36,7 @@ public class RabbitMQExtension implements BeforeAllCallback, BeforeEachCallback,
     private DockerRabbitMQ rabbitMQ;
     private SimpleChannelPool simpleChannelPool;
     private RabbitMQConnectionFactory connectionFactory;
+    private SimpleConnectionPool connectionPool;
 
     @Override
     public void beforeAll(ExtensionContext context) {
@@ -45,12 +47,14 @@ public class RabbitMQExtension implements BeforeAllCallback, BeforeEachCallback,
     @Override
     public void beforeEach(ExtensionContext extensionContext) throws Exception {
         connectionFactory = createRabbitConnectionFactory();
-        this.simpleChannelPool = new SimpleChannelPool(connectionFactory);
+        connectionPool = new SimpleConnectionPool(connectionFactory);
+        this.simpleChannelPool = new SimpleChannelPool(connectionPool);
     }
 
     @Override
-    public void afterEach(ExtensionContext context) throws Exception {
+    public void afterEach(ExtensionContext context) {
         simpleChannelPool.close();
+        connectionPool.close();
     }
 
     @Override
@@ -72,12 +76,12 @@ public class RabbitMQExtension implements BeforeAllCallback, BeforeEachCallback,
         return simpleChannelPool;
     }
 
-    public DockerRabbitMQ getRabbitMQ() {
-        return rabbitMQ;
+    public SimpleConnectionPool getRabbitConnectionPool() {
+        return connectionPool;
     }
 
-    public RabbitMQConnectionFactory getConnectionFactory() {
-        return connectionFactory;
+    public DockerRabbitMQ getRabbitMQ() {
+        return rabbitMQ;
     }
 
     public RabbitMQManagementAPI managementAPI() throws Exception {
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
index 0a9fd41..041ded3 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
@@ -24,7 +24,7 @@ import java.util.Set;
 import javax.annotation.PreDestroy;
 import javax.inject.Inject;
 
-import org.apache.james.backend.rabbitmq.RabbitMQConnectionFactory;
+import org.apache.james.backend.rabbitmq.SimpleConnectionPool;
 import org.apache.james.event.json.EventSerializer;
 import org.apache.james.lifecycle.api.Startable;
 import org.apache.james.metrics.api.MetricFactory;
@@ -59,13 +59,13 @@ public class RabbitMQEventBus implements EventBus, Startable {
     private Sender sender;
 
     @Inject
-    public RabbitMQEventBus(RabbitMQConnectionFactory rabbitMQConnectionFactory, EventSerializer eventSerializer,
+    public RabbitMQEventBus(SimpleConnectionPool simpleConnectionPool, EventSerializer eventSerializer,
                      RetryBackoffConfiguration retryBackoff,
                      RoutingKeyConverter routingKeyConverter,
                      EventDeadLetters eventDeadLetters, MetricFactory metricFactory) {
         this.mailboxListenerExecutor = new MailboxListenerExecutor(metricFactory);
         this.eventBusId = EventBusId.random();
-        this.connectionMono = rabbitMQConnectionFactory.connectionMono();
+        this.connectionMono = simpleConnectionPool.getResilientConnection();
         this.eventSerializer = eventSerializer;
         this.routingKeyConverter = routingKeyConverter;
         this.retryBackoff = retryBackoff;
diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
index 7d70ea5..83eccba 100644
--- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
+++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
@@ -51,7 +51,6 @@ import java.io.Closeable;
 import java.nio.charset.StandardCharsets;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.james.backend.rabbitmq.RabbitMQConnectionFactory;
 import org.apache.james.backend.rabbitmq.RabbitMQExtension;
 import org.apache.james.backend.rabbitmq.RabbitMQManagementAPI;
 import org.apache.james.event.json.EventSerializer;
@@ -71,7 +70,6 @@ import org.junit.jupiter.api.extension.RegisterExtension;
 import org.mockito.stubbing.Answer;
 
 import com.rabbitmq.client.Connection;
-
 import reactor.core.publisher.Mono;
 import reactor.rabbitmq.BindingSpecification;
 import reactor.rabbitmq.ExchangeSpecification;
@@ -94,16 +92,14 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
     private RabbitMQEventBus eventBus2;
     private RabbitMQEventBus eventBus3;
     private Sender sender;
-    private RabbitMQConnectionFactory connectionFactory;
     private EventSerializer eventSerializer;
     private RoutingKeyConverter routingKeyConverter;
     private MemoryEventDeadLetters memoryEventDeadLetters;
+    private Mono<Connection> resilientConnection;
 
     @BeforeEach
     void setUp() {
-        connectionFactory = rabbitMQExtension.getConnectionFactory();
         memoryEventDeadLetters = new MemoryEventDeadLetters();
-        Mono<Connection> connectionMono = Mono.fromSupplier(connectionFactory::create).cache();
 
         TestId.Factory mailboxIdFactory = new TestId.Factory();
         eventSerializer = new EventSerializer(mailboxIdFactory, new TestMessageId.Factory());
@@ -116,7 +112,8 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
         eventBus.start();
         eventBus2.start();
         eventBus3.start();
-        sender = RabbitFlux.createSender(new SenderOptions().connectionMono(connectionMono));
+        resilientConnection = rabbitMQExtension.getRabbitConnectionPool().getResilientConnection();
+        sender = RabbitFlux.createSender(new SenderOptions().connectionMono(resilientConnection));
     }
 
     @AfterEach
@@ -132,7 +129,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
     }
 
     private RabbitMQEventBus newEventBus() {
-        return new RabbitMQEventBus(connectionFactory, eventSerializer, RetryBackoffConfiguration.DEFAULT, routingKeyConverter, memoryEventDeadLetters, new NoopMetricFactory());
+        return new RabbitMQEventBus(rabbitMQExtension.getRabbitConnectionPool(), eventSerializer, RetryBackoffConfiguration.DEFAULT, routingKeyConverter, memoryEventDeadLetters, new NoopMetricFactory());
     }
 
     @Override
@@ -232,30 +229,31 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
     @Nested
     class PublishingTest {
         private static final String MAILBOX_WORK_QUEUE_NAME = MAILBOX_EVENT + "-workQueue";
+        private Sender sender1;
 
         @BeforeEach
         void setUp() {
-            createQueue();
-        }
+            SenderOptions senderOption = new SenderOptions().connectionMono(resilientConnection);
+            sender1 = RabbitFlux.createSender(senderOption);
 
-        private void createQueue() {
-            SenderOptions senderOption = new SenderOptions()
-                .connectionMono(Mono.fromSupplier(connectionFactory::create));
-            Sender sender = RabbitFlux.createSender(senderOption);
-
-            sender.declareQueue(QueueSpecification.queue(MAILBOX_WORK_QUEUE_NAME)
+            sender1.declareQueue(QueueSpecification.queue(MAILBOX_WORK_QUEUE_NAME)
                 .durable(DURABLE)
                 .exclusive(!EXCLUSIVE)
                 .autoDelete(!AUTO_DELETE)
                 .arguments(NO_ARGUMENTS))
                 .block();
-            sender.bind(BindingSpecification.binding()
+            sender1.bind(BindingSpecification.binding()
                 .exchange(MAILBOX_EVENT_EXCHANGE_NAME)
                 .queue(MAILBOX_WORK_QUEUE_NAME)
                 .routingKey(EMPTY_ROUTING_KEY))
                 .block();
         }
 
+        @AfterEach
+        void tearDown() {
+            sender1.close();
+        }
+
         @Test
         void dispatchShouldPublishSerializedEventToRabbitMQ() {
             eventBus.dispatch(EVENT, NO_KEYS).block();
@@ -271,15 +269,14 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
         }
 
         private Event dequeueEvent() {
-            RabbitMQConnectionFactory connectionFactory = rabbitMQExtension.getConnectionFactory();
-            Receiver receiver = RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(Mono.just(connectionFactory.create())));
-
-            byte[] eventInBytes = receiver.consumeAutoAck(MAILBOX_WORK_QUEUE_NAME)
-                .blockFirst()
-                .getBody();
+            try (Receiver receiver = RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(resilientConnection))) {
+                byte[] eventInBytes = receiver.consumeAutoAck(MAILBOX_WORK_QUEUE_NAME)
+                    .blockFirst()
+                    .getBody();
 
-            return eventSerializer.fromJson(new String(eventInBytes, StandardCharsets.UTF_8))
-                .get();
+                return eventSerializer.fromJson(new String(eventInBytes, StandardCharsets.UTF_8))
+                    .get();
+            }
         }
     }
 
diff --git a/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java b/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java
index 14b8592..a74b8f1 100644
--- a/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java
+++ b/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java
@@ -24,7 +24,7 @@ import java.net.URISyntaxException;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.james.backend.rabbitmq.DockerRabbitMQ;
-import org.apache.james.backend.rabbitmq.RabbitMQConnectionFactory;
+import org.apache.james.backend.rabbitmq.SimpleConnectionPool;
 import org.apache.james.core.quota.QuotaCount;
 import org.apache.james.core.quota.QuotaSize;
 import org.apache.james.event.json.EventSerializer;
@@ -61,6 +61,7 @@ public class RabbitMQEventBusHostSystem extends JamesImapHostSystem {
     private final DockerRabbitMQ dockerRabbitMQ;
     private RabbitMQEventBus eventBus;
     private InMemoryIntegrationResources.Resources resources;
+    private SimpleConnectionPool connectionPool;
 
     RabbitMQEventBusHostSystem(DockerRabbitMQ dockerRabbitMQ) {
         this.dockerRabbitMQ = dockerRabbitMQ;
@@ -70,6 +71,7 @@ public class RabbitMQEventBusHostSystem extends JamesImapHostSystem {
     public void beforeTest() throws Exception {
         super.beforeTest();
 
+        connectionPool = new SimpleConnectionPool(dockerRabbitMQ.createRabbitConnectionFactory());
         eventBus = createEventBus();
         eventBus.start();
 
@@ -95,14 +97,14 @@ public class RabbitMQEventBusHostSystem extends JamesImapHostSystem {
         InMemoryId.Factory mailboxIdFactory = new InMemoryId.Factory();
         EventSerializer eventSerializer = new EventSerializer(mailboxIdFactory, messageIdFactory);
         RoutingKeyConverter routingKeyConverter = new RoutingKeyConverter(ImmutableSet.of(new MailboxIdRegistrationKey.Factory(mailboxIdFactory)));
-        RabbitMQConnectionFactory rabbitConnectionFactory = dockerRabbitMQ.createRabbitConnectionFactory();
-        return new RabbitMQEventBus(rabbitConnectionFactory, eventSerializer, RetryBackoffConfiguration.DEFAULT,
+        return new RabbitMQEventBus(connectionPool, eventSerializer, RetryBackoffConfiguration.DEFAULT,
             routingKeyConverter, new MemoryEventDeadLetters(), new NoopMetricFactory());
     }
 
     @Override
     public void afterTest() {
         eventBus.stop();
+        connectionPool.close();
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org