You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ro...@apache.org on 2019/05/28 07:39:27 UTC
[james-project] 02/06: JAMES-2777 create a MemoryTaskManagerWorker
responsible to execute the tasks
This is an automated email from the ASF dual-hosted git repository.
rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit c420360973e5cb25134a268fa228465a593cd93b
Author: RĂ©mi Kowalski <rk...@linagora.com>
AuthorDate: Tue May 21 11:42:06 2019 +0200
JAMES-2777 create a MemoryTaskManagerWorker responsible to execute the tasks
---
.../apache/james/task/MemoryTaskManagerWorker.java | 114 ++++++++++++++
.../james/task/TaskExecutionDetailsUpdater.java | 24 +++
.../org/apache/james/task/TaskManagerWorker.java | 31 ++++
.../james/task/MemoryTaskManagerWorkerTest.java | 163 +++++++++++++++++++++
4 files changed, 332 insertions(+)
diff --git a/server/task/src/main/java/org/apache/james/task/MemoryTaskManagerWorker.java b/server/task/src/main/java/org/apache/james/task/MemoryTaskManagerWorker.java
new file mode 100644
index 0000000..6feb85a
--- /dev/null
+++ b/server/task/src/main/java/org/apache/james/task/MemoryTaskManagerWorker.java
@@ -0,0 +1,114 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+package org.apache.james.task;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
+import org.apache.james.util.MDCBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Mono;
+
+public class MemoryTaskManagerWorker implements TaskManagerWorker {
+ private static final boolean INTERRUPT_IF_RUNNING = true;
+ private static final Logger LOGGER = LoggerFactory.getLogger(MemoryTaskManagerWorker.class);
+ private final ConcurrentHashMap<TaskId, CompletableFuture<Task.Result>> idToFuture = new ConcurrentHashMap<>();
+
+ @Override
+ public Mono<Task.Result> executeTask(TaskWithId taskWithId, Consumer<TaskExecutionDetailsUpdater> updateDetails) {
+ CompletableFuture<Task.Result> futureResult = CompletableFuture.supplyAsync(() -> runWithMdc(taskWithId, updateDetails));
+
+ idToFuture.put(taskWithId.getId(), futureResult);
+
+ Mono<Task.Result> result = Mono.<Task.Result>fromFuture(futureResult)
+ .doOnError(res -> failed(updateDetails,
+ (logger, details) -> logger.error("Task was partially performed. Check logs for more details")))
+ .doOnTerminate(() -> idToFuture.remove(taskWithId.getId()));
+
+ return result;
+ }
+
+ private Task.Result runWithMdc(TaskWithId taskWithId, Consumer<TaskExecutionDetailsUpdater> updateDetails) {
+ return MDCBuilder.withMdc(
+ MDCBuilder.create()
+ .addContext(Task.TASK_ID, taskWithId.getId())
+ .addContext(Task.TASK_TYPE, taskWithId.getTask().type())
+ .addContext(Task.TASK_DETAILS, taskWithId.getTask().details()),
+ () -> run(taskWithId, updateDetails));
+ }
+
+
+ private Task.Result run(TaskWithId taskWithId, Consumer<TaskExecutionDetailsUpdater> updateDetails) {
+ updateDetails.accept(TaskExecutionDetails::start);
+ try {
+ return taskWithId.getTask()
+ .run()
+ .onComplete(() -> success(updateDetails))
+ .onFailure(() -> failed(updateDetails, (logger, details) -> logger.error("Task was partially performed. Check logs for more details" + details.getTaskId())));
+ } catch (Exception e) {
+ failed(updateDetails,
+ (logger, executionDetails) -> logger.error("Error while running task", executionDetails, e));
+ return Task.Result.PARTIAL;
+ }
+ }
+
+ @Override
+ public void cancelTask(TaskId id, Consumer<TaskExecutionDetailsUpdater> updateDetails) {
+ Optional.ofNullable(idToFuture.remove(id))
+ .ifPresent(future -> {
+ updateDetails.accept(details -> {
+ if (details.getStatus().equals(TaskManager.Status.WAITING) || details.getStatus().equals(TaskManager.Status.IN_PROGRESS)) {
+ return details.cancel();
+ }
+ return details;
+ });
+ future.cancel(INTERRUPT_IF_RUNNING);
+ });
+ }
+
+ private void success(Consumer<TaskExecutionDetailsUpdater> updateDetails) {
+ updateDetails.accept(currentDetails -> {
+ if (!wasCancelled(currentDetails)) {
+ LOGGER.info("Task success");
+ return currentDetails.completed();
+ }
+ return currentDetails;
+ });
+ }
+
+ private void failed(Consumer<TaskExecutionDetailsUpdater> updateDetails, BiConsumer<Logger, TaskExecutionDetails> logOperation) {
+ updateDetails.accept(currentDetails -> {
+ if (!wasCancelled(currentDetails)) {
+ logOperation.accept(LOGGER, currentDetails);
+ return currentDetails.failed();
+ }
+ return currentDetails;
+ });
+ }
+
+ private boolean wasCancelled(TaskExecutionDetails details) {
+ return details.getStatus() == TaskManager.Status.CANCELLED;
+ }
+
+}
diff --git a/server/task/src/main/java/org/apache/james/task/TaskExecutionDetailsUpdater.java b/server/task/src/main/java/org/apache/james/task/TaskExecutionDetailsUpdater.java
new file mode 100644
index 0000000..fe008f6
--- /dev/null
+++ b/server/task/src/main/java/org/apache/james/task/TaskExecutionDetailsUpdater.java
@@ -0,0 +1,24 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+package org.apache.james.task;
+
+@FunctionalInterface
+public interface TaskExecutionDetailsUpdater {
+ TaskExecutionDetails update(TaskExecutionDetails currentDetails);
+}
diff --git a/server/task/src/main/java/org/apache/james/task/TaskManagerWorker.java b/server/task/src/main/java/org/apache/james/task/TaskManagerWorker.java
new file mode 100644
index 0000000..b30a850
--- /dev/null
+++ b/server/task/src/main/java/org/apache/james/task/TaskManagerWorker.java
@@ -0,0 +1,31 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+package org.apache.james.task;
+
+import java.util.function.Consumer;
+
+import reactor.core.publisher.Mono;
+
+public interface TaskManagerWorker {
+
+ Mono<Task.Result> executeTask(TaskWithId taskWithId, Consumer<TaskExecutionDetailsUpdater> updateDetails);
+
+ void cancelTask(TaskId id, Consumer<TaskExecutionDetailsUpdater> updateDetails);
+
+}
diff --git a/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerWorkerTest.java b/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerWorkerTest.java
new file mode 100644
index 0000000..e4ae643
--- /dev/null
+++ b/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerWorkerTest.java
@@ -0,0 +1,163 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+package org.apache.james.task;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+
+import org.junit.Test;
+
+import reactor.core.publisher.Mono;
+
+public class MemoryTaskManagerWorkerTest {
+
+ private final MemoryTaskManagerWorker worker = new MemoryTaskManagerWorker();
+
+ private final Task successfulTask = () -> Task.Result.COMPLETED;
+ private final Task failedTask = () -> Task.Result.PARTIAL;
+ private final Task throwingTask = () -> {
+ throw new RuntimeException("Throwing Task");
+ };
+
+ @Test
+ public void aSuccessfullTaskShouldCompleteSuccessfully() {
+ assertThatTaskSucceeded(successfulTask);
+ }
+
+ @Test
+ public void aFailedTaskShouldCompleteWithFailedStatus() {
+ assertThatTaskFailed(failedTask);
+ }
+
+ @Test
+ public void aThrowingTaskShouldCompleteWithFailedStatus() {
+ assertThatTaskFailed(throwingTask);
+ }
+
+ @Test
+ public void theWorkerShouldReportThatATaskIsInProgress() {
+ TaskId id = TaskId.generateTaskId();
+ CountDownLatch latch = new CountDownLatch(1);
+ CountDownLatch taskLaunched = new CountDownLatch(1);
+
+ Task inProgressTask = () -> {
+ taskLaunched.countDown();
+ await(latch);
+ return Task.Result.COMPLETED;
+ };
+
+ TaskWithId taskWithId = new TaskWithId(id, inProgressTask);
+ TaskExecutionDetails executionDetails = TaskExecutionDetails.from(inProgressTask, id);
+ ConcurrentHashMap<TaskId, TaskExecutionDetails> idToDetails = new ConcurrentHashMap<>();
+ idToDetails.put(id, executionDetails);
+
+ worker.executeTask(taskWithId, updateDetails(idToDetails, id)).cache();
+ await(taskLaunched);
+ assertThat(idToDetails.get(id).getStatus()).isEqualTo(TaskManager.Status.IN_PROGRESS);
+ latch.countDown();
+ }
+
+ @Test
+ public void theWorkerShouldNotRunACancelledTask() {
+ TaskId id = TaskId.generateTaskId();
+ AtomicInteger counter = new AtomicInteger(0);
+
+ Task task = () -> {
+ counter.incrementAndGet();
+ return Task.Result.COMPLETED;
+ };
+
+ TaskWithId taskWithId = new TaskWithId(id, task);
+ TaskExecutionDetails executionDetails = TaskExecutionDetails.from(task, id);
+ ConcurrentHashMap<TaskId, TaskExecutionDetails> idToDetails = new ConcurrentHashMap<>();
+
+ idToDetails.put(id, executionDetails.cancel());
+
+ worker.executeTask(taskWithId, updateDetails(idToDetails, id)).cache();
+
+ assertThat(counter.get()).isEqualTo(0);
+ assertThat(idToDetails.get(id).getStatus()).isEqualTo(TaskManager.Status.CANCELLED);
+ }
+
+ @Test
+ public void theWorkerShouldCancelAnInProgressTask() {
+ TaskId id = TaskId.generateTaskId();
+ AtomicInteger counter = new AtomicInteger(0);
+ CountDownLatch latch = new CountDownLatch(1);
+
+ Task inProgressTask = () -> {
+ await(latch);
+ counter.incrementAndGet();
+ return Task.Result.COMPLETED;
+ };
+
+ TaskWithId taskWithId = new TaskWithId(id, inProgressTask);
+ TaskExecutionDetails executionDetails = TaskExecutionDetails.from(inProgressTask, id);
+ ConcurrentHashMap<TaskId, TaskExecutionDetails> idToDetails = new ConcurrentHashMap<>();
+ idToDetails.put(id, executionDetails);
+
+ worker.executeTask(taskWithId, updateDetails(idToDetails, id)).cache();
+
+ worker.cancelTask(id, updateDetails(idToDetails, id));
+ assertThat(idToDetails.get(id).getStatus()).isEqualTo(TaskManager.Status.CANCELLED);
+
+ assertThat(counter.get()).isEqualTo(0);
+ assertThat(idToDetails.get(id).getStatus()).isEqualTo(TaskManager.Status.CANCELLED);
+ }
+
+ private void assertThatTaskSucceeded(Task task) {
+ assertTaskExecutionResultAndStatus(task, Task.Result.COMPLETED, TaskManager.Status.COMPLETED);
+ }
+
+ private void assertThatTaskFailed(Task task) {
+ assertTaskExecutionResultAndStatus(task, Task.Result.PARTIAL, TaskManager.Status.FAILED);
+ }
+
+ private void assertTaskExecutionResultAndStatus(Task task, Task.Result expectedResult, TaskManager.Status expectedStatus) {
+ TaskId id = TaskId.generateTaskId();
+ TaskWithId taskWithId = new TaskWithId(id, task);
+ TaskExecutionDetails executionDetails = TaskExecutionDetails.from(task, id);
+ ConcurrentHashMap<TaskId, TaskExecutionDetails> idToDetails = new ConcurrentHashMap<>();
+ idToDetails.put(id, executionDetails);
+
+ Mono<Task.Result> result = worker.executeTask(taskWithId, updateDetails(idToDetails, id)).cache();
+
+ assertThat(result.block()).isEqualTo(expectedResult);
+ assertThat(idToDetails.get(id).getStatus()).isEqualTo(expectedStatus);
+ }
+
+ private Consumer<TaskExecutionDetailsUpdater> updateDetails(ConcurrentHashMap<TaskId, TaskExecutionDetails> idToExecutionDetails, TaskId taskId) {
+ return updater -> {
+ TaskExecutionDetails newDetails = updater.update(idToExecutionDetails.get(taskId));
+ idToExecutionDetails.put(taskId, newDetails);
+ };
+ }
+
+ private void await(CountDownLatch countDownLatch) {
+ try {
+ countDownLatch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org