You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ro...@apache.org on 2019/08/28 07:28:32 UTC
[james-project] 01/09: JAMES-2813 extract WorkQueue interface
This is an automated email from the ASF dual-hosted git repository.
rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 0cd90e1752c0a384c9ff8e65590560454ee67e97
Author: RĂ©mi Kowalski <rk...@linagora.com>
AuthorDate: Wed Jul 31 17:14:20 2019 +0200
JAMES-2813 extract WorkQueue interface
---
.../org/apache/james/task/MemoryTaskManager.java | 2 +-
.../task/{WorkQueue.java => MemoryWorkQueue.java} | 13 +----
.../main/java/org/apache/james/task/WorkQueue.java | 62 ++--------------------
.../eventsourcing/EventSourcingTaskManager.scala | 6 ++-
.../EventSourcingTaskManagerTest.java | 8 ++-
5 files changed, 17 insertions(+), 74 deletions(-)
diff --git a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
index 2ac480e..ad7bd9d 100644
--- a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
+++ b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
@@ -81,7 +81,7 @@ public class MemoryTaskManager implements TaskManager {
idToExecutionDetails = new ConcurrentHashMap<>();
worker = new SerialTaskManagerWorker();
- workQueue = WorkQueue.builder().worker(worker);
+ workQueue = new MemoryWorkQueue(worker);
}
public TaskId submit(Task task) {
diff --git a/server/task/src/main/java/org/apache/james/task/WorkQueue.java b/server/task/src/main/java/org/apache/james/task/MemoryWorkQueue.java
similarity index 90%
copy from server/task/src/main/java/org/apache/james/task/WorkQueue.java
copy to server/task/src/main/java/org/apache/james/task/MemoryWorkQueue.java
index e2e7035..8a720e6 100644
--- a/server/task/src/main/java/org/apache/james/task/WorkQueue.java
+++ b/server/task/src/main/java/org/apache/james/task/MemoryWorkQueue.java
@@ -19,7 +19,6 @@
package org.apache.james.task;
-import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.LinkedBlockingQueue;
@@ -29,21 +28,13 @@ import reactor.core.scheduler.Schedulers;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
-public class WorkQueue implements Closeable {
-
- public static RequireWorker builder() {
- return WorkQueue::new;
- }
-
- public interface RequireWorker {
- WorkQueue worker(TaskManagerWorker worker);
- }
+public class MemoryWorkQueue implements WorkQueue {
private final TaskManagerWorker worker;
private final Disposable subscription;
private final LinkedBlockingQueue<Tuple2<TaskWithId, TaskManagerWorker.Listener>> tasks;
- private WorkQueue(TaskManagerWorker worker) {
+ public MemoryWorkQueue(TaskManagerWorker worker) {
this.worker = worker;
this.tasks = new LinkedBlockingQueue<>();
this.subscription = Mono.fromCallable(tasks::take)
diff --git a/server/task/src/main/java/org/apache/james/task/WorkQueue.java b/server/task/src/main/java/org/apache/james/task/WorkQueue.java
index e2e7035..7c8c70a 100644
--- a/server/task/src/main/java/org/apache/james/task/WorkQueue.java
+++ b/server/task/src/main/java/org/apache/james/task/WorkQueue.java
@@ -16,69 +16,13 @@
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/
-
package org.apache.james.task;
import java.io.Closeable;
-import java.io.IOException;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import reactor.core.Disposable;
-import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Schedulers;
-import reactor.util.function.Tuple2;
-import reactor.util.function.Tuples;
-
-public class WorkQueue implements Closeable {
-
- public static RequireWorker builder() {
- return WorkQueue::new;
- }
-
- public interface RequireWorker {
- WorkQueue worker(TaskManagerWorker worker);
- }
-
- private final TaskManagerWorker worker;
- private final Disposable subscription;
- private final LinkedBlockingQueue<Tuple2<TaskWithId, TaskManagerWorker.Listener>> tasks;
-
- private WorkQueue(TaskManagerWorker worker) {
- this.worker = worker;
- this.tasks = new LinkedBlockingQueue<>();
- this.subscription = Mono.fromCallable(tasks::take)
- .repeat()
- .subscribeOn(Schedulers.elastic())
- .flatMapSequential(this::dispatchTaskToWorker)
- .subscribe();
- }
-
- private Mono<?> dispatchTaskToWorker(Tuple2<TaskWithId, TaskManagerWorker.Listener> tuple) {
- TaskWithId taskWithId = tuple.getT1();
- TaskManagerWorker.Listener listener = tuple.getT2();
- return worker.executeTask(taskWithId, listener);
- }
-
- public void submit(TaskWithId taskWithId, TaskManagerWorker.Listener listener) {
- try {
- tasks.put(Tuples.of(taskWithId, listener));
- } catch (InterruptedException e) {
- listener.cancelled();
- }
- }
- public void cancel(TaskId taskId) {
- worker.cancelTask(taskId);
- }
+public interface WorkQueue extends Closeable {
- @Override
- public void close() throws IOException {
- try {
- subscription.dispose();
- } catch (Throwable ignore) {
- //avoid failing during close
- }
- worker.close();
- }
+ void submit(TaskWithId taskWithId, TaskManagerWorker.Listener listener);
+ void cancel(TaskId taskId);
}
diff --git a/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala b/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
index 17c9ebd..0569dcc 100644
--- a/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
+++ b/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
@@ -30,10 +30,12 @@ import org.apache.james.task.eventsourcing.TaskCommand._
import scala.annotation.tailrec
-class EventSourcingTaskManager @Inject @VisibleForTesting private[eventsourcing](val eventStore: EventStore,
+class EventSourcingTaskManager @Inject @VisibleForTesting private[eventsourcing](
+ val serialTaskManagerWorker: TaskManagerWorker,
+ val workQueue: WorkQueue,
+ val eventStore: EventStore,
val executionDetailsProjection: TaskExecutionDetailsProjection) extends TaskManager with Closeable {
- private val workQueue: WorkQueue = WorkQueue.builder().worker(new SerialTaskManagerWorker)
private val delayBetweenPollingInMs = 500
private def workDispatcher: Subscriber = {
diff --git a/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java b/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java
index b017721..057462d 100644
--- a/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java
+++ b/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java
@@ -22,8 +22,12 @@ package org.apache.james.task.eventsourcing;
import org.apache.james.eventsourcing.eventstore.EventStore;
import org.apache.james.eventsourcing.eventstore.memory.InMemoryEventStore;
import org.apache.james.task.CountDownLatchExtension;
+import org.apache.james.task.MemoryWorkQueue;
+import org.apache.james.task.SerialTaskManagerWorker;
import org.apache.james.task.TaskManager;
import org.apache.james.task.TaskManagerContract;
+import org.apache.james.task.TaskManagerWorker;
+import org.apache.james.task.WorkQueue;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -35,9 +39,11 @@ class EventSourcingTaskManagerTest implements TaskManagerContract {
@BeforeEach
void setUp() {
+ TaskManagerWorker worker = new SerialTaskManagerWorker();
+ WorkQueue workQueue = new MemoryWorkQueue(worker);
EventStore eventStore = new InMemoryEventStore();
TaskExecutionDetailsProjection executionDetailsProjection = new MemoryTaskExecutionDetailsProjection();
- taskManager = new EventSourcingTaskManager(eventStore, executionDetailsProjection);
+ taskManager = new EventSourcingTaskManager(worker, workQueue, eventStore, executionDetailsProjection);
}
@AfterEach
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org