You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ma...@apache.org on 2019/07/01 12:37:49 UTC

[james-project] 01/03: JAMES-2272 split WorkQueue as a standalone component

This is an automated email from the ASF dual-hosted git repository.

matthieu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 94c184dba021469a38d734f33b8f1beb656b2bde
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Thu Jun 27 09:55:49 2019 +0200

    JAMES-2272 split WorkQueue as a standalone component
---
 .../org/apache/james/task/MemoryTaskManager.java   |  66 +++++-------
 .../apache/james/task/TaskExecutionDetails.java    |   2 +-
 .../main/java/org/apache/james/task/WorkQueue.java | 117 +++++++++++++++++++++
 3 files changed, 145 insertions(+), 40 deletions(-)

diff --git a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
index 7143b1e..0f05331 100644
--- a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
+++ b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
@@ -19,73 +19,60 @@
 
 package org.apache.james.task;
 
+import java.io.IOException;
 import java.time.Duration;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.function.BiConsumer;
 import java.util.function.Consumer;
-import java.util.function.Predicate;
 
 import javax.annotation.PreDestroy;
 
 import com.github.steveash.guavate.Guavate;
 import com.google.common.collect.ImmutableList;
-
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
-import reactor.core.publisher.WorkQueueProcessor;
 import reactor.core.scheduler.Schedulers;
 
 public class MemoryTaskManager implements TaskManager {
     public static final Duration AWAIT_POLLING_DURATION = Duration.ofMillis(500);
     public static final Duration NOW = Duration.ZERO;
-    private final WorkQueueProcessor<TaskWithId> workQueue;
+    private final WorkQueue workQueue;
     private final TaskManagerWorker worker;
     private final ConcurrentHashMap<TaskId, TaskExecutionDetails> idToExecutionDetails;
     private final ConcurrentHashMap<TaskId, Mono<Task.Result>> tasksResult;
-    private final ExecutorService taskExecutor = Executors.newSingleThreadExecutor();
-    private final ExecutorService requestTaskExecutor = Executors.newSingleThreadExecutor();
 
     public MemoryTaskManager() {
-        workQueue = WorkQueueProcessor.<TaskWithId>builder()
-            .executor(taskExecutor)
-            .requestTaskExecutor(requestTaskExecutor)
-            .build();
+
         idToExecutionDetails = new ConcurrentHashMap<>();
         tasksResult = new ConcurrentHashMap<>();
         worker = new MemoryTaskManagerWorker();
-        workQueue
-            .subscribeOn(Schedulers.single())
-            .filter(isTaskWaiting().or(isTaskRequestedForCancellation()))
-            .subscribe(this::sendTaskToWorker);
+        workQueue = WorkQueue.builder()
+            .worker(this::treatTask)
+            .listener(this::listenToWorkQueueEvents);
     }
 
-    private void sendTaskToWorker(TaskWithId taskWithId) {
-        if (isTaskWaiting().test(taskWithId)) {
-            treatTask(taskWithId);
-        } else if (isTaskRequestedForCancellation().test(taskWithId)) {
-            updateDetails(taskWithId.getId()).accept(TaskExecutionDetails::cancelEffectively);
+    private void listenToWorkQueueEvents(WorkQueue.Event event) {
+        switch (event.status) {
+            case CANCELLED:
+                updateDetails(event.id).accept(TaskExecutionDetails::cancelEffectively);
+                break;
+            case STARTED:
+                break;
         }
     }
 
-    private Predicate<TaskWithId> isTaskWaiting() {
-        return task -> listIds(Status.WAITING).contains(task.getId());
-    }
-
-    private Predicate<TaskWithId> isTaskRequestedForCancellation() {
-        return task -> listIds(Status.CANCEL_REQUESTED).contains(task.getId());
-    }
-
     private void treatTask(TaskWithId task) {
         Mono<Task.Result> result = worker.executeTask(task, updateDetails(task.getId()));
         tasksResult.put(task.getId(), result);
         try {
-            result.block();
+            BiConsumer<Throwable, Object> ignoreException = (t, o) -> { };
+            result
+                .onErrorContinue(InterruptedException.class, ignoreException)
+                .block();
         } catch (CancellationException e) {
             // Do not throw CancellationException
         }
@@ -95,7 +82,7 @@ public class MemoryTaskManager implements TaskManager {
         TaskId taskId = TaskId.generateTaskId();
         TaskExecutionDetails executionDetails = TaskExecutionDetails.from(task, taskId);
         idToExecutionDetails.put(taskId, executionDetails);
-        workQueue.onNext(new TaskWithId(taskId, task));
+        workQueue.submit(new TaskWithId(taskId, task));
         return taskId;
     }
 
@@ -115,10 +102,6 @@ public class MemoryTaskManager implements TaskManager {
         return ImmutableList.copyOf(tasksFiltered(status).values());
     }
 
-    public Set<TaskId> listIds(Status status) {
-        return tasksFiltered(status).keySet();
-    }
-
     public Map<TaskId, TaskExecutionDetails> tasksFiltered(Status status) {
         return idToExecutionDetails.entrySet()
             .stream()
@@ -132,6 +115,7 @@ public class MemoryTaskManager implements TaskManager {
                 if (details.getStatus().equals(Status.WAITING)) {
                     updateDetails(id).accept(TaskExecutionDetails::cancelRequested);
                 }
+                workQueue.cancel(id);
                 worker.cancelTask(id, updateDetails(id));
             }
         );
@@ -162,13 +146,17 @@ public class MemoryTaskManager implements TaskManager {
 
     @PreDestroy
     public void stop() {
-        taskExecutor.shutdown();
-        requestTaskExecutor.shutdown();
+        try {
+            workQueue.close();
+        } catch (IOException ignored) {
+            //avoid noise when closing the workqueue
+        }
     }
 
     private Consumer<TaskExecutionDetailsUpdater> updateDetails(TaskId taskId) {
         return updater -> {
-            TaskExecutionDetails newDetails = updater.update(idToExecutionDetails.get(taskId));
+            TaskExecutionDetails currentDetails = idToExecutionDetails.get(taskId);
+            TaskExecutionDetails newDetails = updater.update(currentDetails);
             idToExecutionDetails.replace(taskId, newDetails);
         };
     }
diff --git a/server/task/src/main/java/org/apache/james/task/TaskExecutionDetails.java b/server/task/src/main/java/org/apache/james/task/TaskExecutionDetails.java
index 06b9ecf..b34ac60 100644
--- a/server/task/src/main/java/org/apache/james/task/TaskExecutionDetails.java
+++ b/server/task/src/main/java/org/apache/james/task/TaskExecutionDetails.java
@@ -102,7 +102,7 @@ public class TaskExecutionDetails {
     }
 
     public TaskExecutionDetails start() {
-        Preconditions.checkState(status == TaskManager.Status.WAITING);
+        Preconditions.checkState(status == TaskManager.Status.WAITING, "expected WAITING actual status is " + status);
         return new TaskExecutionDetails(
             taskId,
             task,
diff --git a/server/task/src/main/java/org/apache/james/task/WorkQueue.java b/server/task/src/main/java/org/apache/james/task/WorkQueue.java
new file mode 100644
index 0000000..3484d2a
--- /dev/null
+++ b/server/task/src/main/java/org/apache/james/task/WorkQueue.java
@@ -0,0 +1,117 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.task;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Consumer;
+
+import com.google.common.collect.Sets;
+import reactor.core.Disposable;
+import reactor.core.publisher.WorkQueueProcessor;
+import reactor.core.scheduler.Schedulers;
+
+public class WorkQueue implements Closeable {
+
+    public enum Status {
+        STARTED,
+        CANCELLED
+    }
+
+    public static class Event {
+        public final TaskId id;
+        public final Status status;
+
+        private Event(TaskId id, Status status) {
+            this.id = id;
+            this.status = status;
+        }
+    }
+
+    public static RequireWorker builder() {
+        return worker -> listener -> new WorkQueue(worker, listener);
+    }
+
+    public interface RequireWorker {
+        RequireListener worker(Consumer<TaskWithId> worker);
+    }
+
+    public interface RequireListener {
+        WorkQueue listener(Consumer<Event> worker);
+    }
+
+    private final WorkQueueProcessor<TaskWithId> workQueue;
+    private final ExecutorService taskExecutor = Executors.newSingleThreadExecutor();
+    private final ExecutorService requestTaskExecutor = Executors.newSingleThreadExecutor();
+    private final Set<TaskId> cancelledTasks;
+    private final Consumer<Event> listener;
+    private final Disposable subscription;
+
+    private WorkQueue(Consumer<TaskWithId> worker, Consumer<Event> listener) {
+        this.listener = listener;
+        cancelledTasks = Sets.newConcurrentHashSet();
+        workQueue = WorkQueueProcessor.<TaskWithId>builder()
+            .executor(taskExecutor)
+            .requestTaskExecutor(requestTaskExecutor)
+            .build();
+        subscription = workQueue
+            .subscribeOn(Schedulers.single())
+            .subscribe(dispatchNonCancelledTaskToWorker(worker));
+    }
+
+    private Consumer<TaskWithId> dispatchNonCancelledTaskToWorker(Consumer<TaskWithId> delegate) {
+        return taskWithId -> {
+            if (!cancelledTasks.remove(taskWithId.getId())) {
+                listener.accept(new Event(taskWithId.getId(), Status.STARTED));
+                delegate.accept(taskWithId);
+            } else {
+                listener.accept(new Event(taskWithId.getId(), Status.CANCELLED));
+            }
+        };
+    }
+
+    public void submit(TaskWithId taskWithId) {
+        workQueue.onNext(taskWithId);
+    }
+
+    public void cancel(TaskId taskId) {
+        cancelledTasks.add(taskId);
+    }
+
+    @Override
+    public void close() throws IOException {
+        try {
+            subscription.dispose();
+        } catch (Throwable ignore) {
+            //avoid failing during close
+        }
+        try {
+            workQueue.dispose();
+        } catch (Throwable ignore) {
+            //avoid failing during close
+        }
+        taskExecutor.shutdownNow();
+        requestTaskExecutor.shutdownNow();
+    }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org