You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ro...@apache.org on 2019/05/28 07:39:28 UTC
[james-project] 03/06: JAMES-2777 make the MemoryTaskManager
deleguate the execution of the tasks to its worker
This is an automated email from the ASF dual-hosted git repository.
rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 1f232d46e0c7ffc56b988afb0bf38e5bf1cbdcb8
Author: RĂ©mi Kowalski <rk...@linagora.com>
AuthorDate: Tue May 21 11:43:16 2019 +0200
JAMES-2777 make the MemoryTaskManager deleguate the execution of the tasks to its worker
---
server/task/pom.xml | 5 +
.../org/apache/james/task/MemoryTaskManager.java | 154 ++++++++++-----------
.../apache/james/task/MemoryTaskManagerTest.java | 75 ++++++----
3 files changed, 128 insertions(+), 106 deletions(-)
diff --git a/server/task/pom.xml b/server/task/pom.xml
index 32a4647..3ade1a7 100644
--- a/server/task/pom.xml
+++ b/server/task/pom.xml
@@ -36,6 +36,11 @@
<artifactId>james-server-util</artifactId>
</dependency>
<dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
diff --git a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
index e26e173..1623858 100644
--- a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
+++ b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
@@ -19,102 +19,69 @@
package org.apache.james.task;
+import java.time.Duration;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadFactory;
import java.util.function.Consumer;
import javax.annotation.PreDestroy;
-import org.apache.james.util.MDCBuilder;
-import org.apache.james.util.concurrent.NamedThreadFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import com.github.fge.lambdas.Throwing;
import com.github.steveash.guavate.Guavate;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.WorkQueueProcessor;
+import reactor.core.scheduler.Schedulers;
public class MemoryTaskManager implements TaskManager {
- private static final boolean INTERRUPT_IF_RUNNING = true;
- private static final Logger LOGGER = LoggerFactory.getLogger(MemoryTaskManager.class);
-
+ public static final Duration AWAIT_POLLING_DURATION = Duration.ofMillis(500);
+ public static final Duration NOW = Duration.ZERO;
+ private final WorkQueueProcessor<TaskWithId> workQueue;
+ private final TaskManagerWorker worker;
private final ConcurrentHashMap<TaskId, TaskExecutionDetails> idToExecutionDetails;
- private final ConcurrentHashMap<TaskId, Future<?>> idToFuture;
- private final ExecutorService executor;
+ private final ConcurrentHashMap<TaskId, Mono<Task.Result>> tasksResult;
+ private final ExecutorService taskExecutor = Executors.newSingleThreadExecutor();
+ private final ExecutorService requestTaskExecutor = Executors.newSingleThreadExecutor();
public MemoryTaskManager() {
+ workQueue = WorkQueueProcessor.<TaskWithId>builder()
+ .executor(taskExecutor)
+ .requestTaskExecutor(requestTaskExecutor)
+ .build();
idToExecutionDetails = new ConcurrentHashMap<>();
- idToFuture = new ConcurrentHashMap<>();
- ThreadFactory threadFactory = NamedThreadFactory.withClassName(getClass());
- executor = Executors.newSingleThreadExecutor(threadFactory);
+ tasksResult = new ConcurrentHashMap<>();
+ worker = new MemoryTaskManagerWorker();
+ workQueue
+ .subscribeOn(Schedulers.single())
+ .filter(task -> !listIds(Status.CANCELLED).contains(task.getId()))
+ .subscribe(this::treatTask);
}
- @Override
- public TaskId submit(Task task) {
- return submit(task, id -> { });
+ private void treatTask(TaskWithId task) {
+ Mono<Task.Result> result = worker.executeTask(task, updateDetails(task.getId()));
+ tasksResult.put(task.getId(), result);
+ try {
+ result.block();
+ } catch (CancellationException e) {
+ // Do not throw CancellationException
+ }
}
- @VisibleForTesting
- TaskId submit(Task task, Consumer<TaskId> callback) {
+ public TaskId submit(Task task) {
TaskId taskId = TaskId.generateTaskId();
TaskExecutionDetails executionDetails = TaskExecutionDetails.from(task, taskId);
-
idToExecutionDetails.put(taskId, executionDetails);
- idToFuture.put(taskId,
- executor.submit(() -> runWithMdc(executionDetails, task, callback)));
+ workQueue.onNext(new TaskWithId(taskId, task));
return taskId;
}
- private void runWithMdc(TaskExecutionDetails executionDetails, Task task, Consumer<TaskId> callback) {
- MDCBuilder.withMdc(
- MDCBuilder.create()
- .addContext(Task.TASK_ID, executionDetails.getTaskId())
- .addContext(Task.TASK_TYPE, executionDetails.getType())
- .addContext(Task.TASK_DETAILS, executionDetails.getAdditionalInformation()),
- () -> run(executionDetails, task, callback));
- }
-
- private void run(TaskExecutionDetails executionDetails, Task task, Consumer<TaskId> callback) {
- TaskExecutionDetails started = executionDetails.start();
- idToExecutionDetails.put(started.getTaskId(), started);
- try {
- task.run()
- .onComplete(() -> success(started))
- .onFailure(() -> failed(started,
- logger -> logger.info("Task was partially performed. Check logs for more details")));
- } catch (Exception e) {
- failed(started,
- logger -> logger.error("Error while running task", executionDetails, e));
- } finally {
- idToFuture.remove(executionDetails.getTaskId());
- callback.accept(executionDetails.getTaskId());
- }
- }
-
- private void success(TaskExecutionDetails started) {
- if (!wasCancelled(started.getTaskId())) {
- idToExecutionDetails.put(started.getTaskId(), started.completed());
- LOGGER.info("Task success");
- }
- }
-
- private void failed(TaskExecutionDetails started, Consumer<Logger> logOperation) {
- if (!wasCancelled(started.getTaskId())) {
- idToExecutionDetails.put(started.getTaskId(), started.failed());
- logOperation.accept(LOGGER);
- }
- }
-
- private boolean wasCancelled(TaskId taskId) {
- return idToExecutionDetails.get(taskId).getStatus() == Status.CANCELLED;
- }
-
@Override
public TaskExecutionDetails getExecutionDetails(TaskId id) {
return Optional.ofNullable(idToExecutionDetails.get(id))
@@ -128,32 +95,55 @@ public class MemoryTaskManager implements TaskManager {
@Override
public List<TaskExecutionDetails> list(Status status) {
- return idToExecutionDetails.values()
+ return ImmutableList.copyOf(tasksFiltered(status).values());
+ }
+
+ public Set<TaskId> listIds(Status status) {
+ return tasksFiltered(status).keySet();
+ }
+
+ public Map<TaskId, TaskExecutionDetails> tasksFiltered(Status status) {
+ return idToExecutionDetails.entrySet()
.stream()
- .filter(details -> details.getStatus().equals(status))
- .collect(Guavate.toImmutableList());
+ .filter(details -> details.getValue().getStatus().equals(status))
+ .collect(Guavate.entriesToImmutableMap());
}
@Override
public void cancel(TaskId id) {
- Optional.ofNullable(idToFuture.get(id))
- .ifPresent(future -> {
- TaskExecutionDetails executionDetails = idToExecutionDetails.get(id);
- idToExecutionDetails.put(id, executionDetails.cancel());
- future.cancel(INTERRUPT_IF_RUNNING);
- idToFuture.remove(id);
- });
+ TaskExecutionDetails details = getExecutionDetails(id);
+ if (details.getStatus().equals(Status.IN_PROGRESS) || details.getStatus().equals(Status.WAITING)) {
+ worker.cancelTask(id, updateDetails(id));
+ }
}
@Override
public TaskExecutionDetails await(TaskId id) {
- Optional.ofNullable(idToFuture.get(id))
- .ifPresent(Throwing.consumer(Future::get));
- return getExecutionDetails(id);
+ if (Optional.ofNullable(getExecutionDetails(id)).isPresent()) {
+ return Flux.interval(NOW, AWAIT_POLLING_DURATION, Schedulers.elastic())
+ .filter(ignore -> tasksResult.get(id) != null)
+ .map(ignore -> {
+ Optional.ofNullable(tasksResult.get(id))
+ .ifPresent(Throwing.<Mono<Task.Result>>consumer(Mono::block).orDoNothing());
+ return getExecutionDetails(id);
+ })
+ .take(1)
+ .blockFirst();
+ } else {
+ return null;
+ }
}
@PreDestroy
public void stop() {
- executor.shutdownNow();
+ taskExecutor.shutdown();
+ requestTaskExecutor.shutdown();
+ }
+
+ private Consumer<TaskExecutionDetailsUpdater> updateDetails(TaskId taskId) {
+ return updater -> {
+ TaskExecutionDetails newDetails = updater.update(idToExecutionDetails.get(taskId));
+ idToExecutionDetails.replace(taskId, newDetails);
+ };
}
}
diff --git a/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java b/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java
index cec818b..c80f295 100644
--- a/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java
+++ b/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java
@@ -22,6 +22,9 @@ package org.apache.james.task;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.awaitility.Duration.ONE_HUNDRED_MILLISECONDS;
+import static org.awaitility.Duration.ONE_MINUTE;
+import static org.awaitility.Duration.ONE_SECOND;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -29,6 +32,9 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.assertj.core.api.JUnitSoftAssertions;
+import org.awaitility.Awaitility;
+import org.awaitility.Duration;
+import org.awaitility.core.ConditionFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
@@ -39,6 +45,7 @@ import com.github.fge.lambdas.consumers.ConsumerChainer;
public class MemoryTaskManagerTest {
+ public static final int TIME_TO_ENQUEUE = 200;
private MemoryTaskManager memoryTaskManager;
@Rule
@@ -54,6 +61,16 @@ public class MemoryTaskManagerTest {
memoryTaskManager.stop();
}
+ Duration slowPacedPollInterval = ONE_HUNDRED_MILLISECONDS;
+ ConditionFactory calmlyAwait = Awaitility.with()
+ .pollInterval(slowPacedPollInterval)
+ .and()
+ .with()
+ .pollDelay(slowPacedPollInterval)
+ .await();
+ ConditionFactory awaitAtMostOneMinute = calmlyAwait.atMost(ONE_MINUTE);
+ ConditionFactory awaitAtMostOneSecond = calmlyAwait.atMost(ONE_SECOND);
+
@Test
public void getStatusShouldReturnUnknownWhenUnknownId() {
TaskId unknownId = TaskId.generateTaskId();
@@ -87,6 +104,7 @@ public class MemoryTaskManagerTest {
return Task.Result.COMPLETED;
});
+ sleep(TIME_TO_ENQUEUE);
memoryTaskManager.cancel(id);
task1Latch.countDown();
@@ -94,26 +112,21 @@ public class MemoryTaskManagerTest {
}
@Test
- public void getStatusShouldReturnCancelledWhenCancelled() throws Exception {
- CountDownLatch task1Latch = new CountDownLatch(1);
- CountDownLatch ensureStartedLatch = new CountDownLatch(1);
- CountDownLatch ensureFinishedLatch = new CountDownLatch(1);
+ public void getStatusShouldBeCancelledWhenCancelled() throws Exception {
TaskId id = memoryTaskManager.submit(() -> {
- ensureStartedLatch.countDown();
- await(task1Latch);
+ sleep(500);
return Task.Result.COMPLETED;
- },
- any -> ensureFinishedLatch.countDown());
+ });
- ensureStartedLatch.await();
+ sleep(TIME_TO_ENQUEUE);
memoryTaskManager.cancel(id);
- ensureFinishedLatch.await();
assertThat(memoryTaskManager.getExecutionDetails(id).getStatus())
.isEqualTo(TaskManager.Status.CANCELLED);
}
+
@Test
public void cancelShouldBeIdempotent() {
CountDownLatch task1Latch = new CountDownLatch(1);
@@ -122,7 +135,7 @@ public class MemoryTaskManagerTest {
await(task1Latch);
return Task.Result.COMPLETED;
});
-
+ sleep(TIME_TO_ENQUEUE);
memoryTaskManager.cancel(id);
assertThatCode(() -> memoryTaskManager.cancel(id))
.doesNotThrowAnyException();
@@ -146,13 +159,11 @@ public class MemoryTaskManagerTest {
@Test
public void getStatusShouldReturnCompletedWhenRunSuccessfully() throws Exception {
- CountDownLatch latch = new CountDownLatch(1);
TaskId taskId = memoryTaskManager.submit(
- () -> Task.Result.COMPLETED,
- countDownCallback(latch));
+ () -> Task.Result.COMPLETED);
- latch.await();
+ sleep(500);
assertThat(memoryTaskManager.getExecutionDetails(taskId).getStatus())
.isEqualTo(TaskManager.Status.COMPLETED);
@@ -160,13 +171,11 @@ public class MemoryTaskManagerTest {
@Test
public void getStatusShouldReturnFailedWhenRunPartially() throws Exception {
- CountDownLatch latch = new CountDownLatch(1);
TaskId taskId = memoryTaskManager.submit(
- () -> Task.Result.PARTIAL,
- countDownCallback(latch));
+ () -> Task.Result.PARTIAL);
- latch.await();
+ sleep(TIME_TO_ENQUEUE);
assertThat(memoryTaskManager.getExecutionDetails(taskId).getStatus())
.isEqualTo(TaskManager.Status.FAILED);
@@ -213,6 +222,7 @@ public class MemoryTaskManagerTest {
.isEqualTo(TaskManager.Status.COMPLETED);
softly.assertThat(entryWithId(list, inProgressId))
.isEqualTo(TaskManager.Status.IN_PROGRESS);
+ latch3.countDown();
}
private TaskManager.Status entryWithId(List<TaskExecutionDetails> list, TaskId taskId) {
@@ -369,24 +379,42 @@ public class MemoryTaskManagerTest {
}
@Test
+ public void awaitShouldAwaitWaitingTask() {
+ CountDownLatch latch = new CountDownLatch(1);
+ memoryTaskManager.submit(
+ () -> {
+ await(latch);
+ return Task.Result.COMPLETED;
+ });
+ latch.countDown();
+ TaskId task2 = memoryTaskManager.submit(
+ () -> Task.Result.COMPLETED);
+
+ assertThat(memoryTaskManager.await(task2).getStatus()).isEqualTo(TaskManager.Status.COMPLETED);
+ }
+
+ @Test
public void submittedTaskShouldExecuteSequentially() {
ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
TaskId id1 = memoryTaskManager.submit(() -> {
queue.add(1);
- sleep(500);
+ sleep(200);
queue.add(2);
return Task.Result.COMPLETED;
});
TaskId id2 = memoryTaskManager.submit(() -> {
queue.add(3);
- sleep(500);
+ sleep(200);
queue.add(4);
return Task.Result.COMPLETED;
});
+ sleep(TIME_TO_ENQUEUE);
memoryTaskManager.await(id1);
memoryTaskManager.await(id2);
+ awaitAtMostOneSecond.until(() -> queue.contains(4));
+
assertThat(queue)
.containsExactly(1, 2, 3, 4);
}
@@ -396,9 +424,8 @@ public class MemoryTaskManagerTest {
TaskId taskId = memoryTaskManager.submit(() -> {
throw new RuntimeException();
});
-
+ sleep(TIME_TO_ENQUEUE);
TaskExecutionDetails executionDetails = memoryTaskManager.await(taskId);
-
assertThat(executionDetails.getStatus())
.isEqualTo(TaskManager.Status.FAILED);
}
@@ -408,7 +435,7 @@ public class MemoryTaskManagerTest {
TaskId taskId = memoryTaskManager.submit(() -> {
throw new RuntimeException();
});
-
+ sleep(TIME_TO_ENQUEUE);
memoryTaskManager.await(taskId);
assertThat(memoryTaskManager.getExecutionDetails(taskId).getStatus())
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org