You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ro...@apache.org on 2019/05/28 07:39:28 UTC

[james-project] 03/06: JAMES-2777 make the MemoryTaskManager deleguate the execution of the tasks to its worker

This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 1f232d46e0c7ffc56b988afb0bf38e5bf1cbdcb8
Author: RĂ©mi Kowalski <rk...@linagora.com>
AuthorDate: Tue May 21 11:43:16 2019 +0200

    JAMES-2777 make the MemoryTaskManager deleguate the execution of the tasks to its worker
---
 server/task/pom.xml                                |   5 +
 .../org/apache/james/task/MemoryTaskManager.java   | 154 ++++++++++-----------
 .../apache/james/task/MemoryTaskManagerTest.java   |  75 ++++++----
 3 files changed, 128 insertions(+), 106 deletions(-)

diff --git a/server/task/pom.xml b/server/task/pom.xml
index 32a4647..3ade1a7 100644
--- a/server/task/pom.xml
+++ b/server/task/pom.xml
@@ -36,6 +36,11 @@
             <artifactId>james-server-util</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
             <scope>test</scope>
diff --git a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
index e26e173..1623858 100644
--- a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
+++ b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
@@ -19,102 +19,69 @@
 
 package org.apache.james.task;
 
+import java.time.Duration;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadFactory;
 import java.util.function.Consumer;
 
 import javax.annotation.PreDestroy;
 
