You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ro...@apache.org on 2019/10/16 12:43:07 UTC

[james-project] 01/02: JAMES-2813 plug Schedulers leaks

This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 70fd3f15fccfc427b7722b764ebf3918eeba9e64
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Mon Oct 14 16:49:57 2019 +0200

    JAMES-2813 plug Schedulers leaks
---
 .../distributed/RabbitMQTerminationSubscriber.java          | 10 +++++++---
 .../task/eventsourcing/distributed/RabbitMQWorkQueue.java   | 13 ++++++++++---
 2 files changed, 17 insertions(+), 6 deletions(-)

diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
index 0d97c2b..d2bdeeb 100644
--- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
+++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
@@ -71,6 +71,8 @@ public class RabbitMQTerminationSubscriber implements TerminationSubscriber, Sta
     private DirectProcessor<Event> listener;
     private Disposable sendQueueHandle;
     private Disposable listenQueueHandle;
+    private Receiver listenerReceiver;
+    private Sender sender;
 
     @Inject
     public RabbitMQTerminationSubscriber(SimpleConnectionPool simpleConnectionPool, JsonEventSerializer serializer) {
@@ -81,7 +83,7 @@ public class RabbitMQTerminationSubscriber implements TerminationSubscriber, Sta
     }
 
     public void start() {
-        Sender sender = channelPool.createSender();
+        sender = channelPool.createSender();
 
         sender.declareExchange(ExchangeSpecification.exchange(EXCHANGE_NAME)).block();
         sender.declare(QueueSpecification.queue(queueName).durable(false).autoDelete(true)).block();
@@ -92,9 +94,9 @@ public class RabbitMQTerminationSubscriber implements TerminationSubscriber, Sta
             .subscribeOn(Schedulers.boundedElastic())
             .subscribe();
 
-        Receiver receiver = RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(connectionMono));
+        listenerReceiver = RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(connectionMono));
         listener = DirectProcessor.create();
-        listenQueueHandle = receiver
+        listenQueueHandle = listenerReceiver
             .consumeAutoAck(queueName)
             .subscribeOn(Schedulers.boundedElastic())
             .concatMap(this::toEvent)
@@ -135,6 +137,8 @@ public class RabbitMQTerminationSubscriber implements TerminationSubscriber, Sta
     public void close() {
         Optional.ofNullable(sendQueueHandle).ifPresent(Disposable::dispose);
         Optional.ofNullable(listenQueueHandle).ifPresent(Disposable::dispose);
+        Optional.ofNullable(listenerReceiver).ifPresent(Receiver::close);
+        Optional.ofNullable(sender).ifPresent(Sender::close);
         channelPool.close();
     }
 }
diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
index 5516112..e736798 100644
--- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
+++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
@@ -52,6 +52,7 @@ import reactor.rabbitmq.ExchangeSpecification;
 import reactor.rabbitmq.OutboundMessage;
 import reactor.rabbitmq.QueueSpecification;
 import reactor.rabbitmq.RabbitFlux;
+import reactor.rabbitmq.Receiver;
 import reactor.rabbitmq.ReceiverOptions;
 import reactor.rabbitmq.Sender;
 
@@ -79,6 +80,8 @@ public class RabbitMQWorkQueue implements WorkQueue, Startable {
     private Disposable sendCancelRequestsQueueHandle;
     private Disposable receiverHandle;
     private Disposable cancelRequestListenerHandle;
+    private Sender cancelRequestSender;
+    private Receiver cancelRequestListener;
 
     public RabbitMQWorkQueue(TaskManagerWorker worker, SimpleConnectionPool simpleConnectionPool, JsonTaskSerializer taskSerializer) {
         this.worker = worker;
@@ -140,7 +143,7 @@ public class RabbitMQWorkQueue implements WorkQueue, Startable {
     }
 
     void listenToCancelRequests() {
-        Sender cancelRequestSender = channelPool.createSender();
+        cancelRequestSender = channelPool.createSender();
         String queueName = CANCEL_REQUESTS_QUEUE_NAME_PREFIX + UUID.randomUUID().toString();
 
         cancelRequestSender.declareExchange(ExchangeSpecification.exchange(CANCEL_REQUESTS_EXCHANGE_NAME)).block();
@@ -156,8 +159,9 @@ public class RabbitMQWorkQueue implements WorkQueue, Startable {
     }
 
     private void registerCancelRequestsListener(String queueName) {
-        cancelRequestListenerHandle = RabbitFlux
-            .createReceiver(new ReceiverOptions().connectionMono(connectionMono))
+        cancelRequestListener = RabbitFlux
+            .createReceiver(new ReceiverOptions().connectionMono(connectionMono));
+        cancelRequestListenerHandle = cancelRequestListener
             .consumeAutoAck(queueName)
             .subscribeOn(Schedulers.boundedElastic())
             .map(this::readCancelRequestMessage)
@@ -201,6 +205,9 @@ public class RabbitMQWorkQueue implements WorkQueue, Startable {
         Optional.ofNullable(receiver).ifPresent(RabbitMQExclusiveConsumer::close);
         Optional.ofNullable(sendCancelRequestsQueueHandle).ifPresent(Disposable::dispose);
         Optional.ofNullable(cancelRequestListenerHandle).ifPresent(Disposable::dispose);
+        Optional.ofNullable(sender).ifPresent(Sender::close);
+        Optional.ofNullable(cancelRequestSender).ifPresent(Sender::close);
+        Optional.ofNullable(cancelRequestListener).ifPresent(Receiver::close);
         channelPool.close();
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org