You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by "ottoka (via GitHub)" <gi...@apache.org> on 2023/02/21 17:10:18 UTC

[GitHub] [james-project] ottoka opened a new pull request, #1452: JAMES-3890 Allow parallel execution of safe tasks

ottoka opened a new pull request, #1452:
URL: https://github.com/apache/james-project/pull/1452

   See JAMES-3890 for rationale. TL;DR: Long running tasks should not delay regular tasks.
   
   I introduce the AsyncSafeTask for long running tasks that are allowed to execute in parallel with other tasks, converting ExpireMailboxTask as an example.
   
   I patched the respective functionality into SerialTaskManagerWorker (name does not quite fit anymore). It gets a separate asyncTaskExceutor for such tasks, and uses a map instead of a reference to keep track of everything, e.g. to cancel running tasks. The "pollingMono" wrapper takes care of cleaning up the map once it finishes.
   
   While this achieves the goal, I am not quite sure this is the best possible solution. Any suggestions?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] chibenwa commented on a diff in pull request #1452: JAMES-3890 Allow parallel execution of safe tasks

Posted by "chibenwa (via GitHub)" <gi...@apache.org>.
chibenwa commented on code in PR #1452:
URL: https://github.com/apache/james-project/pull/1452#discussion_r1115327990


##########
server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java:
##########
@@ -40,50 +40,66 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 import reactor.core.Disposable;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Scheduler;
 import reactor.core.scheduler.Schedulers;
-import reactor.util.function.Tuple2;
-import reactor.util.function.Tuples;
 
 public class SerialTaskManagerWorker implements TaskManagerWorker {
     private static final Logger LOGGER = LoggerFactory.getLogger(SerialTaskManagerWorker.class);
     public static final boolean MAY_INTERRUPT_IF_RUNNING = true;
 
     private final Scheduler taskExecutor;
+    private final Scheduler asyncTaskExecutor;
     private final Listener listener;
-    private final AtomicReference<Tuple2<TaskId, CompletableFuture>> runningTask;
+    private final Map<TaskId, CompletableFuture> runningTasks;
     private final Set<TaskId> cancelledTasks;
     private final Duration pollingInterval;
 
     public SerialTaskManagerWorker(Listener listener, Duration pollingInterval) {
         this.pollingInterval = pollingInterval;
         this.taskExecutor = Schedulers.fromExecutor(
             Executors.newSingleThreadExecutor(NamedThreadFactory.withName("task executor")));
+        this.asyncTaskExecutor = Schedulers.fromExecutor(
+            Executors.newCachedThreadPool(NamedThreadFactory.withName("async task executor")));

Review Comment:
   In my opinion much tasks do not fundamentally alter the state of data, in a way that can create incompatibilities, from a correctness perspective.
   
   EG redelivering dead letter can be run in parallel with mail repository reprocessing.
   
   It then is more to me a resource usage problem: maybe reindexing all of my ElasticSearch data at the same time than my deduplication is not a good idea?
   
   How would we prevent 2 expensive tasks t be executed at the same time?
   
   > Might be worth a followup ticket.
   
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] ottoka commented on pull request #1452: JAMES-3890 Allow parallel execution of safe tasks

Posted by "ottoka (via GitHub)" <gi...@apache.org>.
ottoka commented on PR #1452:
URL: https://github.com/apache/james-project/pull/1452#issuecomment-1441435154

   BTW This should also fix some weird issue I got with RabbitMQ and long unacked messages. So once this is merged, I would like to backport it to 3.7.x, maybe even in time for a 3.7.4 release.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] chibenwa commented on a diff in pull request #1452: JAMES-3890 Allow parallel execution of safe tasks

Posted by "chibenwa (via GitHub)" <gi...@apache.org>.
chibenwa commented on code in PR #1452:
URL: https://github.com/apache/james-project/pull/1452#discussion_r1114369966


