You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ro...@apache.org on 2019/08/28 07:28:32 UTC

[james-project] 01/09: JAMES-2813 extract WorkQueue interface

This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 0cd90e1752c0a384c9ff8e65590560454ee67e97
Author: RĂ©mi Kowalski <rk...@linagora.com>
AuthorDate: Wed Jul 31 17:14:20 2019 +0200

    JAMES-2813 extract WorkQueue interface
---
 .../org/apache/james/task/MemoryTaskManager.java   |  2 +-
 .../task/{WorkQueue.java => MemoryWorkQueue.java}  | 13 +----
 .../main/java/org/apache/james/task/WorkQueue.java | 62 ++--------------------
 .../eventsourcing/EventSourcingTaskManager.scala   |  6 ++-
 .../EventSourcingTaskManagerTest.java              |  8 ++-
 5 files changed, 17 insertions(+), 74 deletions(-)

diff --git a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
index 2ac480e..ad7bd9d 100644
--- a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
+++ b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
@@ -81,7 +81,7 @@ public class MemoryTaskManager implements TaskManager {
 
         idToExecutionDetails = new ConcurrentHashMap<>();
         worker = new SerialTaskManagerWorker();
-        workQueue = WorkQueue.builder().worker(worker);
+        workQueue = new MemoryWorkQueue(worker);
     }
 
     public TaskId submit(Task task) {
diff --git a/server/task/src/main/java/org/apache/james/task/WorkQueue.java b/server/task/src/main/java/org/apache/james/task/MemoryWorkQueue.java
similarity index 90%
copy from server/task/src/main/java/org/apache/james/task/WorkQueue.java
copy to server/task/src/main/java/org/apache/james/task/MemoryWorkQueue.java
index e2e7035..8a720e6 100644
--- a/server/task/src/main/java/org/apache/james/task/WorkQueue.java
+++ b/server/task/src/main/java/org/apache/james/task/MemoryWorkQueue.java
@@ -19,7 +19,6 @@
 
 package org.apache.james.task;
 
-import java.io.Closeable;
 import java.io.IOException;
 import java.util.concurrent.LinkedBlockingQueue;
 
@@ -29,21 +28,13 @@ import reactor.core.scheduler.Schedulers;
 import reactor.util.function.Tuple2;
 import reactor.util.function.Tuples;
 
-public class WorkQueue implements Closeable {
-
-    public static RequireWorker builder() {
-        return WorkQueue::new;
-    }
-
-    public interface RequireWorker {
-        WorkQueue worker(TaskManagerWorker worker);
-    }
+public class MemoryWorkQueue implements WorkQueue {
 
     private final TaskManagerWorker worker;
     private final Disposable subscription;
     private final LinkedBlockingQueue<Tuple2<TaskWithId, TaskManagerWorker.Listener>> tasks;
 
-    private WorkQueue(TaskManagerWorker worker) {
+    public MemoryWorkQueue(TaskManagerWorker worker) {
         this.worker = worker;
         this.tasks = new LinkedBlockingQueue<>();
         this.subscription = Mono.fromCallable(tasks::take)
diff --git a/server/task/src/main/java/org/apache/james/task/WorkQueue.java b/server/task/src/main/java/org/apache/james/task/WorkQueue.java
index e2e7035..7c8c70a 100644
--- a/server/task/src/main/java/org/apache/james/task/WorkQueue.java
+++ b/server/task/src/main/java/org/apache/james/task/WorkQueue.java
@@ -16,69 +16,13 @@
  * specific language governing permissions and limitations      *
  * under the License.                                           *
  ****************************************************************/
-
 package org.apache.james.task;
 
 import java.io.Closeable;
-import java.io.IOException;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import reactor.core.Disposable;
-import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Schedulers;
-import reactor.util.function.Tuple2;
-import reactor.util.function.Tuples;
-
-public class WorkQueue implements Closeable {
-
-    public static RequireWorker builder() {
-        return WorkQueue::new;
-    }
-
-    public interface RequireWorker {
-        WorkQueue worker(TaskManagerWorker worker);
-    }
-
-    private final TaskManagerWorker worker;
-    private final Disposable subscription;
-    private final LinkedBlockingQueue<Tuple2<TaskWithId, TaskManagerWorker.Listener>> tasks;
-
-    private WorkQueue(TaskManagerWorker worker) {
-        this.worker = worker;
-        this.tasks = new LinkedBlockingQueue<>();
-        this.subscription = Mono.fromCallable(tasks::take)
-            .repeat()
-            .subscribeOn(Schedulers.elastic())
-            .flatMapSequential(this::dispatchTaskToWorker)
-            .subscribe();
-    }
-
-    private Mono<?> dispatchTaskToWorker(Tuple2<TaskWithId, TaskManagerWorker.Listener> tuple) {
-        TaskWithId taskWithId = tuple.getT1();
-        TaskManagerWorker.Listener listener = tuple.getT2();
-        return worker.executeTask(taskWithId, listener);
-    }
-
-    public void submit(TaskWithId taskWithId, TaskManagerWorker.Listener listener) {
-        try {
-            tasks.put(Tuples.of(taskWithId, listener));
-        } catch (InterruptedException e) {
-            listener.cancelled();
-        }
-    }
 
-    public void cancel(TaskId taskId) {
-        worker.cancelTask(taskId);
-    }
+public interface WorkQueue extends Closeable {
 
-    @Override
-    public void close() throws IOException {
-        try {
-            subscription.dispose();
-        } catch (Throwable ignore) {
-            //avoid failing during close
-        }
-        worker.close();
-    }
+    void submit(TaskWithId taskWithId, TaskManagerWorker.Listener listener);
 
+    void cancel(TaskId taskId);
 }
diff --git a/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala b/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
index 17c9ebd..0569dcc 100644
--- a/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
+++ b/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
@@ -30,10 +30,12 @@ import org.apache.james.task.eventsourcing.TaskCommand._
 
 import scala.annotation.tailrec
 
-class EventSourcingTaskManager @Inject @VisibleForTesting private[eventsourcing](val eventStore: EventStore,
+class EventSourcingTaskManager @Inject @VisibleForTesting private[eventsourcing](
+                                                                                 val serialTaskManagerWorker: TaskManagerWorker,
+                                                                                 val workQueue: WorkQueue,
+                                                                                 val eventStore: EventStore,
                                                                                  val executionDetailsProjection: TaskExecutionDetailsProjection) extends TaskManager with Closeable {
 
-  private val workQueue: WorkQueue = WorkQueue.builder().worker(new SerialTaskManagerWorker)
   private val delayBetweenPollingInMs = 500
 
   private def workDispatcher: Subscriber = {
diff --git a/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java b/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java
index b017721..057462d 100644
--- a/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java
+++ b/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java
@@ -22,8 +22,12 @@ package org.apache.james.task.eventsourcing;
 import org.apache.james.eventsourcing.eventstore.EventStore;
 import org.apache.james.eventsourcing.eventstore.memory.InMemoryEventStore;
 import org.apache.james.task.CountDownLatchExtension;
+import org.apache.james.task.MemoryWorkQueue;
+import org.apache.james.task.SerialTaskManagerWorker;
 import org.apache.james.task.TaskManager;
 import org.apache.james.task.TaskManagerContract;
+import org.apache.james.task.TaskManagerWorker;
+import org.apache.james.task.WorkQueue;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -35,9 +39,11 @@ class EventSourcingTaskManagerTest implements TaskManagerContract {
 
     @BeforeEach
     void setUp() {
+        TaskManagerWorker worker = new SerialTaskManagerWorker();
+        WorkQueue workQueue = new MemoryWorkQueue(worker);
         EventStore eventStore = new InMemoryEventStore();
         TaskExecutionDetailsProjection executionDetailsProjection = new MemoryTaskExecutionDetailsProjection();
-        taskManager = new EventSourcingTaskManager(eventStore, executionDetailsProjection);
+        taskManager = new EventSourcingTaskManager(worker, workQueue, eventStore, executionDetailsProjection);
     }
 
     @AfterEach


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org