You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ro...@apache.org on 2019/05/28 07:39:25 UTC

[james-project] branch master updated (8df6ddc -> aa60bfb)

This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git.


    from 8df6ddc  JAMES-2766 Upgrading to ElasticSearch 6.7
     new c57b922  JAMES-2777 add a class representing a task with its id
     new c420360  JAMES-2777 create a MemoryTaskManagerWorker responsible to execute the tasks
     new 1f232d4  JAMES-2777 make the MemoryTaskManager deleguate the execution of the tasks to its worker
     new bcdbaa7  refactor MemoryTaskManagerTest to avoid too many Thread.sleep
     new 3adf8bc  JAMES-2777 add task requested for cancellation status
     new aa60bfb  JAMES-2777 use junit 5 in server.task project

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../james/webadmin/routes/TasksRoutesTest.java     |  20 +-
 server/task/pom.xml                                |  19 +-
 .../org/apache/james/task/MemoryTaskManager.java   | 166 ++++++++--------
 .../apache/james/task/MemoryTaskManagerWorker.java | 152 ++++++++++++++
 .../apache/james/task/TaskExecutionDetails.java    |  16 +-
 ...ption.java => TaskExecutionDetailsUpdater.java} |   5 +-
 .../java/org/apache/james/task/TaskManager.java    |   1 +
 .../org/apache/james/task/TaskManagerWorker.java}  |  11 +-
 .../java/org/apache/james/task/TaskWithId.java     |  34 ++--
 .../apache/james/task/MemoryTaskManagerTest.java   | 221 ++++++++++++++-------
 .../james/task/MemoryTaskManagerWorkerTest.java    | 204 +++++++++++++++++++
 .../java/org/apache/james/task/TaskIdTest.java     |   2 +-
 .../test/java/org/apache/james/task/TaskTest.java  |   2 +-
 .../org/apache/james/task/TaskWithIdTest.java}     |  38 ++--
 14 files changed, 689 insertions(+), 202 deletions(-)
 create mode 100644 server/task/src/main/java/org/apache/james/task/MemoryTaskManagerWorker.java
 copy server/task/src/main/java/org/apache/james/task/{TaskNotFoundException.java => TaskExecutionDetailsUpdater.java} (89%)
 copy server/{data/data-jmap/src/main/java/org/apache/james/jmap/api/vacation/VacationRepository.java => task/src/main/java/org/apache/james/task/TaskManagerWorker.java} (80%)
 copy backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/IndexName.java => server/task/src/main/java/org/apache/james/task/TaskWithId.java (70%)
 create mode 100644 server/task/src/test/java/org/apache/james/task/MemoryTaskManagerWorkerTest.java
 copy server/{blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/swift/RegionTest.java => task/src/test/java/org/apache/james/task/TaskWithIdTest.java} (64%)


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 02/06: JAMES-2777 create a MemoryTaskManagerWorker responsible to execute the tasks

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit c420360973e5cb25134a268fa228465a593cd93b
Author: Rémi Kowalski <rk...@linagora.com>
AuthorDate: Tue May 21 11:42:06 2019 +0200

    JAMES-2777 create a MemoryTaskManagerWorker responsible to execute the tasks
---
 .../apache/james/task/MemoryTaskManagerWorker.java | 114 ++++++++++++++
 .../james/task/TaskExecutionDetailsUpdater.java    |  24 +++
 .../org/apache/james/task/TaskManagerWorker.java   |  31 ++++
 .../james/task/MemoryTaskManagerWorkerTest.java    | 163 +++++++++++++++++++++
 4 files changed, 332 insertions(+)

