You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2018/01/04 08:13:23 UTC
[08/21] james-project git commit: JAMES-2272 Memory implementation
for TaskManager
JAMES-2272 Memory implementation for TaskManager
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/8c1a0671
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/8c1a0671
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/8c1a0671
Branch: refs/heads/master
Commit: 8c1a06717a4fc694648a496e1cb0e2981a339ced
Parents: fbce4b0
Author: benwa <bt...@linagora.com>
Authored: Wed Dec 27 11:47:04 2017 +0700
Committer: benwa <bt...@linagora.com>
Committed: Thu Jan 4 15:03:36 2018 +0700
----------------------------------------------------------------------
.../apache/james/task/MemoryTaskManager.java | 156 +++++++
.../apache/james/task/TaskExecutionDetails.java | 28 +-
.../james/task/MemoryTaskManagerTest.java | 434 +++++++++++++++++++
3 files changed, 600 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/8c1a0671/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
----------------------------------------------------------------------
diff --git a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
new file mode 100644
index 0000000..ddd37bb
--- /dev/null
+++ b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
@@ -0,0 +1,156 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.task;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.function.Consumer;
+
+import javax.annotation.PreDestroy;
+
+import org.apache.james.util.MDCBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.fge.lambdas.Throwing;
+import com.github.steveash.guavate.Guavate;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+
+public class MemoryTaskManager implements TaskManager {
+ private static final boolean INTERRUPT_IF_RUNNING = true;
+ private static final Logger LOGGER = LoggerFactory.getLogger(MemoryTaskManager.class);
+
+ private final ConcurrentHashMap<TaskId, TaskExecutionDetails> idToExecutionDetails;
+ private final ConcurrentHashMap<TaskId, Future> idToFuture;
+ private final ExecutorService executor;
+
+ public MemoryTaskManager() {
+ idToExecutionDetails = new ConcurrentHashMap<>();
+ idToFuture = new ConcurrentHashMap<>();
+ executor = Executors.newSingleThreadExecutor();
+ }
+
+ @Override
+ public TaskId submit(Task task) {
+ return submit(task, id -> {});
+ }
+
+ @VisibleForTesting
+ TaskId submit(Task task, Consumer<TaskId> callback) {
+ TaskId taskId = TaskId.generateTaskId();
+ TaskExecutionDetails executionDetails = TaskExecutionDetails.from(task, taskId);
+
+ idToExecutionDetails.put(taskId, executionDetails);
+ idToFuture.put(taskId,
+ executor.submit(() -> runWithMdc(executionDetails, task, callback)));
+ return taskId;
+ }
+
+ private void runWithMdc(TaskExecutionDetails executionDetails, Task task, Consumer<TaskId> callback) {
+ MDCBuilder.withMdc(
+ MDCBuilder.create()
+ .addContext(Task.TASK_ID, executionDetails.getTaskId())
+ .addContext(Task.TASK_TYPE, executionDetails.getType())
+ .addContext(Task.TASK_DETAILS, executionDetails.getAdditionalInformation()),
+ () -> run(executionDetails, task, callback));
+ }
+
+ private void run(TaskExecutionDetails executionDetails, Task task, Consumer<TaskId> callback) {
+ TaskExecutionDetails started = executionDetails.start();
+ idToExecutionDetails.put(started.getTaskId(), started);
+ try {
+ task.run()
+ .onComplete(() -> success(started))
+ .onFailure(() -> failed(started,
+ logger -> logger.info("Task was partially performed. Check logs for more details")));
+ } catch (Exception e) {
+ failed(started,
+ logger -> logger.error("Error while running task", executionDetails, e));
+ } finally {
+ idToFuture.remove(executionDetails.getTaskId());
+ callback.accept(executionDetails.getTaskId());
+ }
+ }
+
+ private void success(TaskExecutionDetails started) {
+ if (!wasCancelled(started.getTaskId())) {
+ idToExecutionDetails.put(started.getTaskId(), started.completed());
+ LOGGER.info("Task success");
+ }
+ }
+
+ private void failed(TaskExecutionDetails started, Consumer<Logger> logOperation) {
+ if (!wasCancelled(started.getTaskId())) {
+ idToExecutionDetails.put(started.getTaskId(), started.failed());
+ logOperation.accept(LOGGER);
+ }
+ }
+
+ private boolean wasCancelled(TaskId taskId) {
+ return idToExecutionDetails.get(taskId).getStatus() == Status.CANCELLED;
+ }
+
+ @Override
+ public TaskExecutionDetails getExecutionDetails(TaskId id) {
+ return Optional.ofNullable(idToExecutionDetails.get(id))
+ .orElseThrow(TaskNotFoundException::new);
+ }
+
+ @Override
+ public List<TaskExecutionDetails> list() {
+ return ImmutableList.copyOf(idToExecutionDetails.values());
+ }
+
+ @Override
+ public List<TaskExecutionDetails> list(Status status) {
+ return idToExecutionDetails.values()
+ .stream()
+ .filter(details -> details.getStatus().equals(status))
+ .collect(Guavate.toImmutableList());
+ }
+
+ @Override
+ public void cancel(TaskId id) {
+ Optional.ofNullable(idToFuture.get(id))
+ .ifPresent(future -> {
+ TaskExecutionDetails executionDetails = idToExecutionDetails.get(id);
+ idToExecutionDetails.put(id, executionDetails.cancel());
+ future.cancel(INTERRUPT_IF_RUNNING);
+ idToFuture.remove(id);
+ });
+ }
+
+ @Override
+ public TaskExecutionDetails await(TaskId id) {
+ Optional.ofNullable(idToFuture.get(id))
+ .ifPresent(Throwing.consumer(Future::get));
+ return getExecutionDetails(id);
+ }
+
+ @PreDestroy
+ public void stop() {
+ executor.shutdownNow();
+ }
+}
http://git-wip-us.apache.org/repos/asf/james-project/blob/8c1a0671/server/task/src/main/java/org/apache/james/task/TaskExecutionDetails.java
----------------------------------------------------------------------
diff --git a/server/task/src/main/java/org/apache/james/task/TaskExecutionDetails.java b/server/task/src/main/java/org/apache/james/task/TaskExecutionDetails.java
index 0122fa7..a95627c 100644
--- a/server/task/src/main/java/org/apache/james/task/TaskExecutionDetails.java
+++ b/server/task/src/main/java/org/apache/james/task/TaskExecutionDetails.java
@@ -33,9 +33,8 @@ public class TaskExecutionDetails {
public static TaskExecutionDetails from(Task task, TaskId id) {
return new TaskExecutionDetails(
id,
- task.type(),
+ task,
TaskManager.Status.WAITING,
- task.details(),
Optional.of(ZonedDateTime.now()),
Optional.empty(),
Optional.empty(),
@@ -44,24 +43,21 @@ public class TaskExecutionDetails {
}
private final TaskId taskId;
- private final String type;
+ private final Task task;
private final TaskManager.Status status;
- private final Optional<AdditionalInformation> additionalInformation;
private final Optional<ZonedDateTime> submitDate;
private final Optional<ZonedDateTime> startedDate;
private final Optional<ZonedDateTime> completedDate;
private final Optional<ZonedDateTime> canceledDate;
private final Optional<ZonedDateTime> failedDate;
- public TaskExecutionDetails(TaskId taskId, String type, TaskManager.Status status,
- Optional<AdditionalInformation> additionalInformation,
+ public TaskExecutionDetails(TaskId taskId, Task task, TaskManager.Status status,
Optional<ZonedDateTime> submitDate, Optional<ZonedDateTime> startedDate,
Optional<ZonedDateTime> completedDate, Optional<ZonedDateTime> canceledDate,
Optional<ZonedDateTime> failedDate) {
this.taskId = taskId;
- this.type = type;
+ this.task = task;
this.status = status;
- this.additionalInformation = additionalInformation;
this.submitDate = submitDate;
this.startedDate = startedDate;
this.completedDate = completedDate;
@@ -74,7 +70,7 @@ public class TaskExecutionDetails {
}
public String getType() {
- return type;
+ return task.type();
}
public TaskManager.Status getStatus() {
@@ -82,7 +78,7 @@ public class TaskExecutionDetails {
}
public Optional<AdditionalInformation> getAdditionalInformation() {
- return additionalInformation;
+ return task.details();
}
public Optional<ZonedDateTime> getSubmitDate() {
@@ -109,9 +105,8 @@ public class TaskExecutionDetails {
Preconditions.checkState(status == TaskManager.Status.WAITING);
return new TaskExecutionDetails(
taskId,
- type,
+ task,
TaskManager.Status.IN_PROGRESS,
- additionalInformation,
submitDate,
Optional.of(ZonedDateTime.now()),
Optional.empty(),
@@ -123,9 +118,8 @@ public class TaskExecutionDetails {
Preconditions.checkState(status == TaskManager.Status.IN_PROGRESS);
return new TaskExecutionDetails(
taskId,
- type,
+ task,
TaskManager.Status.COMPLETED,
- additionalInformation,
submitDate,
startedDate,
Optional.of(ZonedDateTime.now()),
@@ -137,9 +131,8 @@ public class TaskExecutionDetails {
Preconditions.checkState(status == TaskManager.Status.IN_PROGRESS);
return new TaskExecutionDetails(
taskId,
- type,
+ task,
TaskManager.Status.FAILED,
- additionalInformation,
submitDate,
startedDate,
Optional.empty(),
@@ -152,9 +145,8 @@ public class TaskExecutionDetails {
|| status == TaskManager.Status.WAITING);
return new TaskExecutionDetails(
taskId,
- type,
+ task,
TaskManager.Status.CANCELLED,
- additionalInformation,
submitDate,
startedDate,
Optional.empty(),
http://git-wip-us.apache.org/repos/asf/james-project/blob/8c1a0671/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java
----------------------------------------------------------------------
diff --git a/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java b/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java
new file mode 100644
index 0000000..0b7005a
--- /dev/null
+++ b/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java
@@ -0,0 +1,434 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.task;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.assertj.core.api.JUnitSoftAssertions;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.github.fge.lambdas.Throwing;
+import com.github.fge.lambdas.consumers.ConsumerChainer;
+import com.google.common.base.Throwables;
+
+public class MemoryTaskManagerTest {
+
+ private MemoryTaskManager memoryTaskManager;
+
+ @Rule
+ public JUnitSoftAssertions softly = new JUnitSoftAssertions();
+
+ @Before
+ public void setUp() {
+ memoryTaskManager = new MemoryTaskManager();
+ }
+
+ @After
+ public void tearDown() {
+ memoryTaskManager.stop();
+ }
+
+ @Test
+ public void getStatusShouldReturnUnknownWhenUnknownId() {
+ TaskId unknownId = TaskId.generateTaskId();
+ assertThatThrownBy(() -> memoryTaskManager.getExecutionDetails(unknownId))
+ .isInstanceOf(TaskNotFoundException.class);
+ }
+
+ @Test
+ public void getStatusShouldReturnWaitingWhenNotYetProcessed() {
+ CountDownLatch task1Latch = new CountDownLatch(1);
+
+ memoryTaskManager.submit(() -> {
+ await(task1Latch);
+ return Task.Result.COMPLETED;
+ });
+
+ TaskId taskId = memoryTaskManager.submit(() -> Task.Result.COMPLETED);
+
+ assertThat(memoryTaskManager.getExecutionDetails(taskId).getStatus())
+ .isEqualTo(TaskManager.Status.WAITING);
+ }
+
+ @Test
+ public void taskCodeAfterCancelIsNotRun() {
+ CountDownLatch task1Latch = new CountDownLatch(1);
+ AtomicInteger count = new AtomicInteger(0);
+
+ TaskId id = memoryTaskManager.submit(() -> {
+ await(task1Latch);
+ count.incrementAndGet();
+ return Task.Result.COMPLETED;
+ });
+
+ memoryTaskManager.cancel(id);
+ task1Latch.countDown();
+
+ assertThat(count.get()).isEqualTo(0);
+ }
+
+ @Test
+ public void getStatusShouldReturnCancelledWhenCancelled() throws Exception {
+ CountDownLatch task1Latch = new CountDownLatch(1);
+ CountDownLatch ensureStartedLatch = new CountDownLatch(1);
+ CountDownLatch ensureFinishedLatch = new CountDownLatch(1);
+
+ TaskId id = memoryTaskManager.submit(() -> {
+ ensureStartedLatch.countDown();
+ await(task1Latch);
+ return Task.Result.COMPLETED;
+ },
+ any -> ensureFinishedLatch.countDown());
+
+ ensureStartedLatch.await();
+ memoryTaskManager.cancel(id);
+ ensureFinishedLatch.await();
+
+ assertThat(memoryTaskManager.getExecutionDetails(id).getStatus())
+ .isEqualTo(TaskManager.Status.CANCELLED);
+ }
+
+ @Test
+ public void cancelShouldBeIdempotent() {
+ CountDownLatch task1Latch = new CountDownLatch(1);
+
+ TaskId id = memoryTaskManager.submit(() -> {
+ await(task1Latch);
+ return Task.Result.COMPLETED;
+ });
+
+ memoryTaskManager.cancel(id);
+ assertThatCode(() -> memoryTaskManager.cancel(id))
+ .doesNotThrowAnyException();
+ }
+
+ @Test
+ public void getStatusShouldReturnInProgressWhenProcessingIsInProgress() throws Exception {
+ CountDownLatch latch1 = new CountDownLatch(1);
+ CountDownLatch latch2 = new CountDownLatch(1);
+
+ TaskId taskId = memoryTaskManager.submit(() -> {
+ latch2.countDown();
+ await(latch1);
+ return Task.Result.COMPLETED;
+ });
+ latch2.await();
+
+ assertThat(memoryTaskManager.getExecutionDetails(taskId).getStatus())
+ .isEqualTo(TaskManager.Status.IN_PROGRESS);
+ }
+
+ @Test
+ public void getStatusShouldReturnCompletedWhenRunSuccessfully() throws Exception {
+ CountDownLatch latch = new CountDownLatch(1);
+
+ TaskId taskId = memoryTaskManager.submit(
+ () -> Task.Result.COMPLETED,
+ countDownCallback(latch));
+
+ latch.await();
+
+ assertThat(memoryTaskManager.getExecutionDetails(taskId).getStatus())
+ .isEqualTo(TaskManager.Status.COMPLETED);
+ }
+
+ @Test
+ public void getStatusShouldReturnFailedWhenRunPartially() throws Exception {
+ CountDownLatch latch = new CountDownLatch(1);
+
+ TaskId taskId = memoryTaskManager.submit(
+ () -> Task.Result.PARTIAL,
+ countDownCallback(latch));
+
+ latch.await();
+
+ assertThat(memoryTaskManager.getExecutionDetails(taskId).getStatus())
+ .isEqualTo(TaskManager.Status.FAILED);
+ }
+
+ private ConsumerChainer<TaskId> countDownCallback(CountDownLatch latch) {
+ return Throwing.consumer(id -> latch.countDown());
+ }
+
+ @Test
+ public void listShouldReturnTaskSatus() throws Exception {
+ CountDownLatch latch1 = new CountDownLatch(1);
+ CountDownLatch latch2 = new CountDownLatch(1);
+ CountDownLatch latch3 = new CountDownLatch(1);
+
+ TaskId failedId = memoryTaskManager.submit(
+ () -> Task.Result.PARTIAL);
+ TaskId successfulId = memoryTaskManager.submit(
+ () -> Task.Result.COMPLETED);
+ TaskId inProgressId = memoryTaskManager.submit(
+ () -> {
+ await(latch1);
+ latch2.countDown();
+ await(latch3);
+ return Task.Result.COMPLETED;
+ });
+ TaskId waitingId = memoryTaskManager.submit(
+ () -> {
+ await(latch3);
+ latch2.countDown();
+ return Task.Result.COMPLETED;
+ });
+
+ latch1.countDown();
+ latch2.await();
+
+ List<TaskExecutionDetails> list = memoryTaskManager.list();
+ softly.assertThat(list).hasSize(4);
+ softly.assertThat(entryWithId(list, failedId))
+ .isEqualTo(TaskManager.Status.FAILED);
+ softly.assertThat(entryWithId(list, waitingId))
+ .isEqualTo(TaskManager.Status.WAITING);
+ softly.assertThat(entryWithId(list, successfulId))
+ .isEqualTo(TaskManager.Status.COMPLETED);
+ softly.assertThat(entryWithId(list, inProgressId))
+ .isEqualTo(TaskManager.Status.IN_PROGRESS);
+ }
+
+ private TaskManager.Status entryWithId(List<TaskExecutionDetails> list, TaskId taskId) {
+ return list.stream()
+ .filter(e -> e.getTaskId().equals(taskId))
+ .findFirst().get()
+ .getStatus();
+ }
+
+ @Test
+ public void listShouldAllowToSeeWaitingTasks() throws Exception {
+ CountDownLatch latch1 = new CountDownLatch(1);
+ CountDownLatch latch2 = new CountDownLatch(1);
+ CountDownLatch latch3 = new CountDownLatch(1);
+
+ memoryTaskManager.submit(
+ () -> Task.Result.PARTIAL);
+ memoryTaskManager.submit(
+ () -> Task.Result.COMPLETED);
+ memoryTaskManager.submit(
+ () -> {
+ await(latch1);
+ latch2.countDown();
+ await(latch3);
+ return Task.Result.COMPLETED;
+ });
+ TaskId waitingId = memoryTaskManager.submit(
+ () -> {
+ await(latch3);
+ latch2.countDown();
+ return Task.Result.COMPLETED;
+ });
+
+ latch1.countDown();
+ latch2.await();
+
+ assertThat(memoryTaskManager.list(TaskManager.Status.WAITING))
+ .extracting(TaskExecutionDetails::getTaskId)
+ .containsOnly(waitingId);
+ }
+
+ @Test
+ public void listShouldAllowToSeeInProgressTasks() throws Exception {
+ CountDownLatch latch1 = new CountDownLatch(1);
+ CountDownLatch latch2 = new CountDownLatch(1);
+ CountDownLatch latch3 = new CountDownLatch(1);
+
+ memoryTaskManager.submit(
+ () -> Task.Result.PARTIAL);
+ TaskId successfulId = memoryTaskManager.submit(
+ () -> Task.Result.COMPLETED);
+ memoryTaskManager.submit(
+ () -> {
+ await(latch1);
+ latch2.countDown();
+ await(latch3);
+ return Task.Result.COMPLETED;
+ });
+ memoryTaskManager.submit(
+ () -> {
+ await(latch3);
+ latch2.countDown();
+ return Task.Result.COMPLETED;
+ });
+
+ latch1.countDown();
+ latch2.await();
+
+ assertThat(memoryTaskManager.list(TaskManager.Status.COMPLETED))
+ .extracting(TaskExecutionDetails::getTaskId)
+ .containsOnly(successfulId);
+ }
+
+ @Test
+ public void listShouldAllowToSeeFailedTasks() throws Exception {
+ CountDownLatch latch1 = new CountDownLatch(1);
+ CountDownLatch latch2 = new CountDownLatch(1);
+ CountDownLatch latch3 = new CountDownLatch(1);
+
+ TaskId failedId = memoryTaskManager.submit(
+ () -> Task.Result.PARTIAL);
+ memoryTaskManager.submit(
+ () -> Task.Result.COMPLETED);
+ memoryTaskManager.submit(
+ () -> {
+ await(latch1);
+ latch2.countDown();
+ await(latch3);
+ return Task.Result.COMPLETED;
+ });
+ memoryTaskManager.submit(
+ () -> {
+ await(latch3);
+ latch2.countDown();
+ return Task.Result.COMPLETED;
+ });
+
+ latch1.countDown();
+ latch2.await();
+
+ assertThat(memoryTaskManager.list(TaskManager.Status.FAILED))
+ .extracting(TaskExecutionDetails::getTaskId)
+ .containsOnly(failedId);
+ }
+
+ @Test
+ public void listShouldAllowToSeeSuccessfulTasks() throws Exception {
+ CountDownLatch latch1 = new CountDownLatch(1);
+ CountDownLatch latch2 = new CountDownLatch(1);
+ CountDownLatch latch3 = new CountDownLatch(1);
+
+ memoryTaskManager.submit(
+ () -> Task.Result.PARTIAL);
+ memoryTaskManager.submit(
+ () -> Task.Result.COMPLETED);
+ TaskId inProgressId = memoryTaskManager.submit(
+ () -> {
+ await(latch1);
+ latch2.countDown();
+ await(latch3);
+ return Task.Result.COMPLETED;
+ });
+ memoryTaskManager.submit(
+ () -> {
+ await(latch3);
+ latch2.countDown();
+ return Task.Result.COMPLETED;
+ });
+
+ latch1.countDown();
+ latch2.await();
+
+ assertThat(memoryTaskManager.list(TaskManager.Status.IN_PROGRESS))
+ .extracting(TaskExecutionDetails::getTaskId)
+ .containsOnly(inProgressId);
+ }
+
+ @Test
+ public void listShouldBeEmptyWhenNoTasks() throws Exception {
+ assertThat(memoryTaskManager.list()).isEmpty();
+ }
+
+ @Test
+ public void listCancelledShouldBeEmptyWhenNoTasks() throws Exception {
+ assertThat(memoryTaskManager.list(TaskManager.Status.CANCELLED)).isEmpty();
+ }
+
+ @Test
+ public void awaitShouldNotThrowWhenCompletedTask() throws Exception {
+ TaskId taskId = memoryTaskManager.submit(
+ () -> Task.Result.COMPLETED);
+ memoryTaskManager.await(taskId);
+ memoryTaskManager.await(taskId);
+ }
+
+ @Test
+ public void submittedTaskShouldExecuteSequentially() {
+ ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
+
+ TaskId id1 = memoryTaskManager.submit(() -> {
+ queue.add(1);
+ sleep(500);
+ queue.add(2);
+ return Task.Result.COMPLETED;
+ });
+ TaskId id2 = memoryTaskManager.submit(() -> {
+ queue.add(3);
+ sleep(500);
+ queue.add(4);
+ return Task.Result.COMPLETED;
+ });
+ memoryTaskManager.await(id1);
+ memoryTaskManager.await(id2);
+
+ assertThat(queue)
+ .containsExactly(1, 2, 3, 4);
+ }
+
+ @Test
+ public void awaitShouldReturnFailedWhenExceptionThrown() {
+ TaskId taskId = memoryTaskManager.submit(() -> {
+ throw new RuntimeException();
+ });
+
+ TaskExecutionDetails executionDetails = memoryTaskManager.await(taskId);
+
+ assertThat(executionDetails.getStatus())
+ .isEqualTo(TaskManager.Status.FAILED);
+ }
+
+ @Test
+ public void getStatusShouldReturnFailedWhenExceptionThrown() {
+ TaskId taskId = memoryTaskManager.submit(() -> {
+ throw new RuntimeException();
+ });
+
+ memoryTaskManager.await(taskId);
+
+ assertThat(memoryTaskManager.getExecutionDetails(taskId).getStatus())
+ .isEqualTo(TaskManager.Status.FAILED);
+ }
+
+ public void sleep(int durationInMs) {
+ try {
+ Thread.sleep(durationInMs);
+ } catch (InterruptedException e) {
+ Throwables.propagate(e);
+ }
+ }
+
+ public void await(CountDownLatch countDownLatch) {
+ try {
+ countDownLatch.await();
+ } catch (InterruptedException e) {
+ Throwables.propagate(e);
+ }
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org