You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ro...@apache.org on 2019/10/18 06:40:41 UTC
[james-project] 08/17: JAMES-2813 put executeWithSemaphore building
blocks into submethods
This is an automated email from the ASF dual-hosted git repository.
rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 12559bdf8638c692950c97c115905d9c5b1983c4
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Wed Oct 9 11:58:12 2019 +0200
JAMES-2813 put executeWithSemaphore building blocks into submethods
---
.../apache/james/task/SerialTaskManagerWorker.java | 31 +++++++++++-----------
1 file changed, 16 insertions(+), 15 deletions(-)
diff --git a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
index b954aa2..8fb0e15 100644
--- a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
+++ b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
@@ -38,7 +38,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Sets;
-import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
@@ -89,19 +88,12 @@ public class SerialTaskManagerWorker implements TaskManagerWorker {
CompletableFuture<Task.Result> future = CompletableFuture.supplyAsync(() -> runWithMdc(taskWithId, listener), taskExecutor);
runningTask.set(Tuples.of(taskWithId.getId(), future));
- Disposable informationPolling = pollAdditionalInformation(taskWithId)
- .doOnNext(information -> listener.updated(taskWithId.getId(), information))
- .subscribe();
- return Mono.fromFuture(future)
- .doOnError(exception -> {
- if (exception instanceof CancellationException) {
- listener.cancelled(taskWithId.getId(), taskWithId.getTask().details());
- } else {
- listener.failed(taskWithId.getId(), taskWithId.getTask().details(), exception);
- }
- })
- .onErrorReturn(Task.Result.PARTIAL)
- .doOnTerminate(informationPolling::dispose);
+ return Mono.using(
+ () -> pollAdditionalInformation(taskWithId).subscribe(),
+ ignored -> Mono.fromFuture(future)
+ .doOnError(exception -> handleExecutionError(taskWithId, listener, exception))
+ .onErrorReturn(Task.Result.PARTIAL),
+ polling -> polling.dispose());
} else {
listener.cancelled(taskWithId.getId(), taskWithId.getTask().details());
return Mono.empty();
@@ -109,11 +101,20 @@ public class SerialTaskManagerWorker implements TaskManagerWorker {
};
}
+ private void handleExecutionError(TaskWithId taskWithId, Listener listener, Throwable exception) {
+ if (exception instanceof CancellationException) {
+ listener.cancelled(taskWithId.getId(), taskWithId.getTask().details());
+ } else {
+ listener.failed(taskWithId.getId(), taskWithId.getTask().details(), exception);
+ }
+ }
+
private Flux<TaskExecutionDetails.AdditionalInformation> pollAdditionalInformation(TaskWithId taskWithId) {
return Mono.fromCallable(() -> taskWithId.getTask().details())
.delayElement(Duration.ofSeconds(1))
.repeat()
- .flatMap(Mono::justOrEmpty);
+ .flatMap(Mono::justOrEmpty)
+ .doOnNext(information -> listener.updated(taskWithId.getId(), taskWithId.getTask().type(), information));
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org