You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/07/31 09:24:39 UTC

[james-project] 18/28: JAMES-3350 Extract ReactorRabbitMQChannelPool configuration

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 891234eb0a687e0b1b4cc9366f736d2976dc5af7
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue Jul 28 09:30:39 2020 +0700

    JAMES-3350 Extract ReactorRabbitMQChannelPool configuration
---
 .../rabbitmq/ReactorRabbitMQChannelPool.java       | 73 ++++++++++++++++------
 .../james/backends/rabbitmq/RabbitMQExtension.java |  9 +--
 .../rabbitmq/ReactorRabbitMQChannelPoolTest.java   |  7 ++-
 .../rabbitmq/host/RabbitMQEventBusHostSystem.java  |  3 +-
 .../james/modules/rabbitmq/RabbitMQModule.java     | 17 ++---
 .../RabbitMQEventDeadLettersIntegrationTest.java   | 12 +++-
 6 files changed, 85 insertions(+), 36 deletions(-)

diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
index 118e2fa..913d84d 100644
--- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
+++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
@@ -50,6 +50,7 @@ import reactor.rabbitmq.ReceiverOptions;
 import reactor.rabbitmq.Sender;
 import reactor.rabbitmq.SenderOptions;
 import reactor.util.retry.Retry;
+import reactor.util.retry.RetryBackoffSpec;
 
 public class ReactorRabbitMQChannelPool implements ChannelPool, Startable {
 
@@ -64,13 +65,11 @@ public class ReactorRabbitMQChannelPool implements ChannelPool, Startable {
         private static final Logger LOGGER = LoggerFactory.getLogger(ChannelFactory.class);
 
         private final Mono<Connection> connectionMono;
-        private final Duration minBorrowDelay;
-        private final int retries;
+        private final Configuration configuration;
 
-        ChannelFactory(Mono<Connection> connectionMono, Duration minBorrowDelay, int retries) {
+        ChannelFactory(Mono<Connection> connectionMono, Configuration configuration) {
             this.connectionMono = connectionMono;
-            this.minBorrowDelay = minBorrowDelay;
-            this.retries = retries;
+            this.configuration = configuration;
         }
 
         @Override
@@ -84,7 +83,7 @@ public class ReactorRabbitMQChannelPool implements ChannelPool, Startable {
             return Mono.fromCallable(connection::openChannel)
                 .map(maybeChannel ->
                     maybeChannel.orElseThrow(() -> new RuntimeException("RabbitMQ reached to maximum opened channels, cannot get more channels")))
-                .retryWhen(Retry.backoff(retries, minBorrowDelay).scheduler(Schedulers.elastic()))
+                .retryWhen(configuration.backoffSpec().scheduler(Schedulers.elastic()))
                 .doOnError(throwable -> LOGGER.error("error when creating new channel", throwable));
         }
 
@@ -102,6 +101,50 @@ public class ReactorRabbitMQChannelPool implements ChannelPool, Startable {
         }
     }
 
+    public static class Configuration {
+        @FunctionalInterface
+        public interface RequiresRetries {
+            RequiredMinBorrowDelay retries(int retries);
+        }
+
+        @FunctionalInterface
+        public interface RequiredMinBorrowDelay {
+            RequiredMaxChannel minBorrowDelay(Duration minBorrowDelay);
+        }
+
+        @FunctionalInterface
+        public interface RequiredMaxChannel {
+            Configuration maxChannel(int maxChannel);
+        }
+
+        public static final Configuration DEFAULT = builder()
+            .retries(MAX_BORROW_RETRIES)
+            .minBorrowDelay(MIN_BORROW_DELAY)
+            .maxChannel(MAX_CHANNELS_NUMBER);
+
+        public static RequiresRetries builder() {
+            return retries -> minBorrowDelay -> maxChannel -> new Configuration(minBorrowDelay, retries, maxChannel);
+        }
+
+        private final Duration minBorrowDelay;
+        private final int retries;
+        private final int maxChannel;
+
+        public Configuration(Duration minBorrowDelay, int retries, int maxChannel) {
+            this.minBorrowDelay = minBorrowDelay;
+            this.retries = retries;
+            this.maxChannel = maxChannel;
+        }
+
+        private RetryBackoffSpec backoffSpec() {
+            return Retry.backoff(retries, minBorrowDelay);
+        }
+
+        public int getMaxChannel() {
+            return maxChannel;
+        }
+    }
+
     private static final Logger LOGGER = LoggerFactory.getLogger(ReactorRabbitMQChannelPool.class);
     private static final long MAXIMUM_BORROW_TIMEOUT_IN_MS = Duration.ofSeconds(5).toMillis();
     private static final int MAX_CHANNELS_NUMBER = 3;
@@ -111,24 +154,18 @@ public class ReactorRabbitMQChannelPool implements ChannelPool, Startable {
     private final Mono<Connection> connectionMono;
     private final GenericObjectPool<Channel> pool;
     private final ConcurrentSkipListSet<Channel> borrowedChannels;
-    private final Duration minBorrowDelay;
-    private final int retries;
+    private final Configuration configuration;
     private Sender sender;
 
-    public ReactorRabbitMQChannelPool(SimpleConnectionPool simpleConnectionPool) {
-        this(simpleConnectionPool.getResilientConnection(), MAX_CHANNELS_NUMBER, MIN_BORROW_DELAY, MAX_BORROW_RETRIES);
-    }
-
-    public ReactorRabbitMQChannelPool(Mono<Connection> connectionMono, int poolSize, Duration minBorrowDelay, int retries) {
+    public ReactorRabbitMQChannelPool(Mono<Connection> connectionMono, Configuration configuration) {
         this.connectionMono = connectionMono;
-        this.retries = retries;
-        ChannelFactory channelFactory = new ChannelFactory(connectionMono, minBorrowDelay, retries);
+        this.configuration = configuration;
+        ChannelFactory channelFactory = new ChannelFactory(connectionMono, configuration);
 
         GenericObjectPoolConfig<Channel> config = new GenericObjectPoolConfig<>();
-        config.setMaxTotal(poolSize);
+        config.setMaxTotal(configuration.getMaxChannel());
         this.pool = new GenericObjectPool<>(channelFactory, config);
         this.borrowedChannels = new ConcurrentSkipListSet<>(Comparator.comparingInt(System::identityHashCode));
-        this.minBorrowDelay = minBorrowDelay;
     }
 
     public void start() {
@@ -151,7 +188,7 @@ public class ReactorRabbitMQChannelPool implements ChannelPool, Startable {
     private Mono<Channel> borrow() {
         return tryBorrowFromPool()
             .doOnError(throwable -> LOGGER.warn("Cannot borrow channel", throwable))
-            .retryWhen(Retry.backoff(retries, minBorrowDelay).scheduler(Schedulers.elastic()))
+            .retryWhen(configuration.backoffSpec().scheduler(Schedulers.elastic()))
             .onErrorMap(this::propagateException)
             .subscribeOn(Schedulers.elastic())
             .doOnNext(borrowedChannels::add);
diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQExtension.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQExtension.java
index c2a89a2..56a2f11 100644
--- a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQExtension.java
+++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQExtension.java
@@ -140,10 +140,11 @@ public class RabbitMQExtension implements BeforeAllCallback, BeforeEachCallback,
 
         RabbitMQConnectionFactory connectionFactory = createRabbitConnectionFactory();
         connectionPool = new SimpleConnectionPool(connectionFactory);
-        Duration minBorrowDelay = Duration.ofMillis(5);
-        int retries = 2;
-        channelPool = new ReactorRabbitMQChannelPool(connectionPool.getResilientConnection(2, Duration.ofMillis(5)), 5,
-            minBorrowDelay, retries);
+        channelPool = new ReactorRabbitMQChannelPool(connectionPool.getResilientConnection(2, Duration.ofMillis(5)),
+            ReactorRabbitMQChannelPool.Configuration.builder()
+                .retries(2)
+                .minBorrowDelay(Duration.ofMillis(5))
+                .maxChannel(3));
         channelPool.start();
     }
 
diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPoolTest.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPoolTest.java
index 20f83c2..0f259c3 100644
--- a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPoolTest.java
+++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPoolTest.java
@@ -67,11 +67,12 @@ class ReactorRabbitMQChannelPoolTest implements ChannelPoolContract {
     }
 
     private ReactorRabbitMQChannelPool generateChannelPool(int poolSize) {
-        Duration minBorrowDelay = Duration.ofMillis(5);
-        int retries = 2;
         ReactorRabbitMQChannelPool reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool(
             rabbitMQExtension.getConnectionPool().getResilientConnection(),
-            poolSize, minBorrowDelay, retries);
+            ReactorRabbitMQChannelPool.Configuration.builder()
+                .retries(2)
+                .minBorrowDelay(Duration.ofMillis(5))
+                .maxChannel(poolSize));
         reactorRabbitMQChannelPool.start();
         return reactorRabbitMQChannelPool;
     }
diff --git a/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java b/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java
index 08b41e9..70128f3 100644
--- a/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java
+++ b/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java
@@ -74,7 +74,8 @@ public class RabbitMQEventBusHostSystem extends JamesImapHostSystem {
         super.beforeTest();
 
         connectionPool = new SimpleConnectionPool(dockerRabbitMQ.createRabbitConnectionFactory());
-        reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool(connectionPool);
+        reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool(connectionPool.getResilientConnection(),
+            ReactorRabbitMQChannelPool.Configuration.DEFAULT);
         reactorRabbitMQChannelPool.start();
         eventBus = createEventBus();
         eventBus.start();
diff --git a/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java b/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java
index 1d223b3..f4d1c9f 100644
--- a/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java
+++ b/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java
@@ -23,7 +23,6 @@ import java.io.FileNotFoundException;
 import javax.inject.Named;
 import javax.inject.Singleton;
 
-import org.apache.commons.configuration2.Configuration;
 import org.apache.commons.configuration2.ex.ConfigurationException;
 import org.apache.james.backends.cassandra.components.CassandraModule;
 import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
@@ -89,6 +88,8 @@ public class RabbitMQModule extends AbstractModule {
         eventDTOModuleBinder.addBinding().toInstance(CassandraMailQueueViewConfigurationModule.MAIL_QUEUE_VIEW_CONFIGURATION);
 
         Multibinder.newSetBinder(binder(), HealthCheck.class).addBinding().to(RabbitMQHealthCheck.class);
+
+        bind(ReactorRabbitMQChannelPool.Configuration.class).toInstance(ReactorRabbitMQChannelPool.Configuration.DEFAULT);
     }
 
     @Provides
@@ -118,7 +119,7 @@ public class RabbitMQModule extends AbstractModule {
     @Provides
     @Named(RABBITMQ_CONFIGURATION_NAME)
     @Singleton
-    private Configuration getConfiguration(PropertiesProvider propertiesProvider) throws ConfigurationException {
+    private org.apache.commons.configuration2.Configuration getConfiguration(PropertiesProvider propertiesProvider) throws ConfigurationException {
         try {
             return propertiesProvider.getConfiguration(RABBITMQ_CONFIGURATION_NAME);
         } catch (FileNotFoundException e) {
@@ -129,26 +130,28 @@ public class RabbitMQModule extends AbstractModule {
 
     @Provides
     @Singleton
-    private RabbitMQConfiguration getMailQueueConfiguration(@Named(RABBITMQ_CONFIGURATION_NAME) Configuration configuration) {
+    private RabbitMQConfiguration getMailQueueConfiguration(@Named(RABBITMQ_CONFIGURATION_NAME) org.apache.commons.configuration2.Configuration configuration) {
         return RabbitMQConfiguration.from(configuration);
     }
 
     @Provides
     @Singleton
-    private CassandraMailQueueViewConfiguration getMailQueueViewConfiguration(@Named(RABBITMQ_CONFIGURATION_NAME) Configuration configuration) {
+    private CassandraMailQueueViewConfiguration getMailQueueViewConfiguration(@Named(RABBITMQ_CONFIGURATION_NAME) org.apache.commons.configuration2.Configuration configuration) {
         return CassandraMailQueueViewConfiguration.from(configuration);
     }
 
     @Provides
     @Singleton
-    private RabbitMQMailQueueConfiguration getMailQueueSizeConfiguration(@Named(RABBITMQ_CONFIGURATION_NAME) Configuration configuration) {
+    private RabbitMQMailQueueConfiguration getMailQueueSizeConfiguration(@Named(RABBITMQ_CONFIGURATION_NAME) org.apache.commons.configuration2.Configuration configuration) {
         return RabbitMQMailQueueConfiguration.from(configuration);
     }
 
     @Provides
     @Singleton
-    ReactorRabbitMQChannelPool provideReactorRabbitMQChannelPool(SimpleConnectionPool simpleConnectionPool) {
-        ReactorRabbitMQChannelPool channelPool = new ReactorRabbitMQChannelPool(simpleConnectionPool);
+    ReactorRabbitMQChannelPool provideReactorRabbitMQChannelPool(SimpleConnectionPool simpleConnectionPool, ReactorRabbitMQChannelPool.Configuration configuration) {
+        ReactorRabbitMQChannelPool channelPool = new ReactorRabbitMQChannelPool(
+            simpleConnectionPool.getResilientConnection(),
+            configuration);
         channelPool.start();
         return channelPool;
     }
diff --git a/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQEventDeadLettersIntegrationTest.java b/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQEventDeadLettersIntegrationTest.java
index 29ca1ab..a58794b 100644
--- a/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQEventDeadLettersIntegrationTest.java
+++ b/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQEventDeadLettersIntegrationTest.java
@@ -46,6 +46,7 @@ import org.apache.james.JamesServerBuilder;
 import org.apache.james.JamesServerExtension;
 import org.apache.james.SearchConfiguration;
 import org.apache.james.backends.rabbitmq.DockerRabbitMQ;
+import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
 import org.apache.james.core.Username;
 import org.apache.james.junit.categories.BasicFeature;
 import org.apache.james.mailbox.DefaultMailboxes;
@@ -221,9 +222,14 @@ class RabbitMQEventDeadLettersIntegrationTest {
             .overrideWith(binder -> binder.bind(RetryBackoffConfiguration.class)
                 .toInstance(RetryBackoffConfiguration.builder()
                     .maxRetries(MAX_RETRIES)
-                    .firstBackoff(java.time.Duration.ofMillis(100))
-                    .jitterFactor(0.5)
-                    .build())))
+                    .firstBackoff(java.time.Duration.ofMillis(10))
+                    .jitterFactor(0.2)
+                    .build()))
+            .overrideWith(binder -> binder.bind(ReactorRabbitMQChannelPool.Configuration.class)
+                .toInstance(ReactorRabbitMQChannelPool.Configuration.builder()
+                    .retries(2)
+                    .minBorrowDelay(java.time.Duration.ofMillis(5))
+                    .maxChannel(3))))
         .build();
 
     private static final String DOMAIN = "domain.tld";


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org