You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ro...@apache.org on 2019/05/28 07:39:30 UTC

[james-project] 05/06: JAMES-2777 add task requested for cancellation status

This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 3adf8bcb65142cfda99e41f5d80fe123d97189cb
Author: RĂ©mi Kowalski <rk...@linagora.com>
AuthorDate: Wed May 22 14:12:27 2019 +0200

    JAMES-2777 add task requested for cancellation status
---
 .../james/webadmin/routes/TasksRoutesTest.java     | 20 ++++++-
 .../org/apache/james/task/MemoryTaskManager.java   | 34 ++++++++---
 .../apache/james/task/MemoryTaskManagerWorker.java | 56 +++++++++++++++---
 .../apache/james/task/TaskExecutionDetails.java    | 16 ++++-
 .../java/org/apache/james/task/TaskManager.java    |  1 +
 .../apache/james/task/MemoryTaskManagerTest.java   | 68 +++++++++++++++++++++-
 .../james/task/MemoryTaskManagerWorkerTest.java    | 47 ++++++++++++++-
 7 files changed, 218 insertions(+), 24 deletions(-)

diff --git a/server/protocols/webadmin/webadmin-core/src/test/java/org/apache/james/webadmin/routes/TasksRoutesTest.java b/server/protocols/webadmin/webadmin-core/src/test/java/org/apache/james/webadmin/routes/TasksRoutesTest.java
index b65a84f..825a41a 100644
--- a/server/protocols/webadmin/webadmin-core/src/test/java/org/apache/james/webadmin/routes/TasksRoutesTest.java
+++ b/server/protocols/webadmin/webadmin-core/src/test/java/org/apache/james/webadmin/routes/TasksRoutesTest.java
@@ -26,6 +26,8 @@ import static org.apache.james.webadmin.WebAdminServer.NO_CONFIGURATION;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.isIn;
+import static org.hamcrest.Matchers.isOneOf;
 import static org.hamcrest.Matchers.not;
 
 import java.util.UUID;