##########
server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java:
##########
@@ -247,6 +247,50 @@ void theWorkerShouldCancelAnInProgressTask() throws InterruptedException {
             .block()).isEmpty();
     }
 

Review Comment:
   With latch you can stop task 1, then synchronize on task 2 execution starting.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] ottoka commented on pull request #1452: JAMES-3890 Allow parallel execution of safe tasks

Posted by "ottoka (via GitHub)" <gi...@apache.org>.
ottoka commented on PR #1452:
URL: https://github.com/apache/james-project/pull/1452#issuecomment-1439913748

   Latest push includes changes as discussed above: runAsync() moved to Task, defaults for both run() variants. This leaves AsyncSafeTask as a marker interface. Also, I believe we can move on with this, so not a draft anymore.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] chibenwa commented on a diff in pull request #1452: JAMES-3890 Allow parallel execution of safe tasks

Posted by "chibenwa (via GitHub)" <gi...@apache.org>.
chibenwa commented on code in PR #1452:
URL: https://github.com/apache/james-project/pull/1452#discussion_r1113737617


##########
server/task/task-api/src/main/java/org/apache/james/task/AsyncSafeTask.java:
##########
@@ -0,0 +1,38 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.task;
+
+import reactor.core.publisher.Mono;
+
+/**
+ * A Task that can safely run in parallel with other tasks.
+ * This means it will not likely interfere with the operations of other tasks, and be able to handle issues arising
+ * from parallel execution; e.g. if the task lists some messages and then tries to access them, it must gracefully
+ * handle the situation when they have been deleted in the meantime.
+ */
+public interface AsyncSafeTask extends Task {
+
+    Mono<Result> runAsync();

Review Comment:
   ```suggestion
       Publisher<Result> runAsync();
   ```
   
   Let's avoid leaking the actual reactive implementation in APIs...



##########
server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java:
##########
@@ -144,8 +160,16 @@ private Mono<Task.Result> cancelled(TaskWithId taskWithId, Listener listener) {
             .thenReturn(Task.Result.PARTIAL);
     }
 
+    private Mono<Task.Result> getTaskMono(Task task) {
+        if (task instanceof AsyncSafeTask) {
+            return ((AsyncSafeTask) task).runAsync();
+        } else {
+            return Mono.fromCallable(task::run);
+        }
+    }

Review Comment:
   Can be avoided with previous comments.
   
   Also `run` is very likely blocking and likely relies on reactor itself, use `subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER)`



##########
server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java:
##########
@@ -40,50 +40,66 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 import reactor.core.Disposable;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Scheduler;
 import reactor.core.scheduler.Schedulers;
-import reactor.util.function.Tuple2;
-import reactor.util.function.Tuples;
 
 public class SerialTaskManagerWorker implements TaskManagerWorker {
     private static final Logger LOGGER = LoggerFactory.getLogger(SerialTaskManagerWorker.class);
     public static final boolean MAY_INTERRUPT_IF_RUNNING = true;
 
     private final Scheduler taskExecutor;
+    private final Scheduler asyncTaskExecutor;
     private final Listener listener;
-    private final AtomicReference<Tuple2<TaskId, CompletableFuture>> runningTask;
+    private final Map<TaskId, CompletableFuture> runningTasks;
     private final Set<TaskId> cancelledTasks;
     private final Duration pollingInterval;
 
     public SerialTaskManagerWorker(Listener listener, Duration pollingInterval) {
         this.pollingInterval = pollingInterval;
         this.taskExecutor = Schedulers.fromExecutor(
             Executors.newSingleThreadExecutor(NamedThreadFactory.withName("task executor")));
+        this.asyncTaskExecutor = Schedulers.fromExecutor(
+            Executors.newCachedThreadPool(NamedThreadFactory.withName("async task executor")));
         this.listener = listener;
         this.cancelledTasks = Sets.newConcurrentHashSet();
-        this.runningTask = new AtomicReference<>();
+        this.runningTasks = Maps.newConcurrentMap();
     }
 
     @Override
     public Mono<Task.Result> executeTask(TaskWithId taskWithId) {
         if (!cancelledTasks.remove(taskWithId.getId())) {
-            Mono<Task.Result> taskMono = runWithMdc(taskWithId, listener).subscribeOn(taskExecutor);
+            Scheduler scheduler = taskExecutor;
+            if (taskWithId.getTask() instanceof AsyncSafeTask) {
+                scheduler = asyncTaskExecutor;
+            }

Review Comment:
   Pro-tip: extract this in a method that given a task will provide a scheduler and avoir variable re-allocation.



##########
server/task/task-api/src/main/java/org/apache/james/task/AsyncSafeTask.java:
##########
@@ -0,0 +1,38 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.task;
+
+import reactor.core.publisher.Mono;
+
+/**
+ * A Task that can safely run in parallel with other tasks.
+ * This means it will not likely interfere with the operations of other tasks, and be able to handle issues arising
+ * from parallel execution; e.g. if the task lists some messages and then tries to access them, it must gracefully
+ * handle the situation when they have been deleted in the meantime.
+ */
+public interface AsyncSafeTask extends Task {
+
+    Mono<Result> runAsync();

Review Comment:
   Also I see no reason not to promote `runAsync` in the `Task` interface and just make `AsyncSafeTask` a marker interface...
   
   By the way we could provide a default implementation for runAsync too, letting implementors choose if they wishes to implement runAsync or run
   
   ```suggestion
       default Publisher<Result> runAsync() {
           return Mono.fromCallable(() -> run())
               .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER);
       }
   ```



##########
server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java:
##########
@@ -40,50 +40,66 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 import reactor.core.Disposable;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Scheduler;
 import reactor.core.scheduler.Schedulers;
-import reactor.util.function.Tuple2;
-import reactor.util.function.Tuples;
 
 public class SerialTaskManagerWorker implements TaskManagerWorker {
     private static final Logger LOGGER = LoggerFactory.getLogger(SerialTaskManagerWorker.class);
     public static final boolean MAY_INTERRUPT_IF_RUNNING = true;
 
     private final Scheduler taskExecutor;
+    private final Scheduler asyncTaskExecutor;
     private final Listener listener;
-    private final AtomicReference<Tuple2<TaskId, CompletableFuture>> runningTask;
+    private final Map<TaskId, CompletableFuture> runningTasks;
     private final Set<TaskId> cancelledTasks;
     private final Duration pollingInterval;
 
     public SerialTaskManagerWorker(Listener listener, Duration pollingInterval) {
         this.pollingInterval = pollingInterval;
         this.taskExecutor = Schedulers.fromExecutor(
             Executors.newSingleThreadExecutor(NamedThreadFactory.withName("task executor")));
+        this.asyncTaskExecutor = Schedulers.fromExecutor(
+            Executors.newCachedThreadPool(NamedThreadFactory.withName("async task executor")));

Review Comment:
   Shouldn't we put a cap on threads? Or we might end up with too many concurrent tasks?



##########
server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java:
##########
@@ -247,6 +247,50 @@ void theWorkerShouldCancelAnInProgressTask() throws InterruptedException {
             .block()).isEmpty();
     }
 

Review Comment:
   Additional test idea: submit 2 async tasks, and ensure that they both execute at the same time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] ottoka commented on a diff in pull request #1452: JAMES-3890 Allow parallel execution of safe tasks

Posted by "ottoka (via GitHub)" <gi...@apache.org>.
ottoka commented on code in PR #1452:
URL: https://github.com/apache/james-project/pull/1452#discussion_r1114142440


##########
server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java:
##########
@@ -40,50 +40,66 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 import reactor.core.Disposable;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Scheduler;
 import reactor.core.scheduler.Schedulers;
-import reactor.util.function.Tuple2;
-import reactor.util.function.Tuples;
 
 public class SerialTaskManagerWorker implements TaskManagerWorker {
     private static final Logger LOGGER = LoggerFactory.getLogger(SerialTaskManagerWorker.class);
     public static final boolean MAY_INTERRUPT_IF_RUNNING = true;
 
     private final Scheduler taskExecutor;
+    private final Scheduler asyncTaskExecutor;
     private final Listener listener;
-    private final AtomicReference<Tuple2<TaskId, CompletableFuture>> runningTask;
+    private final Map<TaskId, CompletableFuture> runningTasks;
     private final Set<TaskId> cancelledTasks;
     private final Duration pollingInterval;
 
     public SerialTaskManagerWorker(Listener listener, Duration pollingInterval) {
         this.pollingInterval = pollingInterval;
         this.taskExecutor = Schedulers.fromExecutor(
             Executors.newSingleThreadExecutor(NamedThreadFactory.withName("task executor")));
+        this.asyncTaskExecutor = Schedulers.fromExecutor(
+            Executors.newCachedThreadPool(NamedThreadFactory.withName("async task executor")));
         this.listener = listener;
         this.cancelledTasks = Sets.newConcurrentHashSet();
-        this.runningTask = new AtomicReference<>();
+        this.runningTasks = Maps.newConcurrentMap();
     }
 
     @Override
     public Mono<Task.Result> executeTask(TaskWithId taskWithId) {
         if (!cancelledTasks.remove(taskWithId.getId())) {
-            Mono<Task.Result> taskMono = runWithMdc(taskWithId, listener).subscribeOn(taskExecutor);
+            Scheduler scheduler = taskExecutor;
+            if (taskWithId.getTask() instanceof AsyncSafeTask) {
+                scheduler = asyncTaskExecutor;
+            }

Review Comment:
   Right. Once again, not enough functional thinking on my part.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] ottoka commented on a diff in pull request #1452: JAMES-3890 Allow parallel execution of safe tasks

Posted by "ottoka (via GitHub)" <gi...@apache.org>.
ottoka commented on code in PR #1452:
URL: https://github.com/apache/james-project/pull/1452#discussion_r1115312972


##########
server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java:
##########
@@ -40,50 +40,66 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 import reactor.core.Disposable;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Scheduler;
 import reactor.core.scheduler.Schedulers;
-import reactor.util.function.Tuple2;
-import reactor.util.function.Tuples;
 
 public class SerialTaskManagerWorker implements TaskManagerWorker {
     private static final Logger LOGGER = LoggerFactory.getLogger(SerialTaskManagerWorker.class);
     public static final boolean MAY_INTERRUPT_IF_RUNNING = true;
 
     private final Scheduler taskExecutor;
+    private final Scheduler asyncTaskExecutor;
     private final Listener listener;
-    private final AtomicReference<Tuple2<TaskId, CompletableFuture>> runningTask;
+    private final Map<TaskId, CompletableFuture> runningTasks;
     private final Set<TaskId> cancelledTasks;
     private final Duration pollingInterval;
 
     public SerialTaskManagerWorker(Listener listener, Duration pollingInterval) {
         this.pollingInterval = pollingInterval;
         this.taskExecutor = Schedulers.fromExecutor(
             Executors.newSingleThreadExecutor(NamedThreadFactory.withName("task executor")));
+        this.asyncTaskExecutor = Schedulers.fromExecutor(
+            Executors.newCachedThreadPool(NamedThreadFactory.withName("async task executor")));

Review Comment:
   Do you have any particular long-running task in mind? The tricky part will be to figure out if it is "safe" in the sense of AsyncSafeTask. Might be worth a followup ticket. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] ottoka commented on a diff in pull request #1452: JAMES-3890 Allow parallel execution of safe tasks

Posted by "ottoka (via GitHub)" <gi...@apache.org>.
ottoka commented on code in PR #1452:
URL: https://github.com/apache/james-project/pull/1452#discussion_r1114126209


##########
server/task/task-api/src/main/java/org/apache/james/task/AsyncSafeTask.java:
##########
@@ -0,0 +1,38 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.task;
+
+import reactor.core.publisher.Mono;
+
+/**
+ * A Task that can safely run in parallel with other tasks.
+ * This means it will not likely interfere with the operations of other tasks, and be able to handle issues arising
+ * from parallel execution; e.g. if the task lists some messages and then tries to access them, it must gracefully
+ * handle the situation when they have been deleted in the meantime.
+ */
+public interface AsyncSafeTask extends Task {
+
+    Mono<Result> runAsync();

Review Comment:
   I like the idea of having a choice. It bears a slight risk that someone forgets to override either method, which would lead to an infinite recursion of the two default implementations. But at least that case will be painfully obvious :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] ottoka commented on a diff in pull request #1452: JAMES-3890 Allow parallel execution of safe tasks

Posted by "ottoka (via GitHub)" <gi...@apache.org>.
ottoka commented on code in PR #1452:
URL: https://github.com/apache/james-project/pull/1452#discussion_r1114167825


##########
server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java:
##########
@@ -40,50 +40,66 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 import reactor.core.Disposable;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Scheduler;
 import reactor.core.scheduler.Schedulers;
-import reactor.util.function.Tuple2;
-import reactor.util.function.Tuples;
 
 public class SerialTaskManagerWorker implements TaskManagerWorker {
     private static final Logger LOGGER = LoggerFactory.getLogger(SerialTaskManagerWorker.class);
     public static final boolean MAY_INTERRUPT_IF_RUNNING = true;
 
     private final Scheduler taskExecutor;
+    private final Scheduler asyncTaskExecutor;
     private final Listener listener;
-    private final AtomicReference<Tuple2<TaskId, CompletableFuture>> runningTask;
+    private final Map<TaskId, CompletableFuture> runningTasks;
     private final Set<TaskId> cancelledTasks;
     private final Duration pollingInterval;
 
     public SerialTaskManagerWorker(Listener listener, Duration pollingInterval) {
         this.pollingInterval = pollingInterval;
         this.taskExecutor = Schedulers.fromExecutor(
             Executors.newSingleThreadExecutor(NamedThreadFactory.withName("task executor")));
+        this.asyncTaskExecutor = Schedulers.fromExecutor(
+            Executors.newCachedThreadPool(NamedThreadFactory.withName("async task executor")));

Review Comment:
   I see your point, however I would prefer to keep it open. Once a limited pool is exhausted, it would block any further tasks though they are _supposed_ to run without blocking. And as always, what should be a sensible default limit?
   I do not expect this to be a problem in practice, since I assume async safe tasks to be quite rare, compared to regular tasks (Right now, there is just the one ExpireMailboxTask). And an admin starting too many async tasks should not complain about them needing a lot of resources.
   If it turns out to be a problem, we can still add a limit through a configuration property later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] chibenwa commented on pull request #1452: JAMES-3890 Allow parallel execution of safe tasks

Posted by "chibenwa (via GitHub)" <gi...@apache.org>.
chibenwa commented on PR #1452:
URL: https://github.com/apache/james-project/pull/1452#issuecomment-1445134787

   > Have not seen this before, but I will count it as a success :)
   
   It does!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] chibenwa commented on pull request #1452: JAMES-3890 Allow parallel execution of safe tasks

Posted by "chibenwa (via GitHub)" <gi...@apache.org>.
chibenwa commented on PR #1452:
URL: https://github.com/apache/james-project/pull/1452#issuecomment-1439364314

   ```
   [ERROR] Failed to execute goal org.apache.maven.plugins:maven-surefire-plugin:2.22.2:test (default-test) on project james-server-task-memory: There was a timeout or other error in the fork -> [Help 1]
   ```
   
   Likely sone CountLatchDown tests gone wrong...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] chibenwa commented on a diff in pull request #1452: JAMES-3890 Allow parallel execution of safe tasks

Posted by "chibenwa (via GitHub)" <gi...@apache.org>.
chibenwa commented on code in PR #1452:
URL: https://github.com/apache/james-project/pull/1452#discussion_r1114372253


##########
server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java:
##########
@@ -40,50 +40,66 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 import reactor.core.Disposable;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Scheduler;
 import reactor.core.scheduler.Schedulers;
-import reactor.util.function.Tuple2;
-import reactor.util.function.Tuples;
 
 public class SerialTaskManagerWorker implements TaskManagerWorker {
     private static final Logger LOGGER = LoggerFactory.getLogger(SerialTaskManagerWorker.class);
     public static final boolean MAY_INTERRUPT_IF_RUNNING = true;
 
     private final Scheduler taskExecutor;
+    private final Scheduler asyncTaskExecutor;
     private final Listener listener;
-    private final AtomicReference<Tuple2<TaskId, CompletableFuture>> runningTask;
+    private final Map<TaskId, CompletableFuture> runningTasks;
     private final Set<TaskId> cancelledTasks;
     private final Duration pollingInterval;
 
     public SerialTaskManagerWorker(Listener listener, Duration pollingInterval) {
         this.pollingInterval = pollingInterval;
         this.taskExecutor = Schedulers.fromExecutor(
             Executors.newSingleThreadExecutor(NamedThreadFactory.withName("task executor")));
+        this.asyncTaskExecutor = Schedulers.fromExecutor(
+            Executors.newCachedThreadPool(NamedThreadFactory.withName("async task executor")));

Review Comment:
   > I assume async safe tasks to be quite rare, compared to regular tasks (Right now, there is just the one ExpireMailboxTask)
   
   I rather expect that most of our tasks can safely be executed in parallel.
   
   > If it turns out to be a problem, we can still add a limit through a configuration property later.
   
   Ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] chibenwa commented on pull request #1452: JAMES-3890 Allow parallel execution of safe tasks

Posted by "chibenwa (via GitHub)" <gi...@apache.org>.
chibenwa commented on PR #1452:
URL: https://github.com/apache/james-project/pull/1452#issuecomment-1439314690

   Hello @ottoka 
   
   Thank you for the contribution. I think this is a good middle term evolution proposal.
   
   My main remark regarding your design is that the execution of all tasks is kept on a single node, which, of course, put a constraint on scalability. I would see (longer term) executing tasks in parallel at the level of the cluster. Which means pushing the notion of AsyncSafeTask handling to the middleware layer (RabbitMQ) too.  I guess this could be an evolution of this work!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] ottoka commented on pull request #1452: JAMES-3890 Allow parallel execution of safe tasks

Posted by "ottoka (via GitHub)" <gi...@apache.org>.
ottoka commented on PR #1452:
URL: https://github.com/apache/james-project/pull/1452#issuecomment-1445129572

   > 15:13:47,387 [INFO] BUILD SUCCESS
   > Could not update commit status. Message: {"message":"API rate limit exceeded for user ID 40367165.","documentation_url":"https://docs.github.com/rest/overview/resources-in-the-rest-api#rate-limiting"}
   > Finished: SUCCESS
   
   Have not seen this before, but I will count it as a success :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] ottoka commented on pull request #1452: JAMES-3890 Allow parallel execution of safe tasks

Posted by "ottoka (via GitHub)" <gi...@apache.org>.
ottoka commented on PR #1452:
URL: https://github.com/apache/james-project/pull/1452#issuecomment-1442302333

   Sigh. Seems I need to adapt ALL the logic in SerialTaskManager.close(), lots of "interesting" side effects and races going on here. Let's see if the latest push fixes it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] ottoka commented on a diff in pull request #1452: JAMES-3890 Allow parallel execution of safe tasks

Posted by "ottoka (via GitHub)" <gi...@apache.org>.
ottoka commented on code in PR #1452:
URL: https://github.com/apache/james-project/pull/1452#discussion_r1115405925


##########
server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java:
##########
@@ -40,50 +40,66 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 import reactor.core.Disposable;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Scheduler;
 import reactor.core.scheduler.Schedulers;
-import reactor.util.function.Tuple2;
-import reactor.util.function.Tuples;
 
 public class SerialTaskManagerWorker implements TaskManagerWorker {
     private static final Logger LOGGER = LoggerFactory.getLogger(SerialTaskManagerWorker.class);
     public static final boolean MAY_INTERRUPT_IF_RUNNING = true;
 
     private final Scheduler taskExecutor;
+    private final Scheduler asyncTaskExecutor;
     private final Listener listener;
-    private final AtomicReference<Tuple2<TaskId, CompletableFuture>> runningTask;
+    private final Map<TaskId, CompletableFuture> runningTasks;
     private final Set<TaskId> cancelledTasks;
     private final Duration pollingInterval;
 
     public SerialTaskManagerWorker(Listener listener, Duration pollingInterval) {
         this.pollingInterval = pollingInterval;
         this.taskExecutor = Schedulers.fromExecutor(
             Executors.newSingleThreadExecutor(NamedThreadFactory.withName("task executor")));
+        this.asyncTaskExecutor = Schedulers.fromExecutor(
+            Executors.newCachedThreadPool(NamedThreadFactory.withName("async task executor")));

Review Comment:
   > How would we prevent 2 expensive tasks t be executed at the same time?
   I would trust a server admin to be sensible enough and not start multiple expensive tasks during peak business hours :) 



##########
server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java:
##########
@@ -40,50 +40,66 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 import reactor.core.Disposable;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Scheduler;
 import reactor.core.scheduler.Schedulers;
-import reactor.util.function.Tuple2;
-import reactor.util.function.Tuples;
 
 public class SerialTaskManagerWorker implements TaskManagerWorker {
     private static final Logger LOGGER = LoggerFactory.getLogger(SerialTaskManagerWorker.class);
     public static final boolean MAY_INTERRUPT_IF_RUNNING = true;
 
     private final Scheduler taskExecutor;
+    private final Scheduler asyncTaskExecutor;
     private final Listener listener;
-    private final AtomicReference<Tuple2<TaskId, CompletableFuture>> runningTask;
+    private final Map<TaskId, CompletableFuture> runningTasks;
     private final Set<TaskId> cancelledTasks;
     private final Duration pollingInterval;
 
     public SerialTaskManagerWorker(Listener listener, Duration pollingInterval) {
         this.pollingInterval = pollingInterval;
         this.taskExecutor = Schedulers.fromExecutor(
             Executors.newSingleThreadExecutor(NamedThreadFactory.withName("task executor")));
+        this.asyncTaskExecutor = Schedulers.fromExecutor(
+            Executors.newCachedThreadPool(NamedThreadFactory.withName("async task executor")));

Review Comment:
   > How would we prevent 2 expensive tasks t be executed at the same time?
   
   I would trust a server admin to be sensible enough and not start multiple expensive tasks during peak business hours :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] ottoka commented on pull request #1452: JAMES-3890 Allow parallel execution of safe tasks

Posted by "ottoka (via GitHub)" <gi...@apache.org>.
ottoka commented on PR #1452:
URL: https://github.com/apache/james-project/pull/1452#issuecomment-1438835202

   Out of curiosity: Given the serialized nature of task execution, when would executeTask() ever get to its "else" branch, i.e. the taskId is already in the cancelledTasks list?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] ottoka commented on a diff in pull request #1452: JAMES-3890 Allow parallel execution of safe tasks

Posted by "ottoka (via GitHub)" <gi...@apache.org>.
ottoka commented on code in PR #1452:
URL: https://github.com/apache/james-project/pull/1452#discussion_r1114180765


##########
server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java:
##########
@@ -247,6 +247,50 @@ void theWorkerShouldCancelAnInProgressTask() throws InterruptedException {
             .block()).isEmpty();
     }
 

Review Comment:
   I thought about that, but I fear such a test would be too unstable in general. The implementation only means that the tasks _can_ execute at the same time, it does not mean that they always have to _do_ so. If the machine is barely grinding along under heavy load, the tasks might very well run one after the other. Any test based on measurement of execution time overlap will fail in this case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] ottoka commented on a diff in pull request #1452: JAMES-3890 Allow parallel execution of safe tasks

Posted by "ottoka (via GitHub)" <gi...@apache.org>.
ottoka commented on code in PR #1452:
URL: https://github.com/apache/james-project/pull/1452#discussion_r1114140135


##########
server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java:
##########
@@ -144,8 +160,16 @@ private Mono<Task.Result> cancelled(TaskWithId taskWithId, Listener listener) {
             .thenReturn(Task.Result.PARTIAL);
     }
 
+    private Mono<Task.Result> getTaskMono(Task task) {
+        if (task instanceof AsyncSafeTask) {
+            return ((AsyncSafeTask) task).runAsync();
+        } else {
+            return Mono.fromCallable(task::run);
+        }
+    }

Review Comment:
   I am going with your default suggestion above, which will include the wrapper.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] ottoka commented on a diff in pull request #1452: JAMES-3890 Allow parallel execution of safe tasks

Posted by "ottoka (via GitHub)" <gi...@apache.org>.
ottoka commented on code in PR #1452:
URL: https://github.com/apache/james-project/pull/1452#discussion_r1115323555


##########
server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java:
##########
@@ -247,6 +247,50 @@ void theWorkerShouldCancelAnInProgressTask() throws InterruptedException {
             .block()).isEmpty();
     }
 

Review Comment:
   I see what you mean, that will be timing independent. I will add such a test then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] ottoka merged pull request #1452: JAMES-3890 Allow parallel execution of safe tasks

Posted by "ottoka (via GitHub)" <gi...@apache.org>.
ottoka merged PR #1452:
URL: https://github.com/apache/james-project/pull/1452


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] ottoka commented on pull request #1452: JAMES-3890 Allow parallel execution of safe tasks

