You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/04/10 06:41:44 UTC

[james-project] 02/02: JAMES-2774 Avoid nested block in ReactorRabbitMQChannelPool

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 4d0da670f68c45776477d90a2617074b9c13adad
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed Apr 8 22:00:52 2020 +0700

    JAMES-2774 Avoid nested block in ReactorRabbitMQChannelPool
---
 .../rabbitmq/ReactorRabbitMQChannelPool.java       | 24 ++++++++--------------
 1 file changed, 9 insertions(+), 15 deletions(-)

diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
index db3e900..0459abe 100644
--- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
+++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
@@ -138,28 +138,22 @@ public class ReactorRabbitMQChannelPool implements ChannelPool, Startable {
         return RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(connectionMono));
     }
 
-    public Mono<Connection> getConnectionMono() {
-        return connectionMono;
-    }
-
     @Override
     public Mono<? extends Channel> getChannelMono() {
-        return Mono.fromCallable(this::borrow);
+        return borrow();
     }
 
-    private Channel borrow() {
-        Channel channel = tryBorrowFromPool();
-        borrowedChannels.add(channel);
-        return channel;
-    }
-
-    private Channel tryBorrowFromPool() {
-        return Mono.fromCallable(this::borrowFromPool)
+    private Mono<Channel> borrow() {
+        return tryBorrowFromPool()
             .doOnError(throwable -> LOGGER.warn("Cannot borrow channel", throwable))
             .retryBackoff(MAX_BORROW_RETRIES, MIN_BORROW_DELAY, FOREVER, Schedulers.elastic())
             .onErrorMap(this::propagateException)
             .subscribeOn(Schedulers.elastic())
-            .block();
+            .doOnNext(borrowedChannels::add);
+    }
+
+    private Mono<Channel> tryBorrowFromPool() {
+        return Mono.fromCallable(this::borrowFromPool);
     }
 
     private Throwable propagateException(Throwable throwable) {
@@ -223,7 +217,7 @@ public class ReactorRabbitMQChannelPool implements ChannelPool, Startable {
     public boolean tryChannel() {
         Channel channel = null;
         try {
-            channel = borrow();
+            channel = borrow().block();
             return channel.isOpen();
         } catch (Throwable t) {
             return false;


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org