You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ro...@apache.org on 2019/10/18 06:40:41 UTC

[james-project] 08/17: JAMES-2813 put executeWithSemaphore building blocks into submethods

This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 12559bdf8638c692950c97c115905d9c5b1983c4
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Wed Oct 9 11:58:12 2019 +0200

    JAMES-2813 put executeWithSemaphore building blocks into submethods
---
 .../apache/james/task/SerialTaskManagerWorker.java | 31 +++++++++++-----------
 1 file changed, 16 insertions(+), 15 deletions(-)

diff --git a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
index b954aa2..8fb0e15 100644
--- a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
+++ b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
@@ -38,7 +38,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Sets;
-import reactor.core.Disposable;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.util.function.Tuple2;
@@ -89,19 +88,12 @@ public class SerialTaskManagerWorker implements TaskManagerWorker {
                 CompletableFuture<Task.Result> future = CompletableFuture.supplyAsync(() -> runWithMdc(taskWithId, listener), taskExecutor);
                 runningTask.set(Tuples.of(taskWithId.getId(), future));
 
-                Disposable informationPolling = pollAdditionalInformation(taskWithId)
-                    .doOnNext(information -> listener.updated(taskWithId.getId(), information))
-                    .subscribe();
-                return Mono.fromFuture(future)
-                        .doOnError(exception -> {
-                            if (exception instanceof CancellationException) {
-                                listener.cancelled(taskWithId.getId(), taskWithId.getTask().details());
-                            } else {
-                                listener.failed(taskWithId.getId(), taskWithId.getTask().details(), exception);
-                            }
-                        })
-                        .onErrorReturn(Task.Result.PARTIAL)
-                        .doOnTerminate(informationPolling::dispose);
+                return Mono.using(
+                    () -> pollAdditionalInformation(taskWithId).subscribe(),
+                    ignored -> Mono.fromFuture(future)
+                            .doOnError(exception -> handleExecutionError(taskWithId, listener, exception))
+                            .onErrorReturn(Task.Result.PARTIAL),
+                    polling -> polling.dispose());
             } else {
                 listener.cancelled(taskWithId.getId(), taskWithId.getTask().details());
                 return Mono.empty();
@@ -109,11 +101,20 @@ public class SerialTaskManagerWorker implements TaskManagerWorker {
         };
     }
 
+    private void handleExecutionError(TaskWithId taskWithId, Listener listener, Throwable exception) {
+        if (exception instanceof CancellationException) {
+            listener.cancelled(taskWithId.getId(), taskWithId.getTask().details());
+        } else {
+            listener.failed(taskWithId.getId(), taskWithId.getTask().details(), exception);
+        }
+    }
+
     private Flux<TaskExecutionDetails.AdditionalInformation> pollAdditionalInformation(TaskWithId taskWithId) {
         return Mono.fromCallable(() -> taskWithId.getTask().details())
             .delayElement(Duration.ofSeconds(1))
             .repeat()
-            .flatMap(Mono::justOrEmpty);
+            .flatMap(Mono::justOrEmpty)
+            .doOnNext(information -> listener.updated(taskWithId.getId(), taskWithId.getTask().type(), information));
     }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org