You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by rc...@apache.org on 2020/02/21 02:27:23 UTC

[james-project] 10/13: JAMES-3063 Evict closed channels on retried borrowing

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 4e94085192e7843290e5261ce58a6d4c9b5e91bf
Author: Tran Tien Duc <dt...@linagora.com>
AuthorDate: Mon Feb 10 18:24:12 2020 +0700

    JAMES-3063 Evict closed channels on retried borrowing
---
 .../rabbitmq/ReactorRabbitMQChannelPool.java       | 49 ++++++++++++++++++----
 .../rabbitmq/ReactorRabbitMQChannelPoolTest.java   | 18 +++-----
 2 files changed, 48 insertions(+), 19 deletions(-)

diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
index 5ccfbe1..2f789bb 100644
--- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
+++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
@@ -19,6 +19,7 @@
 
 package org.apache.james.backends.rabbitmq;
 
+import java.io.IOException;
 import java.time.Duration;
 import java.util.Comparator;
 import java.util.concurrent.ConcurrentSkipListSet;
@@ -37,7 +38,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.github.fge.lambdas.Throwing;
-import com.google.common.base.Preconditions;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 
@@ -52,7 +52,12 @@ import reactor.rabbitmq.Sender;
 import reactor.rabbitmq.SenderOptions;
 
 public class ReactorRabbitMQChannelPool implements ChannelPool, Startable {
-    private static final int MAX_CHANNELS_NUMBER = 5;
+
+    private static class ChannelClosedException extends IOException {
+        ChannelClosedException(String message) {
+            super(message);
+        }
+    }
 
     static class ChannelFactory extends BasePooledObjectFactory<Channel> {
 
@@ -60,7 +65,6 @@ public class ReactorRabbitMQChannelPool implements ChannelPool, Startable {
 
         private static final int MAX_RETRIES = 5;
         private static final Duration RETRY_FIRST_BACK_OFF = Duration.ofMillis(100);
-        private static final Duration FOREVER = Duration.ofMillis(Long.MAX_VALUE);
 
         private final Mono<Connection> connectionMono;
 
@@ -69,7 +73,7 @@ public class ReactorRabbitMQChannelPool implements ChannelPool, Startable {
         }
 
         @Override
-        public Channel create() throws Exception {
+        public Channel create() {
             return connectionMono
                 .flatMap(this::openChannel)
                 .block();
@@ -97,7 +101,12 @@ public class ReactorRabbitMQChannelPool implements ChannelPool, Startable {
         }
     }
 
+    private static final Logger LOGGER = LoggerFactory.getLogger(ReactorRabbitMQChannelPool.class);
     private static final long MAXIMUM_BORROW_TIMEOUT_IN_MS = Duration.ofSeconds(5).toMillis();
+    private static final int MAX_CHANNELS_NUMBER = 5;
+    private static final int MAX_BORROW_RETRIES = 3;
+    private static final Duration MIN_BORROW_DELAY = Duration.ofMillis(50);
+    private static final Duration FOREVER = Duration.ofMillis(Long.MAX_VALUE);
 
     private final Mono<Connection> connectionMono;
     private final GenericObjectPool<Channel> pool;
@@ -140,13 +149,39 @@ public class ReactorRabbitMQChannelPool implements ChannelPool, Startable {
         return Mono.fromCallable(this::borrow);
     }
 
-    private Channel borrow() throws Exception {
-        Channel channel = pool.borrowObject(MAXIMUM_BORROW_TIMEOUT_IN_MS);
-        Preconditions.checkArgument(channel.isOpen());
+    private Channel borrow() {
+        Channel channel = tryBorrowFromPool();
         borrowedChannels.add(channel);
         return channel;
     }
 
+    private Channel tryBorrowFromPool() {
+        return Mono.fromCallable(this::borrowFromPool)
+            .doOnError(throwable -> LOGGER.warn("Cannot borrow channel", throwable))
+            .retryBackoff(MAX_BORROW_RETRIES, MIN_BORROW_DELAY, FOREVER, Schedulers.elastic())
+            .onErrorMap(this::propagateException)
+            .subscribeOn(Schedulers.elastic())
+            .block();
+    }
+
+    private Throwable propagateException(Throwable throwable) {
+        if (throwable instanceof IllegalStateException
+            && throwable.getMessage().contains("Retries exhausted")) {
+            return throwable.getCause();
+        }
+
+        return throwable;
+    }
+
+    private Channel borrowFromPool() throws Exception {
+        Channel channel = pool.borrowObject(MAXIMUM_BORROW_TIMEOUT_IN_MS);
+        if (!channel.isOpen()) {
+            invalidateObject(channel);
+            throw new ChannelClosedException("borrowed channel is already closed");
+        }
+        return channel;
+    }
+
     @Override
     public BiConsumer<SignalType, Channel> getChannelCloseHandler() {
         return (signalType, channel) -> {
diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPoolTest.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPoolTest.java
index 8927930..f4e6d62 100644
--- a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPoolTest.java
+++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPoolTest.java
@@ -20,7 +20,6 @@
 package org.apache.james.backends.rabbitmq;
 
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 import java.time.Duration;
@@ -32,7 +31,6 @@ import java.util.concurrent.ExecutionException;
 import org.apache.james.util.concurrency.ConcurrentTestRunner;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
@@ -110,22 +108,18 @@ class ReactorRabbitMQChannelPoolTest implements ChannelPoolContract {
         assertThat(channel.isOpen()).isFalse();
     }
 
-    @Disabled("IllegalArgumentException on channel state checks")
     @Test
     void channelBorrowShouldNotThrowWhenClosedChannel() throws Exception {
         ChannelPool channelPool = generateChannelPool(1);
         Channel channel = channelPool.getChannelMono().block();
-        returnChannel(channel, channelPool);
+        returnToThePool(channelPool, channel);
 
-        // unexpected closing, connection timeout, rabbitmq temporally down...
+        // unexpected closing, connection timeout, rabbitmq temporary down...
         channel.close();
 
-        assertThatCode(() -> channelPool.getChannelMono().block())
-            .doesNotThrowAnyException();
-    }
-
-    private void returnChannel(Channel channel, ChannelPool channelPool) {
-        channelPool.getChannelCloseHandler()
-            .accept(SignalType.ON_COMPLETE, channel);
+        assertThat(channelPool.getChannelMono()
+                .block()
+                .isOpen())
+            .isTrue();
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org