You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ro...@apache.org on 2019/08/28 07:28:33 UTC

[james-project] 02/09: JAMES-2813 remove listener from submit task to the worker

This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 4289da532648f9145f585273607d58f09a4a2bbf
Author: RĂ©mi Kowalski <rk...@linagora.com>
AuthorDate: Wed Jul 31 18:00:48 2019 +0200

    JAMES-2813 remove listener from submit task to the worker
---
 .../org/apache/james/CassandraTaskManagerTest.java | 28 ++++++++-----
 .../apache/james/DistributedTaskManagerModule.java | 14 +++++++
 .../org/apache/james/task/MemoryTaskManager.java   | 43 +++++++++++---------
 .../org/apache/james/task/MemoryWorkQueue.java     | 16 +++-----
 .../apache/james/task/SerialTaskManagerWorker.java | 24 ++++++-----
 .../org/apache/james/task/TaskManagerWorker.java   | 12 +++---
 .../main/java/org/apache/james/task/WorkQueue.java |  2 +-
 .../eventsourcing/EventSourcingTaskManager.scala   | 47 +++++++++++-----------
 .../task/eventsourcing/WorkQueueSupplier.scala}    | 23 +++++------
 .../task/eventsourcing/WorkerStatusListener.scala  | 46 ++++++++++-----------
 .../james/task/SerialTaskManagerWorkerTest.java    | 42 +++++++++----------
 .../EventSourcingTaskManagerTest.java              | 10 +++--
 12 files changed, 169 insertions(+), 138 deletions(-)

diff --git a/server/container/guice/cassandra-guice/src/test/java/org/apache/james/CassandraTaskManagerTest.java b/server/container/guice/cassandra-guice/src/test/java/org/apache/james/CassandraTaskManagerTest.java
index 82c3f2c..72cd9ff 100644
--- a/server/container/guice/cassandra-guice/src/test/java/org/apache/james/CassandraTaskManagerTest.java
+++ b/server/container/guice/cassandra-guice/src/test/java/org/apache/james/CassandraTaskManagerTest.java
@@ -19,6 +19,11 @@
 
 package org.apache.james;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.CassandraClusterExtension;
 import org.apache.james.backends.cassandra.components.CassandraModule;
@@ -31,27 +36,26 @@ import org.apache.james.eventsourcing.eventstore.cassandra.dto.EventDTOModule;
 import org.apache.james.server.task.json.JsonTaskSerializer;
 import org.apache.james.server.task.json.dto.TestTaskDTOModules;
 import org.apache.james.task.CompletedTask;
+import org.apache.james.task.MemoryWorkQueue;
+import org.apache.james.task.SerialTaskManagerWorker;
 import org.apache.james.task.TaskExecutionDetails;
 import org.apache.james.task.TaskId;
 import org.apache.james.task.TaskManager;
+import org.apache.james.task.TaskManagerWorker;
 import org.apache.james.task.eventsourcing.EventSourcingTaskManager;
 import org.apache.james.task.eventsourcing.TaskExecutionDetailsProjection;
+import org.apache.james.task.eventsourcing.WorkQueueSupplier;
+import org.apache.james.task.eventsourcing.WorkerStatusListener;
 import org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjection;
 import org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjectionDAO;
 import org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjectionModule;
 import org.apache.james.task.eventsourcing.cassandra.TasksSerializationModule;
-
-import com.github.steveash.guavate.Guavate;
 import org.awaitility.Awaitility;
 import org.awaitility.Duration;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import static org.assertj.core.api.Assertions.assertThat;
