You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/01/25 03:55:20 UTC

[04/10] james-project git commit: MAILBOX-374 Our connection mono should cache the connection as long as it is open

MAILBOX-374 Our connection mono should cache the connection as long as it is open

We should regenerate the connection mono when the connection had been close,
for instance due to error.

A custom callable had been written as I miss a `Mono<T>.cacheWhen(Predicate<T>)`


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/edf85300
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/edf85300
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/edf85300

Branch: refs/heads/master
Commit: edf85300021eafd1ed1ef944d291c18bcfd9d996
Parents: 603de62
Author: Benoit Tellier <bt...@linagora.com>
Authored: Thu Jan 24 14:13:56 2019 +0700
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Fri Jan 25 10:33:25 2019 +0700

----------------------------------------------------------------------
 .../rabbitmq/RabbitMQConnectionFactory.java     | 32 ++++++++++++++++----
 .../james/mailbox/events/RabbitMQEventBus.java  |  6 ++--
 2 files changed, 30 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/edf85300/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactory.java
----------------------------------------------------------------------
diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactory.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactory.java
index e02fb28..c13b21b 100644
--- a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactory.java
+++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactory.java
@@ -19,6 +19,8 @@
 package org.apache.james.backend.rabbitmq;
 
 import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.Callable;
 
 import javax.inject.Inject;
 
@@ -29,16 +31,34 @@ import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 
 public class RabbitMQConnectionFactory {
+    private class ConnectionCallable implements Callable<Connection> {
+        private final ConnectionFactory connectionFactory;
+        private Optional<Connection> connection;
+
+        ConnectionCallable(ConnectionFactory connectionFactory) {
+            this.connectionFactory = connectionFactory;
+            connection = Optional.empty();
+        }
+
+        @Override
+        public Connection call() throws Exception {
+            if (connection.map(Connection::isOpen).orElse(false)) {
+                return connection.get();
+            }
+            Connection newConnection = connectionFactory.newConnection();
+            connection = Optional.of(newConnection);
+            return newConnection;
+        }
+    }
+
     private final ConnectionFactory connectionFactory;
 
-    private final int maxRetries;
-    private final int minDelay;
+    private final RabbitMQConfiguration configuration;
 
     @Inject
     public RabbitMQConnectionFactory(RabbitMQConfiguration rabbitMQConfiguration) {
         this.connectionFactory = from(rabbitMQConfiguration);
-        this.maxRetries = rabbitMQConfiguration.getMaxRetries();
-        this.minDelay = rabbitMQConfiguration.getMinDelay();
+        this.configuration = rabbitMQConfiguration;
     }
 
     private ConnectionFactory from(RabbitMQConfiguration rabbitMQConfiguration) {
@@ -56,8 +76,8 @@ public class RabbitMQConnectionFactory {
     }
 
     public Mono<Connection> connectionMono() {
-        return Mono.fromCallable(connectionFactory::newConnection)
-            .retryBackoff(maxRetries, Duration.ofMillis(minDelay))
+        return Mono.fromCallable(new ConnectionCallable(connectionFactory))
+            .retryBackoff(configuration.getMaxRetries(), Duration.ofMillis(configuration.getMinDelay()))
             .publishOn(Schedulers.elastic());
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/edf85300/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
index b203032..ecfeb0b 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
@@ -31,6 +31,7 @@ import org.apache.james.mailbox.Event;
 import org.apache.james.mailbox.MailboxListener;
 import org.apache.james.metrics.api.MetricFactory;
 
+import com.github.fge.lambdas.Throwing;
 import com.rabbitmq.client.Connection;
 
 import reactor.core.publisher.Mono;
@@ -64,7 +65,7 @@ public class RabbitMQEventBus implements EventBus {
                      EventDeadLetters eventDeadLetters, MetricFactory metricFactory) {
         this.mailboxListenerExecutor = new MailboxListenerExecutor(metricFactory);
         this.eventBusId = EventBusId.random();
-        this.connectionMono = Mono.fromSupplier(rabbitMQConnectionFactory::create).cache();
+        this.connectionMono = rabbitMQConnectionFactory.connectionMono();
         this.eventSerializer = eventSerializer;
         this.routingKeyConverter = routingKeyConverter;
         this.retryBackoff = retryBackoff;
@@ -74,7 +75,8 @@ public class RabbitMQEventBus implements EventBus {
 
     public void start() {
         if (!isRunning.get()) {
-            sender = RabbitFlux.createSender(new SenderOptions().connectionMono(connectionMono));
+            sender = RabbitFlux.createSender(new SenderOptions().connectionMono(connectionMono)
+                .resourceManagementChannelMono(connectionMono.map(Throwing.function(Connection::createChannel))));
             MailboxListenerRegistry mailboxListenerRegistry = new MailboxListenerRegistry();
             keyRegistrationHandler = new KeyRegistrationHandler(eventBusId, eventSerializer, sender, connectionMono, routingKeyConverter, mailboxListenerRegistry, mailboxListenerExecutor);
             groupRegistrationHandler = new GroupRegistrationHandler(eventSerializer, sender, connectionMono, retryBackoff, eventDeadLetters, mailboxListenerExecutor);


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org