You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ro...@apache.org on 2019/05/28 07:39:30 UTC
[james-project] 05/06: JAMES-2777 add task requested for
cancellation status
This is an automated email from the ASF dual-hosted git repository.
rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 3adf8bcb65142cfda99e41f5d80fe123d97189cb
Author: RĂ©mi Kowalski <rk...@linagora.com>
AuthorDate: Wed May 22 14:12:27 2019 +0200
JAMES-2777 add task requested for cancellation status
---
.../james/webadmin/routes/TasksRoutesTest.java | 20 ++++++-
.../org/apache/james/task/MemoryTaskManager.java | 34 ++++++++---
.../apache/james/task/MemoryTaskManagerWorker.java | 56 +++++++++++++++---
.../apache/james/task/TaskExecutionDetails.java | 16 ++++-
.../java/org/apache/james/task/TaskManager.java | 1 +
.../apache/james/task/MemoryTaskManagerTest.java | 68 +++++++++++++++++++++-
.../james/task/MemoryTaskManagerWorkerTest.java | 47 ++++++++++++++-
7 files changed, 218 insertions(+), 24 deletions(-)
diff --git a/server/protocols/webadmin/webadmin-core/src/test/java/org/apache/james/webadmin/routes/TasksRoutesTest.java b/server/protocols/webadmin/webadmin-core/src/test/java/org/apache/james/webadmin/routes/TasksRoutesTest.java
index b65a84f..825a41a 100644
--- a/server/protocols/webadmin/webadmin-core/src/test/java/org/apache/james/webadmin/routes/TasksRoutesTest.java
+++ b/server/protocols/webadmin/webadmin-core/src/test/java/org/apache/james/webadmin/routes/TasksRoutesTest.java
@@ -26,6 +26,8 @@ import static org.apache.james.webadmin.WebAdminServer.NO_CONFIGURATION;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.isIn;
+import static org.hamcrest.Matchers.isOneOf;
import static org.hamcrest.Matchers.not;
import java.util.UUID;
@@ -204,8 +206,14 @@ public class TasksRoutesTest {
@Test
public void deleteShouldCancelMatchingTask() {
+ CountDownLatch inProgressLatch = new CountDownLatch(1);
+
TaskId taskId = taskManager.submit(() -> {
- await();
+ try {
+ inProgressLatch.await();
+ } catch (InterruptedException e) {
+ //ignore
+ }
return Task.Result.COMPLETED;
});
@@ -216,7 +224,17 @@ public class TasksRoutesTest {
.get("/" + taskId.getValue())
.then()
.statusCode(HttpStatus.OK_200)
+ .body("status", isOneOf("canceledRequested", "canceled"));
+
+ inProgressLatch.countDown();
+ when()
+ .get("/" + taskId.getValue())
+ .then()
+ .statusCode(HttpStatus.OK_200)
.body("status", is("canceled"));
+
+
+
}
@Test
diff --git a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
index 1623858..566ce32 100644
--- a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
+++ b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
@@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
+import java.util.function.Predicate;
import javax.annotation.PreDestroy;
@@ -60,8 +61,24 @@ public class MemoryTaskManager implements TaskManager {
worker = new MemoryTaskManagerWorker();
workQueue
.subscribeOn(Schedulers.single())
- .filter(task -> !listIds(Status.CANCELLED).contains(task.getId()))
- .subscribe(this::treatTask);
+ .filter(isTaskWaiting().or(isTaskRequestedForCancellation()))
+ .subscribe(this::sendTaskToWorker);
+ }
+
+ private void sendTaskToWorker(TaskWithId taskWithId) {
+ if (isTaskWaiting().test(taskWithId)) {
+ treatTask(taskWithId);
+ } else if (isTaskRequestedForCancellation().test(taskWithId)) {
+ updateDetails(taskWithId.getId()).accept(TaskExecutionDetails::cancelEffectively);
+ }
+ }
+
+ private Predicate<TaskWithId> isTaskWaiting() {
+ return task -> listIds(Status.WAITING).contains(task.getId());
+ }
+
+ private Predicate<TaskWithId> isTaskRequestedForCancellation() {
+ return task -> listIds(Status.CANCEL_REQUESTED).contains(task.getId());
}
private void treatTask(TaskWithId task) {
@@ -111,15 +128,18 @@ public class MemoryTaskManager implements TaskManager {
@Override
public void cancel(TaskId id) {
- TaskExecutionDetails details = getExecutionDetails(id);
- if (details.getStatus().equals(Status.IN_PROGRESS) || details.getStatus().equals(Status.WAITING)) {
- worker.cancelTask(id, updateDetails(id));
- }
+ Optional.ofNullable(idToExecutionDetails.get(id)).ifPresent(details -> {
+ if (details.getStatus().equals(Status.WAITING)) {
+ updateDetails(id).accept(currentDetails -> currentDetails.cancelRequested());
+ }
+ worker.cancelTask(id, updateDetails(id));
+ }
+ );
}
@Override
public TaskExecutionDetails await(TaskId id) {
- if (Optional.ofNullable(getExecutionDetails(id)).isPresent()) {
+ if (Optional.ofNullable(idToExecutionDetails.get(id)).isPresent()) {
return Flux.interval(NOW, AWAIT_POLLING_DURATION, Schedulers.elastic())
.filter(ignore -> tasksResult.get(id) != null)
.map(ignore -> {
diff --git a/server/task/src/main/java/org/apache/james/task/MemoryTaskManagerWorker.java b/server/task/src/main/java/org/apache/james/task/MemoryTaskManagerWorker.java
index 6feb85a..e69e641 100644
--- a/server/task/src/main/java/org/apache/james/task/MemoryTaskManagerWorker.java
+++ b/server/task/src/main/java/org/apache/james/task/MemoryTaskManagerWorker.java
@@ -18,21 +18,27 @@
****************************************************************/
package org.apache.james.task;
+import java.time.Duration;
import java.util.Optional;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
+import org.apache.james.util.FunctionalUtils;
import org.apache.james.util.MDCBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class MemoryTaskManagerWorker implements TaskManagerWorker {
private static final boolean INTERRUPT_IF_RUNNING = true;
private static final Logger LOGGER = LoggerFactory.getLogger(MemoryTaskManagerWorker.class);
+ public static final Duration CHECK_CANCELED_PERIOD = Duration.ofMillis(100);
+ public static final int FIRST = 1;
private final ConcurrentHashMap<TaskId, CompletableFuture<Task.Result>> idToFuture = new ConcurrentHashMap<>();
@Override
@@ -42,8 +48,12 @@ public class MemoryTaskManagerWorker implements TaskManagerWorker {
idToFuture.put(taskWithId.getId(), futureResult);
Mono<Task.Result> result = Mono.<Task.Result>fromFuture(futureResult)
- .doOnError(res -> failed(updateDetails,
- (logger, details) -> logger.error("Task was partially performed. Check logs for more details")))
+ .doOnError(res -> {
+ if (!(res instanceof CancellationException)) {
+ failed(updateDetails,
+ (logger, details) -> logger.error("Task was partially performed. Check logs for more details"));
+ }
+ })
.doOnTerminate(() -> idToFuture.remove(taskWithId.getId()));
return result;
@@ -77,16 +87,44 @@ public class MemoryTaskManagerWorker implements TaskManagerWorker {
public void cancelTask(TaskId id, Consumer<TaskExecutionDetailsUpdater> updateDetails) {
Optional.ofNullable(idToFuture.remove(id))
.ifPresent(future -> {
- updateDetails.accept(details -> {
- if (details.getStatus().equals(TaskManager.Status.WAITING) || details.getStatus().equals(TaskManager.Status.IN_PROGRESS)) {
- return details.cancel();
- }
- return details;
- });
- future.cancel(INTERRUPT_IF_RUNNING);
+ requestCancellation(updateDetails, future);
+ waitUntilFutureIsCancelled(future)
+ .subscribe(cancellationSuccessful -> effectivelyCancelled(updateDetails));
});
}
+ private void requestCancellation(Consumer<TaskExecutionDetailsUpdater> updateDetails, CompletableFuture<Task.Result> future) {
+ updateDetails.accept(details -> {
+ if (details.getStatus().equals(TaskManager.Status.WAITING) || details.getStatus().equals(TaskManager.Status.IN_PROGRESS)) {
+ return details.cancelRequested();
+ }
+ return details;
+ });
+ future.cancel(INTERRUPT_IF_RUNNING);
+ }
+
+ private Flux<Boolean> waitUntilFutureIsCancelled(CompletableFuture<Task.Result> future) {
+ return Flux.interval(CHECK_CANCELED_PERIOD)
+ .map(ignore -> future.isCancelled())
+ .filter(FunctionalUtils.identityPredicate())
+ .take(FIRST);
+ }
+
+ private void effectivelyCancelled(Consumer<TaskExecutionDetailsUpdater> updateDetails) {
+ updateDetails.accept(details -> {
+ if (canBeCancelledEffectively(details)) {
+ return details.cancelEffectively();
+ }
+ return details;
+ });
+ }
+
+ private boolean canBeCancelledEffectively(TaskExecutionDetails details) {
+ return details.getStatus().equals(TaskManager.Status.WAITING)
+ || details.getStatus().equals(TaskManager.Status.IN_PROGRESS)
+ || details.getStatus().equals(TaskManager.Status.CANCEL_REQUESTED);
+ }
+
private void success(Consumer<TaskExecutionDetailsUpdater> updateDetails) {
updateDetails.accept(currentDetails -> {
if (!wasCancelled(currentDetails)) {
diff --git a/server/task/src/main/java/org/apache/james/task/TaskExecutionDetails.java b/server/task/src/main/java/org/apache/james/task/TaskExecutionDetails.java
index a95627c..06b9ecf 100644
--- a/server/task/src/main/java/org/apache/james/task/TaskExecutionDetails.java
+++ b/server/task/src/main/java/org/apache/james/task/TaskExecutionDetails.java
@@ -140,12 +140,26 @@ public class TaskExecutionDetails {
Optional.of(ZonedDateTime.now()));
}
- public TaskExecutionDetails cancel() {
+ public TaskExecutionDetails cancelRequested() {
Preconditions.checkState(status == TaskManager.Status.IN_PROGRESS
|| status == TaskManager.Status.WAITING);
return new TaskExecutionDetails(
taskId,
task,
+ TaskManager.Status.CANCEL_REQUESTED,
+ submitDate,
+ startedDate,
+ Optional.empty(),
+ Optional.of(ZonedDateTime.now()),
+ Optional.empty());
+ }
+
+ public TaskExecutionDetails cancelEffectively() {
+ Preconditions.checkState(status == TaskManager.Status.IN_PROGRESS
+ || status == TaskManager.Status.WAITING || status == TaskManager.Status.CANCEL_REQUESTED);
+ return new TaskExecutionDetails(
+ taskId,
+ task,
TaskManager.Status.CANCELLED,
submitDate,
startedDate,
diff --git a/server/task/src/main/java/org/apache/james/task/TaskManager.java b/server/task/src/main/java/org/apache/james/task/TaskManager.java
index a3fabc3..de3e16b 100644
--- a/server/task/src/main/java/org/apache/james/task/TaskManager.java
+++ b/server/task/src/main/java/org/apache/james/task/TaskManager.java
@@ -27,6 +27,7 @@ public interface TaskManager {
WAITING("waiting"),
IN_PROGRESS("inProgress"),
COMPLETED("completed"),
+ CANCEL_REQUESTED("canceledRequested"),
CANCELLED("canceled"),
FAILED("failed");
diff --git a/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java b/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java
index c857ec1..676071f 100644
--- a/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java
+++ b/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java
@@ -108,6 +108,35 @@ public class MemoryTaskManagerTest {
}
@Test
+ public void completedTaskShouldNotBeCancelled() {
+ TaskId id = memoryTaskManager.submit(() -> Task.Result.COMPLETED);
+
+ awaitUntilTaskHasStatus(id, TaskManager.Status.COMPLETED);
+ memoryTaskManager.cancel(id);
+
+ try {
+ awaitUntilTaskHasStatus(id, TaskManager.Status.CANCELLED);
+ } catch (Exception e) {
+ //Should timeout
+ }
+ assertThat(memoryTaskManager.getExecutionDetails(id).getStatus()).isEqualTo(TaskManager.Status.COMPLETED);
+ }
+ @Test
+ public void failedTaskShouldNotBeCancelled() {
+ TaskId id = memoryTaskManager.submit(() -> Task.Result.PARTIAL);
+
+ awaitUntilTaskHasStatus(id, TaskManager.Status.FAILED);
+ memoryTaskManager.cancel(id);
+
+ try {
+ awaitUntilTaskHasStatus(id, TaskManager.Status.CANCELLED);
+ } catch (Exception e) {
+ //Should timeout
+ }
+ assertThat(memoryTaskManager.getExecutionDetails(id).getStatus()).isEqualTo(TaskManager.Status.FAILED);
+ }
+
+ @Test
public void getStatusShouldBeCancelledWhenCancelled() {
TaskId id = memoryTaskManager.submit(() -> {
sleep(500);
@@ -118,7 +147,35 @@ public class MemoryTaskManagerTest {
memoryTaskManager.cancel(id);
assertThat(memoryTaskManager.getExecutionDetails(id).getStatus())
+ .isIn(TaskManager.Status.CANCELLED, TaskManager.Status.CANCEL_REQUESTED);
+
+ awaitUntilTaskHasStatus(id, TaskManager.Status.CANCELLED);
+ assertThat(memoryTaskManager.getExecutionDetails(id).getStatus())
.isEqualTo(TaskManager.Status.CANCELLED);
+
+ }
+
+ @Test
+ public void aWaitingTaskShouldBeCancelled() {
+ TaskId id = memoryTaskManager.submit(() -> {
+ sleep(500);
+ return Task.Result.COMPLETED;
+ });
+
+ TaskId idTaskToCancel = memoryTaskManager.submit(() -> Task.Result.COMPLETED);
+
+ memoryTaskManager.cancel(idTaskToCancel);
+
+ awaitUntilTaskHasStatus(id, TaskManager.Status.IN_PROGRESS);
+
+
+ assertThat(memoryTaskManager.getExecutionDetails(idTaskToCancel).getStatus())
+ .isIn(TaskManager.Status.CANCELLED, TaskManager.Status.CANCEL_REQUESTED);
+
+ awaitUntilTaskHasStatus(idTaskToCancel, TaskManager.Status.CANCELLED);
+ assertThat(memoryTaskManager.getExecutionDetails(idTaskToCancel).getStatus())
+ .isEqualTo(TaskManager.Status.CANCELLED);
+
}
@Test
@@ -345,17 +402,22 @@ public class MemoryTaskManagerTest {
}
@Test
- public void listShouldBeEmptyWhenNoTasks() throws Exception {
+ public void listShouldBeEmptyWhenNoTasks() {
assertThat(memoryTaskManager.list()).isEmpty();
}
@Test
- public void listCancelledShouldBeEmptyWhenNoTasks() throws Exception {
+ public void listCancelledShouldBeEmptyWhenNoTasks() {
assertThat(memoryTaskManager.list(TaskManager.Status.CANCELLED)).isEmpty();
}
@Test
- public void awaitShouldNotThrowWhenCompletedTask() throws Exception {
+ public void listCancelRequestedShouldBeEmptyWhenNoTasks() {
+ assertThat(memoryTaskManager.list(TaskManager.Status.CANCEL_REQUESTED)).isEmpty();
+ }
+
+ @Test
+ public void awaitShouldNotThrowWhenCompletedTask() {
TaskId taskId = memoryTaskManager.submit(
() -> Task.Result.COMPLETED);
memoryTaskManager.await(taskId);
diff --git a/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerWorkerTest.java b/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerWorkerTest.java
index e4ae643..10c4d6a 100644
--- a/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerWorkerTest.java
+++ b/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerWorkerTest.java
@@ -19,12 +19,17 @@
package org.apache.james.task;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Duration.ONE_HUNDRED_MILLISECONDS;
+import static org.awaitility.Duration.ONE_SECOND;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
+import org.awaitility.Awaitility;
+import org.awaitility.Duration;
+import org.awaitility.core.ConditionFactory;
import org.junit.Test;
import reactor.core.publisher.Mono;
@@ -33,6 +38,15 @@ public class MemoryTaskManagerWorkerTest {
private final MemoryTaskManagerWorker worker = new MemoryTaskManagerWorker();
+ private final Duration slowPacedPollInterval = ONE_HUNDRED_MILLISECONDS;
+ private final ConditionFactory calmlyAwait = Awaitility.with()
+ .pollInterval(slowPacedPollInterval)
+ .and()
+ .with()
+ .pollDelay(slowPacedPollInterval)
+ .await();
+ private final ConditionFactory awaitAtMostOneSecond = calmlyAwait.atMost(ONE_SECOND);
+
private final Task successfulTask = () -> Task.Result.COMPLETED;
private final Task failedTask = () -> Task.Result.PARTIAL;
private final Task throwingTask = () -> {
@@ -76,6 +90,27 @@ public class MemoryTaskManagerWorkerTest {
assertThat(idToDetails.get(id).getStatus()).isEqualTo(TaskManager.Status.IN_PROGRESS);
latch.countDown();
}
+ @Test
+ public void theWorkerShouldNotRunATaskRequestedForCancellation() {
+ TaskId id = TaskId.generateTaskId();
+ AtomicInteger counter = new AtomicInteger(0);
+
+ Task task = () -> {
+ counter.incrementAndGet();
+ return Task.Result.COMPLETED;
+ };
+
+ TaskWithId taskWithId = new TaskWithId(id, task);
+ TaskExecutionDetails executionDetails = TaskExecutionDetails.from(task, id);
+ ConcurrentHashMap<TaskId, TaskExecutionDetails> idToDetails = new ConcurrentHashMap<>();
+
+ idToDetails.put(id, executionDetails.cancelRequested());
+
+ worker.executeTask(taskWithId, updateDetails(idToDetails, id)).subscribe();
+
+ assertThat(counter.get()).isEqualTo(0);
+ assertThat(idToDetails.get(id).getStatus()).isEqualTo(TaskManager.Status.CANCEL_REQUESTED);
+ }
@Test
public void theWorkerShouldNotRunACancelledTask() {
@@ -91,9 +126,9 @@ public class MemoryTaskManagerWorkerTest {
TaskExecutionDetails executionDetails = TaskExecutionDetails.from(task, id);
ConcurrentHashMap<TaskId, TaskExecutionDetails> idToDetails = new ConcurrentHashMap<>();
- idToDetails.put(id, executionDetails.cancel());
+ idToDetails.put(id, executionDetails.cancelEffectively());
- worker.executeTask(taskWithId, updateDetails(idToDetails, id)).cache();
+ worker.executeTask(taskWithId, updateDetails(idToDetails, id)).subscribe();
assertThat(counter.get()).isEqualTo(0);
assertThat(idToDetails.get(id).getStatus()).isEqualTo(TaskManager.Status.CANCELLED);
@@ -119,9 +154,11 @@ public class MemoryTaskManagerWorkerTest {
worker.executeTask(taskWithId, updateDetails(idToDetails, id)).cache();
worker.cancelTask(id, updateDetails(idToDetails, id));
- assertThat(idToDetails.get(id).getStatus()).isEqualTo(TaskManager.Status.CANCELLED);
+ assertThat(idToDetails.get(id).getStatus()).isIn(TaskManager.Status.CANCELLED, TaskManager.Status.CANCEL_REQUESTED);
assertThat(counter.get()).isEqualTo(0);
+
+ awaitUntilTaskHasStatus(idToDetails, id, TaskManager.Status.CANCELLED);
assertThat(idToDetails.get(id).getStatus()).isEqualTo(TaskManager.Status.CANCELLED);
}
@@ -153,6 +190,10 @@ public class MemoryTaskManagerWorkerTest {
};
}
+ private void awaitUntilTaskHasStatus(ConcurrentHashMap<TaskId, TaskExecutionDetails> idToExecutionDetails, TaskId id, TaskManager.Status status) {
+ awaitAtMostOneSecond.until(() -> idToExecutionDetails.get(id).getStatus().equals(status));
+ }
+
private void await(CountDownLatch countDownLatch) {
try {
countDownLatch.await();
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org