+import com.github.steveash.guavate.Guavate;
 
 class CassandraTaskManagerTest {
     private static final JsonTaskSerializer TASK_SERIALIZER = new JsonTaskSerializer(TestTaskDTOModules.COMPLETED_TASK_MODULE);
@@ -73,8 +77,14 @@ class CassandraTaskManagerTest {
         CassandraCluster cassandra = cassandraCluster.getCassandraCluster();
         CassandraTaskExecutionDetailsProjectionDAO cassandraTaskExecutionDetailsProjectionDAO = new CassandraTaskExecutionDetailsProjectionDAO(cassandra.getConf(), cassandra.getTypesProvider());
         TaskExecutionDetailsProjection executionDetailsProjection = new CassandraTaskExecutionDetailsProjection(cassandraTaskExecutionDetailsProjectionDAO);
-        TaskManager taskManager1 = new EventSourcingTaskManager(eventStore, executionDetailsProjection);
-        TaskManager taskManager2 = new EventSourcingTaskManager(eventStore, executionDetailsProjection);
+
+        WorkQueueSupplier workQueueSupplier = eventSourcingSystem -> {
+            WorkerStatusListener listener = new WorkerStatusListener(eventSourcingSystem);
+            TaskManagerWorker worker = new SerialTaskManagerWorker(listener);
+            return new MemoryWorkQueue(worker);
+        };
+        TaskManager taskManager1 = new EventSourcingTaskManager(workQueueSupplier, eventStore, executionDetailsProjection);
+        TaskManager taskManager2 = new EventSourcingTaskManager(workQueueSupplier, eventStore, executionDetailsProjection);
 
         TaskId taskId = taskManager1.submit(new CompletedTask());
         Awaitility.await()
diff --git a/server/container/guice/cassandra-guice/src/test/java/org/apache/james/DistributedTaskManagerModule.java b/server/container/guice/cassandra-guice/src/test/java/org/apache/james/DistributedTaskManagerModule.java
index 7a6391f..6f1a294 100644
--- a/server/container/guice/cassandra-guice/src/test/java/org/apache/james/DistributedTaskManagerModule.java
+++ b/server/container/guice/cassandra-guice/src/test/java/org/apache/james/DistributedTaskManagerModule.java
@@ -20,20 +20,34 @@
 
 package org.apache.james;
 
+import org.apache.james.task.MemoryWorkQueue;
+import org.apache.james.task.SerialTaskManagerWorker;
 import org.apache.james.task.TaskManager;
+import org.apache.james.task.TaskManagerWorker;
 import org.apache.james.task.eventsourcing.EventSourcingTaskManager;
 import org.apache.james.task.eventsourcing.TaskExecutionDetailsProjection;
+import org.apache.james.task.eventsourcing.WorkQueueSupplier;
+import org.apache.james.task.eventsourcing.WorkerStatusListener;
 import org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjection;
 
 import com.google.inject.AbstractModule;
 import com.google.inject.Scopes;
 
 public class DistributedTaskManagerModule extends AbstractModule {
+
+    public static final WorkQueueSupplier workQueueSupplier = eventSourcingSystem -> {
+        WorkerStatusListener listener = new WorkerStatusListener(eventSourcingSystem);
+        TaskManagerWorker worker = new SerialTaskManagerWorker(listener);
+        return new MemoryWorkQueue(worker);
+    };
+
     @Override
     protected void configure() {
         bind(TaskExecutionDetailsProjection.class).in(Scopes.SINGLETON);
         bind(TaskManager.class).in(Scopes.SINGLETON);
+        bind(WorkQueueSupplier.class).in(Scopes.SINGLETON);
         bind(TaskExecutionDetailsProjection.class).to(CassandraTaskExecutionDetailsProjection.class);
         bind(TaskManager.class).to(EventSourcingTaskManager.class);
+        bind(WorkQueueSupplier.class).toInstance(workQueueSupplier);
     }
 }
diff --git a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
index ad7bd9d..f972e39 100644
--- a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
+++ b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
@@ -36,38 +36,45 @@ import reactor.core.scheduler.Schedulers;
 
 public class MemoryTaskManager implements TaskManager {
 
+    @FunctionalInterface
+    private interface TaskExecutionDetailsUpdaterFactory {
+        Consumer<TaskExecutionDetailsUpdater> apply(TaskId taskId);
+    }
+
     private static class DetailsUpdater implements TaskManagerWorker.Listener {
 
-        private final Consumer<TaskExecutionDetailsUpdater> updater;
+        private final TaskExecutionDetailsUpdaterFactory updaterFactory;
 
-        DetailsUpdater(Consumer<TaskExecutionDetailsUpdater> updater) {
-            this.updater = updater;
+        DetailsUpdater(TaskExecutionDetailsUpdaterFactory updaterFactory) {
+            this.updaterFactory = updaterFactory;
         }
 
         @Override
-        public void started() {
-            updater.accept(TaskExecutionDetails::started);
+        public void started(TaskId taskId) {
+            updaterFactory.apply(taskId).accept(TaskExecutionDetails::started);
         }
 
         @Override
-        public void completed(Task.Result result) {
-            updater.accept(TaskExecutionDetails::completed);
-
+        public void completed(TaskId taskId, Task.Result result) {
+            updaterFactory.apply(taskId)
+                .accept(TaskExecutionDetails::completed);
         }
 
         @Override
-        public void failed(Throwable t) {
-            failed();
+        public void failed(TaskId taskId, Throwable t) {
+            failed(taskId);
         }
 
         @Override
-        public void failed() {
-            updater.accept(TaskExecutionDetails::failed);
+        public void failed(TaskId taskId) {
+            updaterFactory.apply(taskId)
+                .accept(TaskExecutionDetails::failed);
         }
 
         @Override
-        public void cancelled() {
-            updater.accept(TaskExecutionDetails::cancelEffectively);
+        public void cancelled(TaskId taskId) {
+            updaterFactory.apply(taskId)
+                .accept(TaskExecutionDetails::cancelEffectively);
         }
     }
 
@@ -80,7 +87,7 @@ public class MemoryTaskManager implements TaskManager {
     public MemoryTaskManager() {
 
         idToExecutionDetails = new ConcurrentHashMap<>();
-        worker = new SerialTaskManagerWorker();
+        worker = new SerialTaskManagerWorker(updater());
         workQueue = new MemoryWorkQueue(worker);
     }
 
@@ -88,7 +95,7 @@ public class MemoryTaskManager implements TaskManager {
         TaskId taskId = TaskId.generateTaskId();
         TaskExecutionDetails executionDetails = TaskExecutionDetails.from(task, taskId);
         idToExecutionDetails.put(taskId, executionDetails);
-        workQueue.submit(new TaskWithId(taskId, task), updater(taskId));
+        workQueue.submit(new TaskWithId(taskId, task));
         return taskId;
     }
 
@@ -148,8 +155,8 @@ public class MemoryTaskManager implements TaskManager {
         }
     }
 
-    private DetailsUpdater updater(TaskId id) {
-        return new DetailsUpdater(updateDetails(id));
+    private DetailsUpdater updater() {
+        return new DetailsUpdater(this::updateDetails);
     }
 
     private Consumer<TaskExecutionDetailsUpdater> updateDetails(TaskId taskId) {
diff --git a/server/task/src/main/java/org/apache/james/task/MemoryWorkQueue.java b/server/task/src/main/java/org/apache/james/task/MemoryWorkQueue.java
index 8a720e6..cea8b04 100644
--- a/server/task/src/main/java/org/apache/james/task/MemoryWorkQueue.java
+++ b/server/task/src/main/java/org/apache/james/task/MemoryWorkQueue.java
@@ -25,14 +25,12 @@ import java.util.concurrent.LinkedBlockingQueue;
 import reactor.core.Disposable;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
-import reactor.util.function.Tuple2;
-import reactor.util.function.Tuples;
 
 public class MemoryWorkQueue implements WorkQueue {
 
     private final TaskManagerWorker worker;
     private final Disposable subscription;
-    private final LinkedBlockingQueue<Tuple2<TaskWithId, TaskManagerWorker.Listener>> tasks;
+    private final LinkedBlockingQueue<TaskWithId> tasks;
 
     public MemoryWorkQueue(TaskManagerWorker worker) {
         this.worker = worker;
@@ -44,17 +42,15 @@ public class MemoryWorkQueue implements WorkQueue {
             .subscribe();
     }
 
-    private Mono<?> dispatchTaskToWorker(Tuple2<TaskWithId, TaskManagerWorker.Listener> tuple) {
-        TaskWithId taskWithId = tuple.getT1();
-        TaskManagerWorker.Listener listener = tuple.getT2();
-        return worker.executeTask(taskWithId, listener);
+    private Mono<?> dispatchTaskToWorker(TaskWithId taskWithId) {
+        return worker.executeTask(taskWithId);
     }
 
-    public void submit(TaskWithId taskWithId, TaskManagerWorker.Listener listener) {
+    public void submit(TaskWithId taskWithId) {
         try {
-            tasks.put(Tuples.of(taskWithId, listener));
+            tasks.put(taskWithId);
         } catch (InterruptedException e) {
-            listener.cancelled();
+            worker.cancelTask(taskWithId.getId());
         }
     }
 
diff --git a/server/task/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java b/server/task/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
index 4736372..afb7a39 100644
--- a/server/task/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
+++ b/server/task/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
@@ -45,19 +45,21 @@ public class SerialTaskManagerWorker implements TaskManagerWorker {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(SerialTaskManagerWorker.class);
     private final ExecutorService taskExecutor;
+    private final Listener listener;
     private final AtomicReference<Tuple2<TaskId, Future<?>>> runningTask;
     private final Semaphore semaphore;
     private final Set<TaskId> cancelledTasks;
 
-    public SerialTaskManagerWorker() {
+    public SerialTaskManagerWorker(Listener listener) {
         this.taskExecutor = Executors.newSingleThreadExecutor(NamedThreadFactory.withName("task executor"));
+        this.listener = listener;
         this.cancelledTasks = Sets.newConcurrentHashSet();
         this.runningTask = new AtomicReference<>();
         this.semaphore = new Semaphore(1);
     }
 
     @Override
-    public Mono<Task.Result> executeTask(TaskWithId taskWithId, Listener listener) {
+    public Mono<Task.Result> executeTask(TaskWithId taskWithId) {
             return Mono
                 .using(
                     acquireSemaphore(taskWithId, listener),
@@ -72,7 +74,7 @@ public class SerialTaskManagerWorker implements TaskManagerWorker {
                     semaphore.acquire();
                     return semaphore;
                 } catch (InterruptedException e) {
-                    listener.cancelled();
+                    listener.cancelled(taskWithId.getId());
                     throw e;
                 }
             };
@@ -87,14 +89,14 @@ public class SerialTaskManagerWorker implements TaskManagerWorker {
                 return Mono.fromFuture(future)
                         .doOnError(exception -> {
                             if (exception instanceof CancellationException) {
-                                listener.cancelled();
+                                listener.cancelled(taskWithId.getId());
                             } else {
-                                listener.failed(exception);
+                                listener.failed(taskWithId.getId(), exception);
                             }
                         })
                         .onErrorReturn(Task.Result.PARTIAL);
             } else {
-                listener.cancelled();
+                listener.cancelled(taskWithId.getId());
                 return Mono.empty();
             }
         };
@@ -110,21 +112,21 @@ public class SerialTaskManagerWorker implements TaskManagerWorker {
     }
 
     private Task.Result run(TaskWithId taskWithId, Listener listener) {
-        listener.started();
+        listener.started(taskWithId.getId());
         try {
             return taskWithId.getTask()
                 .run()
-                .onComplete(listener::completed)
+                .onComplete(result -> listener.completed(taskWithId.getId(), result))
                 .onFailure(() -> {
                     LOGGER.error("Task was partially performed. Check logs for more details. Taskid : " + taskWithId.getId());
-                    listener.failed();
+                    listener.failed(taskWithId.getId());
                 });
         } catch (InterruptedException e) {
-            listener.cancelled();
+            listener.cancelled(taskWithId.getId());
             return Task.Result.PARTIAL;
         } catch (Exception e) {
             LOGGER.error("Error while running task {}", taskWithId.getId(), e);
-            listener.failed(e);
+            listener.failed(taskWithId.getId(), e);
             return Task.Result.PARTIAL;
         }
     }
diff --git a/server/task/src/main/java/org/apache/james/task/TaskManagerWorker.java b/server/task/src/main/java/org/apache/james/task/TaskManagerWorker.java
index cacc6c8..4780cc7 100644
--- a/server/task/src/main/java/org/apache/james/task/TaskManagerWorker.java
+++ b/server/task/src/main/java/org/apache/james/task/TaskManagerWorker.java
@@ -25,18 +25,18 @@ import reactor.core.publisher.Mono;
 public interface TaskManagerWorker extends Closeable {
 
     interface Listener {
-        void started();
+        void started(TaskId taskId);
 
-        void completed(Task.Result result);
+        void completed(TaskId taskId, Task.Result result);
 
-        void failed(Throwable t);
+        void failed(TaskId taskId, Throwable t);
 
-        void failed();
+        void failed(TaskId taskId);
 
-        void cancelled();
+        void cancelled(TaskId taskId);
     }
 
-    Mono<Task.Result> executeTask(TaskWithId taskWithId, Listener listener);
+    Mono<Task.Result> executeTask(TaskWithId taskWithId);
 
     void cancelTask(TaskId taskId);
 }
diff --git a/server/task/src/main/java/org/apache/james/task/WorkQueue.java b/server/task/src/main/java/org/apache/james/task/WorkQueue.java
index 7c8c70a..ae363ff 100644
--- a/server/task/src/main/java/org/apache/james/task/WorkQueue.java
+++ b/server/task/src/main/java/org/apache/james/task/WorkQueue.java
@@ -22,7 +22,7 @@ import java.io.Closeable;
 
 public interface WorkQueue extends Closeable {
 
-    void submit(TaskWithId taskWithId, TaskManagerWorker.Listener listener);
+    void submit(TaskWithId taskWithId);
 
     void cancel(TaskId taskId);
 }
diff --git a/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala b/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
index 0569dcc..34f55cf 100644
--- a/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
+++ b/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
@@ -1,21 +1,21 @@
 /** **************************************************************
-  * Licensed to the Apache Software Foundation (ASF) under one   *
-  * or more contributor license agreements.  See the NOTICE file *
-  * distributed with this work for additional information        *
-  * regarding copyright ownership.  The ASF licenses this file   *
-  * to you under the Apache License, Version 2.0 (the            *
-  * "License"); you may not use this file except in compliance   *
-  * with the License.  You may obtain a copy of the License at   *
-  * *
-  * http://www.apache.org/licenses/LICENSE-2.0                 *
-  * *
-  * Unless required by applicable law or agreed to in writing,   *
-  * software distributed under the License is distributed on an  *
-  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
-  * KIND, either express or implied.  See the License for the    *
-  * specific language governing permissions and limitations      *
-  * under the License.                                           *
-  * ***************************************************************/
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0                 *
+ * *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ * ***************************************************************/
 package org.apache.james.task.eventsourcing
 
 import java.io.Closeable
@@ -23,25 +23,24 @@ import java.util
 
 import com.google.common.annotations.VisibleForTesting
 import javax.inject.Inject
-import org.apache.james.eventsourcing.{AggregateId, Subscriber}
 import org.apache.james.eventsourcing.eventstore.{EventStore, History}
+import org.apache.james.eventsourcing.{AggregateId, Subscriber}
 import org.apache.james.task._
 import org.apache.james.task.eventsourcing.TaskCommand._
 
 import scala.annotation.tailrec
 
 class EventSourcingTaskManager @Inject @VisibleForTesting private[eventsourcing](
-                                                                                 val serialTaskManagerWorker: TaskManagerWorker,
-                                                                                 val workQueue: WorkQueue,
-                                                                                 val eventStore: EventStore,
-                                                                                 val executionDetailsProjection: TaskExecutionDetailsProjection) extends TaskManager with Closeable {
+                                                                                  workQueueSupplier: WorkQueueSupplier,
+                                                                                  val eventStore: EventStore,
+                                                                                  val executionDetailsProjection: TaskExecutionDetailsProjection) extends TaskManager with Closeable {
 
   private val delayBetweenPollingInMs = 500
 
   private def workDispatcher: Subscriber = {
     case Created(aggregateId, _, task) =>
       val taskWithId = new TaskWithId(aggregateId.taskId, task)
-      workQueue.submit(taskWithId, new WorkerStatusListener(taskWithId.getId, eventSourcingSystem))
+      workQueue.submit(taskWithId)
     case CancelRequested(aggregateId, _) =>
       workQueue.cancel(aggregateId.taskId)
     case _ =>
@@ -63,6 +62,8 @@ class EventSourcingTaskManager @Inject @VisibleForTesting private[eventsourcing]
       workDispatcher),
     eventStore = eventStore)
 
+  private val workQueue: WorkQueue = workQueueSupplier(eventSourcingSystem)
+
   override def submit(task: Task): TaskId = {
     val taskId = TaskId.generateTaskId
     val command = Create(taskId, task)
diff --git a/server/task/src/main/java/org/apache/james/task/WorkQueue.java b/server/task/src/main/scala/org/apache/james/task/eventsourcing/WorkQueueSupplier.scala
similarity index 61%
copy from server/task/src/main/java/org/apache/james/task/WorkQueue.java
copy to server/task/src/main/scala/org/apache/james/task/eventsourcing/WorkQueueSupplier.scala
index 7c8c70a..0265359 100644
--- a/server/task/src/main/java/org/apache/james/task/WorkQueue.java
+++ b/server/task/src/main/scala/org/apache/james/task/eventsourcing/WorkQueueSupplier.scala
@@ -1,4 +1,4 @@
-/****************************************************************
+/** **************************************************************
  * Licensed to the Apache Software Foundation (ASF) under one   *
  * or more contributor license agreements.  See the NOTICE file *
  * distributed with this work for additional information        *
@@ -6,23 +6,22 @@
  * to you under the Apache License, Version 2.0 (the            *
  * "License"); you may not use this file except in compliance   *
  * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0                 *
+ * *
  * Unless required by applicable law or agreed to in writing,   *
  * software distributed under the License is distributed on an  *
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
  * KIND, either express or implied.  See the License for the    *
  * specific language governing permissions and limitations      *
  * under the License.                                           *
- ****************************************************************/
-package org.apache.james.task;
+ * ***************************************************************/
+package org.apache.james.task.eventsourcing
 
-import java.io.Closeable;
+import org.apache.james.eventsourcing.EventSourcingSystem
+import org.apache.james.task.WorkQueue
 
-public interface WorkQueue extends Closeable {
-
-    void submit(TaskWithId taskWithId, TaskManagerWorker.Listener listener);
-
-    void cancel(TaskId taskId);
+@FunctionalInterface
+trait WorkQueueSupplier {
+  def apply(eventSourcingSystem: EventSourcingSystem): WorkQueue
 }
diff --git a/server/task/src/main/scala/org/apache/james/task/eventsourcing/WorkerStatusListener.scala b/server/task/src/main/scala/org/apache/james/task/eventsourcing/WorkerStatusListener.scala
index 7e1ca87..2b08857 100644
--- a/server/task/src/main/scala/org/apache/james/task/eventsourcing/WorkerStatusListener.scala
+++ b/server/task/src/main/scala/org/apache/james/task/eventsourcing/WorkerStatusListener.scala
@@ -1,21 +1,21 @@
 /** **************************************************************
-  * Licensed to the Apache Software Foundation (ASF) under one   *
-  * or more contributor license agreements.  See the NOTICE file *
-  * distributed with this work for additional information        *
-  * regarding copyright ownership.  The ASF licenses this file   *
-  * to you under the Apache License, Version 2.0 (the            *
-  * "License"); you may not use this file except in compliance   *
-  * with the License.  You may obtain a copy of the License at   *
-  * *
-  * http://www.apache.org/licenses/LICENSE-2.0                 *
-  * *
-  * Unless required by applicable law or agreed to in writing,   *
-  * software distributed under the License is distributed on an  *
-  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
-  * KIND, either express or implied.  See the License for the    *
-  * specific language governing permissions and limitations      *
-  * under the License.                                           *
-  * ***************************************************************/
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0                 *
+ * *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ * ***************************************************************/
 
 package org.apache.james.task.eventsourcing
 
@@ -24,15 +24,15 @@ import org.apache.james.task.Task.Result
 import org.apache.james.task.eventsourcing.TaskCommand._
 import org.apache.james.task.{TaskId, TaskManagerWorker}
 
-class WorkerStatusListener(taskId: TaskId, eventSourcingSystem: EventSourcingSystem) extends TaskManagerWorker.Listener {
+case class WorkerStatusListener(eventSourcingSystem: EventSourcingSystem) extends TaskManagerWorker.Listener {
 
-  override def started(): Unit = eventSourcingSystem.dispatch(Start(taskId))
+  override def started(taskId: TaskId): Unit = eventSourcingSystem.dispatch(Start(taskId))
 
-  override def completed(result: Result): Unit = eventSourcingSystem.dispatch(Complete(taskId, result))
+  override def completed(taskId: TaskId, result: Result): Unit = eventSourcingSystem.dispatch(Complete(taskId, result))
 
-  override def failed(t: Throwable): Unit = eventSourcingSystem.dispatch(Fail(taskId))
+  override def failed(taskId: TaskId, t: Throwable): Unit = eventSourcingSystem.dispatch(Fail(taskId))
 
-  override def failed(): Unit = eventSourcingSystem.dispatch(Fail(taskId))
+  override def failed(taskId: TaskId): Unit = eventSourcingSystem.dispatch(Fail(taskId))
 
-  override def cancelled(): Unit = eventSourcingSystem.dispatch(Cancel(taskId))
+  override def cancelled(taskId: TaskId): Unit = eventSourcingSystem.dispatch(Cancel(taskId))
 }
\ No newline at end of file
diff --git a/server/task/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java b/server/task/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
index 2aa1dc0..522ed0c 100644
--- a/server/task/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
+++ b/server/task/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
@@ -20,6 +20,7 @@ package org.apache.james.task;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
@@ -32,18 +33,25 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.awaitility.Awaitility;
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import reactor.core.publisher.Mono;
 
 class SerialTaskManagerWorkerTest {
-
-    private final SerialTaskManagerWorker worker = new SerialTaskManagerWorker();
+    private TaskManagerWorker.Listener listener;
+    private SerialTaskManagerWorker worker;
 
     private final Task successfulTask = new CompletedTask();
     private final Task failedTask = new FailedTask();
     private final Task throwingTask = new ThrowingTask();
 
+    @BeforeEach
+    void beforeEach() {
+        listener = mock(TaskManagerWorker.Listener.class);
+        worker = new SerialTaskManagerWorker(listener);
+    }
+
     @AfterEach
     void tearDown() throws IOException {
         worker.close();
@@ -53,35 +61,31 @@ class SerialTaskManagerWorkerTest {
     void aSuccessfullTaskShouldCompleteSuccessfully() {
         TaskWithId taskWithId = new TaskWithId(TaskId.generateTaskId(), this.successfulTask);
 
-        TaskManagerWorker.Listener listener = mock(TaskManagerWorker.Listener.class);
-
-        Mono<Task.Result> result = worker.executeTask(taskWithId, listener);
+        Mono<Task.Result> result = worker.executeTask(taskWithId);
 
         assertThat(result.block()).isEqualTo(Task.Result.COMPLETED);
 
-        verify(listener, atLeastOnce()).completed(Task.Result.COMPLETED);
+        verify(listener, atLeastOnce()).completed(taskWithId.getId(), Task.Result.COMPLETED);
     }
 
     @Test
     void aFailedTaskShouldCompleteWithFailedStatus() {
         TaskWithId taskWithId = new TaskWithId(TaskId.generateTaskId(), failedTask);
-        TaskManagerWorker.Listener listener = mock(TaskManagerWorker.Listener.class);
 
-        Mono<Task.Result> result = worker.executeTask(taskWithId, listener);
+        Mono<Task.Result> result = worker.executeTask(taskWithId);
 
         assertThat(result.block()).isEqualTo(Task.Result.PARTIAL);
-        verify(listener, atLeastOnce()).failed();
+        verify(listener, atLeastOnce()).failed(taskWithId.getId());
     }
 
     @Test
     void aThrowingTaskShouldCompleteWithFailedStatus() {
         TaskWithId taskWithId = new TaskWithId(TaskId.generateTaskId(), throwingTask);
-        TaskManagerWorker.Listener listener = mock(TaskManagerWorker.Listener.class);
 
-        Mono<Task.Result> result = worker.executeTask(taskWithId, listener);
+        Mono<Task.Result> result = worker.executeTask(taskWithId);
 
         assertThat(result.block()).isEqualTo(Task.Result.PARTIAL);
-        verify(listener, atLeastOnce()).failed(any(RuntimeException.class));
+        verify(listener, atLeastOnce()).failed(eq(taskWithId.getId()), any(RuntimeException.class));
     }
 
     @Test
@@ -98,12 +102,10 @@ class SerialTaskManagerWorkerTest {
 
         TaskWithId taskWithId = new TaskWithId(id, inProgressTask);
 
-        TaskManagerWorker.Listener listener = mock(TaskManagerWorker.Listener.class);
-
-        worker.executeTask(taskWithId, listener).subscribe();
+        worker.executeTask(taskWithId).subscribe();
 
         await(taskLaunched);
-        verify(listener, atLeastOnce()).started();
+        verify(listener, atLeastOnce()).started(id);
         verifyNoMoreInteractions(listener);
         latch.countDown();
     }
@@ -122,19 +124,17 @@ class SerialTaskManagerWorkerTest {
 
         TaskWithId taskWithId = new TaskWithId(id, inProgressTask);
 
-        TaskManagerWorker.Listener listener = mock(TaskManagerWorker.Listener.class);
-
-        Mono<Task.Result> resultMono = worker.executeTask(taskWithId, listener).cache();
+        Mono<Task.Result> resultMono = worker.executeTask(taskWithId).cache();
         resultMono.subscribe();
 
         Awaitility.waitAtMost(org.awaitility.Duration.TEN_SECONDS)
-            .untilAsserted(() -> verify(listener, atLeastOnce()).started());
+            .untilAsserted(() -> verify(listener, atLeastOnce()).started(id));
 
         worker.cancelTask(id);
 
         resultMono.block(Duration.ofSeconds(10));
 
-        verify(listener, atLeastOnce()).cancelled();
+        verify(listener, atLeastOnce()).cancelled(id);
         verifyNoMoreInteractions(listener);
     }
 
diff --git a/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java b/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java
index 057462d..63c62b0 100644
--- a/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java
+++ b/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java
@@ -27,7 +27,6 @@ import org.apache.james.task.SerialTaskManagerWorker;
 import org.apache.james.task.TaskManager;
 import org.apache.james.task.TaskManagerContract;
 import org.apache.james.task.TaskManagerWorker;
-import org.apache.james.task.WorkQueue;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -39,11 +38,14 @@ class EventSourcingTaskManagerTest implements TaskManagerContract {
 
     @BeforeEach
     void setUp() {
-        TaskManagerWorker worker = new SerialTaskManagerWorker();
-        WorkQueue workQueue = new MemoryWorkQueue(worker);
         EventStore eventStore = new InMemoryEventStore();
         TaskExecutionDetailsProjection executionDetailsProjection = new MemoryTaskExecutionDetailsProjection();
-        taskManager = new EventSourcingTaskManager(worker, workQueue, eventStore, executionDetailsProjection);
+        WorkQueueSupplier workQueueSupplier = eventSourcingSystem -> {
+            WorkerStatusListener listener = new WorkerStatusListener(eventSourcingSystem);
+            TaskManagerWorker worker = new SerialTaskManagerWorker(listener);
+            return new MemoryWorkQueue(worker);
+        };
+        taskManager = new EventSourcingTaskManager(workQueueSupplier, eventStore, executionDetailsProjection);
     }
 
     @AfterEach


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org