You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by rc...@apache.org on 2020/02/21 02:27:23 UTC
[james-project] 10/13: JAMES-3063 Evict closed channels on retried
borrowing
This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 4e94085192e7843290e5261ce58a6d4c9b5e91bf
Author: Tran Tien Duc <dt...@linagora.com>
AuthorDate: Mon Feb 10 18:24:12 2020 +0700
JAMES-3063 Evict closed channels on retried borrowing
---
.../rabbitmq/ReactorRabbitMQChannelPool.java | 49 ++++++++++++++++++----
.../rabbitmq/ReactorRabbitMQChannelPoolTest.java | 18 +++-----
2 files changed, 48 insertions(+), 19 deletions(-)
diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
index 5ccfbe1..2f789bb 100644
--- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
+++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
@@ -19,6 +19,7 @@
package org.apache.james.backends.rabbitmq;
+import java.io.IOException;
import java.time.Duration;
import java.util.Comparator;
import java.util.concurrent.ConcurrentSkipListSet;
@@ -37,7 +38,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.github.fge.lambdas.Throwing;
-import com.google.common.base.Preconditions;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
@@ -52,7 +52,12 @@ import reactor.rabbitmq.Sender;
import reactor.rabbitmq.SenderOptions;
public class ReactorRabbitMQChannelPool implements ChannelPool, Startable {
- private static final int MAX_CHANNELS_NUMBER = 5;
+
+ private static class ChannelClosedException extends IOException {
+ ChannelClosedException(String message) {
+ super(message);
+ }
+ }
static class ChannelFactory extends BasePooledObjectFactory<Channel> {
@@ -60,7 +65,6 @@ public class ReactorRabbitMQChannelPool implements ChannelPool, Startable {
private static final int MAX_RETRIES = 5;
private static final Duration RETRY_FIRST_BACK_OFF = Duration.ofMillis(100);
- private static final Duration FOREVER = Duration.ofMillis(Long.MAX_VALUE);
private final Mono<Connection> connectionMono;
@@ -69,7 +73,7 @@ public class ReactorRabbitMQChannelPool implements ChannelPool, Startable {
}
@Override
- public Channel create() throws Exception {
+ public Channel create() {
return connectionMono
.flatMap(this::openChannel)
.block();
@@ -97,7 +101,12 @@ public class ReactorRabbitMQChannelPool implements ChannelPool, Startable {
}
}
+ private static final Logger LOGGER = LoggerFactory.getLogger(ReactorRabbitMQChannelPool.class);
private static final long MAXIMUM_BORROW_TIMEOUT_IN_MS = Duration.ofSeconds(5).toMillis();
+ private static final int MAX_CHANNELS_NUMBER = 5;
+ private static final int MAX_BORROW_RETRIES = 3;
+ private static final Duration MIN_BORROW_DELAY = Duration.ofMillis(50);
+ private static final Duration FOREVER = Duration.ofMillis(Long.MAX_VALUE);
private final Mono<Connection> connectionMono;
private final GenericObjectPool<Channel> pool;
@@ -140,13 +149,39 @@ public class ReactorRabbitMQChannelPool implements ChannelPool, Startable {
return Mono.fromCallable(this::borrow);
}
- private Channel borrow() throws Exception {
- Channel channel = pool.borrowObject(MAXIMUM_BORROW_TIMEOUT_IN_MS);
- Preconditions.checkArgument(channel.isOpen());
+ private Channel borrow() {
+ Channel channel = tryBorrowFromPool();
borrowedChannels.add(channel);
return channel;
}
+ private Channel tryBorrowFromPool() {
+ return Mono.fromCallable(this::borrowFromPool)
+ .doOnError(throwable -> LOGGER.warn("Cannot borrow channel", throwable))
+ .retryBackoff(MAX_BORROW_RETRIES, MIN_BORROW_DELAY, FOREVER, Schedulers.elastic())
+ .onErrorMap(this::propagateException)
+ .subscribeOn(Schedulers.elastic())
+ .block();
+ }
+
+ private Throwable propagateException(Throwable throwable) {
+ if (throwable instanceof IllegalStateException
+ && throwable.getMessage().contains("Retries exhausted")) {
+ return throwable.getCause();
+ }
+
+ return throwable;
+ }
+
+ private Channel borrowFromPool() throws Exception {
+ Channel channel = pool.borrowObject(MAXIMUM_BORROW_TIMEOUT_IN_MS);
+ if (!channel.isOpen()) {
+ invalidateObject(channel);
+ throw new ChannelClosedException("borrowed channel is already closed");
+ }
+ return channel;
+ }
+
@Override
public BiConsumer<SignalType, Channel> getChannelCloseHandler() {
return (signalType, channel) -> {
diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPoolTest.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPoolTest.java
index 8927930..f4e6d62 100644
--- a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPoolTest.java
+++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPoolTest.java
@@ -20,7 +20,6 @@
package org.apache.james.backends.rabbitmq;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import java.time.Duration;
@@ -32,7 +31,6 @@ import java.util.concurrent.ExecutionException;
import org.apache.james.util.concurrency.ConcurrentTestRunner;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
@@ -110,22 +108,18 @@ class ReactorRabbitMQChannelPoolTest implements ChannelPoolContract {
assertThat(channel.isOpen()).isFalse();
}
- @Disabled("IllegalArgumentException on channel state checks")
@Test
void channelBorrowShouldNotThrowWhenClosedChannel() throws Exception {
ChannelPool channelPool = generateChannelPool(1);
Channel channel = channelPool.getChannelMono().block();
- returnChannel(channel, channelPool);
+ returnToThePool(channelPool, channel);
- // unexpected closing, connection timeout, rabbitmq temporally down...
+ // unexpected closing, connection timeout, rabbitmq temporary down...
channel.close();
- assertThatCode(() -> channelPool.getChannelMono().block())
- .doesNotThrowAnyException();
- }
-
- private void returnChannel(Channel channel, ChannelPool channelPool) {
- channelPool.getChannelCloseHandler()
- .accept(SignalType.ON_COMPLETE, channel);
+ assertThat(channelPool.getChannelMono()
+ .block()
+ .isOpen())
+ .isTrue();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org