You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/04/10 06:41:44 UTC
[james-project] 02/02: JAMES-2774 Avoid nested block in
ReactorRabbitMQChannelPool
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 4d0da670f68c45776477d90a2617074b9c13adad
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed Apr 8 22:00:52 2020 +0700
JAMES-2774 Avoid nested block in ReactorRabbitMQChannelPool
---
.../rabbitmq/ReactorRabbitMQChannelPool.java | 24 ++++++++--------------
1 file changed, 9 insertions(+), 15 deletions(-)
diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
index db3e900..0459abe 100644
--- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
+++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
@@ -138,28 +138,22 @@ public class ReactorRabbitMQChannelPool implements ChannelPool, Startable {
return RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(connectionMono));
}
- public Mono<Connection> getConnectionMono() {
- return connectionMono;
- }
-
@Override
public Mono<? extends Channel> getChannelMono() {
- return Mono.fromCallable(this::borrow);
+ return borrow();
}
- private Channel borrow() {
- Channel channel = tryBorrowFromPool();
- borrowedChannels.add(channel);
- return channel;
- }
-
- private Channel tryBorrowFromPool() {
- return Mono.fromCallable(this::borrowFromPool)
+ private Mono<Channel> borrow() {
+ return tryBorrowFromPool()
.doOnError(throwable -> LOGGER.warn("Cannot borrow channel", throwable))
.retryBackoff(MAX_BORROW_RETRIES, MIN_BORROW_DELAY, FOREVER, Schedulers.elastic())
.onErrorMap(this::propagateException)
.subscribeOn(Schedulers.elastic())
- .block();
+ .doOnNext(borrowedChannels::add);
+ }
+
+ private Mono<Channel> tryBorrowFromPool() {
+ return Mono.fromCallable(this::borrowFromPool);
}
private Throwable propagateException(Throwable throwable) {
@@ -223,7 +217,7 @@ public class ReactorRabbitMQChannelPool implements ChannelPool, Startable {
public boolean tryChannel() {
Channel channel = null;
try {
- channel = borrow();
+ channel = borrow().block();
return channel.isOpen();
} catch (Throwable t) {
return false;
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org