You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by "chibenwa (via GitHub)" <gi...@apache.org> on 2023/02/22 01:46:15 UTC

[GitHub] [james-project] chibenwa commented on a diff in pull request #1452: JAMES-3890 Allow parallel execution of safe tasks

chibenwa commented on code in PR #1452:
URL: https://github.com/apache/james-project/pull/1452#discussion_r1113737617


##########
server/task/task-api/src/main/java/org/apache/james/task/AsyncSafeTask.java:
##########
@@ -0,0 +1,38 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.task;
+
+import reactor.core.publisher.Mono;
+
+/**
+ * A Task that can safely run in parallel with other tasks.
+ * This means it will not likely interfere with the operations of other tasks, and be able to handle issues arising
+ * from parallel execution; e.g. if the task lists some messages and then tries to access them, it must gracefully
+ * handle the situation when they have been deleted in the meantime.
+ */
+public interface AsyncSafeTask extends Task {
+
+    Mono<Result> runAsync();

Review Comment:
   ```suggestion
       Publisher<Result> runAsync();
   ```
   
   Let's avoid leaking the actual reactive implementation in APIs...



##########
server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java:
##########
@@ -144,8 +160,16 @@ private Mono<Task.Result> cancelled(TaskWithId taskWithId, Listener listener) {
             .thenReturn(Task.Result.PARTIAL);
     }
 
+    private Mono<Task.Result> getTaskMono(Task task) {
+        if (task instanceof AsyncSafeTask) {
+            return ((AsyncSafeTask) task).runAsync();
+        } else {
+            return Mono.fromCallable(task::run);
+        }
+    }

Review Comment:
   Can be avoided with previous comments.
   
   Also `run` is very likely blocking and likely relies on reactor itself, use `subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER)`



##########
server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java:
##########
@@ -40,50 +40,66 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 import reactor.core.Disposable;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Scheduler;
 import reactor.core.scheduler.Schedulers;
-import reactor.util.function.Tuple2;
-import reactor.util.function.Tuples;
 
 public class SerialTaskManagerWorker implements TaskManagerWorker {
     private static final Logger LOGGER = LoggerFactory.getLogger(SerialTaskManagerWorker.class);
     public static final boolean MAY_INTERRUPT_IF_RUNNING = true;
 
     private final Scheduler taskExecutor;
+    private final Scheduler asyncTaskExecutor;
     private final Listener listener;
-    private final AtomicReference<Tuple2<TaskId, CompletableFuture>> runningTask;
+    private final Map<TaskId, CompletableFuture> runningTasks;
     private final Set<TaskId> cancelledTasks;
     private final Duration pollingInterval;
 
     public SerialTaskManagerWorker(Listener listener, Duration pollingInterval) {
         this.pollingInterval = pollingInterval;
         this.taskExecutor = Schedulers.fromExecutor(
             Executors.newSingleThreadExecutor(NamedThreadFactory.withName("task executor")));
+        this.asyncTaskExecutor = Schedulers.fromExecutor(
+            Executors.newCachedThreadPool(NamedThreadFactory.withName("async task executor")));
         this.listener = listener;
         this.cancelledTasks = Sets.newConcurrentHashSet();
-        this.runningTask = new AtomicReference<>();
+        this.runningTasks = Maps.newConcurrentMap();
     }
 
     @Override
     public Mono<Task.Result> executeTask(TaskWithId taskWithId) {
         if (!cancelledTasks.remove(taskWithId.getId())) {
-            Mono<Task.Result> taskMono = runWithMdc(taskWithId, listener).subscribeOn(taskExecutor);
+            Scheduler scheduler = taskExecutor;
+            if (taskWithId.getTask() instanceof AsyncSafeTask) {
+                scheduler = asyncTaskExecutor;
+            }

Review Comment:
   Pro-tip: extract this in a method that given a task will provide a scheduler and avoir variable re-allocation.



##########
server/task/task-api/src/main/java/org/apache/james/task/AsyncSafeTask.java:
##########
@@ -0,0 +1,38 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.task;
+
+import reactor.core.publisher.Mono;
+
+/**
+ * A Task that can safely run in parallel with other tasks.
+ * This means it will not likely interfere with the operations of other tasks, and be able to handle issues arising
+ * from parallel execution; e.g. if the task lists some messages and then tries to access them, it must gracefully
+ * handle the situation when they have been deleted in the meantime.
+ */
+public interface AsyncSafeTask extends Task {
+
+    Mono<Result> runAsync();

Review Comment:
   Also I see no reason not to promote `runAsync` in the `Task` interface and just make `AsyncSafeTask` a marker interface...
   
   By the way we could provide a default implementation for runAsync too, letting implementors choose if they wishes to implement runAsync or run
   
   ```suggestion
       default Publisher<Result> runAsync() {
           return Mono.fromCallable(() -> run())
               .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER);
       }
   ```



##########
server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java:
##########
@@ -40,50 +40,66 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 import reactor.core.Disposable;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Scheduler;
 import reactor.core.scheduler.Schedulers;
-import reactor.util.function.Tuple2;
-import reactor.util.function.Tuples;
 
 public class SerialTaskManagerWorker implements TaskManagerWorker {
     private static final Logger LOGGER = LoggerFactory.getLogger(SerialTaskManagerWorker.class);
     public static final boolean MAY_INTERRUPT_IF_RUNNING = true;
 
     private final Scheduler taskExecutor;
+    private final Scheduler asyncTaskExecutor;
     private final Listener listener;
-    private final AtomicReference<Tuple2<TaskId, CompletableFuture>> runningTask;
+    private final Map<TaskId, CompletableFuture> runningTasks;
     private final Set<TaskId> cancelledTasks;
     private final Duration pollingInterval;
 
     public SerialTaskManagerWorker(Listener listener, Duration pollingInterval) {
         this.pollingInterval = pollingInterval;
         this.taskExecutor = Schedulers.fromExecutor(
             Executors.newSingleThreadExecutor(NamedThreadFactory.withName("task executor")));
+        this.asyncTaskExecutor = Schedulers.fromExecutor(
+            Executors.newCachedThreadPool(NamedThreadFactory.withName("async task executor")));

Review Comment:
   Shouldn't we put a cap on threads? Or we might end up with too many concurrent tasks?



##########
server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java:
##########
@@ -247,6 +247,50 @@ void theWorkerShouldCancelAnInProgressTask() throws InterruptedException {
             .block()).isEmpty();
     }
 

Review Comment:
   Additional test idea: submit 2 async tasks, and ensure that they both execute at the same time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org