You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/06/05 03:01:29 UTC

[james-project] 03/06: JAMES-2334 Awaiting a cancelled task should be supported

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 69744b46a1cbaab6d990243ed6c2d8e583a43aeb
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri May 31 14:33:15 2019 +0700

    JAMES-2334 Awaiting a cancelled task should be supported
---
 .../java/org/apache/james/task/MemoryTaskManager.java     | 12 +++++++++---
 .../org/apache/james/task/MemoryTaskManagerWorker.java    | 15 +++++++--------
 2 files changed, 16 insertions(+), 11 deletions(-)

diff --git a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
index 566ce32..7143b1e 100644
--- a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
+++ b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
@@ -33,9 +33,9 @@ import java.util.function.Predicate;
 
 import javax.annotation.PreDestroy;
 
-import com.github.fge.lambdas.Throwing;
 import com.github.steveash.guavate.Guavate;
 import com.google.common.collect.ImmutableList;
+
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.publisher.WorkQueueProcessor;
@@ -130,7 +130,7 @@ public class MemoryTaskManager implements TaskManager {
     public void cancel(TaskId id) {
         Optional.ofNullable(idToExecutionDetails.get(id)).ifPresent(details -> {
                 if (details.getStatus().equals(Status.WAITING)) {
-                    updateDetails(id).accept(currentDetails -> currentDetails.cancelRequested());
+                    updateDetails(id).accept(TaskExecutionDetails::cancelRequested);
                 }
                 worker.cancelTask(id, updateDetails(id));
             }
@@ -144,7 +144,13 @@ public class MemoryTaskManager implements TaskManager {
                 .filter(ignore -> tasksResult.get(id) != null)
                 .map(ignore -> {
                     Optional.ofNullable(tasksResult.get(id))
-                        .ifPresent(Throwing.<Mono<Task.Result>>consumer(Mono::block).orDoNothing());
+                        .ifPresent(mono -> {
+                            try {
+                                mono.block();
+                            } catch (CancellationException e) {
+                                // ignore
+                            }
+                        });
                     return getExecutionDetails(id);
                 })
                 .take(1)
diff --git a/server/task/src/main/java/org/apache/james/task/MemoryTaskManagerWorker.java b/server/task/src/main/java/org/apache/james/task/MemoryTaskManagerWorker.java
index 5d6226d..9890095 100644
--- a/server/task/src/main/java/org/apache/james/task/MemoryTaskManagerWorker.java
+++ b/server/task/src/main/java/org/apache/james/task/MemoryTaskManagerWorker.java
@@ -37,8 +37,9 @@ import reactor.core.publisher.Mono;
 public class MemoryTaskManagerWorker implements TaskManagerWorker {
     private static final boolean INTERRUPT_IF_RUNNING = true;
     private static final Logger LOGGER = LoggerFactory.getLogger(MemoryTaskManagerWorker.class);
-    public static final Duration CHECK_CANCELED_PERIOD = Duration.ofMillis(100);
-    public static final int FIRST = 1;
+    private static final Duration CHECK_CANCELED_PERIOD = Duration.ofMillis(100);
+    private static final int FIRST = 1;
+
     private final ConcurrentHashMap<TaskId, CompletableFuture<Task.Result>> idToFuture = new ConcurrentHashMap<>();
 
     @Override
@@ -47,7 +48,7 @@ public class MemoryTaskManagerWorker implements TaskManagerWorker {
 
         idToFuture.put(taskWithId.getId(), futureResult);
 
-        Mono<Task.Result> result = Mono.<Task.Result>fromFuture(futureResult)
+        return Mono.fromFuture(futureResult)
             .doOnError(res -> {
                 if (!(res instanceof CancellationException)) {
                     failed(updateDetails,
@@ -55,8 +56,6 @@ public class MemoryTaskManagerWorker implements TaskManagerWorker {
                 }
             })
             .doOnTerminate(() -> idToFuture.remove(taskWithId.getId()));
-
-        return result;
     }
 
     private Task.Result runWithMdc(TaskWithId taskWithId, Consumer<TaskExecutionDetailsUpdater> updateDetails) {
@@ -78,7 +77,7 @@ public class MemoryTaskManagerWorker implements TaskManagerWorker {
                 .onFailure(() -> failed(updateDetails, (logger, details) -> logger.error("Task was partially performed. Check logs for more details" + details.getTaskId())));
         } catch (Exception e) {
             failed(updateDetails,
-                (logger, executionDetails) -> logger.error("Error while running task", executionDetails, e));
+                (logger, executionDetails) -> logger.error("Error while running task {}", executionDetails, e));
             return Task.Result.PARTIAL;
         }
     }
@@ -88,8 +87,8 @@ public class MemoryTaskManagerWorker implements TaskManagerWorker {
         Optional.ofNullable(idToFuture.remove(id))
             .ifPresent(future -> {
                 requestCancellation(updateDetails, future);
-                waitUntilFutureIsCancelled(future)
-                    .subscribe(cancellationSuccessful -> effectivelyCancelled(updateDetails));
+                waitUntilFutureIsCancelled(future).blockFirst();
+                effectivelyCancelled(updateDetails);
             });
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org