You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/07/31 09:24:39 UTC
[james-project] 18/28: JAMES-3350 Extract
ReactorRabbitMQChannelPool configuration
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 891234eb0a687e0b1b4cc9366f736d2976dc5af7
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue Jul 28 09:30:39 2020 +0700
JAMES-3350 Extract ReactorRabbitMQChannelPool configuration
---
.../rabbitmq/ReactorRabbitMQChannelPool.java | 73 ++++++++++++++++------
.../james/backends/rabbitmq/RabbitMQExtension.java | 9 +--
.../rabbitmq/ReactorRabbitMQChannelPoolTest.java | 7 ++-
.../rabbitmq/host/RabbitMQEventBusHostSystem.java | 3 +-
.../james/modules/rabbitmq/RabbitMQModule.java | 17 ++---
.../RabbitMQEventDeadLettersIntegrationTest.java | 12 +++-
6 files changed, 85 insertions(+), 36 deletions(-)
diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
index 118e2fa..913d84d 100644
--- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
+++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
@@ -50,6 +50,7 @@ import reactor.rabbitmq.ReceiverOptions;
import reactor.rabbitmq.Sender;
import reactor.rabbitmq.SenderOptions;
import reactor.util.retry.Retry;
+import reactor.util.retry.RetryBackoffSpec;
public class ReactorRabbitMQChannelPool implements ChannelPool, Startable {
@@ -64,13 +65,11 @@ public class ReactorRabbitMQChannelPool implements ChannelPool, Startable {
private static final Logger LOGGER = LoggerFactory.getLogger(ChannelFactory.class);
private final Mono<Connection> connectionMono;
- private final Duration minBorrowDelay;
- private final int retries;
+ private final Configuration configuration;
- ChannelFactory(Mono<Connection> connectionMono, Duration minBorrowDelay, int retries) {
+ ChannelFactory(Mono<Connection> connectionMono, Configuration configuration) {
this.connectionMono = connectionMono;
- this.minBorrowDelay = minBorrowDelay;
- this.retries = retries;
+ this.configuration = configuration;
}
@Override
@@ -84,7 +83,7 @@ public class ReactorRabbitMQChannelPool implements ChannelPool, Startable {
return Mono.fromCallable(connection::openChannel)
.map(maybeChannel ->
maybeChannel.orElseThrow(() -> new RuntimeException("RabbitMQ reached to maximum opened channels, cannot get more channels")))
- .retryWhen(Retry.backoff(retries, minBorrowDelay).scheduler(Schedulers.elastic()))
+ .retryWhen(configuration.backoffSpec().scheduler(Schedulers.elastic()))
.doOnError(throwable -> LOGGER.error("error when creating new channel", throwable));
}
@@ -102,6 +101,50 @@ public class ReactorRabbitMQChannelPool implements ChannelPool, Startable {
}
}
+ public static class Configuration {
+ @FunctionalInterface
+ public interface RequiresRetries {
+ RequiredMinBorrowDelay retries(int retries);
+ }
+
+ @FunctionalInterface
+ public interface RequiredMinBorrowDelay {
+ RequiredMaxChannel minBorrowDelay(Duration minBorrowDelay);
+ }
+
+ @FunctionalInterface
+ public interface RequiredMaxChannel {
+ Configuration maxChannel(int maxChannel);
+ }
+
+ public static final Configuration DEFAULT = builder()
+ .retries(MAX_BORROW_RETRIES)
+ .minBorrowDelay(MIN_BORROW_DELAY)
+ .maxChannel(MAX_CHANNELS_NUMBER);
+
+ public static RequiresRetries builder() {
+ return retries -> minBorrowDelay -> maxChannel -> new Configuration(minBorrowDelay, retries, maxChannel);
+ }
+
+ private final Duration minBorrowDelay;
+ private final int retries;
+ private final int maxChannel;
+
+ public Configuration(Duration minBorrowDelay, int retries, int maxChannel) {
+ this.minBorrowDelay = minBorrowDelay;
+ this.retries = retries;
+ this.maxChannel = maxChannel;
+ }
+
+ private RetryBackoffSpec backoffSpec() {
+ return Retry.backoff(retries, minBorrowDelay);
+ }
+
+ public int getMaxChannel() {
+ return maxChannel;
+ }
+ }
+
private static final Logger LOGGER = LoggerFactory.getLogger(ReactorRabbitMQChannelPool.class);
private static final long MAXIMUM_BORROW_TIMEOUT_IN_MS = Duration.ofSeconds(5).toMillis();
private static final int MAX_CHANNELS_NUMBER = 3;
@@ -111,24 +154,18 @@ public class ReactorRabbitMQChannelPool implements ChannelPool, Startable {
private final Mono<Connection> connectionMono;
private final GenericObjectPool<Channel> pool;
private final ConcurrentSkipListSet<Channel> borrowedChannels;
- private final Duration minBorrowDelay;
- private final int retries;
+ private final Configuration configuration;
private Sender sender;
- public ReactorRabbitMQChannelPool(SimpleConnectionPool simpleConnectionPool) {
- this(simpleConnectionPool.getResilientConnection(), MAX_CHANNELS_NUMBER, MIN_BORROW_DELAY, MAX_BORROW_RETRIES);
- }
-
- public ReactorRabbitMQChannelPool(Mono<Connection> connectionMono, int poolSize, Duration minBorrowDelay, int retries) {
+ public ReactorRabbitMQChannelPool(Mono<Connection> connectionMono, Configuration configuration) {
this.connectionMono = connectionMono;
- this.retries = retries;
- ChannelFactory channelFactory = new ChannelFactory(connectionMono, minBorrowDelay, retries);
+ this.configuration = configuration;
+ ChannelFactory channelFactory = new ChannelFactory(connectionMono, configuration);
GenericObjectPoolConfig<Channel> config = new GenericObjectPoolConfig<>();
- config.setMaxTotal(poolSize);
+ config.setMaxTotal(configuration.getMaxChannel());
this.pool = new GenericObjectPool<>(channelFactory, config);
this.borrowedChannels = new ConcurrentSkipListSet<>(Comparator.comparingInt(System::identityHashCode));
- this.minBorrowDelay = minBorrowDelay;
}
public void start() {
@@ -151,7 +188,7 @@ public class ReactorRabbitMQChannelPool implements ChannelPool, Startable {
private Mono<Channel> borrow() {
return tryBorrowFromPool()
.doOnError(throwable -> LOGGER.warn("Cannot borrow channel", throwable))
- .retryWhen(Retry.backoff(retries, minBorrowDelay).scheduler(Schedulers.elastic()))
+ .retryWhen(configuration.backoffSpec().scheduler(Schedulers.elastic()))
.onErrorMap(this::propagateException)
.subscribeOn(Schedulers.elastic())
.doOnNext(borrowedChannels::add);
diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQExtension.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQExtension.java
index c2a89a2..56a2f11 100644
--- a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQExtension.java
+++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQExtension.java
@@ -140,10 +140,11 @@ public class RabbitMQExtension implements BeforeAllCallback, BeforeEachCallback,
RabbitMQConnectionFactory connectionFactory = createRabbitConnectionFactory();
connectionPool = new SimpleConnectionPool(connectionFactory);
- Duration minBorrowDelay = Duration.ofMillis(5);
- int retries = 2;
- channelPool = new ReactorRabbitMQChannelPool(connectionPool.getResilientConnection(2, Duration.ofMillis(5)), 5,
- minBorrowDelay, retries);
+ channelPool = new ReactorRabbitMQChannelPool(connectionPool.getResilientConnection(2, Duration.ofMillis(5)),
+ ReactorRabbitMQChannelPool.Configuration.builder()
+ .retries(2)
+ .minBorrowDelay(Duration.ofMillis(5))
+ .maxChannel(3));
channelPool.start();
}
diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPoolTest.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPoolTest.java
index 20f83c2..0f259c3 100644
--- a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPoolTest.java
+++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPoolTest.java
@@ -67,11 +67,12 @@ class ReactorRabbitMQChannelPoolTest implements ChannelPoolContract {
}
private ReactorRabbitMQChannelPool generateChannelPool(int poolSize) {
- Duration minBorrowDelay = Duration.ofMillis(5);
- int retries = 2;
ReactorRabbitMQChannelPool reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool(
rabbitMQExtension.getConnectionPool().getResilientConnection(),
- poolSize, minBorrowDelay, retries);
+ ReactorRabbitMQChannelPool.Configuration.builder()
+ .retries(2)
+ .minBorrowDelay(Duration.ofMillis(5))
+ .maxChannel(poolSize));
reactorRabbitMQChannelPool.start();
return reactorRabbitMQChannelPool;
}
diff --git a/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java b/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java
index 08b41e9..70128f3 100644
--- a/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java
+++ b/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java
@@ -74,7 +74,8 @@ public class RabbitMQEventBusHostSystem extends JamesImapHostSystem {
super.beforeTest();
connectionPool = new SimpleConnectionPool(dockerRabbitMQ.createRabbitConnectionFactory());
- reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool(connectionPool);
+ reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool(connectionPool.getResilientConnection(),
+ ReactorRabbitMQChannelPool.Configuration.DEFAULT);
reactorRabbitMQChannelPool.start();
eventBus = createEventBus();
eventBus.start();
diff --git a/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java b/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java
index 1d223b3..f4d1c9f 100644
--- a/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java
+++ b/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java
@@ -23,7 +23,6 @@ import java.io.FileNotFoundException;
import javax.inject.Named;
import javax.inject.Singleton;
-import org.apache.commons.configuration2.Configuration;
import org.apache.commons.configuration2.ex.ConfigurationException;
import org.apache.james.backends.cassandra.components.CassandraModule;
import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
@@ -89,6 +88,8 @@ public class RabbitMQModule extends AbstractModule {
eventDTOModuleBinder.addBinding().toInstance(CassandraMailQueueViewConfigurationModule.MAIL_QUEUE_VIEW_CONFIGURATION);
Multibinder.newSetBinder(binder(), HealthCheck.class).addBinding().to(RabbitMQHealthCheck.class);
+
+ bind(ReactorRabbitMQChannelPool.Configuration.class).toInstance(ReactorRabbitMQChannelPool.Configuration.DEFAULT);
}
@Provides
@@ -118,7 +119,7 @@ public class RabbitMQModule extends AbstractModule {
@Provides
@Named(RABBITMQ_CONFIGURATION_NAME)
@Singleton
- private Configuration getConfiguration(PropertiesProvider propertiesProvider) throws ConfigurationException {
+ private org.apache.commons.configuration2.Configuration getConfiguration(PropertiesProvider propertiesProvider) throws ConfigurationException {
try {
return propertiesProvider.getConfiguration(RABBITMQ_CONFIGURATION_NAME);
} catch (FileNotFoundException e) {
@@ -129,26 +130,28 @@ public class RabbitMQModule extends AbstractModule {
@Provides
@Singleton
- private RabbitMQConfiguration getMailQueueConfiguration(@Named(RABBITMQ_CONFIGURATION_NAME) Configuration configuration) {
+ private RabbitMQConfiguration getMailQueueConfiguration(@Named(RABBITMQ_CONFIGURATION_NAME) org.apache.commons.configuration2.Configuration configuration) {
return RabbitMQConfiguration.from(configuration);
}
@Provides
@Singleton
- private CassandraMailQueueViewConfiguration getMailQueueViewConfiguration(@Named(RABBITMQ_CONFIGURATION_NAME) Configuration configuration) {
+ private CassandraMailQueueViewConfiguration getMailQueueViewConfiguration(@Named(RABBITMQ_CONFIGURATION_NAME) org.apache.commons.configuration2.Configuration configuration) {
return CassandraMailQueueViewConfiguration.from(configuration);
}
@Provides
@Singleton
- private RabbitMQMailQueueConfiguration getMailQueueSizeConfiguration(@Named(RABBITMQ_CONFIGURATION_NAME) Configuration configuration) {
+ private RabbitMQMailQueueConfiguration getMailQueueSizeConfiguration(@Named(RABBITMQ_CONFIGURATION_NAME) org.apache.commons.configuration2.Configuration configuration) {
return RabbitMQMailQueueConfiguration.from(configuration);
}
@Provides
@Singleton
- ReactorRabbitMQChannelPool provideReactorRabbitMQChannelPool(SimpleConnectionPool simpleConnectionPool) {
- ReactorRabbitMQChannelPool channelPool = new ReactorRabbitMQChannelPool(simpleConnectionPool);
+ ReactorRabbitMQChannelPool provideReactorRabbitMQChannelPool(SimpleConnectionPool simpleConnectionPool, ReactorRabbitMQChannelPool.Configuration configuration) {
+ ReactorRabbitMQChannelPool channelPool = new ReactorRabbitMQChannelPool(
+ simpleConnectionPool.getResilientConnection(),
+ configuration);
channelPool.start();
return channelPool;
}
diff --git a/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQEventDeadLettersIntegrationTest.java b/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQEventDeadLettersIntegrationTest.java
index 29ca1ab..a58794b 100644
--- a/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQEventDeadLettersIntegrationTest.java
+++ b/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQEventDeadLettersIntegrationTest.java
@@ -46,6 +46,7 @@ import org.apache.james.JamesServerBuilder;
import org.apache.james.JamesServerExtension;
import org.apache.james.SearchConfiguration;
import org.apache.james.backends.rabbitmq.DockerRabbitMQ;
+import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
import org.apache.james.core.Username;
import org.apache.james.junit.categories.BasicFeature;
import org.apache.james.mailbox.DefaultMailboxes;
@@ -221,9 +222,14 @@ class RabbitMQEventDeadLettersIntegrationTest {
.overrideWith(binder -> binder.bind(RetryBackoffConfiguration.class)
.toInstance(RetryBackoffConfiguration.builder()
.maxRetries(MAX_RETRIES)
- .firstBackoff(java.time.Duration.ofMillis(100))
- .jitterFactor(0.5)
- .build())))
+ .firstBackoff(java.time.Duration.ofMillis(10))
+ .jitterFactor(0.2)
+ .build()))
+ .overrideWith(binder -> binder.bind(ReactorRabbitMQChannelPool.Configuration.class)
+ .toInstance(ReactorRabbitMQChannelPool.Configuration.builder()
+ .retries(2)
+ .minBorrowDelay(java.time.Duration.ofMillis(5))
+ .maxChannel(3))))
.build();
private static final String DOMAIN = "domain.tld";
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org