diff --git a/server/task/src/main/java/org/apache/james/task/MemoryTaskManagerWorker.java b/server/task/src/main/java/org/apache/james/task/MemoryTaskManagerWorker.java
new file mode 100644
index 0000000..6feb85a
--- /dev/null
+++ b/server/task/src/main/java/org/apache/james/task/MemoryTaskManagerWorker.java
@@ -0,0 +1,114 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.task;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
+import org.apache.james.util.MDCBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Mono;
+
+public class MemoryTaskManagerWorker implements TaskManagerWorker {
+    private static final boolean INTERRUPT_IF_RUNNING = true;
+    private static final Logger LOGGER = LoggerFactory.getLogger(MemoryTaskManagerWorker.class);
+    private final ConcurrentHashMap<TaskId, CompletableFuture<Task.Result>> idToFuture = new ConcurrentHashMap<>();
+
+    @Override
+    public Mono<Task.Result> executeTask(TaskWithId taskWithId, Consumer<TaskExecutionDetailsUpdater> updateDetails) {
+        CompletableFuture<Task.Result> futureResult = CompletableFuture.supplyAsync(() -> runWithMdc(taskWithId, updateDetails));
+
+        idToFuture.put(taskWithId.getId(), futureResult);
+
+        Mono<Task.Result> result = Mono.<Task.Result>fromFuture(futureResult)
+            .doOnError(res -> failed(updateDetails,
+                (logger, details) -> logger.error("Task was partially performed. Check logs for more details")))
+            .doOnTerminate(() -> idToFuture.remove(taskWithId.getId()));
+
+        return result;
+    }
+
+    private Task.Result runWithMdc(TaskWithId taskWithId, Consumer<TaskExecutionDetailsUpdater> updateDetails) {
+        return MDCBuilder.withMdc(
+            MDCBuilder.create()
+                .addContext(Task.TASK_ID, taskWithId.getId())
+                .addContext(Task.TASK_TYPE, taskWithId.getTask().type())
+                .addContext(Task.TASK_DETAILS, taskWithId.getTask().details()),
+            () -> run(taskWithId, updateDetails));
+    }
+
+
+    private Task.Result run(TaskWithId taskWithId, Consumer<TaskExecutionDetailsUpdater> updateDetails) {
+        updateDetails.accept(TaskExecutionDetails::start);
+        try {
+            return taskWithId.getTask()
+                .run()
+                .onComplete(() -> success(updateDetails))
+                .onFailure(() -> failed(updateDetails, (logger, details) -> logger.error("Task was partially performed. Check logs for more details" + details.getTaskId())));
+        } catch (Exception e) {
+            failed(updateDetails,
+                (logger, executionDetails) -> logger.error("Error while running task", executionDetails, e));
+            return Task.Result.PARTIAL;
+        }
+    }
+
+    @Override
+    public void cancelTask(TaskId id, Consumer<TaskExecutionDetailsUpdater> updateDetails) {
+        Optional.ofNullable(idToFuture.remove(id))
+            .ifPresent(future -> {
+                updateDetails.accept(details -> {
+                    if (details.getStatus().equals(TaskManager.Status.WAITING) || details.getStatus().equals(TaskManager.Status.IN_PROGRESS)) {
+                        return details.cancel();
+                    }
+                    return details;
+                });
+                future.cancel(INTERRUPT_IF_RUNNING);
+            });
+    }
+
+    private void success(Consumer<TaskExecutionDetailsUpdater> updateDetails) {
+        updateDetails.accept(currentDetails -> {
+            if (!wasCancelled(currentDetails)) {
+                LOGGER.info("Task success");
+                return currentDetails.completed();
+            }
+            return currentDetails;
+        });
+    }
+
+    private void failed(Consumer<TaskExecutionDetailsUpdater> updateDetails, BiConsumer<Logger, TaskExecutionDetails> logOperation) {
+        updateDetails.accept(currentDetails -> {
+            if (!wasCancelled(currentDetails)) {
+                logOperation.accept(LOGGER, currentDetails);
+                return currentDetails.failed();
+            }
+            return currentDetails;
+        });
+    }
+
+    private boolean wasCancelled(TaskExecutionDetails details) {
+        return details.getStatus() == TaskManager.Status.CANCELLED;
+    }
+
+}
diff --git a/server/task/src/main/java/org/apache/james/task/TaskExecutionDetailsUpdater.java b/server/task/src/main/java/org/apache/james/task/TaskExecutionDetailsUpdater.java
new file mode 100644
index 0000000..fe008f6
--- /dev/null
+++ b/server/task/src/main/java/org/apache/james/task/TaskExecutionDetailsUpdater.java
@@ -0,0 +1,24 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.task;
+
+@FunctionalInterface
+public interface TaskExecutionDetailsUpdater {
+    TaskExecutionDetails update(TaskExecutionDetails currentDetails);
+}
diff --git a/server/task/src/main/java/org/apache/james/task/TaskManagerWorker.java b/server/task/src/main/java/org/apache/james/task/TaskManagerWorker.java
new file mode 100644
index 0000000..b30a850
--- /dev/null
+++ b/server/task/src/main/java/org/apache/james/task/TaskManagerWorker.java
@@ -0,0 +1,31 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.task;
+
+import java.util.function.Consumer;
+
+import reactor.core.publisher.Mono;
+
+public interface TaskManagerWorker {
+
+    Mono<Task.Result> executeTask(TaskWithId taskWithId, Consumer<TaskExecutionDetailsUpdater> updateDetails);
+
+    void cancelTask(TaskId id, Consumer<TaskExecutionDetailsUpdater> updateDetails);
+
+}
diff --git a/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerWorkerTest.java b/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerWorkerTest.java
new file mode 100644
index 0000000..e4ae643
--- /dev/null
+++ b/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerWorkerTest.java
@@ -0,0 +1,163 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.task;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+
+import org.junit.Test;
+
+import reactor.core.publisher.Mono;
+
+public class MemoryTaskManagerWorkerTest {
+
+    private final MemoryTaskManagerWorker worker = new MemoryTaskManagerWorker();
+
+    private final Task successfulTask = () -> Task.Result.COMPLETED;
+    private final Task failedTask = () -> Task.Result.PARTIAL;
+    private final Task throwingTask = () -> {
+        throw new RuntimeException("Throwing Task");
+    };
+
+    @Test
+    public void aSuccessfullTaskShouldCompleteSuccessfully() {
+        assertThatTaskSucceeded(successfulTask);
+    }
+
+    @Test
+    public void aFailedTaskShouldCompleteWithFailedStatus() {
+        assertThatTaskFailed(failedTask);
+    }
+
+    @Test
+    public void aThrowingTaskShouldCompleteWithFailedStatus() {
+        assertThatTaskFailed(throwingTask);
+    }
+
+    @Test
+    public void theWorkerShouldReportThatATaskIsInProgress() {
+        TaskId id = TaskId.generateTaskId();
+        CountDownLatch latch = new CountDownLatch(1);
+        CountDownLatch taskLaunched = new CountDownLatch(1);
+
+        Task inProgressTask = () -> {
+            taskLaunched.countDown();
+            await(latch);
+            return Task.Result.COMPLETED;
+        };
+
+        TaskWithId taskWithId = new TaskWithId(id, inProgressTask);
+        TaskExecutionDetails executionDetails = TaskExecutionDetails.from(inProgressTask, id);
+        ConcurrentHashMap<TaskId, TaskExecutionDetails> idToDetails = new ConcurrentHashMap<>();
+        idToDetails.put(id, executionDetails);
+
+        worker.executeTask(taskWithId, updateDetails(idToDetails, id)).cache();
+        await(taskLaunched);
+        assertThat(idToDetails.get(id).getStatus()).isEqualTo(TaskManager.Status.IN_PROGRESS);
+        latch.countDown();
+    }
+
+    @Test
+    public void theWorkerShouldNotRunACancelledTask() {
+        TaskId id = TaskId.generateTaskId();
+        AtomicInteger counter = new AtomicInteger(0);
+
+        Task task = () -> {
+            counter.incrementAndGet();
+            return Task.Result.COMPLETED;
+        };
+
+        TaskWithId taskWithId = new TaskWithId(id, task);
+        TaskExecutionDetails executionDetails = TaskExecutionDetails.from(task, id);
+        ConcurrentHashMap<TaskId, TaskExecutionDetails> idToDetails = new ConcurrentHashMap<>();
+
+        idToDetails.put(id, executionDetails.cancel());
+
+        worker.executeTask(taskWithId, updateDetails(idToDetails, id)).cache();
+
+        assertThat(counter.get()).isEqualTo(0);
+        assertThat(idToDetails.get(id).getStatus()).isEqualTo(TaskManager.Status.CANCELLED);
+    }
+
+    @Test
+    public void theWorkerShouldCancelAnInProgressTask() {
+        TaskId id = TaskId.generateTaskId();
+        AtomicInteger counter = new AtomicInteger(0);
+        CountDownLatch latch = new CountDownLatch(1);
+
+        Task inProgressTask = () -> {
+            await(latch);
+            counter.incrementAndGet();
+            return Task.Result.COMPLETED;
+        };
+
+        TaskWithId taskWithId = new TaskWithId(id, inProgressTask);
+        TaskExecutionDetails executionDetails = TaskExecutionDetails.from(inProgressTask, id);
+        ConcurrentHashMap<TaskId, TaskExecutionDetails> idToDetails = new ConcurrentHashMap<>();
+        idToDetails.put(id, executionDetails);
+
+        worker.executeTask(taskWithId, updateDetails(idToDetails, id)).cache();
+
+        worker.cancelTask(id, updateDetails(idToDetails, id));
+        assertThat(idToDetails.get(id).getStatus()).isEqualTo(TaskManager.Status.CANCELLED);
+
+        assertThat(counter.get()).isEqualTo(0);
+        assertThat(idToDetails.get(id).getStatus()).isEqualTo(TaskManager.Status.CANCELLED);
+    }
+
+    private void assertThatTaskSucceeded(Task task) {
+        assertTaskExecutionResultAndStatus(task, Task.Result.COMPLETED, TaskManager.Status.COMPLETED);
+    }
+
+    private void assertThatTaskFailed(Task task) {
+        assertTaskExecutionResultAndStatus(task, Task.Result.PARTIAL, TaskManager.Status.FAILED);
+    }
+
+    private void assertTaskExecutionResultAndStatus(Task task, Task.Result expectedResult, TaskManager.Status expectedStatus) {
+        TaskId id = TaskId.generateTaskId();
+        TaskWithId taskWithId = new TaskWithId(id, task);
+        TaskExecutionDetails executionDetails = TaskExecutionDetails.from(task, id);
+        ConcurrentHashMap<TaskId, TaskExecutionDetails> idToDetails = new ConcurrentHashMap<>();
+        idToDetails.put(id, executionDetails);
+
+        Mono<Task.Result> result = worker.executeTask(taskWithId, updateDetails(idToDetails, id)).cache();
+
+        assertThat(result.block()).isEqualTo(expectedResult);
+        assertThat(idToDetails.get(id).getStatus()).isEqualTo(expectedStatus);
+    }
+
+    private Consumer<TaskExecutionDetailsUpdater> updateDetails(ConcurrentHashMap<TaskId, TaskExecutionDetails> idToExecutionDetails, TaskId taskId) {
+        return updater -> {
+            TaskExecutionDetails newDetails = updater.update(idToExecutionDetails.get(taskId));
+            idToExecutionDetails.put(taskId, newDetails);
+        };
+    }
+
+    private void await(CountDownLatch countDownLatch) {
+        try {
+            countDownLatch.await();
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 05/06: JAMES-2777 add task requested for cancellation status

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 3adf8bcb65142cfda99e41f5d80fe123d97189cb
Author: Rémi Kowalski <rk...@linagora.com>
AuthorDate: Wed May 22 14:12:27 2019 +0200

    JAMES-2777 add task requested for cancellation status
---
 .../james/webadmin/routes/TasksRoutesTest.java     | 20 ++++++-
 .../org/apache/james/task/MemoryTaskManager.java   | 34 ++++++++---
 .../apache/james/task/MemoryTaskManagerWorker.java | 56 +++++++++++++++---
 .../apache/james/task/TaskExecutionDetails.java    | 16 ++++-
 .../java/org/apache/james/task/TaskManager.java    |  1 +
 .../apache/james/task/MemoryTaskManagerTest.java   | 68 +++++++++++++++++++++-
 .../james/task/MemoryTaskManagerWorkerTest.java    | 47 ++++++++++++++-
 7 files changed, 218 insertions(+), 24 deletions(-)

diff --git a/server/protocols/webadmin/webadmin-core/src/test/java/org/apache/james/webadmin/routes/TasksRoutesTest.java b/server/protocols/webadmin/webadmin-core/src/test/java/org/apache/james/webadmin/routes/TasksRoutesTest.java
index b65a84f..825a41a 100644
--- a/server/protocols/webadmin/webadmin-core/src/test/java/org/apache/james/webadmin/routes/TasksRoutesTest.java
+++ b/server/protocols/webadmin/webadmin-core/src/test/java/org/apache/james/webadmin/routes/TasksRoutesTest.java
@@ -26,6 +26,8 @@ import static org.apache.james.webadmin.WebAdminServer.NO_CONFIGURATION;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.isIn;
+import static org.hamcrest.Matchers.isOneOf;
 import static org.hamcrest.Matchers.not;
 
 import java.util.UUID;
@@ -204,8 +206,14 @@ public class TasksRoutesTest {
 
     @Test
     public void deleteShouldCancelMatchingTask() {
+        CountDownLatch inProgressLatch = new CountDownLatch(1);
+
         TaskId taskId = taskManager.submit(() -> {
-            await();
+            try {
+                inProgressLatch.await();
+            } catch (InterruptedException e) {
+                //ignore
+            }
             return Task.Result.COMPLETED;
         });
 
@@ -216,7 +224,17 @@ public class TasksRoutesTest {
             .get("/" + taskId.getValue())
         .then()
             .statusCode(HttpStatus.OK_200)
+            .body("status", isOneOf("canceledRequested", "canceled"));
+
+        inProgressLatch.countDown();
+        when()
+            .get("/" + taskId.getValue())
+            .then()
+            .statusCode(HttpStatus.OK_200)
             .body("status", is("canceled"));
+
+
+
     }
 
     @Test
diff --git a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
index 1623858..566ce32 100644
--- a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
+++ b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
@@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.function.Consumer;
+import java.util.function.Predicate;
 
 import javax.annotation.PreDestroy;
 
@@ -60,8 +61,24 @@ public class MemoryTaskManager implements TaskManager {
         worker = new MemoryTaskManagerWorker();
         workQueue
             .subscribeOn(Schedulers.single())
-            .filter(task -> !listIds(Status.CANCELLED).contains(task.getId()))
-            .subscribe(this::treatTask);
+            .filter(isTaskWaiting().or(isTaskRequestedForCancellation()))
+            .subscribe(this::sendTaskToWorker);
+    }
+
+    private void sendTaskToWorker(TaskWithId taskWithId) {
+        if (isTaskWaiting().test(taskWithId)) {
+            treatTask(taskWithId);
+        } else if (isTaskRequestedForCancellation().test(taskWithId)) {
+            updateDetails(taskWithId.getId()).accept(TaskExecutionDetails::cancelEffectively);
+        }
+    }
+
+    private Predicate<TaskWithId> isTaskWaiting() {
+        return task -> listIds(Status.WAITING).contains(task.getId());
+    }
+
+    private Predicate<TaskWithId> isTaskRequestedForCancellation() {
+        return task -> listIds(Status.CANCEL_REQUESTED).contains(task.getId());
     }
 
     private void treatTask(TaskWithId task) {
@@ -111,15 +128,18 @@ public class MemoryTaskManager implements TaskManager {
 
     @Override
     public void cancel(TaskId id) {
-        TaskExecutionDetails details = getExecutionDetails(id);
-        if (details.getStatus().equals(Status.IN_PROGRESS) || details.getStatus().equals(Status.WAITING)) {
-            worker.cancelTask(id, updateDetails(id));
-        }
+        Optional.ofNullable(idToExecutionDetails.get(id)).ifPresent(details -> {
+                if (details.getStatus().equals(Status.WAITING)) {
+                    updateDetails(id).accept(currentDetails -> currentDetails.cancelRequested());
+                }
+                worker.cancelTask(id, updateDetails(id));
+            }
+        );
     }
 
     @Override
     public TaskExecutionDetails await(TaskId id) {
-        if (Optional.ofNullable(getExecutionDetails(id)).isPresent()) {
+        if (Optional.ofNullable(idToExecutionDetails.get(id)).isPresent()) {
             return Flux.interval(NOW, AWAIT_POLLING_DURATION, Schedulers.elastic())
                 .filter(ignore -> tasksResult.get(id) != null)
                 .map(ignore -> {
diff --git a/server/task/src/main/java/org/apache/james/task/MemoryTaskManagerWorker.java b/server/task/src/main/java/org/apache/james/task/MemoryTaskManagerWorker.java
index 6feb85a..e69e641 100644
--- a/server/task/src/main/java/org/apache/james/task/MemoryTaskManagerWorker.java
+++ b/server/task/src/main/java/org/apache/james/task/MemoryTaskManagerWorker.java
@@ -18,21 +18,27 @@
  ****************************************************************/
 package org.apache.james.task;
 
+import java.time.Duration;
 import java.util.Optional;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 
+import org.apache.james.util.FunctionalUtils;
 import org.apache.james.util.MDCBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public class MemoryTaskManagerWorker implements TaskManagerWorker {
     private static final boolean INTERRUPT_IF_RUNNING = true;
     private static final Logger LOGGER = LoggerFactory.getLogger(MemoryTaskManagerWorker.class);
+    public static final Duration CHECK_CANCELED_PERIOD = Duration.ofMillis(100);
+    public static final int FIRST = 1;
     private final ConcurrentHashMap<TaskId, CompletableFuture<Task.Result>> idToFuture = new ConcurrentHashMap<>();
 
     @Override
@@ -42,8 +48,12 @@ public class MemoryTaskManagerWorker implements TaskManagerWorker {
         idToFuture.put(taskWithId.getId(), futureResult);
 
         Mono<Task.Result> result = Mono.<Task.Result>fromFuture(futureResult)
-            .doOnError(res -> failed(updateDetails,
-                (logger, details) -> logger.error("Task was partially performed. Check logs for more details")))
+            .doOnError(res -> {
+                if (!(res instanceof CancellationException)) {
+                    failed(updateDetails,
+                        (logger, details) -> logger.error("Task was partially performed. Check logs for more details"));
+                }
+            })
             .doOnTerminate(() -> idToFuture.remove(taskWithId.getId()));
 
         return result;
@@ -77,16 +87,44 @@ public class MemoryTaskManagerWorker implements TaskManagerWorker {
     public void cancelTask(TaskId id, Consumer<TaskExecutionDetailsUpdater> updateDetails) {
         Optional.ofNullable(idToFuture.remove(id))
             .ifPresent(future -> {
-                updateDetails.accept(details -> {
-                    if (details.getStatus().equals(TaskManager.Status.WAITING) || details.getStatus().equals(TaskManager.Status.IN_PROGRESS)) {
-                        return details.cancel();
-                    }
-                    return details;
-                });
-                future.cancel(INTERRUPT_IF_RUNNING);
+                requestCancellation(updateDetails, future);
+                waitUntilFutureIsCancelled(future)
+                    .subscribe(cancellationSuccessful -> effectivelyCancelled(updateDetails));
             });
     }
 
+    private void requestCancellation(Consumer<TaskExecutionDetailsUpdater> updateDetails, CompletableFuture<Task.Result> future) {
+        updateDetails.accept(details -> {
+            if (details.getStatus().equals(TaskManager.Status.WAITING) || details.getStatus().equals(TaskManager.Status.IN_PROGRESS)) {
+                return details.cancelRequested();
+            }
+            return details;
+        });
+        future.cancel(INTERRUPT_IF_RUNNING);
+    }
+
+    private Flux<Boolean> waitUntilFutureIsCancelled(CompletableFuture<Task.Result> future) {
+        return Flux.interval(CHECK_CANCELED_PERIOD)
+            .map(ignore -> future.isCancelled())
+            .filter(FunctionalUtils.identityPredicate())
+            .take(FIRST);
+    }
+
+    private void effectivelyCancelled(Consumer<TaskExecutionDetailsUpdater> updateDetails) {
+        updateDetails.accept(details -> {
+            if (canBeCancelledEffectively(details)) {
+                return details.cancelEffectively();
+            }
+            return details;
+        });
+    }
+
+    private boolean canBeCancelledEffectively(TaskExecutionDetails details) {
+        return details.getStatus().equals(TaskManager.Status.WAITING)
+            || details.getStatus().equals(TaskManager.Status.IN_PROGRESS)
+            || details.getStatus().equals(TaskManager.Status.CANCEL_REQUESTED);
+    }
+
     private void success(Consumer<TaskExecutionDetailsUpdater> updateDetails) {
         updateDetails.accept(currentDetails -> {
             if (!wasCancelled(currentDetails)) {
diff --git a/server/task/src/main/java/org/apache/james/task/TaskExecutionDetails.java b/server/task/src/main/java/org/apache/james/task/TaskExecutionDetails.java
index a95627c..06b9ecf 100644
--- a/server/task/src/main/java/org/apache/james/task/TaskExecutionDetails.java
+++ b/server/task/src/main/java/org/apache/james/task/TaskExecutionDetails.java
@@ -140,12 +140,26 @@ public class TaskExecutionDetails {
             Optional.of(ZonedDateTime.now()));
     }
 
-    public TaskExecutionDetails cancel() {
+    public TaskExecutionDetails cancelRequested() {
         Preconditions.checkState(status == TaskManager.Status.IN_PROGRESS
             || status == TaskManager.Status.WAITING);
         return new TaskExecutionDetails(
             taskId,
             task,
+            TaskManager.Status.CANCEL_REQUESTED,
+            submitDate,
+            startedDate,
+            Optional.empty(),
+            Optional.of(ZonedDateTime.now()),
+            Optional.empty());
+    }
+
+    public TaskExecutionDetails cancelEffectively() {
+        Preconditions.checkState(status == TaskManager.Status.IN_PROGRESS
+            || status == TaskManager.Status.WAITING || status == TaskManager.Status.CANCEL_REQUESTED);
+        return new TaskExecutionDetails(
+            taskId,
+            task,
             TaskManager.Status.CANCELLED,
             submitDate,
             startedDate,
diff --git a/server/task/src/main/java/org/apache/james/task/TaskManager.java b/server/task/src/main/java/org/apache/james/task/TaskManager.java
index a3fabc3..de3e16b 100644
--- a/server/task/src/main/java/org/apache/james/task/TaskManager.java
+++ b/server/task/src/main/java/org/apache/james/task/TaskManager.java
@@ -27,6 +27,7 @@ public interface TaskManager {
         WAITING("waiting"),
         IN_PROGRESS("inProgress"),
         COMPLETED("completed"),
+        CANCEL_REQUESTED("canceledRequested"),
         CANCELLED("canceled"),
         FAILED("failed");
 
diff --git a/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java b/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java
index c857ec1..676071f 100644
--- a/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java
+++ b/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java
@@ -108,6 +108,35 @@ public class MemoryTaskManagerTest {
     }
 
     @Test
+    public void completedTaskShouldNotBeCancelled() {
+        TaskId id = memoryTaskManager.submit(() -> Task.Result.COMPLETED);
+
+        awaitUntilTaskHasStatus(id, TaskManager.Status.COMPLETED);
+        memoryTaskManager.cancel(id);
+
+       try {
+           awaitUntilTaskHasStatus(id, TaskManager.Status.CANCELLED);
+       } catch (Exception e) {
+           //Should timeout
+       }
+        assertThat(memoryTaskManager.getExecutionDetails(id).getStatus()).isEqualTo(TaskManager.Status.COMPLETED);
+    }
+    @Test
+    public void failedTaskShouldNotBeCancelled() {
+        TaskId id = memoryTaskManager.submit(() -> Task.Result.PARTIAL);
+
+        awaitUntilTaskHasStatus(id, TaskManager.Status.FAILED);
+        memoryTaskManager.cancel(id);
+
+       try {
+           awaitUntilTaskHasStatus(id, TaskManager.Status.CANCELLED);
+       } catch (Exception e) {
+           //Should timeout
+       }
+        assertThat(memoryTaskManager.getExecutionDetails(id).getStatus()).isEqualTo(TaskManager.Status.FAILED);
+    }
+
+    @Test
     public void getStatusShouldBeCancelledWhenCancelled() {
         TaskId id = memoryTaskManager.submit(() -> {
             sleep(500);
@@ -118,7 +147,35 @@ public class MemoryTaskManagerTest {
         memoryTaskManager.cancel(id);
 
         assertThat(memoryTaskManager.getExecutionDetails(id).getStatus())
+            .isIn(TaskManager.Status.CANCELLED, TaskManager.Status.CANCEL_REQUESTED);
+
+        awaitUntilTaskHasStatus(id, TaskManager.Status.CANCELLED);
+        assertThat(memoryTaskManager.getExecutionDetails(id).getStatus())
             .isEqualTo(TaskManager.Status.CANCELLED);
+
+    }
+
+    @Test
+    public void aWaitingTaskShouldBeCancelled() {
+        TaskId id = memoryTaskManager.submit(() -> {
+            sleep(500);
+            return Task.Result.COMPLETED;
+        });
+
+        TaskId idTaskToCancel = memoryTaskManager.submit(() -> Task.Result.COMPLETED);
+
+        memoryTaskManager.cancel(idTaskToCancel);
+
+        awaitUntilTaskHasStatus(id, TaskManager.Status.IN_PROGRESS);
+
+
+        assertThat(memoryTaskManager.getExecutionDetails(idTaskToCancel).getStatus())
+            .isIn(TaskManager.Status.CANCELLED, TaskManager.Status.CANCEL_REQUESTED);
+
+        awaitUntilTaskHasStatus(idTaskToCancel, TaskManager.Status.CANCELLED);
+        assertThat(memoryTaskManager.getExecutionDetails(idTaskToCancel).getStatus())
+            .isEqualTo(TaskManager.Status.CANCELLED);
+
     }
 
     @Test
@@ -345,17 +402,22 @@ public class MemoryTaskManagerTest {
     }
 
     @Test
-    public void listShouldBeEmptyWhenNoTasks() throws Exception {
+    public void listShouldBeEmptyWhenNoTasks() {
         assertThat(memoryTaskManager.list()).isEmpty();
     }
 
     @Test
-    public void listCancelledShouldBeEmptyWhenNoTasks() throws Exception {
+    public void listCancelledShouldBeEmptyWhenNoTasks() {
         assertThat(memoryTaskManager.list(TaskManager.Status.CANCELLED)).isEmpty();
     }
 
     @Test
-    public void awaitShouldNotThrowWhenCompletedTask() throws Exception {
+    public void listCancelRequestedShouldBeEmptyWhenNoTasks() {
+        assertThat(memoryTaskManager.list(TaskManager.Status.CANCEL_REQUESTED)).isEmpty();
+    }
+
+    @Test
+    public void awaitShouldNotThrowWhenCompletedTask() {
         TaskId taskId = memoryTaskManager.submit(
             () -> Task.Result.COMPLETED);
         memoryTaskManager.await(taskId);
diff --git a/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerWorkerTest.java b/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerWorkerTest.java
index e4ae643..10c4d6a 100644
--- a/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerWorkerTest.java
+++ b/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerWorkerTest.java
@@ -19,12 +19,17 @@
 package org.apache.james.task;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Duration.ONE_HUNDRED_MILLISECONDS;
+import static org.awaitility.Duration.ONE_SECOND;
 
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 
+import org.awaitility.Awaitility;
+import org.awaitility.Duration;
+import org.awaitility.core.ConditionFactory;
 import org.junit.Test;
 
 import reactor.core.publisher.Mono;
@@ -33,6 +38,15 @@ public class MemoryTaskManagerWorkerTest {
 
     private final MemoryTaskManagerWorker worker = new MemoryTaskManagerWorker();
 
+    private final Duration slowPacedPollInterval = ONE_HUNDRED_MILLISECONDS;
+    private final ConditionFactory calmlyAwait = Awaitility.with()
+        .pollInterval(slowPacedPollInterval)
+        .and()
+        .with()
+        .pollDelay(slowPacedPollInterval)
+        .await();
+    private final ConditionFactory awaitAtMostOneSecond = calmlyAwait.atMost(ONE_SECOND);
+
     private final Task successfulTask = () -> Task.Result.COMPLETED;
     private final Task failedTask = () -> Task.Result.PARTIAL;
     private final Task throwingTask = () -> {
@@ -76,6 +90,27 @@ public class MemoryTaskManagerWorkerTest {
         assertThat(idToDetails.get(id).getStatus()).isEqualTo(TaskManager.Status.IN_PROGRESS);
         latch.countDown();
     }
+    @Test
+    public void theWorkerShouldNotRunATaskRequestedForCancellation() {
+        TaskId id = TaskId.generateTaskId();
+        AtomicInteger counter = new AtomicInteger(0);
+
+        Task task = () -> {
+            counter.incrementAndGet();
+            return Task.Result.COMPLETED;
+        };
+
+        TaskWithId taskWithId = new TaskWithId(id, task);
+        TaskExecutionDetails executionDetails = TaskExecutionDetails.from(task, id);
+        ConcurrentHashMap<TaskId, TaskExecutionDetails> idToDetails = new ConcurrentHashMap<>();
+
+        idToDetails.put(id, executionDetails.cancelRequested());
+
+        worker.executeTask(taskWithId, updateDetails(idToDetails, id)).subscribe();
+
+        assertThat(counter.get()).isEqualTo(0);
+        assertThat(idToDetails.get(id).getStatus()).isEqualTo(TaskManager.Status.CANCEL_REQUESTED);
+    }
 
     @Test
     public void theWorkerShouldNotRunACancelledTask() {
@@ -91,9 +126,9 @@ public class MemoryTaskManagerWorkerTest {
         TaskExecutionDetails executionDetails = TaskExecutionDetails.from(task, id);
         ConcurrentHashMap<TaskId, TaskExecutionDetails> idToDetails = new ConcurrentHashMap<>();
 
-        idToDetails.put(id, executionDetails.cancel());
+        idToDetails.put(id, executionDetails.cancelEffectively());
 
-        worker.executeTask(taskWithId, updateDetails(idToDetails, id)).cache();
+        worker.executeTask(taskWithId, updateDetails(idToDetails, id)).subscribe();
 
         assertThat(counter.get()).isEqualTo(0);
         assertThat(idToDetails.get(id).getStatus()).isEqualTo(TaskManager.Status.CANCELLED);
@@ -119,9 +154,11 @@ public class MemoryTaskManagerWorkerTest {
         worker.executeTask(taskWithId, updateDetails(idToDetails, id)).cache();
 
         worker.cancelTask(id, updateDetails(idToDetails, id));
-        assertThat(idToDetails.get(id).getStatus()).isEqualTo(TaskManager.Status.CANCELLED);
+        assertThat(idToDetails.get(id).getStatus()).isIn(TaskManager.Status.CANCELLED, TaskManager.Status.CANCEL_REQUESTED);
 
         assertThat(counter.get()).isEqualTo(0);
+
+        awaitUntilTaskHasStatus(idToDetails, id, TaskManager.Status.CANCELLED);
         assertThat(idToDetails.get(id).getStatus()).isEqualTo(TaskManager.Status.CANCELLED);
     }
 
@@ -153,6 +190,10 @@ public class MemoryTaskManagerWorkerTest {
         };
     }
 
+    private void awaitUntilTaskHasStatus(ConcurrentHashMap<TaskId, TaskExecutionDetails> idToExecutionDetails, TaskId id, TaskManager.Status status) {
+        awaitAtMostOneSecond.until(() -> idToExecutionDetails.get(id).getStatus().equals(status));
+    }
+
     private void await(CountDownLatch countDownLatch) {
         try {
             countDownLatch.await();


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 01/06: JAMES-2777 add a class representing a task with its id

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit c57b922d848e392a956de4cf4c16668ab14dffaa
Author: Rémi Kowalski <rk...@linagora.com>
AuthorDate: Tue May 21 11:38:52 2019 +0200

    JAMES-2777 add a class representing a task with its id
---
 .../java/org/apache/james/task/TaskWithId.java     | 53 ++++++++++++++++++++++
 .../java/org/apache/james/task/TaskWithIdTest.java | 47 +++++++++++++++++++
 2 files changed, 100 insertions(+)

diff --git a/server/task/src/main/java/org/apache/james/task/TaskWithId.java b/server/task/src/main/java/org/apache/james/task/TaskWithId.java
new file mode 100644
index 0000000..724c71c
--- /dev/null
+++ b/server/task/src/main/java/org/apache/james/task/TaskWithId.java
@@ -0,0 +1,53 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.task;
+
+import java.util.Objects;
+
+public class TaskWithId {
+    private final TaskId id;
+    private final Task task;
+
+    public TaskWithId(TaskId id, Task task) {
+        this.id = id;
+        this.task = task;
+    }
+
+    public TaskId getId() {
+        return id;
+    }
+
+    public Task getTask() {
+        return task;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o instanceof TaskWithId) {
+            TaskWithId taskWithId = (TaskWithId) o;
+            return Objects.equals(this.id, taskWithId.id);
+        }
+        return false;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(id);
+    }
+}
diff --git a/server/task/src/test/java/org/apache/james/task/TaskWithIdTest.java b/server/task/src/test/java/org/apache/james/task/TaskWithIdTest.java
new file mode 100644
index 0000000..e66c436
--- /dev/null
+++ b/server/task/src/test/java/org/apache/james/task/TaskWithIdTest.java
@@ -0,0 +1,47 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.task;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.Test;
+
+public class TaskWithIdTest {
+
+    @Test
+    public void twoTasksWithSameIdShouldBeEqual() {
+        TaskId id = TaskId.generateTaskId();
+        Task task1 = () -> Task.Result.COMPLETED;
+        Task task2 = () -> Task.Result.COMPLETED;
+        TaskWithId taskWithId1 = new TaskWithId(id, task1);
+        TaskWithId taskWithId2 = new TaskWithId(id, task2);
+        assertThat(taskWithId1).isEqualTo(taskWithId2);
+    }
+
+    @Test
+    public void sameTaskWithDifferentIdShouldNotBeEqual() {
+        TaskId id1 = TaskId.generateTaskId();
+        TaskId id2 = TaskId.generateTaskId();
+        Task task = () -> Task.Result.COMPLETED;
+        TaskWithId taskWithId1 = new TaskWithId(id1, task);
+        TaskWithId taskWithId2 = new TaskWithId(id2, task);
+        assertThat(taskWithId1).isNotEqualTo(taskWithId2);
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 06/06: JAMES-2777 use junit 5 in server.task project

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit aa60bfbce8732b46de6ac7cc1d00dc0f2b354a17
Author: Rémi Kowalski <rk...@linagora.com>
AuthorDate: Wed May 22 16:47:01 2019 +0200

    JAMES-2777 use junit 5 in server.task project
---
 server/task/pom.xml                                    | 14 ++++++++++++--
 .../org/apache/james/task/MemoryTaskManagerTest.java   | 18 ++++++++----------
 .../apache/james/task/MemoryTaskManagerWorkerTest.java |  2 +-
 .../test/java/org/apache/james/task/TaskIdTest.java    |  2 +-
 .../src/test/java/org/apache/james/task/TaskTest.java  |  2 +-
 .../java/org/apache/james/task/TaskWithIdTest.java     |  2 +-
 6 files changed, 24 insertions(+), 16 deletions(-)

diff --git a/server/task/pom.xml b/server/task/pom.xml
index 3ade1a7..00bc8ad 100644
--- a/server/task/pom.xml
+++ b/server/task/pom.xml
@@ -41,8 +41,18 @@
             <scope>test</scope>
         </dependency>
         <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-engine</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.junit.platform</groupId>
+            <artifactId>junit-platform-launcher</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.junit.vintage</groupId>
+            <artifactId>junit-vintage-engine</artifactId>
             <scope>test</scope>
         </dependency>
         <dependency>
diff --git a/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java b/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java
index 676071f..7b87080 100644
--- a/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java
+++ b/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java
@@ -30,28 +30,24 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.assertj.core.api.JUnitSoftAssertions;
+import org.assertj.core.api.SoftAssertions;
 import org.awaitility.Awaitility;
 import org.awaitility.Duration;
 import org.awaitility.core.ConditionFactory;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 public class MemoryTaskManagerTest {
 
     private MemoryTaskManager memoryTaskManager;
 
-    @Rule
-    public JUnitSoftAssertions softly = new JUnitSoftAssertions();
-
-    @Before
+    @BeforeEach
     public void setUp() {
         memoryTaskManager = new MemoryTaskManager();
     }
 
-    @After
+    @AfterEach
     public void tearDown() {
         memoryTaskManager.stop();
     }
@@ -231,6 +227,8 @@ public class MemoryTaskManagerTest {
 
     @Test
     public void listShouldReturnTaskStatus() throws Exception {
+        SoftAssertions softly = new SoftAssertions();
+
         CountDownLatch latch1 = new CountDownLatch(1);
         CountDownLatch latch2 = new CountDownLatch(1);
 
diff --git a/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerWorkerTest.java b/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerWorkerTest.java
index 10c4d6a..48d22fd 100644
--- a/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerWorkerTest.java
+++ b/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerWorkerTest.java
@@ -30,7 +30,7 @@ import java.util.function.Consumer;
 import org.awaitility.Awaitility;
 import org.awaitility.Duration;
 import org.awaitility.core.ConditionFactory;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import reactor.core.publisher.Mono;
 
diff --git a/server/task/src/test/java/org/apache/james/task/TaskIdTest.java b/server/task/src/test/java/org/apache/james/task/TaskIdTest.java
index 24c3a03..0d22279 100644
--- a/server/task/src/test/java/org/apache/james/task/TaskIdTest.java
+++ b/server/task/src/test/java/org/apache/james/task/TaskIdTest.java
@@ -19,7 +19,7 @@
 
 package org.apache.james.task;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import nl.jqno.equalsverifier.EqualsVerifier;
 
diff --git a/server/task/src/test/java/org/apache/james/task/TaskTest.java b/server/task/src/test/java/org/apache/james/task/TaskTest.java
index cd45aed..a293680 100644
--- a/server/task/src/test/java/org/apache/james/task/TaskTest.java
+++ b/server/task/src/test/java/org/apache/james/task/TaskTest.java
@@ -24,7 +24,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 public class TaskTest {
 
diff --git a/server/task/src/test/java/org/apache/james/task/TaskWithIdTest.java b/server/task/src/test/java/org/apache/james/task/TaskWithIdTest.java
index e66c436..a0a1ca0 100644
--- a/server/task/src/test/java/org/apache/james/task/TaskWithIdTest.java
+++ b/server/task/src/test/java/org/apache/james/task/TaskWithIdTest.java
@@ -21,7 +21,7 @@ package org.apache.james.task;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 public class TaskWithIdTest {
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 03/06: JAMES-2777 make the MemoryTaskManager deleguate the execution of the tasks to its worker

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 1f232d46e0c7ffc56b988afb0bf38e5bf1cbdcb8
Author: Rémi Kowalski <rk...@linagora.com>
AuthorDate: Tue May 21 11:43:16 2019 +0200

    JAMES-2777 make the MemoryTaskManager deleguate the execution of the tasks to its worker
---
 server/task/pom.xml                                |   5 +
 .../org/apache/james/task/MemoryTaskManager.java   | 154 ++++++++++-----------
 .../apache/james/task/MemoryTaskManagerTest.java   |  75 ++++++----
 3 files changed, 128 insertions(+), 106 deletions(-)

diff --git a/server/task/pom.xml b/server/task/pom.xml
index 32a4647..3ade1a7 100644
--- a/server/task/pom.xml
+++ b/server/task/pom.xml
@@ -36,6 +36,11 @@
             <artifactId>james-server-util</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
             <scope>test</scope>
diff --git a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
index e26e173..1623858 100644
--- a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
+++ b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
@@ -19,102 +19,69 @@
 
 package org.apache.james.task;
 
+import java.time.Duration;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadFactory;
 import java.util.function.Consumer;
 
 import javax.annotation.PreDestroy;
 
-import org.apache.james.util.MDCBuilder;
-import org.apache.james.util.concurrent.NamedThreadFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import com.github.fge.lambdas.Throwing;
 import com.github.steveash.guavate.Guavate;
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.WorkQueueProcessor;
+import reactor.core.scheduler.Schedulers;
 
 public class MemoryTaskManager implements TaskManager {
-    private static final boolean INTERRUPT_IF_RUNNING = true;
-    private static final Logger LOGGER = LoggerFactory.getLogger(MemoryTaskManager.class);
-
+    public static final Duration AWAIT_POLLING_DURATION = Duration.ofMillis(500);
+    public static final Duration NOW = Duration.ZERO;
+    private final WorkQueueProcessor<TaskWithId> workQueue;
+    private final TaskManagerWorker worker;
     private final ConcurrentHashMap<TaskId, TaskExecutionDetails> idToExecutionDetails;
-    private final ConcurrentHashMap<TaskId, Future<?>> idToFuture;
-    private final ExecutorService executor;
+    private final ConcurrentHashMap<TaskId, Mono<Task.Result>> tasksResult;
+    private final ExecutorService taskExecutor = Executors.newSingleThreadExecutor();
+    private final ExecutorService requestTaskExecutor = Executors.newSingleThreadExecutor();
 
     public MemoryTaskManager() {
+        workQueue = WorkQueueProcessor.<TaskWithId>builder()
+            .executor(taskExecutor)
+            .requestTaskExecutor(requestTaskExecutor)
+            .build();
         idToExecutionDetails = new ConcurrentHashMap<>();
-        idToFuture = new ConcurrentHashMap<>();
-        ThreadFactory threadFactory = NamedThreadFactory.withClassName(getClass());
-        executor = Executors.newSingleThreadExecutor(threadFactory);
+        tasksResult = new ConcurrentHashMap<>();
+        worker = new MemoryTaskManagerWorker();
+        workQueue
+            .subscribeOn(Schedulers.single())
+            .filter(task -> !listIds(Status.CANCELLED).contains(task.getId()))
+            .subscribe(this::treatTask);
     }
 
-    @Override
-    public TaskId submit(Task task) {
-        return submit(task, id -> { });
+    private void treatTask(TaskWithId task) {
+        Mono<Task.Result> result = worker.executeTask(task, updateDetails(task.getId()));
+        tasksResult.put(task.getId(), result);
+        try {
+            result.block();
+        } catch (CancellationException e) {
+            // Do not throw CancellationException
+        }
     }
 
-    @VisibleForTesting
-    TaskId submit(Task task, Consumer<TaskId> callback) {
+    public TaskId submit(Task task) {
         TaskId taskId = TaskId.generateTaskId();
         TaskExecutionDetails executionDetails = TaskExecutionDetails.from(task, taskId);
-
         idToExecutionDetails.put(taskId, executionDetails);
-        idToFuture.put(taskId,
-            executor.submit(() -> runWithMdc(executionDetails, task, callback)));
+        workQueue.onNext(new TaskWithId(taskId, task));
         return taskId;
     }
 
-    private void runWithMdc(TaskExecutionDetails executionDetails, Task task, Consumer<TaskId> callback) {
-        MDCBuilder.withMdc(
-            MDCBuilder.create()
-                .addContext(Task.TASK_ID, executionDetails.getTaskId())
-                .addContext(Task.TASK_TYPE, executionDetails.getType())
-                .addContext(Task.TASK_DETAILS, executionDetails.getAdditionalInformation()),
-            () -> run(executionDetails, task, callback));
-    }
-
-    private void run(TaskExecutionDetails executionDetails, Task task, Consumer<TaskId> callback) {
-        TaskExecutionDetails started = executionDetails.start();
-        idToExecutionDetails.put(started.getTaskId(), started);
-        try {
-            task.run()
-                .onComplete(() -> success(started))
-                .onFailure(() -> failed(started,
-                    logger -> logger.info("Task was partially performed. Check logs for more details")));
-        } catch (Exception e) {
-            failed(started,
-                logger -> logger.error("Error while running task", executionDetails, e));
-        } finally {
-            idToFuture.remove(executionDetails.getTaskId());
-            callback.accept(executionDetails.getTaskId());
-        }
-    }
-
-    private void success(TaskExecutionDetails started) {
-        if (!wasCancelled(started.getTaskId())) {
-            idToExecutionDetails.put(started.getTaskId(), started.completed());
-            LOGGER.info("Task success");
-        }
-    }
-
-    private void failed(TaskExecutionDetails started, Consumer<Logger> logOperation) {
-        if (!wasCancelled(started.getTaskId())) {
-            idToExecutionDetails.put(started.getTaskId(), started.failed());
-            logOperation.accept(LOGGER);
-        }
-    }
-
-    private boolean wasCancelled(TaskId taskId) {
-        return idToExecutionDetails.get(taskId).getStatus() == Status.CANCELLED;
-    }
-
     @Override
     public TaskExecutionDetails getExecutionDetails(TaskId id) {
         return Optional.ofNullable(idToExecutionDetails.get(id))
@@ -128,32 +95,55 @@ public class MemoryTaskManager implements TaskManager {
 
     @Override
     public List<TaskExecutionDetails> list(Status status) {
-        return idToExecutionDetails.values()
+        return ImmutableList.copyOf(tasksFiltered(status).values());
+    }
+
+    public Set<TaskId> listIds(Status status) {
+        return tasksFiltered(status).keySet();
+    }
+
+    public Map<TaskId, TaskExecutionDetails> tasksFiltered(Status status) {
+        return idToExecutionDetails.entrySet()
             .stream()
-            .filter(details -> details.getStatus().equals(status))
-            .collect(Guavate.toImmutableList());
+            .filter(details -> details.getValue().getStatus().equals(status))
+            .collect(Guavate.entriesToImmutableMap());
     }
 
     @Override
     public void cancel(TaskId id) {
-        Optional.ofNullable(idToFuture.get(id))
-            .ifPresent(future -> {
-                TaskExecutionDetails executionDetails = idToExecutionDetails.get(id);
-                idToExecutionDetails.put(id, executionDetails.cancel());
-                future.cancel(INTERRUPT_IF_RUNNING);
-                idToFuture.remove(id);
-            });
+        TaskExecutionDetails details = getExecutionDetails(id);
+        if (details.getStatus().equals(Status.IN_PROGRESS) || details.getStatus().equals(Status.WAITING)) {
+            worker.cancelTask(id, updateDetails(id));
+        }
     }
 
     @Override
     public TaskExecutionDetails await(TaskId id) {
-        Optional.ofNullable(idToFuture.get(id))
-            .ifPresent(Throwing.consumer(Future::get));
-        return getExecutionDetails(id);
+        if (Optional.ofNullable(getExecutionDetails(id)).isPresent()) {
+            return Flux.interval(NOW, AWAIT_POLLING_DURATION, Schedulers.elastic())
+                .filter(ignore -> tasksResult.get(id) != null)
+                .map(ignore -> {
+                    Optional.ofNullable(tasksResult.get(id))
+                        .ifPresent(Throwing.<Mono<Task.Result>>consumer(Mono::block).orDoNothing());
+                    return getExecutionDetails(id);
+                })
+                .take(1)
+                .blockFirst();
+        } else {
+            return null;
+        }
     }
 
     @PreDestroy
     public void stop() {
-        executor.shutdownNow();
+        taskExecutor.shutdown();
+        requestTaskExecutor.shutdown();
+    }
+
+    private Consumer<TaskExecutionDetailsUpdater> updateDetails(TaskId taskId) {
+        return updater -> {
+            TaskExecutionDetails newDetails = updater.update(idToExecutionDetails.get(taskId));
+            idToExecutionDetails.replace(taskId, newDetails);
+        };
     }
 }
diff --git a/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java b/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java
index cec818b..c80f295 100644
--- a/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java
+++ b/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java
@@ -22,6 +22,9 @@ package org.apache.james.task;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.awaitility.Duration.ONE_HUNDRED_MILLISECONDS;
+import static org.awaitility.Duration.ONE_MINUTE;
+import static org.awaitility.Duration.ONE_SECOND;
 
 import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -29,6 +32,9 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.assertj.core.api.JUnitSoftAssertions;
+import org.awaitility.Awaitility;
+import org.awaitility.Duration;
+import org.awaitility.core.ConditionFactory;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -39,6 +45,7 @@ import com.github.fge.lambdas.consumers.ConsumerChainer;
 
 public class MemoryTaskManagerTest {
 
+    public static final int TIME_TO_ENQUEUE = 200;
     private MemoryTaskManager memoryTaskManager;
 
     @Rule
@@ -54,6 +61,16 @@ public class MemoryTaskManagerTest {
         memoryTaskManager.stop();
     }
 
+    Duration slowPacedPollInterval = ONE_HUNDRED_MILLISECONDS;
+    ConditionFactory calmlyAwait = Awaitility.with()
+        .pollInterval(slowPacedPollInterval)
+        .and()
+        .with()
+        .pollDelay(slowPacedPollInterval)
+        .await();
+    ConditionFactory awaitAtMostOneMinute = calmlyAwait.atMost(ONE_MINUTE);
+    ConditionFactory awaitAtMostOneSecond = calmlyAwait.atMost(ONE_SECOND);
+
     @Test
     public void getStatusShouldReturnUnknownWhenUnknownId() {
         TaskId unknownId = TaskId.generateTaskId();
@@ -87,6 +104,7 @@ public class MemoryTaskManagerTest {
             return Task.Result.COMPLETED;
         });
 
+        sleep(TIME_TO_ENQUEUE);
         memoryTaskManager.cancel(id);
         task1Latch.countDown();
 
@@ -94,26 +112,21 @@ public class MemoryTaskManagerTest {
     }
 
     @Test
-    public void getStatusShouldReturnCancelledWhenCancelled() throws Exception {
-        CountDownLatch task1Latch = new CountDownLatch(1);
-        CountDownLatch ensureStartedLatch = new CountDownLatch(1);
-        CountDownLatch ensureFinishedLatch = new CountDownLatch(1);
+    public void getStatusShouldBeCancelledWhenCancelled() throws Exception {
 
         TaskId id = memoryTaskManager.submit(() -> {
-            ensureStartedLatch.countDown();
-            await(task1Latch);
+            sleep(500);
             return Task.Result.COMPLETED;
-        },
-            any -> ensureFinishedLatch.countDown());
+        });
 
-        ensureStartedLatch.await();
+        sleep(TIME_TO_ENQUEUE);
         memoryTaskManager.cancel(id);
-        ensureFinishedLatch.await();
 
         assertThat(memoryTaskManager.getExecutionDetails(id).getStatus())
             .isEqualTo(TaskManager.Status.CANCELLED);
     }
 
+
     @Test
     public void cancelShouldBeIdempotent() {
         CountDownLatch task1Latch = new CountDownLatch(1);
@@ -122,7 +135,7 @@ public class MemoryTaskManagerTest {
             await(task1Latch);
             return Task.Result.COMPLETED;
         });
-
+        sleep(TIME_TO_ENQUEUE);
         memoryTaskManager.cancel(id);
         assertThatCode(() -> memoryTaskManager.cancel(id))
             .doesNotThrowAnyException();
@@ -146,13 +159,11 @@ public class MemoryTaskManagerTest {
 
     @Test
     public void getStatusShouldReturnCompletedWhenRunSuccessfully() throws Exception {
-        CountDownLatch latch = new CountDownLatch(1);
 
         TaskId taskId = memoryTaskManager.submit(
-            () -> Task.Result.COMPLETED,
-            countDownCallback(latch));
+            () -> Task.Result.COMPLETED);
 
-        latch.await();
+        sleep(500);
 
         assertThat(memoryTaskManager.getExecutionDetails(taskId).getStatus())
             .isEqualTo(TaskManager.Status.COMPLETED);
@@ -160,13 +171,11 @@ public class MemoryTaskManagerTest {
 
     @Test
     public void getStatusShouldReturnFailedWhenRunPartially() throws Exception {
-        CountDownLatch latch = new CountDownLatch(1);
 
         TaskId taskId = memoryTaskManager.submit(
-            () -> Task.Result.PARTIAL,
-            countDownCallback(latch));
+            () -> Task.Result.PARTIAL);
 
-        latch.await();
+        sleep(TIME_TO_ENQUEUE);
 
         assertThat(memoryTaskManager.getExecutionDetails(taskId).getStatus())
             .isEqualTo(TaskManager.Status.FAILED);
@@ -213,6 +222,7 @@ public class MemoryTaskManagerTest {
             .isEqualTo(TaskManager.Status.COMPLETED);
         softly.assertThat(entryWithId(list, inProgressId))
             .isEqualTo(TaskManager.Status.IN_PROGRESS);
+        latch3.countDown();
     }
 
     private TaskManager.Status entryWithId(List<TaskExecutionDetails> list, TaskId taskId) {
@@ -369,24 +379,42 @@ public class MemoryTaskManagerTest {
     }
 
     @Test
+    public void awaitShouldAwaitWaitingTask() {
+        CountDownLatch latch = new CountDownLatch(1);
+        memoryTaskManager.submit(
+            () -> {
+                await(latch);
+                return Task.Result.COMPLETED;
+            });
+        latch.countDown();
+        TaskId task2 = memoryTaskManager.submit(
+            () -> Task.Result.COMPLETED);
+
+        assertThat(memoryTaskManager.await(task2).getStatus()).isEqualTo(TaskManager.Status.COMPLETED);
+    }
+
+    @Test
     public void submittedTaskShouldExecuteSequentially() {
         ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
 
         TaskId id1 = memoryTaskManager.submit(() -> {
             queue.add(1);
-            sleep(500);
+            sleep(200);
             queue.add(2);
             return Task.Result.COMPLETED;
         });
         TaskId id2 = memoryTaskManager.submit(() -> {
             queue.add(3);
-            sleep(500);
+            sleep(200);
             queue.add(4);
             return Task.Result.COMPLETED;
         });
+        sleep(TIME_TO_ENQUEUE);
         memoryTaskManager.await(id1);
         memoryTaskManager.await(id2);
 
+        awaitAtMostOneSecond.until(() -> queue.contains(4));
+
         assertThat(queue)
             .containsExactly(1, 2, 3, 4);
     }
@@ -396,9 +424,8 @@ public class MemoryTaskManagerTest {
         TaskId taskId = memoryTaskManager.submit(() -> {
             throw new RuntimeException();
         });
-
+        sleep(TIME_TO_ENQUEUE);
         TaskExecutionDetails executionDetails = memoryTaskManager.await(taskId);
-
         assertThat(executionDetails.getStatus())
             .isEqualTo(TaskManager.Status.FAILED);
     }
@@ -408,7 +435,7 @@ public class MemoryTaskManagerTest {
         TaskId taskId = memoryTaskManager.submit(() -> {
             throw new RuntimeException();
         });
-
+        sleep(TIME_TO_ENQUEUE);
         memoryTaskManager.await(taskId);
 
         assertThat(memoryTaskManager.getExecutionDetails(taskId).getStatus())


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 04/06: refactor MemoryTaskManagerTest to avoid too many Thread.sleep

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit bcdbaa7f3e96c470a496c94a7f0e677b471c176e
Author: Rémi Kowalski <rk...@linagora.com>
AuthorDate: Mon May 20 15:42:50 2019 +0200

    refactor MemoryTaskManagerTest to avoid too many Thread.sleep
---
 .../apache/james/task/MemoryTaskManagerTest.java   | 102 +++++++++------------
 1 file changed, 45 insertions(+), 57 deletions(-)

diff --git a/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java b/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java
index c80f295..c857ec1 100644
--- a/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java
+++ b/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java
@@ -23,7 +23,6 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.awaitility.Duration.ONE_HUNDRED_MILLISECONDS;
-import static org.awaitility.Duration.ONE_MINUTE;
 import static org.awaitility.Duration.ONE_SECOND;
 
 import java.util.List;
@@ -40,12 +39,8 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 
-import com.github.fge.lambdas.Throwing;
-import com.github.fge.lambdas.consumers.ConsumerChainer;
-
 public class MemoryTaskManagerTest {
 
-    public static final int TIME_TO_ENQUEUE = 200;
     private MemoryTaskManager memoryTaskManager;
 
     @Rule
@@ -61,15 +56,14 @@ public class MemoryTaskManagerTest {
         memoryTaskManager.stop();
     }
 
-    Duration slowPacedPollInterval = ONE_HUNDRED_MILLISECONDS;
-    ConditionFactory calmlyAwait = Awaitility.with()
+    private final Duration slowPacedPollInterval = ONE_HUNDRED_MILLISECONDS;
+    private final ConditionFactory calmlyAwait = Awaitility.with()
         .pollInterval(slowPacedPollInterval)
         .and()
         .with()
         .pollDelay(slowPacedPollInterval)
         .await();
-    ConditionFactory awaitAtMostOneMinute = calmlyAwait.atMost(ONE_MINUTE);
-    ConditionFactory awaitAtMostOneSecond = calmlyAwait.atMost(ONE_SECOND);
+    private final ConditionFactory awaitAtMostOneSecond = calmlyAwait.atMost(ONE_SECOND);
 
     @Test
     public void getStatusShouldReturnUnknownWhenUnknownId() {
@@ -95,16 +89,18 @@ public class MemoryTaskManagerTest {
 
     @Test
     public void taskCodeAfterCancelIsNotRun() {
+        CountDownLatch waitForTaskToBeLaunched = new CountDownLatch(1);
         CountDownLatch task1Latch = new CountDownLatch(1);
         AtomicInteger count = new AtomicInteger(0);
 
         TaskId id = memoryTaskManager.submit(() -> {
+            waitForTaskToBeLaunched.countDown();
             await(task1Latch);
             count.incrementAndGet();
             return Task.Result.COMPLETED;
         });
 
-        sleep(TIME_TO_ENQUEUE);
+        await(waitForTaskToBeLaunched);
         memoryTaskManager.cancel(id);
         task1Latch.countDown();
 
@@ -112,21 +108,19 @@ public class MemoryTaskManagerTest {
     }
 
     @Test
-    public void getStatusShouldBeCancelledWhenCancelled() throws Exception {
-
+    public void getStatusShouldBeCancelledWhenCancelled() {
         TaskId id = memoryTaskManager.submit(() -> {
             sleep(500);
             return Task.Result.COMPLETED;
         });
 
-        sleep(TIME_TO_ENQUEUE);
+        awaitUntilTaskHasStatus(id, TaskManager.Status.IN_PROGRESS);
         memoryTaskManager.cancel(id);
 
         assertThat(memoryTaskManager.getExecutionDetails(id).getStatus())
             .isEqualTo(TaskManager.Status.CANCELLED);
     }
 
-
     @Test
     public void cancelShouldBeIdempotent() {
         CountDownLatch task1Latch = new CountDownLatch(1);
@@ -135,26 +129,24 @@ public class MemoryTaskManagerTest {
             await(task1Latch);
             return Task.Result.COMPLETED;
         });
-        sleep(TIME_TO_ENQUEUE);
+        awaitUntilTaskHasStatus(id, TaskManager.Status.IN_PROGRESS);
         memoryTaskManager.cancel(id);
         assertThatCode(() -> memoryTaskManager.cancel(id))
             .doesNotThrowAnyException();
     }
 
     @Test
-    public void getStatusShouldReturnInProgressWhenProcessingIsInProgress() throws Exception {
+    public void getStatusShouldReturnInProgressWhenProcessingIsInProgress() {
         CountDownLatch latch1 = new CountDownLatch(1);
-        CountDownLatch latch2 = new CountDownLatch(1);
 
         TaskId taskId = memoryTaskManager.submit(() -> {
-            latch2.countDown();
             await(latch1);
             return Task.Result.COMPLETED;
         });
-        latch2.await();
-
+        awaitUntilTaskHasStatus(taskId, TaskManager.Status.IN_PROGRESS);
         assertThat(memoryTaskManager.getExecutionDetails(taskId).getStatus())
             .isEqualTo(TaskManager.Status.IN_PROGRESS);
+        latch1.countDown();
     }
 
     @Test
@@ -163,8 +155,7 @@ public class MemoryTaskManagerTest {
         TaskId taskId = memoryTaskManager.submit(
             () -> Task.Result.COMPLETED);
 
-        sleep(500);
-
+        awaitUntilTaskHasStatus(taskId, TaskManager.Status.COMPLETED);
         assertThat(memoryTaskManager.getExecutionDetails(taskId).getStatus())
             .isEqualTo(TaskManager.Status.COMPLETED);
     }
@@ -175,21 +166,16 @@ public class MemoryTaskManagerTest {
         TaskId taskId = memoryTaskManager.submit(
             () -> Task.Result.PARTIAL);
 
-        sleep(TIME_TO_ENQUEUE);
+        awaitUntilTaskHasStatus(taskId, TaskManager.Status.FAILED);
 
         assertThat(memoryTaskManager.getExecutionDetails(taskId).getStatus())
             .isEqualTo(TaskManager.Status.FAILED);
     }
 
-    private ConsumerChainer<TaskId> countDownCallback(CountDownLatch latch) {
-        return Throwing.consumer(id -> latch.countDown());
-    }
-
     @Test
-    public void listShouldReturnTaskSatus() throws Exception {
+    public void listShouldReturnTaskStatus() throws Exception {
         CountDownLatch latch1 = new CountDownLatch(1);
         CountDownLatch latch2 = new CountDownLatch(1);
-        CountDownLatch latch3 = new CountDownLatch(1);
 
         TaskId failedId = memoryTaskManager.submit(
             () -> Task.Result.PARTIAL);
@@ -197,20 +183,14 @@ public class MemoryTaskManagerTest {
             () -> Task.Result.COMPLETED);
         TaskId inProgressId = memoryTaskManager.submit(
             () -> {
-                await(latch1);
-                latch2.countDown();
-                await(latch3);
+                latch1.countDown();
+                await(latch2);
                 return Task.Result.COMPLETED;
             });
         TaskId waitingId = memoryTaskManager.submit(
-            () -> {
-                await(latch3);
-                latch2.countDown();
-                return Task.Result.COMPLETED;
-            });
+            () -> Task.Result.COMPLETED);
 
-        latch1.countDown();
-        latch2.await();
+        latch1.await();
 
         List<TaskExecutionDetails> list = memoryTaskManager.list();
         softly.assertThat(list).hasSize(4);
@@ -222,7 +202,7 @@ public class MemoryTaskManagerTest {
             .isEqualTo(TaskManager.Status.COMPLETED);
         softly.assertThat(entryWithId(list, inProgressId))
             .isEqualTo(TaskManager.Status.IN_PROGRESS);
-        latch3.countDown();
+        latch2.countDown();
     }
 
     private TaskManager.Status entryWithId(List<TaskExecutionDetails> list, TaskId taskId) {
@@ -262,6 +242,7 @@ public class MemoryTaskManagerTest {
         assertThat(memoryTaskManager.list(TaskManager.Status.WAITING))
             .extracting(TaskExecutionDetails::getTaskId)
             .containsOnly(waitingId);
+        latch3.countDown();
     }
 
     @Test
@@ -294,6 +275,7 @@ public class MemoryTaskManagerTest {
         assertThat(memoryTaskManager.list(TaskManager.Status.COMPLETED))
             .extracting(TaskExecutionDetails::getTaskId)
             .containsOnly(successfulId);
+        latch3.countDown();
     }
 
     @Test
@@ -326,6 +308,7 @@ public class MemoryTaskManagerTest {
         assertThat(memoryTaskManager.list(TaskManager.Status.FAILED))
             .extracting(TaskExecutionDetails::getTaskId)
             .containsOnly(failedId);
+        latch3.countDown();
     }
 
     @Test
@@ -358,6 +341,7 @@ public class MemoryTaskManagerTest {
         assertThat(memoryTaskManager.list(TaskManager.Status.IN_PROGRESS))
             .extracting(TaskExecutionDetails::getTaskId)
             .containsOnly(inProgressId);
+        latch3.countDown();
     }
 
     @Test
@@ -397,26 +381,29 @@ public class MemoryTaskManagerTest {
     public void submittedTaskShouldExecuteSequentially() {
         ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
 
-        TaskId id1 = memoryTaskManager.submit(() -> {
+        memoryTaskManager.submit(() -> {
             queue.add(1);
-            sleep(200);
+            sleep(50);
             queue.add(2);
             return Task.Result.COMPLETED;
         });
-        TaskId id2 = memoryTaskManager.submit(() -> {
+        memoryTaskManager.submit(() -> {
             queue.add(3);
-            sleep(200);
+            sleep(50);
             queue.add(4);
             return Task.Result.COMPLETED;
         });
-        sleep(TIME_TO_ENQUEUE);
-        memoryTaskManager.await(id1);
-        memoryTaskManager.await(id2);
+        memoryTaskManager.submit(() -> {
+            queue.add(5);
+            sleep(50);
+            queue.add(6);
+            return Task.Result.COMPLETED;
+        });
 
-        awaitAtMostOneSecond.until(() -> queue.contains(4));
+        awaitAtMostOneSecond.until(() -> queue.contains(6));
 
         assertThat(queue)
-            .containsExactly(1, 2, 3, 4);
+            .containsExactly(1, 2, 3, 4, 5, 6);
     }
 
     @Test
@@ -424,9 +411,8 @@ public class MemoryTaskManagerTest {
         TaskId taskId = memoryTaskManager.submit(() -> {
             throw new RuntimeException();
         });
-        sleep(TIME_TO_ENQUEUE);
-        TaskExecutionDetails executionDetails = memoryTaskManager.await(taskId);
-        assertThat(executionDetails.getStatus())
+        awaitUntilTaskHasStatus(taskId, TaskManager.Status.FAILED);
+        assertThat(memoryTaskManager.getExecutionDetails(taskId).getStatus())
             .isEqualTo(TaskManager.Status.FAILED);
     }
 
@@ -435,14 +421,12 @@ public class MemoryTaskManagerTest {
         TaskId taskId = memoryTaskManager.submit(() -> {
             throw new RuntimeException();
         });
-        sleep(TIME_TO_ENQUEUE);
-        memoryTaskManager.await(taskId);
-
+        awaitUntilTaskHasStatus(taskId, TaskManager.Status.FAILED);
         assertThat(memoryTaskManager.getExecutionDetails(taskId).getStatus())
             .isEqualTo(TaskManager.Status.FAILED);
     }
 
-    public void sleep(int durationInMs) {
+    private void sleep(int durationInMs) {
         try {
             Thread.sleep(durationInMs);
         } catch (InterruptedException e) {
@@ -450,11 +434,15 @@ public class MemoryTaskManagerTest {
         }
     }
 
-    public void await(CountDownLatch countDownLatch) {
+    private void await(CountDownLatch countDownLatch) {
         try {
             countDownLatch.await();
         } catch (InterruptedException e) {
             throw new RuntimeException(e);
         }
     }
+
+    private void awaitUntilTaskHasStatus(TaskId id, TaskManager.Status status) {
+        awaitAtMostOneSecond.until(() -> memoryTaskManager.getExecutionDetails(id).getStatus().equals(status));
+    }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org