You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ma...@apache.org on 2019/07/01 12:37:49 UTC
[james-project] 01/03: JAMES-2272 split WorkQueue as a standalone
component
This is an automated email from the ASF dual-hosted git repository.
matthieu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 94c184dba021469a38d734f33b8f1beb656b2bde
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Thu Jun 27 09:55:49 2019 +0200
JAMES-2272 split WorkQueue as a standalone component
---
.../org/apache/james/task/MemoryTaskManager.java | 66 +++++-------
.../apache/james/task/TaskExecutionDetails.java | 2 +-
.../main/java/org/apache/james/task/WorkQueue.java | 117 +++++++++++++++++++++
3 files changed, 145 insertions(+), 40 deletions(-)
diff --git a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
index 7143b1e..0f05331 100644
--- a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
+++ b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
@@ -19,73 +19,60 @@
package org.apache.james.task;
+import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.function.BiConsumer;
import java.util.function.Consumer;
-import java.util.function.Predicate;
import javax.annotation.PreDestroy;
import com.github.steveash.guavate.Guavate;
import com.google.common.collect.ImmutableList;
-
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
-import reactor.core.publisher.WorkQueueProcessor;
import reactor.core.scheduler.Schedulers;
public class MemoryTaskManager implements TaskManager {
public static final Duration AWAIT_POLLING_DURATION = Duration.ofMillis(500);
public static final Duration NOW = Duration.ZERO;
- private final WorkQueueProcessor<TaskWithId> workQueue;
+ private final WorkQueue workQueue;
private final TaskManagerWorker worker;
private final ConcurrentHashMap<TaskId, TaskExecutionDetails> idToExecutionDetails;
private final ConcurrentHashMap<TaskId, Mono<Task.Result>> tasksResult;
- private final ExecutorService taskExecutor = Executors.newSingleThreadExecutor();
- private final ExecutorService requestTaskExecutor = Executors.newSingleThreadExecutor();
public MemoryTaskManager() {
- workQueue = WorkQueueProcessor.<TaskWithId>builder()
- .executor(taskExecutor)
- .requestTaskExecutor(requestTaskExecutor)
- .build();
+
idToExecutionDetails = new ConcurrentHashMap<>();
tasksResult = new ConcurrentHashMap<>();
worker = new MemoryTaskManagerWorker();
- workQueue
- .subscribeOn(Schedulers.single())
- .filter(isTaskWaiting().or(isTaskRequestedForCancellation()))
- .subscribe(this::sendTaskToWorker);
+ workQueue = WorkQueue.builder()
+ .worker(this::treatTask)
+ .listener(this::listenToWorkQueueEvents);
}
- private void sendTaskToWorker(TaskWithId taskWithId) {
- if (isTaskWaiting().test(taskWithId)) {
- treatTask(taskWithId);
- } else if (isTaskRequestedForCancellation().test(taskWithId)) {
- updateDetails(taskWithId.getId()).accept(TaskExecutionDetails::cancelEffectively);
+ private void listenToWorkQueueEvents(WorkQueue.Event event) {
+ switch (event.status) {
+ case CANCELLED:
+ updateDetails(event.id).accept(TaskExecutionDetails::cancelEffectively);
+ break;
+ case STARTED:
+ break;
}
}
- private Predicate<TaskWithId> isTaskWaiting() {
- return task -> listIds(Status.WAITING).contains(task.getId());
- }
-
- private Predicate<TaskWithId> isTaskRequestedForCancellation() {
- return task -> listIds(Status.CANCEL_REQUESTED).contains(task.getId());
- }
-
private void treatTask(TaskWithId task) {
Mono<Task.Result> result = worker.executeTask(task, updateDetails(task.getId()));
tasksResult.put(task.getId(), result);
try {
- result.block();
+ BiConsumer<Throwable, Object> ignoreException = (t, o) -> { };
+ result
+ .onErrorContinue(InterruptedException.class, ignoreException)
+ .block();
} catch (CancellationException e) {
// Do not throw CancellationException
}
@@ -95,7 +82,7 @@ public class MemoryTaskManager implements TaskManager {
TaskId taskId = TaskId.generateTaskId();
TaskExecutionDetails executionDetails = TaskExecutionDetails.from(task, taskId);
idToExecutionDetails.put(taskId, executionDetails);
- workQueue.onNext(new TaskWithId(taskId, task));
+ workQueue.submit(new TaskWithId(taskId, task));
return taskId;
}
@@ -115,10 +102,6 @@ public class MemoryTaskManager implements TaskManager {
return ImmutableList.copyOf(tasksFiltered(status).values());
}
- public Set<TaskId> listIds(Status status) {
- return tasksFiltered(status).keySet();
- }
-
public Map<TaskId, TaskExecutionDetails> tasksFiltered(Status status) {
return idToExecutionDetails.entrySet()
.stream()
@@ -132,6 +115,7 @@ public class MemoryTaskManager implements TaskManager {
if (details.getStatus().equals(Status.WAITING)) {
updateDetails(id).accept(TaskExecutionDetails::cancelRequested);
}
+ workQueue.cancel(id);
worker.cancelTask(id, updateDetails(id));
}
);
@@ -162,13 +146,17 @@ public class MemoryTaskManager implements TaskManager {
@PreDestroy
public void stop() {
- taskExecutor.shutdown();
- requestTaskExecutor.shutdown();
+ try {
+ workQueue.close();
+ } catch (IOException ignored) {
+ //avoid noise when closing the workqueue
+ }
}
private Consumer<TaskExecutionDetailsUpdater> updateDetails(TaskId taskId) {
return updater -> {
- TaskExecutionDetails newDetails = updater.update(idToExecutionDetails.get(taskId));
+ TaskExecutionDetails currentDetails = idToExecutionDetails.get(taskId);
+ TaskExecutionDetails newDetails = updater.update(currentDetails);
idToExecutionDetails.replace(taskId, newDetails);
};
}
diff --git a/server/task/src/main/java/org/apache/james/task/TaskExecutionDetails.java b/server/task/src/main/java/org/apache/james/task/TaskExecutionDetails.java
index 06b9ecf..b34ac60 100644
--- a/server/task/src/main/java/org/apache/james/task/TaskExecutionDetails.java
+++ b/server/task/src/main/java/org/apache/james/task/TaskExecutionDetails.java
@@ -102,7 +102,7 @@ public class TaskExecutionDetails {
}
public TaskExecutionDetails start() {
- Preconditions.checkState(status == TaskManager.Status.WAITING);
+ Preconditions.checkState(status == TaskManager.Status.WAITING, "expected WAITING actual status is " + status);
return new TaskExecutionDetails(
taskId,
task,
diff --git a/server/task/src/main/java/org/apache/james/task/WorkQueue.java b/server/task/src/main/java/org/apache/james/task/WorkQueue.java
new file mode 100644
index 0000000..3484d2a
--- /dev/null
+++ b/server/task/src/main/java/org/apache/james/task/WorkQueue.java
@@ -0,0 +1,117 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.task;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Consumer;
+
+import com.google.common.collect.Sets;
+import reactor.core.Disposable;
+import reactor.core.publisher.WorkQueueProcessor;
+import reactor.core.scheduler.Schedulers;
+
+public class WorkQueue implements Closeable {
+
+ public enum Status {
+ STARTED,
+ CANCELLED
+ }
+
+ public static class Event {
+ public final TaskId id;
+ public final Status status;
+
+ private Event(TaskId id, Status status) {
+ this.id = id;
+ this.status = status;
+ }
+ }
+
+ public static RequireWorker builder() {
+ return worker -> listener -> new WorkQueue(worker, listener);
+ }
+
+ public interface RequireWorker {
+ RequireListener worker(Consumer<TaskWithId> worker);
+ }
+
+ public interface RequireListener {
+ WorkQueue listener(Consumer<Event> worker);
+ }
+
+ private final WorkQueueProcessor<TaskWithId> workQueue;
+ private final ExecutorService taskExecutor = Executors.newSingleThreadExecutor();
+ private final ExecutorService requestTaskExecutor = Executors.newSingleThreadExecutor();
+ private final Set<TaskId> cancelledTasks;
+ private final Consumer<Event> listener;
+ private final Disposable subscription;
+
+ private WorkQueue(Consumer<TaskWithId> worker, Consumer<Event> listener) {
+ this.listener = listener;
+ cancelledTasks = Sets.newConcurrentHashSet();
+ workQueue = WorkQueueProcessor.<TaskWithId>builder()
+ .executor(taskExecutor)
+ .requestTaskExecutor(requestTaskExecutor)
+ .build();
+ subscription = workQueue
+ .subscribeOn(Schedulers.single())
+ .subscribe(dispatchNonCancelledTaskToWorker(worker));
+ }
+
+ private Consumer<TaskWithId> dispatchNonCancelledTaskToWorker(Consumer<TaskWithId> delegate) {
+ return taskWithId -> {
+ if (!cancelledTasks.remove(taskWithId.getId())) {
+ listener.accept(new Event(taskWithId.getId(), Status.STARTED));
+ delegate.accept(taskWithId);
+ } else {
+ listener.accept(new Event(taskWithId.getId(), Status.CANCELLED));
+ }
+ };
+ }
+
+ public void submit(TaskWithId taskWithId) {
+ workQueue.onNext(taskWithId);
+ }
+
+ public void cancel(TaskId taskId) {
+ cancelledTasks.add(taskId);
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ subscription.dispose();
+ } catch (Throwable ignore) {
+ //avoid failing during close
+ }
+ try {
+ workQueue.dispose();
+ } catch (Throwable ignore) {
+ //avoid failing during close
+ }
+ taskExecutor.shutdownNow();
+ requestTaskExecutor.shutdownNow();
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org