You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/07/31 09:24:33 UTC
[james-project] 12/28: JAMES-3350 Fasten delays & timeout upon
RabbitMQ checks
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 9ec4dc78f97626af1b7d645da1a7d971bd947c4a
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Mon Jul 27 13:15:51 2020 +0700
JAMES-3350 Fasten delays & timeout upon RabbitMQ checks
---
.../rabbitmq/ReactorRabbitMQChannelPool.java | 23 +++++++++++++--------
.../backends/rabbitmq/SimpleConnectionPool.java | 4 ++++
.../james/backends/rabbitmq/DockerRabbitMQ.java | 24 +++++++++++-----------
.../james/backends/rabbitmq/RabbitMQExtension.java | 6 +++++-
.../rabbitmq/ReactorRabbitMQChannelPoolTest.java | 4 +++-
5 files changed, 38 insertions(+), 23 deletions(-)
diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
index f4703b4..118e2fa 100644
--- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
+++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
@@ -63,13 +63,14 @@ public class ReactorRabbitMQChannelPool implements ChannelPool, Startable {
private static final Logger LOGGER = LoggerFactory.getLogger(ChannelFactory.class);
- private static final int MAX_RETRIES = 5;
- private static final Duration RETRY_FIRST_BACK_OFF = Duration.ofMillis(100);
-
private final Mono<Connection> connectionMono;
+ private final Duration minBorrowDelay;
+ private final int retries;
- ChannelFactory(Mono<Connection> connectionMono) {
+ ChannelFactory(Mono<Connection> connectionMono, Duration minBorrowDelay, int retries) {
this.connectionMono = connectionMono;
+ this.minBorrowDelay = minBorrowDelay;
+ this.retries = retries;
}
@Override
@@ -83,7 +84,7 @@ public class ReactorRabbitMQChannelPool implements ChannelPool, Startable {
return Mono.fromCallable(connection::openChannel)
.map(maybeChannel ->
maybeChannel.orElseThrow(() -> new RuntimeException("RabbitMQ reached to maximum opened channels, cannot get more channels")))
- .retryWhen(Retry.backoff(MAX_RETRIES, RETRY_FIRST_BACK_OFF).scheduler(Schedulers.elastic()))
+ .retryWhen(Retry.backoff(retries, minBorrowDelay).scheduler(Schedulers.elastic()))
.doOnError(throwable -> LOGGER.error("error when creating new channel", throwable));
}
@@ -110,20 +111,24 @@ public class ReactorRabbitMQChannelPool implements ChannelPool, Startable {
private final Mono<Connection> connectionMono;
private final GenericObjectPool<Channel> pool;
private final ConcurrentSkipListSet<Channel> borrowedChannels;
+ private final Duration minBorrowDelay;
+ private final int retries;
private Sender sender;
public ReactorRabbitMQChannelPool(SimpleConnectionPool simpleConnectionPool) {
- this(simpleConnectionPool.getResilientConnection(), MAX_CHANNELS_NUMBER);
+ this(simpleConnectionPool.getResilientConnection(), MAX_CHANNELS_NUMBER, MIN_BORROW_DELAY, MAX_BORROW_RETRIES);
}
- public ReactorRabbitMQChannelPool(Mono<Connection> connectionMono, int poolSize) {
+ public ReactorRabbitMQChannelPool(Mono<Connection> connectionMono, int poolSize, Duration minBorrowDelay, int retries) {
this.connectionMono = connectionMono;
- ChannelFactory channelFactory = new ChannelFactory(connectionMono);
+ this.retries = retries;
+ ChannelFactory channelFactory = new ChannelFactory(connectionMono, minBorrowDelay, retries);
GenericObjectPoolConfig<Channel> config = new GenericObjectPoolConfig<>();
config.setMaxTotal(poolSize);
this.pool = new GenericObjectPool<>(channelFactory, config);
this.borrowedChannels = new ConcurrentSkipListSet<>(Comparator.comparingInt(System::identityHashCode));
+ this.minBorrowDelay = minBorrowDelay;
}
public void start() {
@@ -146,7 +151,7 @@ public class ReactorRabbitMQChannelPool implements ChannelPool, Startable {
private Mono<Channel> borrow() {
return tryBorrowFromPool()
.doOnError(throwable -> LOGGER.warn("Cannot borrow channel", throwable))
- .retryWhen(Retry.backoff(MAX_BORROW_RETRIES, MIN_BORROW_DELAY).scheduler(Schedulers.elastic()))
+ .retryWhen(Retry.backoff(retries, minBorrowDelay).scheduler(Schedulers.elastic()))
.onErrorMap(this::propagateException)
.subscribeOn(Schedulers.elastic())
.doOnNext(borrowedChannels::add);
diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleConnectionPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleConnectionPool.java
index d408666..b6125f2 100644
--- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleConnectionPool.java
+++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleConnectionPool.java
@@ -59,6 +59,10 @@ public class SimpleConnectionPool implements AutoCloseable {
public Mono<Connection> getResilientConnection() {
int numRetries = 10;
Duration initialDelay = Duration.ofMillis(100);
+ return getResilientConnection(numRetries, initialDelay);
+ }
+
+ public Mono<Connection> getResilientConnection(int numRetries, Duration initialDelay) {
return Mono.defer(this::getOpenConnection)
.retryWhen(Retry.backoff(numRetries, initialDelay).scheduler(Schedulers.elastic()));
}
diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/DockerRabbitMQ.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/DockerRabbitMQ.java
index fc03b7a..f883168 100644
--- a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/DockerRabbitMQ.java
+++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/DockerRabbitMQ.java
@@ -47,12 +47,12 @@ public class DockerRabbitMQ {
private static final Logger LOGGER = LoggerFactory.getLogger(DockerRabbitMQ.class);
private static final int MAX_THREE_RETRIES = 3;
- private static final int MIN_DELAY_OF_ONE_HUNDRED_MILLISECONDS = 100;
- private static final int CONNECTION_TIMEOUT_OF_ONE_SECOND = 1000;
- private static final int CHANNEL_RPC_TIMEOUT_OF_ONE_SECOND = 1000;
- private static final int HANDSHAKE_TIMEOUT_OF_ONE_SECOND = 1000;
- private static final int SHUTDOWN_TIMEOUT_OF_ONE_SECOND = 1000;
- private static final int NETWORK_RECOVERY_INTERVAL_OF_ONE_SECOND = 1000;
+ private static final int MIN_DELAY_OF_TEN_MILLISECONDS = 10;
+ private static final int CONNECTION_TIMEOUT_OF_ONE_HUNDRED_MILLISECOND = 100;
+ private static final int CHANNEL_RPC_TIMEOUT_OF_ONE_HUNDRED_MILLISECOND = 100;
+ private static final int HANDSHAKE_TIMEOUT_OF_ONE_HUNDRED_MILLISECOND = 100;
+ private static final int SHUTDOWN_TIMEOUT_OF_ONE_HUNDRED_MILLISECOND = 100;
+ private static final int NETWORK_RECOVERY_INTERVAL_OF_ONE_HUNDRED_MILLISECOND = 100;
private static final String DEFAULT_RABBIT_HOST_NAME_PREFIX = "my-rabbit";
private static final String DEFAULT_RABBIT_NODE_NAME_PREFIX = "rabbit";
private static final int DEFAULT_RABBITMQ_PORT = 5672;
@@ -259,12 +259,12 @@ public class DockerRabbitMQ {
.managementUri(managementUri())
.managementCredentials(DEFAULT_MANAGEMENT_CREDENTIAL)
.maxRetries(MAX_THREE_RETRIES)
- .minDelayInMs(MIN_DELAY_OF_ONE_HUNDRED_MILLISECONDS)
- .connectionTimeoutInMs(CONNECTION_TIMEOUT_OF_ONE_SECOND)
- .channelRpcTimeoutInMs(CHANNEL_RPC_TIMEOUT_OF_ONE_SECOND)
- .handshakeTimeoutInMs(HANDSHAKE_TIMEOUT_OF_ONE_SECOND)
- .shutdownTimeoutInMs(SHUTDOWN_TIMEOUT_OF_ONE_SECOND)
- .networkRecoveryIntervalInMs(NETWORK_RECOVERY_INTERVAL_OF_ONE_SECOND)
+ .minDelayInMs(MIN_DELAY_OF_TEN_MILLISECONDS)
+ .connectionTimeoutInMs(CONNECTION_TIMEOUT_OF_ONE_HUNDRED_MILLISECOND)
+ .channelRpcTimeoutInMs(CHANNEL_RPC_TIMEOUT_OF_ONE_HUNDRED_MILLISECOND)
+ .handshakeTimeoutInMs(HANDSHAKE_TIMEOUT_OF_ONE_HUNDRED_MILLISECOND)
+ .shutdownTimeoutInMs(SHUTDOWN_TIMEOUT_OF_ONE_HUNDRED_MILLISECOND)
+ .networkRecoveryIntervalInMs(NETWORK_RECOVERY_INTERVAL_OF_ONE_HUNDRED_MILLISECOND)
.build();
return new RabbitMQConnectionFactory(rabbitMQConfiguration);
diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQExtension.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQExtension.java
index ded5dd8..871e592 100644
--- a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQExtension.java
+++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQExtension.java
@@ -21,6 +21,7 @@ package org.apache.james.backends.rabbitmq;
import static org.apache.james.backends.rabbitmq.RabbitMQFixture.DEFAULT_MANAGEMENT_CREDENTIAL;
import java.net.URISyntaxException;
+import java.time.Duration;
import java.util.function.Consumer;
import org.junit.jupiter.api.extension.AfterAllCallback;
@@ -115,7 +116,10 @@ public class RabbitMQExtension implements BeforeAllCallback, BeforeEachCallback,
RabbitMQConnectionFactory connectionFactory = createRabbitConnectionFactory();
connectionPool = new SimpleConnectionPool(connectionFactory);
- channelPool = new ReactorRabbitMQChannelPool(connectionPool.getResilientConnection(), 5);
+ Duration minBorrowDelay = Duration.ofMillis(5);
+ int retries = 2;
+ channelPool = new ReactorRabbitMQChannelPool(connectionPool.getResilientConnection(2, Duration.ofMillis(5)), 5,
+ minBorrowDelay, retries);
channelPool.start();
}
diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPoolTest.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPoolTest.java
index f4e6d62..9ae05c1 100644
--- a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPoolTest.java
+++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPoolTest.java
@@ -66,9 +66,11 @@ class ReactorRabbitMQChannelPoolTest implements ChannelPoolContract {
}
private ReactorRabbitMQChannelPool generateChannelPool(int poolSize) {
+ Duration minBorrowDelay = Duration.ofMillis(5);
+ int retries = 2;
ReactorRabbitMQChannelPool reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool(
rabbitMQExtension.getConnectionPool().getResilientConnection(),
- poolSize);
+ poolSize, minBorrowDelay, retries);
reactorRabbitMQChannelPool.start();
return reactorRabbitMQChannelPool;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org