You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2021/05/14 06:42:39 UTC

[james-project] 07/15: [REFACTORING] Dequeuer should not aggressively enforce a scheduler

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit d137a2a935f3e7dbc2a647f33aa97efdc95a9f6c
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri May 7 18:51:53 2021 +0700

    [REFACTORING] Dequeuer should not aggressively enforce a scheduler
---
 .../org/apache/james/queue/rabbitmq/Dequeuer.java  | 22 ++++++++++------------
 1 file changed, 10 insertions(+), 12 deletions(-)

diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
index 0241c46..d3f0e66 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
@@ -42,7 +42,6 @@ import com.github.fge.lambdas.consumers.ThrowingConsumer;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Schedulers;
 import reactor.rabbitmq.AcknowledgableDelivery;
 import reactor.rabbitmq.ConsumeOptions;
 import reactor.rabbitmq.Receiver;
@@ -105,21 +104,20 @@ class Dequeuer implements Closeable {
     }
 
     Flux<? extends MailQueue.MailQueueItem> deQueue() {
-        return flux.flatMapSequential(response -> loadItem(response).subscribeOn(Schedulers.elastic()))
-            .concatMap(item -> filterIfDeleted(item).subscribeOn(Schedulers.elastic()));
+        return flux.flatMapSequential(this::loadItem)
+            .concatMap(this::filterIfDeleted);
     }
 
     private Mono<RabbitMQMailQueueItem> filterIfDeleted(RabbitMQMailQueueItem item) {
         return mailQueueView.isPresent(item.getEnqueueId())
-            .flatMap(isPresent -> keepWhenPresent(item, isPresent));
-    }
-
-    private Mono<? extends RabbitMQMailQueueItem> keepWhenPresent(RabbitMQMailQueueItem item, Boolean isPresent) {
-        if (isPresent) {
-            return Mono.just(item);
-        }
-        item.done(true);
-        return Mono.empty();
+            .handle((isPresent, sink) -> {
+                if (isPresent) {
+                    sink.next(item);
+                } else {
+                    item.done(true);
+                    sink.complete();
+                }
+            });
     }
 
     private Mono<RabbitMQMailQueueItem> loadItem(AcknowledgableDelivery response) {

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org