You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2018/01/04 08:13:23 UTC

[08/21] james-project git commit: JAMES-2272 Memory implementation for TaskManager

JAMES-2272 Memory implementation for TaskManager


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/8c1a0671
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/8c1a0671
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/8c1a0671

Branch: refs/heads/master
Commit: 8c1a06717a4fc694648a496e1cb0e2981a339ced
Parents: fbce4b0
Author: benwa <bt...@linagora.com>
Authored: Wed Dec 27 11:47:04 2017 +0700
Committer: benwa <bt...@linagora.com>
Committed: Thu Jan 4 15:03:36 2018 +0700

----------------------------------------------------------------------
 .../apache/james/task/MemoryTaskManager.java    | 156 +++++++
 .../apache/james/task/TaskExecutionDetails.java |  28 +-
 .../james/task/MemoryTaskManagerTest.java       | 434 +++++++++++++++++++
 3 files changed, 600 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/8c1a0671/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
----------------------------------------------------------------------
diff --git a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
new file mode 100644
index 0000000..ddd37bb
--- /dev/null
+++ b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
@@ -0,0 +1,156 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.task;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.function.Consumer;
+
+import javax.annotation.PreDestroy;
+
+import org.apache.james.util.MDCBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.fge.lambdas.Throwing;
+import com.github.steveash.guavate.Guavate;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+
+public class MemoryTaskManager implements TaskManager {
+    private static final boolean INTERRUPT_IF_RUNNING = true;
+    private static final Logger LOGGER = LoggerFactory.getLogger(MemoryTaskManager.class);
+
+    private final ConcurrentHashMap<TaskId, TaskExecutionDetails> idToExecutionDetails;
+    private final ConcurrentHashMap<TaskId, Future> idToFuture;
+    private final ExecutorService executor;
+
+    public MemoryTaskManager() {
+        idToExecutionDetails = new ConcurrentHashMap<>();
+        idToFuture = new ConcurrentHashMap<>();
+        executor = Executors.newSingleThreadExecutor();
+    }
+
+    @Override
+    public TaskId submit(Task task) {
+        return submit(task, id -> {});
+    }
+
+    @VisibleForTesting
+    TaskId submit(Task task, Consumer<TaskId> callback) {
+        TaskId taskId = TaskId.generateTaskId();
+        TaskExecutionDetails executionDetails = TaskExecutionDetails.from(task, taskId);
+
+        idToExecutionDetails.put(taskId, executionDetails);
+        idToFuture.put(taskId,
+            executor.submit(() -> runWithMdc(executionDetails, task, callback)));
+        return taskId;
+    }
+
+    private void runWithMdc(TaskExecutionDetails executionDetails, Task task, Consumer<TaskId> callback) {
+        MDCBuilder.withMdc(
+            MDCBuilder.create()
+                .addContext(Task.TASK_ID, executionDetails.getTaskId())
+                .addContext(Task.TASK_TYPE, executionDetails.getType())
+                .addContext(Task.TASK_DETAILS, executionDetails.getAdditionalInformation()),
+            () -> run(executionDetails, task, callback));
+    }
+
+    private void run(TaskExecutionDetails executionDetails, Task task, Consumer<TaskId> callback) {
+        TaskExecutionDetails started = executionDetails.start();
+        idToExecutionDetails.put(started.getTaskId(), started);
+        try {
+            task.run()
+                .onComplete(() -> success(started))
+                .onFailure(() -> failed(started,
+                    logger -> logger.info("Task was partially performed. Check logs for more details")));
+        } catch (Exception e) {
+            failed(started,
+                logger -> logger.error("Error while running task", executionDetails, e));
+        } finally {
+            idToFuture.remove(executionDetails.getTaskId());
+            callback.accept(executionDetails.getTaskId());
+        }
+    }
+
+    private void success(TaskExecutionDetails started) {
+        if (!wasCancelled(started.getTaskId())) {
+            idToExecutionDetails.put(started.getTaskId(), started.completed());
+            LOGGER.info("Task success");
+        }
+    }
+
+    private void failed(TaskExecutionDetails started, Consumer<Logger> logOperation) {
+        if (!wasCancelled(started.getTaskId())) {
+            idToExecutionDetails.put(started.getTaskId(), started.failed());
+            logOperation.accept(LOGGER);
+        }
+    }
+
+    private boolean wasCancelled(TaskId taskId) {
+        return idToExecutionDetails.get(taskId).getStatus() == Status.CANCELLED;
+    }
+
+    @Override
+    public TaskExecutionDetails getExecutionDetails(TaskId id) {
+        return Optional.ofNullable(idToExecutionDetails.get(id))
+            .orElseThrow(TaskNotFoundException::new);
+    }
+
+    @Override
+    public List<TaskExecutionDetails> list() {
+        return ImmutableList.copyOf(idToExecutionDetails.values());
+    }
+
+    @Override
+    public List<TaskExecutionDetails> list(Status status) {
+        return idToExecutionDetails.values()
+            .stream()
+            .filter(details -> details.getStatus().equals(status))
+            .collect(Guavate.toImmutableList());
+    }
+
+    @Override
+    public void cancel(TaskId id) {
+        Optional.ofNullable(idToFuture.get(id))
+            .ifPresent(future -> {
+                TaskExecutionDetails executionDetails = idToExecutionDetails.get(id);
+                idToExecutionDetails.put(id, executionDetails.cancel());
+                future.cancel(INTERRUPT_IF_RUNNING);
+                idToFuture.remove(id);
+            });
+    }
+
+    @Override
+    public TaskExecutionDetails await(TaskId id) {
+        Optional.ofNullable(idToFuture.get(id))
+            .ifPresent(Throwing.consumer(Future::get));
+        return getExecutionDetails(id);
+    }
+
+    @PreDestroy
+    public void stop() {
+        executor.shutdownNow();
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/8c1a0671/server/task/src/main/java/org/apache/james/task/TaskExecutionDetails.java
----------------------------------------------------------------------
diff --git a/server/task/src/main/java/org/apache/james/task/TaskExecutionDetails.java b/server/task/src/main/java/org/apache/james/task/TaskExecutionDetails.java
index 0122fa7..a95627c 100644
--- a/server/task/src/main/java/org/apache/james/task/TaskExecutionDetails.java
+++ b/server/task/src/main/java/org/apache/james/task/TaskExecutionDetails.java
@@ -33,9 +33,8 @@ public class TaskExecutionDetails {
     public static TaskExecutionDetails from(Task task, TaskId id) {
         return new TaskExecutionDetails(
             id,
-            task.type(),
+            task,
             TaskManager.Status.WAITING,
-            task.details(),
             Optional.of(ZonedDateTime.now()),
             Optional.empty(),
             Optional.empty(),
@@ -44,24 +43,21 @@ public class TaskExecutionDetails {
     }
 
     private final TaskId taskId;
-    private final String type;
+    private final Task task;
     private final TaskManager.Status status;
-    private final Optional<AdditionalInformation> additionalInformation;
     private final Optional<ZonedDateTime> submitDate;
     private final Optional<ZonedDateTime> startedDate;
     private final Optional<ZonedDateTime> completedDate;
     private final Optional<ZonedDateTime> canceledDate;
     private final Optional<ZonedDateTime> failedDate;
 
-    public TaskExecutionDetails(TaskId taskId, String type, TaskManager.Status status,
-                                Optional<AdditionalInformation> additionalInformation,
+    public TaskExecutionDetails(TaskId taskId, Task task, TaskManager.Status status,
                                 Optional<ZonedDateTime> submitDate, Optional<ZonedDateTime> startedDate,
                                 Optional<ZonedDateTime> completedDate, Optional<ZonedDateTime> canceledDate,
                                 Optional<ZonedDateTime> failedDate) {
         this.taskId = taskId;
-        this.type = type;
+        this.task = task;
         this.status = status;
-        this.additionalInformation = additionalInformation;
         this.submitDate = submitDate;
         this.startedDate = startedDate;
         this.completedDate = completedDate;
@@ -74,7 +70,7 @@ public class TaskExecutionDetails {
     }
 
     public String getType() {
-        return type;
+        return task.type();
     }
 
     public TaskManager.Status getStatus() {
@@ -82,7 +78,7 @@ public class TaskExecutionDetails {
     }
 
     public Optional<AdditionalInformation> getAdditionalInformation() {
-        return additionalInformation;
+        return task.details();
     }
 
     public Optional<ZonedDateTime> getSubmitDate() {
@@ -109,9 +105,8 @@ public class TaskExecutionDetails {
         Preconditions.checkState(status == TaskManager.Status.WAITING);
         return new TaskExecutionDetails(
             taskId,
-            type,
+            task,
             TaskManager.Status.IN_PROGRESS,
-            additionalInformation,
             submitDate,
             Optional.of(ZonedDateTime.now()),
             Optional.empty(),
@@ -123,9 +118,8 @@ public class TaskExecutionDetails {
         Preconditions.checkState(status == TaskManager.Status.IN_PROGRESS);
         return new TaskExecutionDetails(
             taskId,
-            type,
+            task,
             TaskManager.Status.COMPLETED,
-            additionalInformation,
             submitDate,
             startedDate,
             Optional.of(ZonedDateTime.now()),
@@ -137,9 +131,8 @@ public class TaskExecutionDetails {
         Preconditions.checkState(status == TaskManager.Status.IN_PROGRESS);
         return new TaskExecutionDetails(
             taskId,
-            type,
+            task,
             TaskManager.Status.FAILED,
-            additionalInformation,
             submitDate,
             startedDate,
             Optional.empty(),
@@ -152,9 +145,8 @@ public class TaskExecutionDetails {
             || status == TaskManager.Status.WAITING);
         return new TaskExecutionDetails(
             taskId,
-            type,
+            task,
             TaskManager.Status.CANCELLED,
-            additionalInformation,
             submitDate,
             startedDate,
             Optional.empty(),

http://git-wip-us.apache.org/repos/asf/james-project/blob/8c1a0671/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java
----------------------------------------------------------------------
diff --git a/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java b/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java
new file mode 100644
index 0000000..0b7005a
--- /dev/null
+++ b/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java
@@ -0,0 +1,434 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.task;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.assertj.core.api.JUnitSoftAssertions;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.github.fge.lambdas.Throwing;
+import com.github.fge.lambdas.consumers.ConsumerChainer;
+import com.google.common.base.Throwables;
+
+public class MemoryTaskManagerTest {
+
+    private MemoryTaskManager memoryTaskManager;
+
+    @Rule
+    public JUnitSoftAssertions softly = new JUnitSoftAssertions();
+
+    @Before
+    public void setUp() {
+        memoryTaskManager = new MemoryTaskManager();
+    }
+
+    @After
+    public void tearDown() {
+        memoryTaskManager.stop();
+    }
+
+    @Test
+    public void getStatusShouldReturnUnknownWhenUnknownId() {
+        TaskId unknownId = TaskId.generateTaskId();
+        assertThatThrownBy(() -> memoryTaskManager.getExecutionDetails(unknownId))
+            .isInstanceOf(TaskNotFoundException.class);
+    }
+
+    @Test
+    public void getStatusShouldReturnWaitingWhenNotYetProcessed() {
+        CountDownLatch task1Latch = new CountDownLatch(1);
+
+        memoryTaskManager.submit(() -> {
+            await(task1Latch);
+            return Task.Result.COMPLETED;
+        });
+
+        TaskId taskId = memoryTaskManager.submit(() -> Task.Result.COMPLETED);
+
+        assertThat(memoryTaskManager.getExecutionDetails(taskId).getStatus())
+            .isEqualTo(TaskManager.Status.WAITING);
+    }
+
+    @Test
+    public void taskCodeAfterCancelIsNotRun() {
+        CountDownLatch task1Latch = new CountDownLatch(1);
+        AtomicInteger count = new AtomicInteger(0);
+
+        TaskId id = memoryTaskManager.submit(() -> {
+            await(task1Latch);
+            count.incrementAndGet();
+            return Task.Result.COMPLETED;
+        });
+
+        memoryTaskManager.cancel(id);
+        task1Latch.countDown();
+
+        assertThat(count.get()).isEqualTo(0);
+    }
+
+    @Test
+    public void getStatusShouldReturnCancelledWhenCancelled() throws Exception {
+        CountDownLatch task1Latch = new CountDownLatch(1);
+        CountDownLatch ensureStartedLatch = new CountDownLatch(1);
+        CountDownLatch ensureFinishedLatch = new CountDownLatch(1);
+
+        TaskId id = memoryTaskManager.submit(() -> {
+            ensureStartedLatch.countDown();
+            await(task1Latch);
+            return Task.Result.COMPLETED;
+        },
+            any -> ensureFinishedLatch.countDown());
+
+        ensureStartedLatch.await();
+        memoryTaskManager.cancel(id);
+        ensureFinishedLatch.await();
+
+        assertThat(memoryTaskManager.getExecutionDetails(id).getStatus())
+            .isEqualTo(TaskManager.Status.CANCELLED);
+    }
+
+    @Test
+    public void cancelShouldBeIdempotent() {
+        CountDownLatch task1Latch = new CountDownLatch(1);
+
+        TaskId id = memoryTaskManager.submit(() -> {
+            await(task1Latch);
+            return Task.Result.COMPLETED;
+        });
+
+        memoryTaskManager.cancel(id);
+        assertThatCode(() -> memoryTaskManager.cancel(id))
+            .doesNotThrowAnyException();
+    }
+
+    @Test
+    public void getStatusShouldReturnInProgressWhenProcessingIsInProgress() throws Exception {
+        CountDownLatch latch1 = new CountDownLatch(1);
+        CountDownLatch latch2 = new CountDownLatch(1);
+
+        TaskId taskId = memoryTaskManager.submit(() -> {
+            latch2.countDown();
+            await(latch1);
+            return Task.Result.COMPLETED;
+        });
+        latch2.await();
+
+        assertThat(memoryTaskManager.getExecutionDetails(taskId).getStatus())
+            .isEqualTo(TaskManager.Status.IN_PROGRESS);
+    }
+
+    @Test
+    public void getStatusShouldReturnCompletedWhenRunSuccessfully() throws Exception {
+        CountDownLatch latch = new CountDownLatch(1);
+
+        TaskId taskId = memoryTaskManager.submit(
+            () -> Task.Result.COMPLETED,
+            countDownCallback(latch));
+
+        latch.await();
+
+        assertThat(memoryTaskManager.getExecutionDetails(taskId).getStatus())
+            .isEqualTo(TaskManager.Status.COMPLETED);
+    }
+
+    @Test
+    public void getStatusShouldReturnFailedWhenRunPartially() throws Exception {
+        CountDownLatch latch = new CountDownLatch(1);
+
+        TaskId taskId = memoryTaskManager.submit(
+            () -> Task.Result.PARTIAL,
+            countDownCallback(latch));
+
+        latch.await();
+
+        assertThat(memoryTaskManager.getExecutionDetails(taskId).getStatus())
+            .isEqualTo(TaskManager.Status.FAILED);
+    }
+
+    private ConsumerChainer<TaskId> countDownCallback(CountDownLatch latch) {
+        return Throwing.consumer(id -> latch.countDown());
+    }
+
+    @Test
+    public void listShouldReturnTaskSatus() throws Exception {
+        CountDownLatch latch1 = new CountDownLatch(1);
+        CountDownLatch latch2 = new CountDownLatch(1);
+        CountDownLatch latch3 = new CountDownLatch(1);
+
+        TaskId failedId = memoryTaskManager.submit(
+            () -> Task.Result.PARTIAL);
+        TaskId successfulId = memoryTaskManager.submit(
+            () -> Task.Result.COMPLETED);
+        TaskId inProgressId = memoryTaskManager.submit(
+            () -> {
+                await(latch1);
+                latch2.countDown();
+                await(latch3);
+                return Task.Result.COMPLETED;
+            });
+        TaskId waitingId = memoryTaskManager.submit(
+            () -> {
+                await(latch3);
+                latch2.countDown();
+                return Task.Result.COMPLETED;
+            });
+
+        latch1.countDown();
+        latch2.await();
+
+        List<TaskExecutionDetails> list = memoryTaskManager.list();
+        softly.assertThat(list).hasSize(4);
+        softly.assertThat(entryWithId(list, failedId))
+            .isEqualTo(TaskManager.Status.FAILED);
+        softly.assertThat(entryWithId(list, waitingId))
+            .isEqualTo(TaskManager.Status.WAITING);
+        softly.assertThat(entryWithId(list, successfulId))
+            .isEqualTo(TaskManager.Status.COMPLETED);
+        softly.assertThat(entryWithId(list, inProgressId))
+            .isEqualTo(TaskManager.Status.IN_PROGRESS);
+    }
+
+    private TaskManager.Status entryWithId(List<TaskExecutionDetails> list, TaskId taskId) {
+        return list.stream()
+            .filter(e -> e.getTaskId().equals(taskId))
+            .findFirst().get()
+            .getStatus();
+    }
+
+    @Test
+    public void listShouldAllowToSeeWaitingTasks() throws Exception {
+        CountDownLatch latch1 = new CountDownLatch(1);
+        CountDownLatch latch2 = new CountDownLatch(1);
+        CountDownLatch latch3 = new CountDownLatch(1);
+
+        memoryTaskManager.submit(
+            () -> Task.Result.PARTIAL);
+        memoryTaskManager.submit(
+            () -> Task.Result.COMPLETED);
+        memoryTaskManager.submit(
+            () -> {
+                await(latch1);
+                latch2.countDown();
+                await(latch3);
+                return Task.Result.COMPLETED;
+            });
+        TaskId waitingId = memoryTaskManager.submit(
+            () -> {
+                await(latch3);
+                latch2.countDown();
+                return Task.Result.COMPLETED;
+            });
+
+        latch1.countDown();
+        latch2.await();
+
+        assertThat(memoryTaskManager.list(TaskManager.Status.WAITING))
+            .extracting(TaskExecutionDetails::getTaskId)
+            .containsOnly(waitingId);
+    }
+
+    @Test
+    public void listShouldAllowToSeeInProgressTasks() throws Exception {
+        CountDownLatch latch1 = new CountDownLatch(1);
+        CountDownLatch latch2 = new CountDownLatch(1);
+        CountDownLatch latch3 = new CountDownLatch(1);
+
+        memoryTaskManager.submit(
+            () -> Task.Result.PARTIAL);
+        TaskId successfulId = memoryTaskManager.submit(
+            () -> Task.Result.COMPLETED);
+        memoryTaskManager.submit(
+            () -> {
+                await(latch1);
+                latch2.countDown();
+                await(latch3);
+                return Task.Result.COMPLETED;
+            });
+        memoryTaskManager.submit(
+            () -> {
+                await(latch3);
+                latch2.countDown();
+                return Task.Result.COMPLETED;
+            });
+
+        latch1.countDown();
+        latch2.await();
+
+        assertThat(memoryTaskManager.list(TaskManager.Status.COMPLETED))
+            .extracting(TaskExecutionDetails::getTaskId)
+            .containsOnly(successfulId);
+    }
+
+    @Test
+    public void listShouldAllowToSeeFailedTasks() throws Exception {
+        CountDownLatch latch1 = new CountDownLatch(1);
+        CountDownLatch latch2 = new CountDownLatch(1);
+        CountDownLatch latch3 = new CountDownLatch(1);
+
+        TaskId failedId = memoryTaskManager.submit(
+            () -> Task.Result.PARTIAL);
+        memoryTaskManager.submit(
+            () -> Task.Result.COMPLETED);
+        memoryTaskManager.submit(
+            () -> {
+                await(latch1);
+                latch2.countDown();
+                await(latch3);
+                return Task.Result.COMPLETED;
+            });
+        memoryTaskManager.submit(
+            () -> {
+                await(latch3);
+                latch2.countDown();
+                return Task.Result.COMPLETED;
+            });
+
+        latch1.countDown();
+        latch2.await();
+
+        assertThat(memoryTaskManager.list(TaskManager.Status.FAILED))
+            .extracting(TaskExecutionDetails::getTaskId)
+            .containsOnly(failedId);
+    }
+
+    @Test
+    public void listShouldAllowToSeeSuccessfulTasks() throws Exception {
+        CountDownLatch latch1 = new CountDownLatch(1);
+        CountDownLatch latch2 = new CountDownLatch(1);
+        CountDownLatch latch3 = new CountDownLatch(1);
+
+        memoryTaskManager.submit(
+            () -> Task.Result.PARTIAL);
+        memoryTaskManager.submit(
+            () -> Task.Result.COMPLETED);
+        TaskId inProgressId = memoryTaskManager.submit(
+            () -> {
+                await(latch1);
+                latch2.countDown();
+                await(latch3);
+                return Task.Result.COMPLETED;
+            });
+        memoryTaskManager.submit(
+            () -> {
+                await(latch3);
+                latch2.countDown();
+                return Task.Result.COMPLETED;
+            });
+
+        latch1.countDown();
+        latch2.await();
+
+        assertThat(memoryTaskManager.list(TaskManager.Status.IN_PROGRESS))
+            .extracting(TaskExecutionDetails::getTaskId)
+            .containsOnly(inProgressId);
+    }
+
+    @Test
+    public void listShouldBeEmptyWhenNoTasks() throws Exception {
+        assertThat(memoryTaskManager.list()).isEmpty();
+    }
+
+    @Test
+    public void listCancelledShouldBeEmptyWhenNoTasks() throws Exception {
+        assertThat(memoryTaskManager.list(TaskManager.Status.CANCELLED)).isEmpty();
+    }
+
+    @Test
+    public void awaitShouldNotThrowWhenCompletedTask() throws Exception {
+        TaskId taskId = memoryTaskManager.submit(
+            () -> Task.Result.COMPLETED);
+        memoryTaskManager.await(taskId);
+        memoryTaskManager.await(taskId);
+    }
+
+    @Test
+    public void submittedTaskShouldExecuteSequentially() {
+        ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
+
+        TaskId id1 = memoryTaskManager.submit(() -> {
+            queue.add(1);
+            sleep(500);
+            queue.add(2);
+            return Task.Result.COMPLETED;
+        });
+        TaskId id2 = memoryTaskManager.submit(() -> {
+            queue.add(3);
+            sleep(500);
+            queue.add(4);
+            return Task.Result.COMPLETED;
+        });
+        memoryTaskManager.await(id1);
+        memoryTaskManager.await(id2);
+
+        assertThat(queue)
+            .containsExactly(1, 2, 3, 4);
+    }
+
+    @Test
+    public void awaitShouldReturnFailedWhenExceptionThrown() {
+        TaskId taskId = memoryTaskManager.submit(() -> {
+            throw new RuntimeException();
+        });
+
+        TaskExecutionDetails executionDetails = memoryTaskManager.await(taskId);
+
+        assertThat(executionDetails.getStatus())
+            .isEqualTo(TaskManager.Status.FAILED);
+    }
+
+    @Test
+    public void getStatusShouldReturnFailedWhenExceptionThrown() {
+        TaskId taskId = memoryTaskManager.submit(() -> {
+            throw new RuntimeException();
+        });
+
+        memoryTaskManager.await(taskId);
+
+        assertThat(memoryTaskManager.getExecutionDetails(taskId).getStatus())
+            .isEqualTo(TaskManager.Status.FAILED);
+    }
+
+    public void sleep(int durationInMs) {
+        try {
+            Thread.sleep(durationInMs);
+        } catch (InterruptedException e) {
+            Throwables.propagate(e);
+        }
+    }
+
+    public void await(CountDownLatch countDownLatch) {
+        try {
+            countDownLatch.await();
+        } catch (InterruptedException e) {
+            Throwables.propagate(e);
+        }
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org