You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2021/05/14 06:42:39 UTC
[james-project] 07/15: [REFACTORING] Dequeuer should not
aggressively enforce a scheduler
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit d137a2a935f3e7dbc2a647f33aa97efdc95a9f6c
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri May 7 18:51:53 2021 +0700
[REFACTORING] Dequeuer should not aggressively enforce a scheduler
---
.../org/apache/james/queue/rabbitmq/Dequeuer.java | 22 ++++++++++------------
1 file changed, 10 insertions(+), 12 deletions(-)
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
index 0241c46..d3f0e66 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
@@ -42,7 +42,6 @@ import com.github.fge.lambdas.consumers.ThrowingConsumer;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Schedulers;
import reactor.rabbitmq.AcknowledgableDelivery;
import reactor.rabbitmq.ConsumeOptions;
import reactor.rabbitmq.Receiver;
@@ -105,21 +104,20 @@ class Dequeuer implements Closeable {
}
Flux<? extends MailQueue.MailQueueItem> deQueue() {
- return flux.flatMapSequential(response -> loadItem(response).subscribeOn(Schedulers.elastic()))
- .concatMap(item -> filterIfDeleted(item).subscribeOn(Schedulers.elastic()));
+ return flux.flatMapSequential(this::loadItem)
+ .concatMap(this::filterIfDeleted);
}
private Mono<RabbitMQMailQueueItem> filterIfDeleted(RabbitMQMailQueueItem item) {
return mailQueueView.isPresent(item.getEnqueueId())
- .flatMap(isPresent -> keepWhenPresent(item, isPresent));
- }
-
- private Mono<? extends RabbitMQMailQueueItem> keepWhenPresent(RabbitMQMailQueueItem item, Boolean isPresent) {
- if (isPresent) {
- return Mono.just(item);
- }
- item.done(true);
- return Mono.empty();
+ .handle((isPresent, sink) -> {
+ if (isPresent) {
+ sink.next(item);
+ } else {
+ item.done(true);
+ sink.complete();
+ }
+ });
}
private Mono<RabbitMQMailQueueItem> loadItem(AcknowledgableDelivery response) {
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org