Posted by "ottoka (via GitHub)" <gi...@apache.org>.
ottoka commented on PR #1452:
URL: https://github.com/apache/james-project/pull/1452#issuecomment-1441390353

   Latest push adds a test for async safe tasks running in partallel. It also fixes the failing latch tests by relaxing the SerialTaskManager.close() to its previous behavior. Now it only waits for cancel events again, instead of an empty running tasks list. Turns out the TaskManagerContract needs it this way. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] ottoka commented on pull request #1452: JAMES-3890 Allow parallel execution of safe tasks

Posted by "ottoka (via GitHub)" <gi...@apache.org>.
ottoka commented on PR #1452:
URL: https://github.com/apache/james-project/pull/1452#issuecomment-1443452786

   Rebased on current green master to hopefully pass CI build this time. With a change this fundamental I want to be sure.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] ottoka commented on pull request #1452: JAMES-3890 Allow parallel execution of safe tasks

Posted by "ottoka (via GitHub)" <gi...@apache.org>.
ottoka commented on PR #1452:
URL: https://github.com/apache/james-project/pull/1452#issuecomment-1439849289

   I agree about scalability, it would be nice if a safe async task could run on a different cluster node than the serialized regular tasks. But anything more advanced than this solution would require a fundamental re-thinking of the task system, regarding parallel execution and necessary exclusion - if there even _can be_ a general solution. And whatever we wind up with, still has to work on non-cluster deployments anyway.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org