-import org.apache.james.util.MDCBuilder;
-import org.apache.james.util.concurrent.NamedThreadFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import com.github.fge.lambdas.Throwing;
 import com.github.steveash.guavate.Guavate;
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.WorkQueueProcessor;
+import reactor.core.scheduler.Schedulers;
 
 public class MemoryTaskManager implements TaskManager {
-    private static final boolean INTERRUPT_IF_RUNNING = true;
-    private static final Logger LOGGER = LoggerFactory.getLogger(MemoryTaskManager.class);
-
+    public static final Duration AWAIT_POLLING_DURATION = Duration.ofMillis(500);
+    public static final Duration NOW = Duration.ZERO;
+    private final WorkQueueProcessor<TaskWithId> workQueue;
+    private final TaskManagerWorker worker;
     private final ConcurrentHashMap<TaskId, TaskExecutionDetails> idToExecutionDetails;
-    private final ConcurrentHashMap<TaskId, Future<?>> idToFuture;
-    private final ExecutorService executor;
+    private final ConcurrentHashMap<TaskId, Mono<Task.Result>> tasksResult;
+    private final ExecutorService taskExecutor = Executors.newSingleThreadExecutor();
+    private final ExecutorService requestTaskExecutor = Executors.newSingleThreadExecutor();
 
     public MemoryTaskManager() {
+        workQueue = WorkQueueProcessor.<TaskWithId>builder()
+            .executor(taskExecutor)
+            .requestTaskExecutor(requestTaskExecutor)
+            .build();
         idToExecutionDetails = new ConcurrentHashMap<>();
-        idToFuture = new ConcurrentHashMap<>();
-        ThreadFactory threadFactory = NamedThreadFactory.withClassName(getClass());
-        executor = Executors.newSingleThreadExecutor(threadFactory);
+        tasksResult = new ConcurrentHashMap<>();
+        worker = new MemoryTaskManagerWorker();
+        workQueue
+            .subscribeOn(Schedulers.single())
+            .filter(task -> !listIds(Status.CANCELLED).contains(task.getId()))
+            .subscribe(this::treatTask);
     }
 
-    @Override
-    public TaskId submit(Task task) {
-        return submit(task, id -> { });
+    private void treatTask(TaskWithId task) {
+        Mono<Task.Result> result = worker.executeTask(task, updateDetails(task.getId()));
+        tasksResult.put(task.getId(), result);
+        try {
+            result.block();
+        } catch (CancellationException e) {
+            // Do not throw CancellationException
+        }
     }
 
-    @VisibleForTesting
-    TaskId submit(Task task, Consumer<TaskId> callback) {
+    public TaskId submit(Task task) {
         TaskId taskId = TaskId.generateTaskId();
         TaskExecutionDetails executionDetails = TaskExecutionDetails.from(task, taskId);
-
         idToExecutionDetails.put(taskId, executionDetails);
-        idToFuture.put(taskId,
-            executor.submit(() -> runWithMdc(executionDetails, task, callback)));
+        workQueue.onNext(new TaskWithId(taskId, task));
         return taskId;
     }
 
-    private void runWithMdc(TaskExecutionDetails executionDetails, Task task, Consumer<TaskId> callback) {
-        MDCBuilder.withMdc(
-            MDCBuilder.create()
-                .addContext(Task.TASK_ID, executionDetails.getTaskId())
-                .addContext(Task.TASK_TYPE, executionDetails.getType())
-                .addContext(Task.TASK_DETAILS, executionDetails.getAdditionalInformation()),
-            () -> run(executionDetails, task, callback));
-    }
-
-    private void run(TaskExecutionDetails executionDetails, Task task, Consumer<TaskId> callback) {
-        TaskExecutionDetails started = executionDetails.start();
-        idToExecutionDetails.put(started.getTaskId(), started);
-        try {
-            task.run()
-                .onComplete(() -> success(started))
-                .onFailure(() -> failed(started,
-                    logger -> logger.info("Task was partially performed. Check logs for more details")));
-        } catch (Exception e) {
-            failed(started,
-                logger -> logger.error("Error while running task", executionDetails, e));
-        } finally {
-            idToFuture.remove(executionDetails.getTaskId());
-            callback.accept(executionDetails.getTaskId());
-        }
-    }
-
-    private void success(TaskExecutionDetails started) {
-        if (!wasCancelled(started.getTaskId())) {
-            idToExecutionDetails.put(started.getTaskId(), started.completed());
-            LOGGER.info("Task success");
-        }
-    }
-
-    private void failed(TaskExecutionDetails started, Consumer<Logger> logOperation) {
-        if (!wasCancelled(started.getTaskId())) {
-            idToExecutionDetails.put(started.getTaskId(), started.failed());
-            logOperation.accept(LOGGER);
-        }
-    }
-
-    private boolean wasCancelled(TaskId taskId) {
-        return idToExecutionDetails.get(taskId).getStatus() == Status.CANCELLED;
-    }
-
     @Override
     public TaskExecutionDetails getExecutionDetails(TaskId id) {
         return Optional.ofNullable(idToExecutionDetails.get(id))
@@ -128,32 +95,55 @@ public class MemoryTaskManager implements TaskManager {
 
     @Override
     public List<TaskExecutionDetails> list(Status status) {
-        return idToExecutionDetails.values()
+        return ImmutableList.copyOf(tasksFiltered(status).values());
+    }
+
+    public Set<TaskId> listIds(Status status) {
+        return tasksFiltered(status).keySet();
+    }
+
+    public Map<TaskId, TaskExecutionDetails> tasksFiltered(Status status) {
+        return idToExecutionDetails.entrySet()
             .stream()
-            .filter(details -> details.getStatus().equals(status))
-            .collect(Guavate.toImmutableList());
+            .filter(details -> details.getValue().getStatus().equals(status))
+            .collect(Guavate.entriesToImmutableMap());
     }
 
     @Override
     public void cancel(TaskId id) {
-        Optional.ofNullable(idToFuture.get(id))
-            .ifPresent(future -> {
-                TaskExecutionDetails executionDetails = idToExecutionDetails.get(id);
-                idToExecutionDetails.put(id, executionDetails.cancel());
-                future.cancel(INTERRUPT_IF_RUNNING);
-                idToFuture.remove(id);
-            });
+        TaskExecutionDetails details = getExecutionDetails(id);
+        if (details.getStatus().equals(Status.IN_PROGRESS) || details.getStatus().equals(Status.WAITING)) {
+            worker.cancelTask(id, updateDetails(id));
+        }
     }
 
     @Override
     public TaskExecutionDetails await(TaskId id) {
-        Optional.ofNullable(idToFuture.get(id))
-            .ifPresent(Throwing.consumer(Future::get));
-        return getExecutionDetails(id);
+        if (Optional.ofNullable(getExecutionDetails(id)).isPresent()) {
+            return Flux.interval(NOW, AWAIT_POLLING_DURATION, Schedulers.elastic())
+                .filter(ignore -> tasksResult.get(id) != null)
+                .map(ignore -> {
+                    Optional.ofNullable(tasksResult.get(id))
+                        .ifPresent(Throwing.<Mono<Task.Result>>consumer(Mono::block).orDoNothing());
+                    return getExecutionDetails(id);
+                })
+                .take(1)
+                .blockFirst();
+        } else {
+            return null;
+        }
     }
 
     @PreDestroy
     public void stop() {
-        executor.shutdownNow();
+        taskExecutor.shutdown();
+        requestTaskExecutor.shutdown();
+    }
+
+    private Consumer<TaskExecutionDetailsUpdater> updateDetails(TaskId taskId) {
+        return updater -> {
+            TaskExecutionDetails newDetails = updater.update(idToExecutionDetails.get(taskId));
+            idToExecutionDetails.replace(taskId, newDetails);
+        };
     }
 }
diff --git a/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java b/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java
index cec818b..c80f295 100644
--- a/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java
+++ b/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java
@@ -22,6 +22,9 @@ package org.apache.james.task;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.awaitility.Duration.ONE_HUNDRED_MILLISECONDS;
+import static org.awaitility.Duration.ONE_MINUTE;
+import static org.awaitility.Duration.ONE_SECOND;
 
 import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -29,6 +32,9 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.assertj.core.api.JUnitSoftAssertions;
+import org.awaitility.Awaitility;
+import org.awaitility.Duration;
+import org.awaitility.core.ConditionFactory;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -39,6 +45,7 @@ import com.github.fge.lambdas.consumers.ConsumerChainer;
 
 public class MemoryTaskManagerTest {
 
+    public static final int TIME_TO_ENQUEUE = 200;
     private MemoryTaskManager memoryTaskManager;
 
     @Rule
@@ -54,6 +61,16 @@ public class MemoryTaskManagerTest {
         memoryTaskManager.stop();
     }
 
+    Duration slowPacedPollInterval = ONE_HUNDRED_MILLISECONDS;
+    ConditionFactory calmlyAwait = Awaitility.with()
+        .pollInterval(slowPacedPollInterval)
+        .and()
+        .with()
+        .pollDelay(slowPacedPollInterval)
+        .await();
+    ConditionFactory awaitAtMostOneMinute = calmlyAwait.atMost(ONE_MINUTE);
+    ConditionFactory awaitAtMostOneSecond = calmlyAwait.atMost(ONE_SECOND);
+
     @Test
     public void getStatusShouldReturnUnknownWhenUnknownId() {
         TaskId unknownId = TaskId.generateTaskId();
@@ -87,6 +104,7 @@ public class MemoryTaskManagerTest {
             return Task.Result.COMPLETED;
         });
 
+        sleep(TIME_TO_ENQUEUE);
         memoryTaskManager.cancel(id);
         task1Latch.countDown();
 
@@ -94,26 +112,21 @@ public class MemoryTaskManagerTest {
     }
 
     @Test
-    public void getStatusShouldReturnCancelledWhenCancelled() throws Exception {
-        CountDownLatch task1Latch = new CountDownLatch(1);
-        CountDownLatch ensureStartedLatch = new CountDownLatch(1);
-        CountDownLatch ensureFinishedLatch = new CountDownLatch(1);
+    public void getStatusShouldBeCancelledWhenCancelled() throws Exception {
 
         TaskId id = memoryTaskManager.submit(() -> {
-            ensureStartedLatch.countDown();
-            await(task1Latch);
+            sleep(500);
             return Task.Result.COMPLETED;
-        },
-            any -> ensureFinishedLatch.countDown());
+        });
 
-        ensureStartedLatch.await();
+        sleep(TIME_TO_ENQUEUE);
         memoryTaskManager.cancel(id);
-        ensureFinishedLatch.await();
 
         assertThat(memoryTaskManager.getExecutionDetails(id).getStatus())
             .isEqualTo(TaskManager.Status.CANCELLED);
     }
 
+
     @Test
     public void cancelShouldBeIdempotent() {
         CountDownLatch task1Latch = new CountDownLatch(1);
@@ -122,7 +135,7 @@ public class MemoryTaskManagerTest {
             await(task1Latch);
             return Task.Result.COMPLETED;
         });
-
+        sleep(TIME_TO_ENQUEUE);
         memoryTaskManager.cancel(id);
         assertThatCode(() -> memoryTaskManager.cancel(id))
             .doesNotThrowAnyException();
@@ -146,13 +159,11 @@ public class MemoryTaskManagerTest {
 
     @Test
     public void getStatusShouldReturnCompletedWhenRunSuccessfully() throws Exception {
-        CountDownLatch latch = new CountDownLatch(1);
 
         TaskId taskId = memoryTaskManager.submit(
-            () -> Task.Result.COMPLETED,
-            countDownCallback(latch));
+            () -> Task.Result.COMPLETED);
 
-        latch.await();
+        sleep(500);
 
         assertThat(memoryTaskManager.getExecutionDetails(taskId).getStatus())
             .isEqualTo(TaskManager.Status.COMPLETED);
@@ -160,13 +171,11 @@ public class MemoryTaskManagerTest {
 
     @Test
     public void getStatusShouldReturnFailedWhenRunPartially() throws Exception {
-        CountDownLatch latch = new CountDownLatch(1);
 
         TaskId taskId = memoryTaskManager.submit(
-            () -> Task.Result.PARTIAL,
-            countDownCallback(latch));
+            () -> Task.Result.PARTIAL);
 
-        latch.await();
+        sleep(TIME_TO_ENQUEUE);
 
         assertThat(memoryTaskManager.getExecutionDetails(taskId).getStatus())
             .isEqualTo(TaskManager.Status.FAILED);
@@ -213,6 +222,7 @@ public class MemoryTaskManagerTest {
             .isEqualTo(TaskManager.Status.COMPLETED);
         softly.assertThat(entryWithId(list, inProgressId))
             .isEqualTo(TaskManager.Status.IN_PROGRESS);
+        latch3.countDown();
     }
 
     private TaskManager.Status entryWithId(List<TaskExecutionDetails> list, TaskId taskId) {
@@ -369,24 +379,42 @@ public class MemoryTaskManagerTest {
     }
 
     @Test
+    public void awaitShouldAwaitWaitingTask() {
+        CountDownLatch latch = new CountDownLatch(1);
+        memoryTaskManager.submit(
+            () -> {
+                await(latch);
+                return Task.Result.COMPLETED;
+            });
+        latch.countDown();
+        TaskId task2 = memoryTaskManager.submit(
+            () -> Task.Result.COMPLETED);
+
+        assertThat(memoryTaskManager.await(task2).getStatus()).isEqualTo(TaskManager.Status.COMPLETED);
+    }
+
+    @Test
     public void submittedTaskShouldExecuteSequentially() {
         ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
 
         TaskId id1 = memoryTaskManager.submit(() -> {
             queue.add(1);
-            sleep(500);
+            sleep(200);
             queue.add(2);
             return Task.Result.COMPLETED;
         });
         TaskId id2 = memoryTaskManager.submit(() -> {
             queue.add(3);
-            sleep(500);
+            sleep(200);
             queue.add(4);
             return Task.Result.COMPLETED;
         });
+        sleep(TIME_TO_ENQUEUE);
         memoryTaskManager.await(id1);
         memoryTaskManager.await(id2);
 
+        awaitAtMostOneSecond.until(() -> queue.contains(4));
+
         assertThat(queue)
             .containsExactly(1, 2, 3, 4);
     }
@@ -396,9 +424,8 @@ public class MemoryTaskManagerTest {
         TaskId taskId = memoryTaskManager.submit(() -> {
             throw new RuntimeException();
         });
-
+        sleep(TIME_TO_ENQUEUE);
         TaskExecutionDetails executionDetails = memoryTaskManager.await(taskId);
-
         assertThat(executionDetails.getStatus())
             .isEqualTo(TaskManager.Status.FAILED);
     }
@@ -408,7 +435,7 @@ public class MemoryTaskManagerTest {
         TaskId taskId = memoryTaskManager.submit(() -> {
             throw new RuntimeException();
         });
-
+        sleep(TIME_TO_ENQUEUE);
         memoryTaskManager.await(taskId);
 
         assertThat(memoryTaskManager.getExecutionDetails(taskId).getStatus())


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org