@@ -204,8 +206,14 @@ public class TasksRoutesTest {
 
     @Test
     public void deleteShouldCancelMatchingTask() {
+        CountDownLatch inProgressLatch = new CountDownLatch(1);
+
         TaskId taskId = taskManager.submit(() -> {
-            await();
+            try {
+                inProgressLatch.await();
+            } catch (InterruptedException e) {
+                //ignore
+            }
             return Task.Result.COMPLETED;
         });
 
@@ -216,7 +224,17 @@ public class TasksRoutesTest {
             .get("/" + taskId.getValue())
         .then()
             .statusCode(HttpStatus.OK_200)
+            .body("status", isOneOf("canceledRequested", "canceled"));
+
+        inProgressLatch.countDown();
+        when()
+            .get("/" + taskId.getValue())
+            .then()
+            .statusCode(HttpStatus.OK_200)
             .body("status", is("canceled"));
+
+
+
     }
 
     @Test
diff --git a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
index 1623858..566ce32 100644
--- a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
+++ b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
@@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.function.Consumer;
+import java.util.function.Predicate;
 
 import javax.annotation.PreDestroy;
 
@@ -60,8 +61,24 @@ public class MemoryTaskManager implements TaskManager {
         worker = new MemoryTaskManagerWorker();
         workQueue
             .subscribeOn(Schedulers.single())
-            .filter(task -> !listIds(Status.CANCELLED).contains(task.getId()))
-            .subscribe(this::treatTask);
+            .filter(isTaskWaiting().or(isTaskRequestedForCancellation()))
+            .subscribe(this::sendTaskToWorker);
+    }
+
+    private void sendTaskToWorker(TaskWithId taskWithId) {
+        if (isTaskWaiting().test(taskWithId)) {
+            treatTask(taskWithId);
+        } else if (isTaskRequestedForCancellation().test(taskWithId)) {
+            updateDetails(taskWithId.getId()).accept(TaskExecutionDetails::cancelEffectively);
+        }
+    }
+
+    private Predicate<TaskWithId> isTaskWaiting() {
+        return task -> listIds(Status.WAITING).contains(task.getId());
+    }
+
+    private Predicate<TaskWithId> isTaskRequestedForCancellation() {
+        return task -> listIds(Status.CANCEL_REQUESTED).contains(task.getId());
     }
 
     private void treatTask(TaskWithId task) {
@@ -111,15 +128,18 @@ public class MemoryTaskManager implements TaskManager {
 
     @Override
     public void cancel(TaskId id) {
-        TaskExecutionDetails details = getExecutionDetails(id);
-        if (details.getStatus().equals(Status.IN_PROGRESS) || details.getStatus().equals(Status.WAITING)) {
-            worker.cancelTask(id, updateDetails(id));
-        }
+        Optional.ofNullable(idToExecutionDetails.get(id)).ifPresent(details -> {
+                if (details.getStatus().equals(Status.WAITING)) {
+                    updateDetails(id).accept(currentDetails -> currentDetails.cancelRequested());
+                }
+                worker.cancelTask(id, updateDetails(id));
+            }
+        );
     }
 
     @Override
     public TaskExecutionDetails await(TaskId id) {
-        if (Optional.ofNullable(getExecutionDetails(id)).isPresent()) {
+        if (Optional.ofNullable(idToExecutionDetails.get(id)).isPresent()) {
             return Flux.interval(NOW, AWAIT_POLLING_DURATION, Schedulers.elastic())
                 .filter(ignore -> tasksResult.get(id) != null)
                 .map(ignore -> {
diff --git a/server/task/src/main/java/org/apache/james/task/MemoryTaskManagerWorker.java b/server/task/src/main/java/org/apache/james/task/MemoryTaskManagerWorker.java
index 6feb85a..e69e641 100644
--- a/server/task/src/main/java/org/apache/james/task/MemoryTaskManagerWorker.java
+++ b/server/task/src/main/java/org/apache/james/task/MemoryTaskManagerWorker.java
@@ -18,21 +18,27 @@
  ****************************************************************/
 package org.apache.james.task;
 
+import java.time.Duration;
 import java.util.Optional;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 
+import org.apache.james.util.FunctionalUtils;
 import org.apache.james.util.MDCBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public class MemoryTaskManagerWorker implements TaskManagerWorker {
     private static final boolean INTERRUPT_IF_RUNNING = true;
     private static final Logger LOGGER = LoggerFactory.getLogger(MemoryTaskManagerWorker.class);
+    public static final Duration CHECK_CANCELED_PERIOD = Duration.ofMillis(100);
+    public static final int FIRST = 1;
     private final ConcurrentHashMap<TaskId, CompletableFuture<Task.Result>> idToFuture = new ConcurrentHashMap<>();
 
     @Override
@@ -42,8 +48,12 @@ public class MemoryTaskManagerWorker implements TaskManagerWorker {
         idToFuture.put(taskWithId.getId(), futureResult);
 
         Mono<Task.Result> result = Mono.<Task.Result>fromFuture(futureResult)
-            .doOnError(res -> failed(updateDetails,
-                (logger, details) -> logger.error("Task was partially performed. Check logs for more details")))
+            .doOnError(res -> {
+                if (!(res instanceof CancellationException)) {
+                    failed(updateDetails,
+                        (logger, details) -> logger.error("Task was partially performed. Check logs for more details"));
+                }
+            })
             .doOnTerminate(() -> idToFuture.remove(taskWithId.getId()));
 
         return result;
@@ -77,16 +87,44 @@ public class MemoryTaskManagerWorker implements TaskManagerWorker {
     public void cancelTask(TaskId id, Consumer<TaskExecutionDetailsUpdater> updateDetails) {
         Optional.ofNullable(idToFuture.remove(id))
             .ifPresent(future -> {
-                updateDetails.accept(details -> {
-                    if (details.getStatus().equals(TaskManager.Status.WAITING) || details.getStatus().equals(TaskManager.Status.IN_PROGRESS)) {
-                        return details.cancel();
-                    }
-                    return details;
-                });
-                future.cancel(INTERRUPT_IF_RUNNING);
+                requestCancellation(updateDetails, future);
+                waitUntilFutureIsCancelled(future)
+                    .subscribe(cancellationSuccessful -> effectivelyCancelled(updateDetails));
             });
     }
 
+    private void requestCancellation(Consumer<TaskExecutionDetailsUpdater> updateDetails, CompletableFuture<Task.Result> future) {
+        updateDetails.accept(details -> {
+            if (details.getStatus().equals(TaskManager.Status.WAITING) || details.getStatus().equals(TaskManager.Status.IN_PROGRESS)) {
+                return details.cancelRequested();
+            }
+            return details;
+        });
+        future.cancel(INTERRUPT_IF_RUNNING);
+    }
+
+    private Flux<Boolean> waitUntilFutureIsCancelled(CompletableFuture<Task.Result> future) {
+        return Flux.interval(CHECK_CANCELED_PERIOD)
+            .map(ignore -> future.isCancelled())
+            .filter(FunctionalUtils.identityPredicate())
+            .take(FIRST);
+    }
+
+    private void effectivelyCancelled(Consumer<TaskExecutionDetailsUpdater> updateDetails) {
+        updateDetails.accept(details -> {
+            if (canBeCancelledEffectively(details)) {
+                return details.cancelEffectively();
+            }
+            return details;
+        });
+    }
+
+    private boolean canBeCancelledEffectively(TaskExecutionDetails details) {
+        return details.getStatus().equals(TaskManager.Status.WAITING)
+            || details.getStatus().equals(TaskManager.Status.IN_PROGRESS)
+            || details.getStatus().equals(TaskManager.Status.CANCEL_REQUESTED);
+    }
+
     private void success(Consumer<TaskExecutionDetailsUpdater> updateDetails) {
         updateDetails.accept(currentDetails -> {
             if (!wasCancelled(currentDetails)) {
diff --git a/server/task/src/main/java/org/apache/james/task/TaskExecutionDetails.java b/server/task/src/main/java/org/apache/james/task/TaskExecutionDetails.java
index a95627c..06b9ecf 100644
--- a/server/task/src/main/java/org/apache/james/task/TaskExecutionDetails.java
+++ b/server/task/src/main/java/org/apache/james/task/TaskExecutionDetails.java
@@ -140,12 +140,26 @@ public class TaskExecutionDetails {
             Optional.of(ZonedDateTime.now()));
     }
 
-    public TaskExecutionDetails cancel() {
+    public TaskExecutionDetails cancelRequested() {
         Preconditions.checkState(status == TaskManager.Status.IN_PROGRESS
             || status == TaskManager.Status.WAITING);
         return new TaskExecutionDetails(
             taskId,
             task,
+            TaskManager.Status.CANCEL_REQUESTED,
+            submitDate,
+            startedDate,
+            Optional.empty(),
+            Optional.of(ZonedDateTime.now()),
+            Optional.empty());
+    }
+
+    public TaskExecutionDetails cancelEffectively() {
+        Preconditions.checkState(status == TaskManager.Status.IN_PROGRESS
+            || status == TaskManager.Status.WAITING || status == TaskManager.Status.CANCEL_REQUESTED);
+        return new TaskExecutionDetails(
+            taskId,
+            task,
             TaskManager.Status.CANCELLED,
             submitDate,
             startedDate,
diff --git a/server/task/src/main/java/org/apache/james/task/TaskManager.java b/server/task/src/main/java/org/apache/james/task/TaskManager.java
index a3fabc3..de3e16b 100644
--- a/server/task/src/main/java/org/apache/james/task/TaskManager.java
+++ b/server/task/src/main/java/org/apache/james/task/TaskManager.java
@@ -27,6 +27,7 @@ public interface TaskManager {
         WAITING("waiting"),
         IN_PROGRESS("inProgress"),
         COMPLETED("completed"),
+        CANCEL_REQUESTED("canceledRequested"),
         CANCELLED("canceled"),
         FAILED("failed");
 
diff --git a/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java b/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java
index c857ec1..676071f 100644
--- a/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java
+++ b/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java
@@ -108,6 +108,35 @@ public class MemoryTaskManagerTest {
     }
 
     @Test
+    public void completedTaskShouldNotBeCancelled() {
+        TaskId id = memoryTaskManager.submit(() -> Task.Result.COMPLETED);
+
+        awaitUntilTaskHasStatus(id, TaskManager.Status.COMPLETED);
+        memoryTaskManager.cancel(id);
+
+       try {
+           awaitUntilTaskHasStatus(id, TaskManager.Status.CANCELLED);
+       } catch (Exception e) {
+           //Should timeout
+       }
+        assertThat(memoryTaskManager.getExecutionDetails(id).getStatus()).isEqualTo(TaskManager.Status.COMPLETED);
+    }
+    @Test
+    public void failedTaskShouldNotBeCancelled() {
+        TaskId id = memoryTaskManager.submit(() -> Task.Result.PARTIAL);
+
+        awaitUntilTaskHasStatus(id, TaskManager.Status.FAILED);
+        memoryTaskManager.cancel(id);
+
+       try {
+           awaitUntilTaskHasStatus(id, TaskManager.Status.CANCELLED);
+       } catch (Exception e) {
+           //Should timeout
+       }
+        assertThat(memoryTaskManager.getExecutionDetails(id).getStatus()).isEqualTo(TaskManager.Status.FAILED);
+    }
+
+    @Test
     public void getStatusShouldBeCancelledWhenCancelled() {
         TaskId id = memoryTaskManager.submit(() -> {
             sleep(500);
@@ -118,7 +147,35 @@ public class MemoryTaskManagerTest {
         memoryTaskManager.cancel(id);
 
         assertThat(memoryTaskManager.getExecutionDetails(id).getStatus())
+            .isIn(TaskManager.Status.CANCELLED, TaskManager.Status.CANCEL_REQUESTED);
+
+        awaitUntilTaskHasStatus(id, TaskManager.Status.CANCELLED);
+        assertThat(memoryTaskManager.getExecutionDetails(id).getStatus())
             .isEqualTo(TaskManager.Status.CANCELLED);
+
+    }
+
+    @Test
+    public void aWaitingTaskShouldBeCancelled() {
+        TaskId id = memoryTaskManager.submit(() -> {
+            sleep(500);
+            return Task.Result.COMPLETED;
+        });
+
+        TaskId idTaskToCancel = memoryTaskManager.submit(() -> Task.Result.COMPLETED);
+
+        memoryTaskManager.cancel(idTaskToCancel);
+
+        awaitUntilTaskHasStatus(id, TaskManager.Status.IN_PROGRESS);
+
+
+        assertThat(memoryTaskManager.getExecutionDetails(idTaskToCancel).getStatus())
+            .isIn(TaskManager.Status.CANCELLED, TaskManager.Status.CANCEL_REQUESTED);
+
+        awaitUntilTaskHasStatus(idTaskToCancel, TaskManager.Status.CANCELLED);
+        assertThat(memoryTaskManager.getExecutionDetails(idTaskToCancel).getStatus())
+            .isEqualTo(TaskManager.Status.CANCELLED);
+
     }
 
     @Test
@@ -345,17 +402,22 @@ public class MemoryTaskManagerTest {
     }
 
     @Test
-    public void listShouldBeEmptyWhenNoTasks() throws Exception {
+    public void listShouldBeEmptyWhenNoTasks() {
         assertThat(memoryTaskManager.list()).isEmpty();
     }
 
     @Test
-    public void listCancelledShouldBeEmptyWhenNoTasks() throws Exception {
+    public void listCancelledShouldBeEmptyWhenNoTasks() {
         assertThat(memoryTaskManager.list(TaskManager.Status.CANCELLED)).isEmpty();
     }
 
     @Test
-    public void awaitShouldNotThrowWhenCompletedTask() throws Exception {
+    public void listCancelRequestedShouldBeEmptyWhenNoTasks() {
+        assertThat(memoryTaskManager.list(TaskManager.Status.CANCEL_REQUESTED)).isEmpty();
+    }
+
+    @Test
+    public void awaitShouldNotThrowWhenCompletedTask() {
         TaskId taskId = memoryTaskManager.submit(
             () -> Task.Result.COMPLETED);
         memoryTaskManager.await(taskId);
diff --git a/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerWorkerTest.java b/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerWorkerTest.java
index e4ae643..10c4d6a 100644
--- a/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerWorkerTest.java
+++ b/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerWorkerTest.java
@@ -19,12 +19,17 @@
 package org.apache.james.task;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Duration.ONE_HUNDRED_MILLISECONDS;
+import static org.awaitility.Duration.ONE_SECOND;
 
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 
+import org.awaitility.Awaitility;
+import org.awaitility.Duration;
+import org.awaitility.core.ConditionFactory;
 import org.junit.Test;
 
 import reactor.core.publisher.Mono;
@@ -33,6 +38,15 @@ public class MemoryTaskManagerWorkerTest {
 
     private final MemoryTaskManagerWorker worker = new MemoryTaskManagerWorker();
 
+    private final Duration slowPacedPollInterval = ONE_HUNDRED_MILLISECONDS;
+    private final ConditionFactory calmlyAwait = Awaitility.with()
+        .pollInterval(slowPacedPollInterval)
+        .and()
+        .with()
+        .pollDelay(slowPacedPollInterval)
+        .await();
+    private final ConditionFactory awaitAtMostOneSecond = calmlyAwait.atMost(ONE_SECOND);
+
     private final Task successfulTask = () -> Task.Result.COMPLETED;
     private final Task failedTask = () -> Task.Result.PARTIAL;
     private final Task throwingTask = () -> {
@@ -76,6 +90,27 @@ public class MemoryTaskManagerWorkerTest {
         assertThat(idToDetails.get(id).getStatus()).isEqualTo(TaskManager.Status.IN_PROGRESS);
         latch.countDown();
     }
+    @Test
+    public void theWorkerShouldNotRunATaskRequestedForCancellation() {
+        TaskId id = TaskId.generateTaskId();
+        AtomicInteger counter = new AtomicInteger(0);
+
+        Task task = () -> {
+            counter.incrementAndGet();
+            return Task.Result.COMPLETED;
+        };
+
+        TaskWithId taskWithId = new TaskWithId(id, task);
+        TaskExecutionDetails executionDetails = TaskExecutionDetails.from(task, id);
+        ConcurrentHashMap<TaskId, TaskExecutionDetails> idToDetails = new ConcurrentHashMap<>();
+
+        idToDetails.put(id, executionDetails.cancelRequested());
+
+        worker.executeTask(taskWithId, updateDetails(idToDetails, id)).subscribe();
+
+        assertThat(counter.get()).isEqualTo(0);
+        assertThat(idToDetails.get(id).getStatus()).isEqualTo(TaskManager.Status.CANCEL_REQUESTED);
+    }
 
     @Test
     public void theWorkerShouldNotRunACancelledTask() {
@@ -91,9 +126,9 @@ public class MemoryTaskManagerWorkerTest {
         TaskExecutionDetails executionDetails = TaskExecutionDetails.from(task, id);
         ConcurrentHashMap<TaskId, TaskExecutionDetails> idToDetails = new ConcurrentHashMap<>();
 
-        idToDetails.put(id, executionDetails.cancel());
+        idToDetails.put(id, executionDetails.cancelEffectively());
 
-        worker.executeTask(taskWithId, updateDetails(idToDetails, id)).cache();
+        worker.executeTask(taskWithId, updateDetails(idToDetails, id)).subscribe();
 
         assertThat(counter.get()).isEqualTo(0);
         assertThat(idToDetails.get(id).getStatus()).isEqualTo(TaskManager.Status.CANCELLED);
@@ -119,9 +154,11 @@ public class MemoryTaskManagerWorkerTest {
         worker.executeTask(taskWithId, updateDetails(idToDetails, id)).cache();
 
         worker.cancelTask(id, updateDetails(idToDetails, id));
-        assertThat(idToDetails.get(id).getStatus()).isEqualTo(TaskManager.Status.CANCELLED);
+        assertThat(idToDetails.get(id).getStatus()).isIn(TaskManager.Status.CANCELLED, TaskManager.Status.CANCEL_REQUESTED);
 
         assertThat(counter.get()).isEqualTo(0);
+
+        awaitUntilTaskHasStatus(idToDetails, id, TaskManager.Status.CANCELLED);
         assertThat(idToDetails.get(id).getStatus()).isEqualTo(TaskManager.Status.CANCELLED);
     }
 
@@ -153,6 +190,10 @@ public class MemoryTaskManagerWorkerTest {
         };
     }
 
+    private void awaitUntilTaskHasStatus(ConcurrentHashMap<TaskId, TaskExecutionDetails> idToExecutionDetails, TaskId id, TaskManager.Status status) {
+        awaitAtMostOneSecond.until(() -> idToExecutionDetails.get(id).getStatus().equals(status));
+    }
+
     private void await(CountDownLatch countDownLatch) {
         try {
             countDownLatch.await();


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org