You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/06/05 03:01:29 UTC
[james-project] 03/06: JAMES-2334 Awaiting a cancelled task should
be supported
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 69744b46a1cbaab6d990243ed6c2d8e583a43aeb
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri May 31 14:33:15 2019 +0700
JAMES-2334 Awaiting a cancelled task should be supported
---
.../java/org/apache/james/task/MemoryTaskManager.java | 12 +++++++++---
.../org/apache/james/task/MemoryTaskManagerWorker.java | 15 +++++++--------
2 files changed, 16 insertions(+), 11 deletions(-)
diff --git a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
index 566ce32..7143b1e 100644
--- a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
+++ b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
@@ -33,9 +33,9 @@ import java.util.function.Predicate;
import javax.annotation.PreDestroy;
-import com.github.fge.lambdas.Throwing;
import com.github.steveash.guavate.Guavate;
import com.google.common.collect.ImmutableList;
+
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.WorkQueueProcessor;
@@ -130,7 +130,7 @@ public class MemoryTaskManager implements TaskManager {
public void cancel(TaskId id) {
Optional.ofNullable(idToExecutionDetails.get(id)).ifPresent(details -> {
if (details.getStatus().equals(Status.WAITING)) {
- updateDetails(id).accept(currentDetails -> currentDetails.cancelRequested());
+ updateDetails(id).accept(TaskExecutionDetails::cancelRequested);
}
worker.cancelTask(id, updateDetails(id));
}
@@ -144,7 +144,13 @@ public class MemoryTaskManager implements TaskManager {
.filter(ignore -> tasksResult.get(id) != null)
.map(ignore -> {
Optional.ofNullable(tasksResult.get(id))
- .ifPresent(Throwing.<Mono<Task.Result>>consumer(Mono::block).orDoNothing());
+ .ifPresent(mono -> {
+ try {
+ mono.block();
+ } catch (CancellationException e) {
+ // ignore
+ }
+ });
return getExecutionDetails(id);
})
.take(1)
diff --git a/server/task/src/main/java/org/apache/james/task/MemoryTaskManagerWorker.java b/server/task/src/main/java/org/apache/james/task/MemoryTaskManagerWorker.java
index 5d6226d..9890095 100644
--- a/server/task/src/main/java/org/apache/james/task/MemoryTaskManagerWorker.java
+++ b/server/task/src/main/java/org/apache/james/task/MemoryTaskManagerWorker.java
@@ -37,8 +37,9 @@ import reactor.core.publisher.Mono;
public class MemoryTaskManagerWorker implements TaskManagerWorker {
private static final boolean INTERRUPT_IF_RUNNING = true;
private static final Logger LOGGER = LoggerFactory.getLogger(MemoryTaskManagerWorker.class);
- public static final Duration CHECK_CANCELED_PERIOD = Duration.ofMillis(100);
- public static final int FIRST = 1;
+ private static final Duration CHECK_CANCELED_PERIOD = Duration.ofMillis(100);
+ private static final int FIRST = 1;
+
private final ConcurrentHashMap<TaskId, CompletableFuture<Task.Result>> idToFuture = new ConcurrentHashMap<>();
@Override
@@ -47,7 +48,7 @@ public class MemoryTaskManagerWorker implements TaskManagerWorker {
idToFuture.put(taskWithId.getId(), futureResult);
- Mono<Task.Result> result = Mono.<Task.Result>fromFuture(futureResult)
+ return Mono.fromFuture(futureResult)
.doOnError(res -> {
if (!(res instanceof CancellationException)) {
failed(updateDetails,
@@ -55,8 +56,6 @@ public class MemoryTaskManagerWorker implements TaskManagerWorker {
}
})
.doOnTerminate(() -> idToFuture.remove(taskWithId.getId()));
-
- return result;
}
private Task.Result runWithMdc(TaskWithId taskWithId, Consumer<TaskExecutionDetailsUpdater> updateDetails) {
@@ -78,7 +77,7 @@ public class MemoryTaskManagerWorker implements TaskManagerWorker {
.onFailure(() -> failed(updateDetails, (logger, details) -> logger.error("Task was partially performed. Check logs for more details" + details.getTaskId())));
} catch (Exception e) {
failed(updateDetails,
- (logger, executionDetails) -> logger.error("Error while running task", executionDetails, e));
+ (logger, executionDetails) -> logger.error("Error while running task {}", executionDetails, e));
return Task.Result.PARTIAL;
}
}
@@ -88,8 +87,8 @@ public class MemoryTaskManagerWorker implements TaskManagerWorker {
Optional.ofNullable(idToFuture.remove(id))
.ifPresent(future -> {
requestCancellation(updateDetails, future);
- waitUntilFutureIsCancelled(future)
- .subscribe(cancellationSuccessful -> effectivelyCancelled(updateDetails));
+ waitUntilFutureIsCancelled(future).blockFirst();
+ effectivelyCancelled(updateDetails);
});
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org