You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ro...@apache.org on 2019/10/16 12:43:07 UTC
[james-project] 01/02: JAMES-2813 plug Schedulers leaks
This is an automated email from the ASF dual-hosted git repository.
rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 70fd3f15fccfc427b7722b764ebf3918eeba9e64
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Mon Oct 14 16:49:57 2019 +0200
JAMES-2813 plug Schedulers leaks
---
.../distributed/RabbitMQTerminationSubscriber.java | 10 +++++++---
.../task/eventsourcing/distributed/RabbitMQWorkQueue.java | 13 ++++++++++---
2 files changed, 17 insertions(+), 6 deletions(-)
diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
index 0d97c2b..d2bdeeb 100644
--- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
+++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
@@ -71,6 +71,8 @@ public class RabbitMQTerminationSubscriber implements TerminationSubscriber, Sta
private DirectProcessor<Event> listener;
private Disposable sendQueueHandle;
private Disposable listenQueueHandle;
+ private Receiver listenerReceiver;
+ private Sender sender;
@Inject
public RabbitMQTerminationSubscriber(SimpleConnectionPool simpleConnectionPool, JsonEventSerializer serializer) {
@@ -81,7 +83,7 @@ public class RabbitMQTerminationSubscriber implements TerminationSubscriber, Sta
}
public void start() {
- Sender sender = channelPool.createSender();
+ sender = channelPool.createSender();
sender.declareExchange(ExchangeSpecification.exchange(EXCHANGE_NAME)).block();
sender.declare(QueueSpecification.queue(queueName).durable(false).autoDelete(true)).block();
@@ -92,9 +94,9 @@ public class RabbitMQTerminationSubscriber implements TerminationSubscriber, Sta
.subscribeOn(Schedulers.boundedElastic())
.subscribe();
- Receiver receiver = RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(connectionMono));
+ listenerReceiver = RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(connectionMono));
listener = DirectProcessor.create();
- listenQueueHandle = receiver
+ listenQueueHandle = listenerReceiver
.consumeAutoAck(queueName)
.subscribeOn(Schedulers.boundedElastic())
.concatMap(this::toEvent)
@@ -135,6 +137,8 @@ public class RabbitMQTerminationSubscriber implements TerminationSubscriber, Sta
public void close() {
Optional.ofNullable(sendQueueHandle).ifPresent(Disposable::dispose);
Optional.ofNullable(listenQueueHandle).ifPresent(Disposable::dispose);
+ Optional.ofNullable(listenerReceiver).ifPresent(Receiver::close);
+ Optional.ofNullable(sender).ifPresent(Sender::close);
channelPool.close();
}
}
diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
index 5516112..e736798 100644
--- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
+++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
@@ -52,6 +52,7 @@ import reactor.rabbitmq.ExchangeSpecification;
import reactor.rabbitmq.OutboundMessage;
import reactor.rabbitmq.QueueSpecification;
import reactor.rabbitmq.RabbitFlux;
+import reactor.rabbitmq.Receiver;
import reactor.rabbitmq.ReceiverOptions;
import reactor.rabbitmq.Sender;
@@ -79,6 +80,8 @@ public class RabbitMQWorkQueue implements WorkQueue, Startable {
private Disposable sendCancelRequestsQueueHandle;
private Disposable receiverHandle;
private Disposable cancelRequestListenerHandle;
+ private Sender cancelRequestSender;
+ private Receiver cancelRequestListener;
public RabbitMQWorkQueue(TaskManagerWorker worker, SimpleConnectionPool simpleConnectionPool, JsonTaskSerializer taskSerializer) {
this.worker = worker;
@@ -140,7 +143,7 @@ public class RabbitMQWorkQueue implements WorkQueue, Startable {
}
void listenToCancelRequests() {
- Sender cancelRequestSender = channelPool.createSender();
+ cancelRequestSender = channelPool.createSender();
String queueName = CANCEL_REQUESTS_QUEUE_NAME_PREFIX + UUID.randomUUID().toString();
cancelRequestSender.declareExchange(ExchangeSpecification.exchange(CANCEL_REQUESTS_EXCHANGE_NAME)).block();
@@ -156,8 +159,9 @@ public class RabbitMQWorkQueue implements WorkQueue, Startable {
}
private void registerCancelRequestsListener(String queueName) {
- cancelRequestListenerHandle = RabbitFlux
- .createReceiver(new ReceiverOptions().connectionMono(connectionMono))
+ cancelRequestListener = RabbitFlux
+ .createReceiver(new ReceiverOptions().connectionMono(connectionMono));
+ cancelRequestListenerHandle = cancelRequestListener
.consumeAutoAck(queueName)
.subscribeOn(Schedulers.boundedElastic())
.map(this::readCancelRequestMessage)
@@ -201,6 +205,9 @@ public class RabbitMQWorkQueue implements WorkQueue, Startable {
Optional.ofNullable(receiver).ifPresent(RabbitMQExclusiveConsumer::close);
Optional.ofNullable(sendCancelRequestsQueueHandle).ifPresent(Disposable::dispose);
Optional.ofNullable(cancelRequestListenerHandle).ifPresent(Disposable::dispose);
+ Optional.ofNullable(sender).ifPresent(Sender::close);
+ Optional.ofNullable(cancelRequestSender).ifPresent(Sender::close);
+ Optional.ofNullable(cancelRequestListener).ifPresent(Receiver::close);
channelPool.close();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org