You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ro...@apache.org on 2019/05/28 07:39:27 UTC

[james-project] 02/06: JAMES-2777 create a MemoryTaskManagerWorker responsible to execute the tasks

This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit c420360973e5cb25134a268fa228465a593cd93b
Author: RĂ©mi Kowalski <rk...@linagora.com>
AuthorDate: Tue May 21 11:42:06 2019 +0200

    JAMES-2777 create a MemoryTaskManagerWorker responsible to execute the tasks
---
 .../apache/james/task/MemoryTaskManagerWorker.java | 114 ++++++++++++++
 .../james/task/TaskExecutionDetailsUpdater.java    |  24 +++
 .../org/apache/james/task/TaskManagerWorker.java   |  31 ++++
 .../james/task/MemoryTaskManagerWorkerTest.java    | 163 +++++++++++++++++++++
 4 files changed, 332 insertions(+)

diff --git a/server/task/src/main/java/org/apache/james/task/MemoryTaskManagerWorker.java b/server/task/src/main/java/org/apache/james/task/MemoryTaskManagerWorker.java
new file mode 100644
index 0000000..6feb85a
--- /dev/null
+++ b/server/task/src/main/java/org/apache/james/task/MemoryTaskManagerWorker.java
@@ -0,0 +1,114 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.task;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
+import org.apache.james.util.MDCBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Mono;
+
+public class MemoryTaskManagerWorker implements TaskManagerWorker {
+    private static final boolean INTERRUPT_IF_RUNNING = true;
+    private static final Logger LOGGER = LoggerFactory.getLogger(MemoryTaskManagerWorker.class);
+    private final ConcurrentHashMap<TaskId, CompletableFuture<Task.Result>> idToFuture = new ConcurrentHashMap<>();
+
+    @Override
+    public Mono<Task.Result> executeTask(TaskWithId taskWithId, Consumer<TaskExecutionDetailsUpdater> updateDetails) {
+        CompletableFuture<Task.Result> futureResult = CompletableFuture.supplyAsync(() -> runWithMdc(taskWithId, updateDetails));
+
+        idToFuture.put(taskWithId.getId(), futureResult);
+
+        Mono<Task.Result> result = Mono.<Task.Result>fromFuture(futureResult)
+            .doOnError(res -> failed(updateDetails,
+                (logger, details) -> logger.error("Task was partially performed. Check logs for more details")))
+            .doOnTerminate(() -> idToFuture.remove(taskWithId.getId()));
+
+        return result;
+    }
+
+    private Task.Result runWithMdc(TaskWithId taskWithId, Consumer<TaskExecutionDetailsUpdater> updateDetails) {
+        return MDCBuilder.withMdc(
+            MDCBuilder.create()
+                .addContext(Task.TASK_ID, taskWithId.getId())
+                .addContext(Task.TASK_TYPE, taskWithId.getTask().type())
+                .addContext(Task.TASK_DETAILS, taskWithId.getTask().details()),
+            () -> run(taskWithId, updateDetails));
+    }
+
+
+    private Task.Result run(TaskWithId taskWithId, Consumer<TaskExecutionDetailsUpdater> updateDetails) {
+        updateDetails.accept(TaskExecutionDetails::start);
+        try {
+            return taskWithId.getTask()
+                .run()
+                .onComplete(() -> success(updateDetails))
+                .onFailure(() -> failed(updateDetails, (logger, details) -> logger.error("Task was partially performed. Check logs for more details" + details.getTaskId())));
+        } catch (Exception e) {
+            failed(updateDetails,
+                (logger, executionDetails) -> logger.error("Error while running task", executionDetails, e));
+            return Task.Result.PARTIAL;
+        }
+    }
+
+    @Override
+    public void cancelTask(TaskId id, Consumer<TaskExecutionDetailsUpdater> updateDetails) {
+        Optional.ofNullable(idToFuture.remove(id))
+            .ifPresent(future -> {
+                updateDetails.accept(details -> {
+                    if (details.getStatus().equals(TaskManager.Status.WAITING) || details.getStatus().equals(TaskManager.Status.IN_PROGRESS)) {
+                        return details.cancel();
+                    }
+                    return details;
+                });
+                future.cancel(INTERRUPT_IF_RUNNING);
+            });
+    }
+
+    private void success(Consumer<TaskExecutionDetailsUpdater> updateDetails) {
+        updateDetails.accept(currentDetails -> {
+            if (!wasCancelled(currentDetails)) {
+                LOGGER.info("Task success");
+                return currentDetails.completed();
+            }
+            return currentDetails;
+        });
+    }
+
+    private void failed(Consumer<TaskExecutionDetailsUpdater> updateDetails, BiConsumer<Logger, TaskExecutionDetails> logOperation) {
+        updateDetails.accept(currentDetails -> {
+            if (!wasCancelled(currentDetails)) {
+                logOperation.accept(LOGGER, currentDetails);
+                return currentDetails.failed();
+            }
+            return currentDetails;
+        });
+    }
+
+    private boolean wasCancelled(TaskExecutionDetails details) {
+        return details.getStatus() == TaskManager.Status.CANCELLED;
+    }
+
+}
diff --git a/server/task/src/main/java/org/apache/james/task/TaskExecutionDetailsUpdater.java b/server/task/src/main/java/org/apache/james/task/TaskExecutionDetailsUpdater.java
new file mode 100644
index 0000000..fe008f6
--- /dev/null
+++ b/server/task/src/main/java/org/apache/james/task/TaskExecutionDetailsUpdater.java
@@ -0,0 +1,24 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.task;
+
+@FunctionalInterface
+public interface TaskExecutionDetailsUpdater {
+    TaskExecutionDetails update(TaskExecutionDetails currentDetails);
+}
diff --git a/server/task/src/main/java/org/apache/james/task/TaskManagerWorker.java b/server/task/src/main/java/org/apache/james/task/TaskManagerWorker.java
new file mode 100644
index 0000000..b30a850
--- /dev/null
+++ b/server/task/src/main/java/org/apache/james/task/TaskManagerWorker.java
@@ -0,0 +1,31 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.task;
+
+import java.util.function.Consumer;
+
+import reactor.core.publisher.Mono;
+
+public interface TaskManagerWorker {
+
+    Mono<Task.Result> executeTask(TaskWithId taskWithId, Consumer<TaskExecutionDetailsUpdater> updateDetails);
+
+    void cancelTask(TaskId id, Consumer<TaskExecutionDetailsUpdater> updateDetails);
+
+}
diff --git a/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerWorkerTest.java b/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerWorkerTest.java
new file mode 100644
index 0000000..e4ae643
--- /dev/null
+++ b/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerWorkerTest.java
@@ -0,0 +1,163 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.task;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+
+import org.junit.Test;
+
+import reactor.core.publisher.Mono;
+
+public class MemoryTaskManagerWorkerTest {
+
+    private final MemoryTaskManagerWorker worker = new MemoryTaskManagerWorker();
+
+    private final Task successfulTask = () -> Task.Result.COMPLETED;
+    private final Task failedTask = () -> Task.Result.PARTIAL;
+    private final Task throwingTask = () -> {
+        throw new RuntimeException("Throwing Task");
+    };
+
+    @Test
+    public void aSuccessfullTaskShouldCompleteSuccessfully() {
+        assertThatTaskSucceeded(successfulTask);
+    }
+
+    @Test
+    public void aFailedTaskShouldCompleteWithFailedStatus() {
+        assertThatTaskFailed(failedTask);
+    }
+
+    @Test
+    public void aThrowingTaskShouldCompleteWithFailedStatus() {
+        assertThatTaskFailed(throwingTask);
+    }
+
+    @Test
+    public void theWorkerShouldReportThatATaskIsInProgress() {
+        TaskId id = TaskId.generateTaskId();
+        CountDownLatch latch = new CountDownLatch(1);
+        CountDownLatch taskLaunched = new CountDownLatch(1);
+
+        Task inProgressTask = () -> {
+            taskLaunched.countDown();
+            await(latch);
+            return Task.Result.COMPLETED;
+        };
+
+        TaskWithId taskWithId = new TaskWithId(id, inProgressTask);
+        TaskExecutionDetails executionDetails = TaskExecutionDetails.from(inProgressTask, id);
+        ConcurrentHashMap<TaskId, TaskExecutionDetails> idToDetails = new ConcurrentHashMap<>();
+        idToDetails.put(id, executionDetails);
+
+        worker.executeTask(taskWithId, updateDetails(idToDetails, id)).cache();
+        await(taskLaunched);
+        assertThat(idToDetails.get(id).getStatus()).isEqualTo(TaskManager.Status.IN_PROGRESS);
+        latch.countDown();
+    }
+
+    @Test
+    public void theWorkerShouldNotRunACancelledTask() {
+        TaskId id = TaskId.generateTaskId();
+        AtomicInteger counter = new AtomicInteger(0);
+
+        Task task = () -> {
+            counter.incrementAndGet();
+            return Task.Result.COMPLETED;
+        };
+
+        TaskWithId taskWithId = new TaskWithId(id, task);
+        TaskExecutionDetails executionDetails = TaskExecutionDetails.from(task, id);
+        ConcurrentHashMap<TaskId, TaskExecutionDetails> idToDetails = new ConcurrentHashMap<>();
+
+        idToDetails.put(id, executionDetails.cancel());
+
+        worker.executeTask(taskWithId, updateDetails(idToDetails, id)).cache();
+
+        assertThat(counter.get()).isEqualTo(0);
+        assertThat(idToDetails.get(id).getStatus()).isEqualTo(TaskManager.Status.CANCELLED);
+    }
+
+    @Test
+    public void theWorkerShouldCancelAnInProgressTask() {
+        TaskId id = TaskId.generateTaskId();
+        AtomicInteger counter = new AtomicInteger(0);
+        CountDownLatch latch = new CountDownLatch(1);
+
+        Task inProgressTask = () -> {
+            await(latch);
+            counter.incrementAndGet();
+            return Task.Result.COMPLETED;
+        };
+
+        TaskWithId taskWithId = new TaskWithId(id, inProgressTask);
+        TaskExecutionDetails executionDetails = TaskExecutionDetails.from(inProgressTask, id);
+        ConcurrentHashMap<TaskId, TaskExecutionDetails> idToDetails = new ConcurrentHashMap<>();
+        idToDetails.put(id, executionDetails);
+
+        worker.executeTask(taskWithId, updateDetails(idToDetails, id)).cache();
+
+        worker.cancelTask(id, updateDetails(idToDetails, id));
+        assertThat(idToDetails.get(id).getStatus()).isEqualTo(TaskManager.Status.CANCELLED);
+
+        assertThat(counter.get()).isEqualTo(0);
+        assertThat(idToDetails.get(id).getStatus()).isEqualTo(TaskManager.Status.CANCELLED);
+    }
+
+    private void assertThatTaskSucceeded(Task task) {
+        assertTaskExecutionResultAndStatus(task, Task.Result.COMPLETED, TaskManager.Status.COMPLETED);
+    }
+
+    private void assertThatTaskFailed(Task task) {
+        assertTaskExecutionResultAndStatus(task, Task.Result.PARTIAL, TaskManager.Status.FAILED);
+    }
+
+    private void assertTaskExecutionResultAndStatus(Task task, Task.Result expectedResult, TaskManager.Status expectedStatus) {
+        TaskId id = TaskId.generateTaskId();
+        TaskWithId taskWithId = new TaskWithId(id, task);
+        TaskExecutionDetails executionDetails = TaskExecutionDetails.from(task, id);
+        ConcurrentHashMap<TaskId, TaskExecutionDetails> idToDetails = new ConcurrentHashMap<>();
+        idToDetails.put(id, executionDetails);
+
+        Mono<Task.Result> result = worker.executeTask(taskWithId, updateDetails(idToDetails, id)).cache();
+
+        assertThat(result.block()).isEqualTo(expectedResult);
+        assertThat(idToDetails.get(id).getStatus()).isEqualTo(expectedStatus);
+    }
+
+    private Consumer<TaskExecutionDetailsUpdater> updateDetails(ConcurrentHashMap<TaskId, TaskExecutionDetails> idToExecutionDetails, TaskId taskId) {
+        return updater -> {
+            TaskExecutionDetails newDetails = updater.update(idToExecutionDetails.get(taskId));
+            idToExecutionDetails.put(taskId, newDetails);
+        };
+    }
+
+    private void await(CountDownLatch countDownLatch) {
+        try {
+            countDownLatch.await();
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org