You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/07/31 09:24:33 UTC

[james-project] 12/28: JAMES-3350 Fasten delays & timeout upon RabbitMQ checks

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 9ec4dc78f97626af1b7d645da1a7d971bd947c4a
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Mon Jul 27 13:15:51 2020 +0700

    JAMES-3350 Fasten delays & timeout upon RabbitMQ checks
---
 .../rabbitmq/ReactorRabbitMQChannelPool.java       | 23 +++++++++++++--------
 .../backends/rabbitmq/SimpleConnectionPool.java    |  4 ++++
 .../james/backends/rabbitmq/DockerRabbitMQ.java    | 24 +++++++++++-----------
 .../james/backends/rabbitmq/RabbitMQExtension.java |  6 +++++-
 .../rabbitmq/ReactorRabbitMQChannelPoolTest.java   |  4 +++-
 5 files changed, 38 insertions(+), 23 deletions(-)

diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
index f4703b4..118e2fa 100644
--- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
+++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
@@ -63,13 +63,14 @@ public class ReactorRabbitMQChannelPool implements ChannelPool, Startable {
 
         private static final Logger LOGGER = LoggerFactory.getLogger(ChannelFactory.class);
 
-        private static final int MAX_RETRIES = 5;
-        private static final Duration RETRY_FIRST_BACK_OFF = Duration.ofMillis(100);
-
         private final Mono<Connection> connectionMono;
+        private final Duration minBorrowDelay;
+        private final int retries;
 
-        ChannelFactory(Mono<Connection> connectionMono) {
+        ChannelFactory(Mono<Connection> connectionMono, Duration minBorrowDelay, int retries) {
             this.connectionMono = connectionMono;
+            this.minBorrowDelay = minBorrowDelay;
+            this.retries = retries;
         }
 
         @Override
@@ -83,7 +84,7 @@ public class ReactorRabbitMQChannelPool implements ChannelPool, Startable {
             return Mono.fromCallable(connection::openChannel)
                 .map(maybeChannel ->
                     maybeChannel.orElseThrow(() -> new RuntimeException("RabbitMQ reached to maximum opened channels, cannot get more channels")))
-                .retryWhen(Retry.backoff(MAX_RETRIES, RETRY_FIRST_BACK_OFF).scheduler(Schedulers.elastic()))
+                .retryWhen(Retry.backoff(retries, minBorrowDelay).scheduler(Schedulers.elastic()))
                 .doOnError(throwable -> LOGGER.error("error when creating new channel", throwable));
         }
 
@@ -110,20 +111,24 @@ public class ReactorRabbitMQChannelPool implements ChannelPool, Startable {
     private final Mono<Connection> connectionMono;
     private final GenericObjectPool<Channel> pool;
     private final ConcurrentSkipListSet<Channel> borrowedChannels;
+    private final Duration minBorrowDelay;
+    private final int retries;
     private Sender sender;
 
     public ReactorRabbitMQChannelPool(SimpleConnectionPool simpleConnectionPool) {
-        this(simpleConnectionPool.getResilientConnection(), MAX_CHANNELS_NUMBER);
+        this(simpleConnectionPool.getResilientConnection(), MAX_CHANNELS_NUMBER, MIN_BORROW_DELAY, MAX_BORROW_RETRIES);
     }
 
-    public ReactorRabbitMQChannelPool(Mono<Connection> connectionMono, int poolSize) {
+    public ReactorRabbitMQChannelPool(Mono<Connection> connectionMono, int poolSize, Duration minBorrowDelay, int retries) {
         this.connectionMono = connectionMono;
-        ChannelFactory channelFactory = new ChannelFactory(connectionMono);
+        this.retries = retries;
+        ChannelFactory channelFactory = new ChannelFactory(connectionMono, minBorrowDelay, retries);
 
         GenericObjectPoolConfig<Channel> config = new GenericObjectPoolConfig<>();
         config.setMaxTotal(poolSize);
         this.pool = new GenericObjectPool<>(channelFactory, config);
         this.borrowedChannels = new ConcurrentSkipListSet<>(Comparator.comparingInt(System::identityHashCode));
+        this.minBorrowDelay = minBorrowDelay;
     }
 
     public void start() {
@@ -146,7 +151,7 @@ public class ReactorRabbitMQChannelPool implements ChannelPool, Startable {
     private Mono<Channel> borrow() {
         return tryBorrowFromPool()
             .doOnError(throwable -> LOGGER.warn("Cannot borrow channel", throwable))
-            .retryWhen(Retry.backoff(MAX_BORROW_RETRIES, MIN_BORROW_DELAY).scheduler(Schedulers.elastic()))
+            .retryWhen(Retry.backoff(retries, minBorrowDelay).scheduler(Schedulers.elastic()))
             .onErrorMap(this::propagateException)
             .subscribeOn(Schedulers.elastic())
             .doOnNext(borrowedChannels::add);
diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleConnectionPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleConnectionPool.java
index d408666..b6125f2 100644
--- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleConnectionPool.java
+++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleConnectionPool.java
@@ -59,6 +59,10 @@ public class SimpleConnectionPool implements AutoCloseable {
     public Mono<Connection> getResilientConnection() {
         int numRetries = 10;
         Duration initialDelay = Duration.ofMillis(100);
+        return getResilientConnection(numRetries, initialDelay);
+    }
+
+    public Mono<Connection> getResilientConnection(int numRetries, Duration initialDelay) {
         return Mono.defer(this::getOpenConnection)
             .retryWhen(Retry.backoff(numRetries, initialDelay).scheduler(Schedulers.elastic()));
     }
diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/DockerRabbitMQ.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/DockerRabbitMQ.java
index fc03b7a..f883168 100644
--- a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/DockerRabbitMQ.java
+++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/DockerRabbitMQ.java
@@ -47,12 +47,12 @@ public class DockerRabbitMQ {
     private static final Logger LOGGER = LoggerFactory.getLogger(DockerRabbitMQ.class);
 
     private static final int MAX_THREE_RETRIES = 3;
-    private static final int MIN_DELAY_OF_ONE_HUNDRED_MILLISECONDS = 100;
-    private static final int CONNECTION_TIMEOUT_OF_ONE_SECOND = 1000;
-    private static final int CHANNEL_RPC_TIMEOUT_OF_ONE_SECOND = 1000;
-    private static final int HANDSHAKE_TIMEOUT_OF_ONE_SECOND = 1000;
-    private static final int SHUTDOWN_TIMEOUT_OF_ONE_SECOND = 1000;
-    private static final int NETWORK_RECOVERY_INTERVAL_OF_ONE_SECOND = 1000;
+    private static final int MIN_DELAY_OF_TEN_MILLISECONDS = 10;
+    private static final int CONNECTION_TIMEOUT_OF_ONE_HUNDRED_MILLISECOND = 100;
+    private static final int CHANNEL_RPC_TIMEOUT_OF_ONE_HUNDRED_MILLISECOND = 100;
+    private static final int HANDSHAKE_TIMEOUT_OF_ONE_HUNDRED_MILLISECOND = 100;
+    private static final int SHUTDOWN_TIMEOUT_OF_ONE_HUNDRED_MILLISECOND = 100;
+    private static final int NETWORK_RECOVERY_INTERVAL_OF_ONE_HUNDRED_MILLISECOND = 100;
     private static final String DEFAULT_RABBIT_HOST_NAME_PREFIX = "my-rabbit";
     private static final String DEFAULT_RABBIT_NODE_NAME_PREFIX = "rabbit";
     private static final int DEFAULT_RABBITMQ_PORT = 5672;
@@ -259,12 +259,12 @@ public class DockerRabbitMQ {
             .managementUri(managementUri())
             .managementCredentials(DEFAULT_MANAGEMENT_CREDENTIAL)
             .maxRetries(MAX_THREE_RETRIES)
-            .minDelayInMs(MIN_DELAY_OF_ONE_HUNDRED_MILLISECONDS)
-            .connectionTimeoutInMs(CONNECTION_TIMEOUT_OF_ONE_SECOND)
-            .channelRpcTimeoutInMs(CHANNEL_RPC_TIMEOUT_OF_ONE_SECOND)
-            .handshakeTimeoutInMs(HANDSHAKE_TIMEOUT_OF_ONE_SECOND)
-            .shutdownTimeoutInMs(SHUTDOWN_TIMEOUT_OF_ONE_SECOND)
-            .networkRecoveryIntervalInMs(NETWORK_RECOVERY_INTERVAL_OF_ONE_SECOND)
+            .minDelayInMs(MIN_DELAY_OF_TEN_MILLISECONDS)
+            .connectionTimeoutInMs(CONNECTION_TIMEOUT_OF_ONE_HUNDRED_MILLISECOND)
+            .channelRpcTimeoutInMs(CHANNEL_RPC_TIMEOUT_OF_ONE_HUNDRED_MILLISECOND)
+            .handshakeTimeoutInMs(HANDSHAKE_TIMEOUT_OF_ONE_HUNDRED_MILLISECOND)
+            .shutdownTimeoutInMs(SHUTDOWN_TIMEOUT_OF_ONE_HUNDRED_MILLISECOND)
+            .networkRecoveryIntervalInMs(NETWORK_RECOVERY_INTERVAL_OF_ONE_HUNDRED_MILLISECOND)
             .build();
 
         return new RabbitMQConnectionFactory(rabbitMQConfiguration);
diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQExtension.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQExtension.java
index ded5dd8..871e592 100644
--- a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQExtension.java
+++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQExtension.java
@@ -21,6 +21,7 @@ package org.apache.james.backends.rabbitmq;
 import static org.apache.james.backends.rabbitmq.RabbitMQFixture.DEFAULT_MANAGEMENT_CREDENTIAL;
 
 import java.net.URISyntaxException;
+import java.time.Duration;
 import java.util.function.Consumer;
 
 import org.junit.jupiter.api.extension.AfterAllCallback;
@@ -115,7 +116,10 @@ public class RabbitMQExtension implements BeforeAllCallback, BeforeEachCallback,
 
         RabbitMQConnectionFactory connectionFactory = createRabbitConnectionFactory();
         connectionPool = new SimpleConnectionPool(connectionFactory);
-        channelPool = new ReactorRabbitMQChannelPool(connectionPool.getResilientConnection(), 5);
+        Duration minBorrowDelay = Duration.ofMillis(5);
+        int retries = 2;
+        channelPool = new ReactorRabbitMQChannelPool(connectionPool.getResilientConnection(2, Duration.ofMillis(5)), 5,
+            minBorrowDelay, retries);
         channelPool.start();
     }
 
diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPoolTest.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPoolTest.java
index f4e6d62..9ae05c1 100644
--- a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPoolTest.java
+++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPoolTest.java
@@ -66,9 +66,11 @@ class ReactorRabbitMQChannelPoolTest implements ChannelPoolContract {
     }
 
     private ReactorRabbitMQChannelPool generateChannelPool(int poolSize) {
+        Duration minBorrowDelay = Duration.ofMillis(5);
+        int retries = 2;
         ReactorRabbitMQChannelPool reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool(
             rabbitMQExtension.getConnectionPool().getResilientConnection(),
-            poolSize);
+            poolSize, minBorrowDelay, retries);
         reactorRabbitMQChannelPool.start();
         return reactorRabbitMQChannelPool;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org